This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new cea2d6cc Resolve bug of concurrent modification (#201)
cea2d6cc is described below

commit cea2d6cc6e5d070a9cd2c8159d60a113684e3117
Author: Ni Ze <[email protected]>
AuthorDate: Wed Aug 31 17:01:16 2022 +0800

    Resolve bug of concurrent modification (#201)
    
    * feat(nest join) support nest join
    
    * maintain(example) remove MqttSourceExample
    
    * add synchronized
---
 .../streams/window/fire/SplitEventTimeManager.java | 24 +++++++++++++---------
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git 
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
 
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
index b4daf916..35bf2c63 100644
--- 
a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
+++ 
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java
@@ -35,7 +35,7 @@ import 
org.apache.rocketmq.streams.window.operator.AbstractWindow;
 
 public class SplitEventTimeManager {
     protected static final Log LOG = 
LogFactory.getLog(SplitEventTimeManager.class);
-    protected static Map<String, Long> messageSplitId2MaxTime = new 
HashMap<>();
+    protected static final Map<String, Long> messageSplitId2MaxTime = new 
HashMap<>();
     private AtomicInteger queueIdCount = new AtomicInteger(0);
     protected Long lastUpdateTime;
 
@@ -86,19 +86,23 @@ public class SplitEventTimeManager {
             return null;
         }
         Long min = null;
-        Set<Long> eventTimes = new HashSet<>(messageSplitId2MaxTime.values());
-        for (Long eventTime : eventTimes) {
-            if (eventTime == null) {
-                return null;
-            }
-            if (min == null) {
-                min = eventTime;
-            } else {
-                if (eventTime < min) {
+
+        synchronized (messageSplitId2MaxTime) {
+            Set<Long> eventTimes = new 
HashSet<>(messageSplitId2MaxTime.values());
+            for (Long eventTime : eventTimes) {
+                if (eventTime == null) {
+                    return null;
+                }
+                if (min == null) {
                     min = eventTime;
+                } else {
+                    if (eventTime < min) {
+                        min = eventTime;
+                    }
                 }
             }
         }
+
         return min;
 
     }

Reply via email to