This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/develop by this push:
new ef30dd1c add watermark in IdleWindowScaner
ef30dd1c is described below
commit ef30dd1cbaa91fb0bbb80bf2e8d164db49d4366d
Author: starmilkxin <[email protected]>
AuthorDate: Mon May 22 17:08:49 2023 +0800
add watermark in IdleWindowScaner
---
.../supplier/JoinWindowAggregateSupplier.java | 8 +--
.../supplier/WindowAccumulatorSupplier.java | 7 ++-
.../function/supplier/WindowAggregateSupplier.java | 7 ++-
.../streams/core/window/fire/IdleWindowScaner.java | 57 +++++++++++++---------
4 files changed, 43 insertions(+), 36 deletions(-)
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
index a7856397..3d31c473 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.java
@@ -121,7 +121,7 @@ public class JoinWindowAggregateSupplier<K, V1, V2, OUT>
implements Supplier<Pro
throw new IllegalStateException(format);
}
- store(key, data, time, streamType);
+ store(key, data, time, watermark, streamType);
List<WindowKey> fire = this.joinWindowFire.fire(this.name,
watermark, streamType);
for (WindowKey windowKey : fire) {
@@ -130,7 +130,7 @@ public class JoinWindowAggregateSupplier<K, V1, V2, OUT>
implements Supplier<Pro
}
- private void store(Object key, Object data, long time, StreamType
streamType) throws Throwable {
+ private void store(Object key, Object data, long time, long watermark,
StreamType streamType) throws Throwable {
String name = Utils.buildKey(this.name, streamType.name());
List<Window> windows = super.calculateWindow(windowInfo, time);
for (Window window : windows) {
@@ -142,12 +142,12 @@ public class JoinWindowAggregateSupplier<K, V1, V2, OUT>
implements Supplier<Pro
case LEFT_STREAM:
WindowState<K, V1> leftState = new WindowState<>((K)
key, (V1) data, time);
this.leftWindowStore.put(stateTopicMessageQueue,
windowKey, leftState);
- this.idleWindowScaner.putJoinWindowCallback(windowKey,
joinWindowFire);
+ this.idleWindowScaner.putJoinWindowCallback(windowKey,
watermark, joinWindowFire);
break;
case RIGHT_STREAM:
WindowState<K, V2> rightState = new WindowState<>((K)
key, (V2) data, time);
this.rightWindowStore.put(stateTopicMessageQueue,
windowKey, rightState);
- this.idleWindowScaner.putJoinWindowCallback(windowKey,
joinWindowFire);
+ this.idleWindowScaner.putJoinWindowCallback(windowKey,
watermark, joinWindowFire);
break;
}
}
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
index b542923d..436f2b82 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
@@ -151,7 +151,7 @@ public class WindowAccumulatorSupplier<K, V, R, OV>
implements Supplier<Processo
//f(Window + key, newValue, store)
WindowState<K, Accumulator<R, OV>> state = new
WindowState<>(key, storeAccumulator, time);
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
- this.idleWindowScaner.putAccumulatorWindowCallback(windowKey,
this.accumulatorWindowFire);
+ this.idleWindowScaner.putAccumulatorWindowCallback(windowKey,
watermark, this.accumulatorWindowFire);
}
try {
@@ -188,7 +188,6 @@ public class WindowAccumulatorSupplier<K, V, R, OV>
implements Supplier<Processo
WindowState::windowState2Byte);
this.idleWindowScaner = context.getDefaultWindowScaner();
-
this.idleWindowScaner.initSessionTimeOut(windowInfo.getSessionTimeout().toMilliseconds());
String stateTopicName = context.getSourceTopic() +
Constant.STATE_TOPIC_SUFFIX;
this.stateTopicMessageQueue = new MessageQueue(stateTopicName,
context.getSourceBrokerName(), context.getSourceQueueId());
@@ -227,7 +226,7 @@ public class WindowAccumulatorSupplier<K, V, R, OV>
implements Supplier<Processo
logger.info("new session window, with key={}, valueTime={},
sessionBegin=[{}], sessionEnd=[{}]", key, Utils.format(time),
Utils.format(newSessionWindowTime.getKey()),
Utils.format(newSessionWindowTime.getValue()));
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
-
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey,
this.accumulatorSessionWindowFire);
+
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, watermark,
this.accumulatorSessionWindowFire);
}
}
@@ -324,7 +323,7 @@ public class WindowAccumulatorSupplier<K, V, R, OV>
implements Supplier<Processo
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
-
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey,
this.accumulatorSessionWindowFire);
+
this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, watermark,
this.accumulatorSessionWindowFire);
this.idleWindowScaner.removeOldAccumulatorSession(needToDelete);
this.windowStore.deleteByKey(needToDelete);
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
index e0e55c8b..fb20eb5d 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
@@ -153,7 +153,7 @@ public class WindowAggregateSupplier<K, V, OV> implements
Supplier<Processor<V>>
//f(Window + key, newValue, store)
WindowState<K, OV> state = new WindowState<>(key, newValue,
time);
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
- this.idleWindowScaner.putAggregateWindowCallback(windowKey,
this.aggregateWindowFire);
+ this.idleWindowScaner.putAggregateWindowCallback(windowKey,
watermark, this.aggregateWindowFire);
}
try {
@@ -192,7 +192,6 @@ public class WindowAggregateSupplier<K, V, OV> implements
Supplier<Processor<V>>
WindowState::windowState2Byte);
this.idleWindowScaner = context.getDefaultWindowScaner();
-
this.idleWindowScaner.initSessionTimeOut(windowInfo.getSessionTimeout().toMilliseconds());
String stateTopicName = context.getSourceTopic() +
Constant.STATE_TOPIC_SUFFIX;
this.stateTopicMessageQueue = new MessageQueue(stateTopicName,
context.getSourceBrokerName(), context.getSourceQueueId());
@@ -231,7 +230,7 @@ public class WindowAggregateSupplier<K, V, OV> implements
Supplier<Processor<V>>
logger.info("new session window, with key={}, valueTime={},
sessionBegin=[{}], sessionEnd=[{}]", key, Utils.format(time),
Utils.format(newSessionWindowTime.getKey()),
Utils.format(newSessionWindowTime.getValue()));
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
-
this.idleWindowScaner.putAggregateSessionWindowCallback(windowKey,
this.aggregateSessionWindowFire);
+
this.idleWindowScaner.putAggregateSessionWindowCallback(windowKey, watermark,
this.aggregateSessionWindowFire);
}
}
@@ -323,7 +322,7 @@ public class WindowAggregateSupplier<K, V, OV> implements
Supplier<Processor<V>>
this.windowStore.put(stateTopicMessageQueue, windowKey, state);
-
this.idleWindowScaner.putAggregateSessionWindowCallback(windowKey,
this.aggregateSessionWindowFire);
+
this.idleWindowScaner.putAggregateSessionWindowCallback(windowKey, watermark,
this.aggregateSessionWindowFire);
this.idleWindowScaner.removeOldAggregateSession(needToDelete);
this.windowStore.deleteByKey(needToDelete);
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner.java
b/core/src/main/java/org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner.java
index 09d3da88..ddf24188 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner.java
@@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -34,7 +33,6 @@ public class IdleWindowScaner implements AutoCloseable {
private static final Logger logger =
LoggerFactory.getLogger(IdleWindowScaner.class.getName());
private final Integer maxIdleTime;
- private long sessionTimeOut = 0;
private final ScheduledExecutorService executor;
private final ConcurrentHashMap<WindowKey, TimeType>
lastUpdateTime2WindowKey = new ConcurrentHashMap<>(16);
@@ -59,65 +57,66 @@ public class IdleWindowScaner implements AutoCloseable {
}, 0, 1000, TimeUnit.MILLISECONDS);
}
- public void initSessionTimeOut(long sessionTimeOut) {
- this.sessionTimeOut = sessionTimeOut;
- }
-
- public void putAccumulatorWindowCallback(WindowKey windowKey,
AccumulatorWindowFire<?, ?, ?, ?> function) {
+ public void putAccumulatorWindowCallback(WindowKey windowKey, long
watermark, AccumulatorWindowFire<?, ?, ?, ?> function) {
this.fireWindowCallBack.putIfAbsent(windowKey, function);
this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
if (timeType == null) {
- timeType = new TimeType(Type.AccumulatorWindow,
System.currentTimeMillis());
+ timeType = new TimeType(Type.AccumulatorWindow,
System.currentTimeMillis(), watermark);
} else {
timeType.setUpdateTime(System.currentTimeMillis());
+ timeType.setWatermark(watermark);
}
return timeType;
});
}
- public void putAccumulatorSessionWindowCallback(WindowKey windowKey,
AccumulatorSessionWindowFire<?, ?, ?, ?> function) {
+ public void putAccumulatorSessionWindowCallback(WindowKey windowKey, long
watermark, AccumulatorSessionWindowFire<?, ?, ?, ?> function) {
this.fireSessionWindowCallback.putIfAbsent(windowKey, function);
this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
if (timeType == null) {
- timeType = new TimeType(Type.AccumulatorSessionWindow,
System.currentTimeMillis());
+ timeType = new TimeType(Type.AccumulatorSessionWindow,
System.currentTimeMillis(), watermark);
} else {
timeType.setUpdateTime(System.currentTimeMillis());
+ timeType.setWatermark(watermark);
}
return timeType;
});
}
- public void putAggregateWindowCallback(WindowKey windowKey,
AggregateWindowFire<?, ?, ?> function) {
+ public void putAggregateWindowCallback(WindowKey windowKey, long
watermark, AggregateWindowFire<?, ?, ?> function) {
this.windowKeyAggregate.putIfAbsent(windowKey, function);
this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
if (timeType == null) {
- timeType = new TimeType(Type.AggregateWindow,
System.currentTimeMillis());
+ timeType = new TimeType(Type.AggregateWindow,
System.currentTimeMillis(), watermark);
} else {
timeType.setUpdateTime(System.currentTimeMillis());
+ timeType.setWatermark(watermark);
}
return timeType;
});
}
- public void putAggregateSessionWindowCallback(WindowKey windowKey,
AggregateSessionWindowFire<?, ?, ?> function) {
+ public void putAggregateSessionWindowCallback(WindowKey windowKey, long
watermark, AggregateSessionWindowFire<?, ?, ?> function) {
this.windowKeyAggregateSession.putIfAbsent(windowKey, function);
this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
if (timeType == null) {
- timeType = new TimeType(Type.AggregateSessionWindow,
System.currentTimeMillis());
+ timeType = new TimeType(Type.AggregateSessionWindow,
System.currentTimeMillis(), watermark);
} else {
timeType.setUpdateTime(System.currentTimeMillis());
+ timeType.setWatermark(watermark);
}
return timeType;
});
}
- public void putJoinWindowCallback(WindowKey windowKey, JoinWindowFire<?,
?, ?, ?> function) {
+ public void putJoinWindowCallback(WindowKey windowKey, long watermark,
JoinWindowFire<?, ?, ?, ?> function) {
this.fireJoinWindowCallback.putIfAbsent(windowKey, function);
this.lastUpdateTime2WindowKey.compute(windowKey, (key, timeType) -> {
if (timeType == null) {
- timeType = new TimeType(Type.JoinWindow,
System.currentTimeMillis());
+ timeType = new TimeType(Type.JoinWindow,
System.currentTimeMillis(), watermark);
} else {
timeType.setUpdateTime(System.currentTimeMillis());
+ timeType.setWatermark(watermark);
}
return timeType;
});
@@ -169,9 +168,10 @@ public class IdleWindowScaner implements AutoCloseable {
switch (type) {
case AggregateSessionWindow:
case AccumulatorSessionWindow: {
- if (idleTime >= sessionTimeOut) {
+ long watermark = timeType.getWatermark() + idleTime;
+ if (watermark > windowKey.getWindowEnd()) {
try {
- doFire(windowKey, type);
+ doFire(windowKey, type, watermark);
} finally {
iterator.remove();
}
@@ -181,10 +181,10 @@ public class IdleWindowScaner implements AutoCloseable {
case AccumulatorWindow:
case JoinWindow:
case AggregateWindow: {
- long windowSize = windowKey.getWindowEnd() -
windowKey.getWindowStart();
- if (idleTime > this.maxIdleTime && idleTime > windowSize) {
+ long watermark = timeType.getWatermark() + idleTime;
+ if (idleTime > this.maxIdleTime && watermark >
windowKey.getWindowEnd()) {
try {
- doFire(windowKey, type);
+ doFire(windowKey, type, watermark);
} finally {
iterator.remove();
}
@@ -197,8 +197,7 @@ public class IdleWindowScaner implements AutoCloseable {
}
}
- private void doFire(WindowKey windowKey, Type type) throws Throwable {
- long watermark = windowKey.getWindowEnd() + 1;
+ private void doFire(WindowKey windowKey, Type type, long watermark) throws
Throwable {
String operatorName = windowKey.getOperatorName();
switch (type) {
@@ -258,10 +257,12 @@ public class IdleWindowScaner implements AutoCloseable {
static class TimeType {
private Type type;
private long updateTime;
+ private long watermark;
- public TimeType(Type type, long updateTime) {
+ public TimeType(Type type, long updateTime, long watermark) {
this.type = type;
this.updateTime = updateTime;
+ this.watermark = watermark;
}
public Type getType() {
@@ -279,6 +280,14 @@ public class IdleWindowScaner implements AutoCloseable {
public void setUpdateTime(long updateTime) {
this.updateTime = updateTime;
}
+
+ public long getWatermark() {
+ return watermark;
+ }
+
+ public void setWatermark(long watermark) {
+ this.watermark = watermark;
+ }
}
enum Type {