This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/develop by this push:
     new ef30dd1c add watermark in IdleWindowScaner
ef30dd1c is described below

commit ef30dd1cbaa91fb0bbb80bf2e8d164db49d4366d
Author: starmilkxin <[email protected]>
AuthorDate: Mon May 22 17:08:49 2023 +0800

    add watermark in IdleWindowScaner
---
 .../supplier/JoinWindowAggregateSupplier.java      |  8 +--
 .../supplier/WindowAccumulatorSupplier.java        |  7 ++-
 .../function/supplier/WindowAggregateSupplier.java |  7 ++-
 .../streams/core/window/fire/IdleWindowScaner.java | 57 +++++++++++++---------
 4 files changed, 43 insertions(+), 36 deletions(-)

diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
index a7856397..3d31c473 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
@@ -121,7 +121,7 @@ public class JoinWindowAggregateSupplier<K, V1, V2, OUT> 
implements Supplier<Pro
                 throw new IllegalStateException(format);
             }
 
-            store(key, data, time, streamType);
+            store(key, data, time, watermark, streamType);
 
             List<WindowKey> fire = this.joinWindowFire.fire(this.name, 
watermark, streamType);
             for (WindowKey windowKey : fire) {
@@ -130,7 +130,7 @@ public class JoinWindowAggregateSupplier<K, V1, V2, OUT> 
implements Supplier<Pro
         }
 
 
-        private void store(Object key, Object data, long time, StreamType 
streamType) throws Throwable {
+        private void store(Object key, Object data, long time, long watermark, 
StreamType streamType) throws Throwable {
             String name = Utils.buildKey(this.name, streamType.name());
             List<Window> windows = super.calculateWindow(windowInfo, time);
             for (Window window : windows) {
@@ -142,12 +142,12 @@ public class JoinWindowAggregateSupplier<K, V1, V2, OUT> 
implements Supplier<Pro
                     case LEFT_STREAM:
                         WindowState<K, V1> leftState = new WindowState<>((K) 
key, (V1) data, time);
                         this.leftWindowStore.put(stateTopicMessageQueue, 
windowKey, leftState);
-                        this.idleWindowScaner.putJoinWindowCallback(windowKey, 
joinWindowFire);
+                        this.idleWindowScaner.putJoinWindowCallback(windowKey, 
watermark, joinWindowFire);
                         break;
                     case RIGHT_STREAM:
                         WindowState<K, V2> rightState = new WindowState<>((K) 
key, (V2) data, time);
                         this.rightWindowStore.put(stateTopicMessageQueue, 
windowKey, rightState);
-                        this.idleWindowScaner.putJoinWindowCallback(windowKey, 
joinWindowFire);
+                        this.idleWindowScaner.putJoinWindowCallback(windowKey, 
watermark, joinWindowFire);
                         break;
                 }
             }
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
index b542923d..436f2b82 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
@@ -151,7 +151,7 @@ public class WindowAccumulatorSupplier<K, V, R, OV> 
implements Supplier<Processo
                 //f(Window + key, newValue, store)
                 WindowState<K, Accumulator<R, OV>> state = new 
WindowState<>(key, storeAccumulator, time);
                 this.windowStore.put(stateTopicMessageQueue, windowKey, state);
-                this.idleWindowScaner.putAccumulatorWindowCallback(windowKey, 
this.accumulatorWindowFire);
+                this.idleWindowScaner.putAccumulatorWindowCallback(windowKey, 
watermark, this.accumulatorWindowFire);
             }
 
             try {
@@ -188,7 +188,6 @@ public class WindowAccumulatorSupplier<K, V, R, OV> 
implements Supplier<Processo
                     WindowState::windowState2Byte);
 
             this.idleWindowScaner = context.getDefaultWindowScaner();
-            
this.idleWindowScaner.initSessionTimeOut(windowInfo.getSessionTimeout().toMilliseconds());
 
             String stateTopicName = context.getSourceTopic() + 
Constant.STATE_TOPIC_SUFFIX;
             this.stateTopicMessageQueue = new MessageQueue(stateTopicName, 
context.getSourceBrokerName(), context.getSourceQueueId());
@@ -227,7 +226,7 @@ public class WindowAccumulatorSupplier<K, V, R, OV> 
implements Supplier<Processo
                 logger.info("new session window, with key={}, valueTime={}, 
sessionBegin=[{}], sessionEnd=[{}]", key, Utils.format(time),
                         Utils.format(newSessionWindowTime.getKey()), 
Utils.format(newSessionWindowTime.getValue()));
                 this.windowStore.put(stateTopicMessageQueue, windowKey, state);
-                
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, 
this.accumulatorSessionWindowFire);
+                
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, watermark, 
this.accumulatorSessionWindowFire);
             }
         }
 
@@ -324,7 +323,7 @@ public class WindowAccumulatorSupplier<K, V, R, OV> 
implements Supplier<Processo
 
                 this.windowStore.put(stateTopicMessageQueue, windowKey, state);
 
-                
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, 
this.accumulatorSessionWindowFire);
+                
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, watermark, 
this.accumulatorSessionWindowFire);
                 
this.idleWindowScaner.removeOldAccumulatorSession(needToDelete);
 
                 this.windowStore.deleteByKey(needToDelete);
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
index e0e55c8b..fb20eb5d 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
@@ -153,7 +153,7 @@ public class WindowAggregateSupplier<K, V, OV> implements 
Supplier<Processor<V>>
                 //f(Window + key, newValue, store)
                 WindowState<K, OV> state = new WindowState<>(key, newValue, 
time);
                 this.windowStore.put(stateTopicMessageQueue, windowKey, state);
-                this.idleWindowScaner.putAggregateWindowCallback(windowKey, 
this.aggregateWindowFire);
+                this.idleWindowScaner.putAggregateWindowCallback(windowKey, 
watermark, this.aggregateWindowFire);
             }
 
             try {
@@ -192,7 +192,6 @@ public class WindowAggregateSupplier<K, V, OV> implements 
Supplier<Processor<V>>
                     WindowState::windowState2Byte);
 
             this.idleWindowScaner = context.getDefaultWindowScaner();
-            
this.idleWindowScaner.initSessionTimeOut(windowInfo.getSessionTimeout().toMilliseconds());
 
             String stateTopicName = context.getSourceTopic() + 
Constant.STATE_TOPIC_SUFFIX;
             this.stateTopicMessageQueue = new MessageQueue(stateTopicName, 
context.getSourceBrokerName(), context.getSourceQueueId());
@@ -231,7 +230,7 @@ public class WindowAggregateSupplier<K, V, OV> implements 
Supplier<Processor<V>>
                 logger.info("new session window, with key={}, valueTime={}, 
sessionBegin=[{}], sessionEnd=[{}]", key, Utils.format(time),
                         Utils.format(newSessionWindowTime.getKey()), 
Utils.format(newSessionWindowTime.getValue()));
                 this.windowStore.put(stateTopicMessageQueue, windowKey, state);
-                
this.idleWindowScaner.putAggregateSessionWindowCallback(windowKey, 
this.aggregateSessionWindowFire);
+                
this.idleWindowScaner.putAggregateSessionWindowCallback(windowKey, watermark, 
this.aggregateSessionWindowFire);
             }
         }
 
@@ -323,7 +322,7 @@ public class WindowAggregateSupplier<K, V, OV> implements 
Supplier<Processor<V>>
 
                 this.windowStore.put(stateTopicMessageQueue, windowKey, state);
 
-                
this.idleWindowScaner.putAggregateSessionWindowCallback(windowKey, 
this.aggregateSessionWindowFire);
+                
this.idleWindowScaner.putAggregateSessionWindowCallback(windowKey, watermark, 
this.aggregateSessionWindowFire);
                 this.idleWindowScaner.removeOldAggregateSession(needToDelete);
 
                 this.windowStore.deleteByKey(needToDelete);
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner.java
index 09d3da88..ddf24188 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner.java
@@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -34,7 +33,6 @@ public class IdleWindowScaner implements AutoCloseable {
     private static final Logger logger = 
LoggerFactory.getLogger(IdleWindowScaner.class.getName());
 
     private final Integer maxIdleTime;
-    private long sessionTimeOut = 0;
     private final ScheduledExecutorService executor;
 
     private final ConcurrentHashMap<WindowKey, TimeType> 
lastUpdateTime2WindowKey = new ConcurrentHashMap<>(16);
@@ -59,65 +57,66 @@ public class IdleWindowScaner implements AutoCloseable {
         }, 0, 1000, TimeUnit.MILLISECONDS);
     }
 
-    public void initSessionTimeOut(long sessionTimeOut) {
-        this.sessionTimeOut = sessionTimeOut;
-    }
-
-    public void putAccumulatorWindowCallback(WindowKey windowKey, 
AccumulatorWindowFire<?, ?, ?, ?> function) {
+    public void putAccumulatorWindowCallback(WindowKey windowKey, long 
watermark, AccumulatorWindowFire<?, ?, ?, ?> function) {
         this.fireWindowCallBack.putIfAbsent(windowKey, function);
         this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
             if (timeType == null) {
-                timeType = new TimeType(Type.AccumulatorWindow, 
System.currentTimeMillis());
+                timeType = new TimeType(Type.AccumulatorWindow, 
System.currentTimeMillis(), watermark);
             } else {
                 timeType.setUpdateTime(System.currentTimeMillis());
+                timeType.setWatermark(watermark);
             }
             return timeType;
         });
     }
 
-    public void putAccumulatorSessionWindowCallback(WindowKey windowKey, 
AccumulatorSessionWindowFire<?, ?, ?, ?> function) {
+    public void putAccumulatorSessionWindowCallback(WindowKey windowKey, long 
watermark, AccumulatorSessionWindowFire<?, ?, ?, ?> function) {
         this.fireSessionWindowCallback.putIfAbsent(windowKey, function);
         this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
             if (timeType == null) {
-                timeType = new TimeType(Type.AccumulatorSessionWindow, 
System.currentTimeMillis());
+                timeType = new TimeType(Type.AccumulatorSessionWindow, 
System.currentTimeMillis(), watermark);
             } else {
                 timeType.setUpdateTime(System.currentTimeMillis());
+                timeType.setWatermark(watermark);
             }
             return timeType;
         });
     }
 
-    public void putAggregateWindowCallback(WindowKey windowKey, 
AggregateWindowFire<?, ?, ?> function) {
+    public void putAggregateWindowCallback(WindowKey windowKey, long 
watermark, AggregateWindowFire<?, ?, ?> function) {
         this.windowKeyAggregate.putIfAbsent(windowKey, function);
         this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
             if (timeType == null) {
-                timeType = new TimeType(Type.AggregateWindow, 
System.currentTimeMillis());
+                timeType = new TimeType(Type.AggregateWindow, 
System.currentTimeMillis(), watermark);
             } else {
                 timeType.setUpdateTime(System.currentTimeMillis());
+                timeType.setWatermark(watermark);
             }
             return timeType;
         });
     }
 
-    public void putAggregateSessionWindowCallback(WindowKey windowKey, 
AggregateSessionWindowFire<?, ?, ?> function) {
+    public void putAggregateSessionWindowCallback(WindowKey windowKey, long 
watermark, AggregateSessionWindowFire<?, ?, ?> function) {
         this.windowKeyAggregateSession.putIfAbsent(windowKey, function);
         this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
             if (timeType == null) {
-                timeType = new TimeType(Type.AggregateSessionWindow, 
System.currentTimeMillis());
+                timeType = new TimeType(Type.AggregateSessionWindow, 
System.currentTimeMillis(), watermark);
             } else {
                 timeType.setUpdateTime(System.currentTimeMillis());
+                timeType.setWatermark(watermark);
             }
             return timeType;
         });
     }
 
-    public void putJoinWindowCallback(WindowKey windowKey, JoinWindowFire<?, 
?, ?, ?> function) {
+    public void putJoinWindowCallback(WindowKey windowKey, long watermark, 
JoinWindowFire<?, ?, ?, ?> function) {
         this.fireJoinWindowCallback.putIfAbsent(windowKey, function);
         this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
             if (timeType == null) {
-                timeType = new TimeType(Type.JoinWindow, 
System.currentTimeMillis());
+                timeType = new TimeType(Type.JoinWindow, 
System.currentTimeMillis(), watermark);
             } else {
                 timeType.setUpdateTime(System.currentTimeMillis());
+                timeType.setWatermark(watermark);
             }
             return timeType;
         });
@@ -169,9 +168,10 @@ public class IdleWindowScaner implements AutoCloseable {
             switch (type) {
                 case AggregateSessionWindow:
                 case AccumulatorSessionWindow: {
-                    if (idleTime >= sessionTimeOut) {
+                    long watermark = timeType.getWatermark() + idleTime;
+                    if (watermark > windowKey.getWindowEnd()) {
                         try {
-                            doFire(windowKey, type);
+                            doFire(windowKey, type, watermark);
                         } finally {
                             iterator.remove();
                         }
@@ -181,10 +181,10 @@ public class IdleWindowScaner implements AutoCloseable {
                 case AccumulatorWindow:
                 case JoinWindow:
                 case AggregateWindow: {
-                    long windowSize = windowKey.getWindowEnd() - 
windowKey.getWindowStart();
-                    if (idleTime > this.maxIdleTime && idleTime > windowSize) {
+                    long watermark = timeType.getWatermark() + idleTime;
+                    if (idleTime > this.maxIdleTime && watermark > 
windowKey.getWindowEnd()) {
                         try {
-                            doFire(windowKey, type);
+                            doFire(windowKey, type, watermark);
                         } finally {
                             iterator.remove();
                         }
@@ -197,8 +197,7 @@ public class IdleWindowScaner implements AutoCloseable {
         }
     }
 
-    private void doFire(WindowKey windowKey, Type type) throws Throwable {
-        long watermark = windowKey.getWindowEnd() + 1;
+    private void doFire(WindowKey windowKey, Type type, long watermark) throws 
Throwable {
         String operatorName = windowKey.getOperatorName();
 
         switch (type) {
@@ -258,10 +257,12 @@ public class IdleWindowScaner implements AutoCloseable {
     static class TimeType {
         private Type type;
         private long updateTime;
+        private long watermark;
 
-        public TimeType(Type type, long updateTime) {
+        public TimeType(Type type, long updateTime, long watermark) {
             this.type = type;
             this.updateTime = updateTime;
+            this.watermark = watermark;
         }
 
         public Type getType() {
@@ -279,6 +280,14 @@ public class IdleWindowScaner implements AutoCloseable {
         public void setUpdateTime(long updateTime) {
             this.updateTime = updateTime;
         }
+
+        public long getWatermark() {
+            return watermark;
+        }
+
+        public void setWatermark(long watermark) {
+            this.watermark = watermark;
+        }
     }
 
     enum Type {

Reply via email to