xinyuiscool commented on code in PR #1705:
URL: https://github.com/apache/samza/pull/1705#discussion_r1717425287
##########
samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java:
##########
@@ -49,34 +50,59 @@ class WatermarkStates {
private final static class WatermarkState {
private final int expectedTotal;
private final Map<String, Long> timestamps = new HashMap<>();
+ private final Map<String, Long> lastUpdateTime = new HashMap<>();
+ private final long watermarkIdleTime;
+ private final long createTime;
+ private final LongSupplier systemTimeFunc;
private volatile long watermarkTime = WATERMARK_NOT_EXIST;
- WatermarkState(int expectedTotal) {
+ WatermarkState(int expectedTotal, long watermarkIdleTime, LongSupplier
systemTimeFunc) {
this.expectedTotal = expectedTotal;
+ this.watermarkIdleTime = watermarkIdleTime;
+ this.systemTimeFunc = systemTimeFunc;
+ this.createTime = systemTimeFunc.getAsLong();
}
synchronized void update(long timestamp, String taskName) {
+ long currentTime = systemTimeFunc.getAsLong();
if (taskName != null) {
Long ts = timestamps.get(taskName);
if (ts != null && ts > timestamp) {
LOG.warn(String.format("Incoming watermark %s is smaller than
existing watermark %s for upstream task %s",
timestamp, ts, taskName));
} else {
timestamps.put(taskName, timestamp);
+ lastUpdateTime.put(taskName, currentTime);
}
}
if (taskName == null) {
// we get watermark either from the source or from the aggregator task
watermarkTime = Math.max(watermarkTime, timestamp);
- } else if (timestamps.size() == expectedTotal) {
- // For any intermediate streams, the expectedTotal is the upstream
task count.
- // Check whether we got all the watermarks, and set the watermark to
be the min.
- Optional<Long> min = timestamps.values().stream().min(Long::compare);
- watermarkTime = min.orElse(timestamp);
+ } else if (canUpdateWatermark(currentTime)) {
+ Optional<Long> min;
+ if (watermarkIdleTime <= 0) {
+ // All upstream tasks are required in the computation
+ min = timestamps.values().stream().min(Long::compare);
+ } else {
+ // Exclude the tasks that have been idle in watermark emission.
+ min = timestamps.entrySet().stream()
+ .filter(t -> currentTime - lastUpdateTime.get(t.getKey()) <
watermarkIdleTime)
Review Comment:
Makes sense. Changed to use a single loop.
##########
samza-core/src/main/java/org/apache/samza/config/TaskConfig.java:
##########
@@ -147,6 +147,9 @@ public class TaskConfig extends MapConfig {
"task.transactional.state.retain.existing.state";
private static final boolean
DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true;
+ public static final String WATERMARK_IDLE_MS = "task.watermark.idle.ms";
Review Comment:
Changed to WATERMARK_IDLE_TIMEOUT_MS. Thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]