ajothomas commented on code in PR #1705:
URL: https://github.com/apache/samza/pull/1705#discussion_r1717112686
##########
samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java:
##########
@@ -49,34 +50,59 @@ class WatermarkStates {
private final static class WatermarkState {
private final int expectedTotal;
private final Map<String, Long> timestamps = new HashMap<>();
+ private final Map<String, Long> lastUpdateTime = new HashMap<>();
+ private final long watermarkIdleTime;
+ private final long createTime;
+ private final LongSupplier systemTimeFunc;
private volatile long watermarkTime = WATERMARK_NOT_EXIST;
- WatermarkState(int expectedTotal) {
+ WatermarkState(int expectedTotal, long watermarkIdleTime, LongSupplier
systemTimeFunc) {
this.expectedTotal = expectedTotal;
+ this.watermarkIdleTime = watermarkIdleTime;
+ this.systemTimeFunc = systemTimeFunc;
+ this.createTime = systemTimeFunc.getAsLong();
}
synchronized void update(long timestamp, String taskName) {
+ long currentTime = systemTimeFunc.getAsLong();
if (taskName != null) {
Long ts = timestamps.get(taskName);
if (ts != null && ts > timestamp) {
LOG.warn(String.format("Incoming watermark %s is smaller than
existing watermark %s for upstream task %s",
timestamp, ts, taskName));
} else {
timestamps.put(taskName, timestamp);
+ lastUpdateTime.put(taskName, currentTime);
}
}
if (taskName == null) {
// we get watermark either from the source or from the aggregator task
watermarkTime = Math.max(watermarkTime, timestamp);
- } else if (timestamps.size() == expectedTotal) {
- // For any intermediate streams, the expectedTotal is the upstream
task count.
- // Check whether we got all the watermarks, and set the watermark to
be the min.
- Optional<Long> min = timestamps.values().stream().min(Long::compare);
- watermarkTime = min.orElse(timestamp);
+ } else if (canUpdateWatermark(currentTime)) {
+ Optional<Long> min;
+ if (watermarkIdleTime <= 0) {
+ // All upstream tasks are required in the computation
+ min = timestamps.values().stream().min(Long::compare);
+ } else {
+ // Exclude the tasks that have been idle in watermark emission.
+ min = timestamps.entrySet().stream()
+ .filter(t -> currentTime - lastUpdateTime.get(t.getKey()) <
watermarkIdleTime)
+ .map(Map.Entry::getValue)
+ .min(Long::compare);
+ }
+ watermarkTime = Math.max(watermarkTime, min.orElse(timestamp));
}
}
+ private boolean canUpdateWatermark(long currentTime) {
+ // The watermark can be updated if
+ // 1. we received watermarks from all upstream tasks, or
+ // 2. we allow task idle in emitting watermarks and the idle time has
passed.
+ return (timestamps.size() == expectedTotal)
+ || (watermarkIdleTime > 0 && currentTime - createTime >
watermarkIdleTime);
Review Comment:
Is the second part needed ?
We would have received at least one watermark for a lagging partition so
even a lagging partition will have an entry in timestamps. The only scenario I
can think of if some partition is stuck on container startup.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]