This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new cea2d6cc Resolve bug of concurrent modification (#201)
cea2d6cc is described below
commit cea2d6cc6e5d070a9cd2c8159d60a113684e3117
Author: Ni Ze <[email protected]>
AuthorDate: Wed Aug 31 17:01:16 2022 +0800
Resolve bug of concurrent modification (#201)
* feat(nest join) support nest join
* maintain(example) remove MqttSourceExample
* add synchronized
---
.../streams/window/fire/SplitEventTimeManager.java | 24 +++++++++++++---------
1 file changed, 14 insertions(+), 10 deletions(-)
diff --git
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
index b4daf916..35bf2c63 100644
---
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
+++
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
@@ -35,7 +35,7 @@ import
org.apache.rocketmq.streams.window.operator.AbstractWindow;
public class SplitEventTimeManager {
protected static final Log LOG =
LogFactory.getLog(SplitEventTimeManager.class);
- protected static Map<String, Long> messageSplitId2MaxTime = new
HashMap<>();
+ protected static final Map<String, Long> messageSplitId2MaxTime = new
HashMap<>();
private AtomicInteger queueIdCount = new AtomicInteger(0);
protected Long lastUpdateTime;
@@ -86,19 +86,23 @@ public class SplitEventTimeManager {
return null;
}
Long min = null;
- Set<Long> eventTimes = new HashSet<>(messageSplitId2MaxTime.values());
- for (Long eventTime : eventTimes) {
- if (eventTime == null) {
- return null;
- }
- if (min == null) {
- min = eventTime;
- } else {
- if (eventTime < min) {
+
+ synchronized (messageSplitId2MaxTime) {
+ Set<Long> eventTimes = new
HashSet<>(messageSplitId2MaxTime.values());
+ for (Long eventTime : eventTimes) {
+ if (eventTime == null) {
+ return null;
+ }
+ if (min == null) {
min = eventTime;
+ } else {
+ if (eventTime < min) {
+ min = eventTime;
+ }
}
}
}
+
return min;
}