ajothomas commented on code in PR #1705:
URL: https://github.com/apache/samza/pull/1705#discussion_r1717530146


##########
samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java:
##########
@@ -49,34 +50,59 @@ class WatermarkStates {
   private final static class WatermarkState {
     private final int expectedTotal;
     private final Map<String, Long> timestamps = new HashMap<>();
+    private final Map<String, Long> lastUpdateTime = new HashMap<>();
+    private final long watermarkIdleTime;
+    private final long createTime;
+    private final LongSupplier systemTimeFunc;
     private volatile long watermarkTime = WATERMARK_NOT_EXIST;
 
-    WatermarkState(int expectedTotal) {
+    WatermarkState(int expectedTotal, long watermarkIdleTime, LongSupplier 
systemTimeFunc) {
       this.expectedTotal = expectedTotal;
+      this.watermarkIdleTime = watermarkIdleTime;
+      this.systemTimeFunc = systemTimeFunc;
+      this.createTime = systemTimeFunc.getAsLong();
     }
 
     synchronized void update(long timestamp, String taskName) {
+      long currentTime = systemTimeFunc.getAsLong();
       if (taskName != null) {
         Long ts = timestamps.get(taskName);
         if (ts != null && ts > timestamp) {
           LOG.warn(String.format("Incoming watermark %s is smaller than 
existing watermark %s for upstream task %s",
               timestamp, ts, taskName));
         } else {
           timestamps.put(taskName, timestamp);
+          lastUpdateTime.put(taskName, currentTime);
         }
       }
 
       if (taskName == null) {
         // we get watermark either from the source or from the aggregator task
         watermarkTime = Math.max(watermarkTime, timestamp);
-      } else if (timestamps.size() == expectedTotal) {
-        // For any intermediate streams, the expectedTotal is the upstream 
task count.
-        // Check whether we got all the watermarks, and set the watermark to 
be the min.
-        Optional<Long> min = timestamps.values().stream().min(Long::compare);
-        watermarkTime = min.orElse(timestamp);
+      } else if (canUpdateWatermark(currentTime)) {
+        Optional<Long> min;
+        if (watermarkIdleTime <= 0) {
+          // All upstream tasks are required in the computation
+          min = timestamps.values().stream().min(Long::compare);
+        } else {
+          // Exclude the tasks that have been idle in watermark emission.
+          min = timestamps.entrySet().stream()
+                  .filter(t -> currentTime - lastUpdateTime.get(t.getKey()) < 
watermarkIdleTime)
+                  .map(Map.Entry::getValue)
+                  .min(Long::compare);
+        }
+        watermarkTime = Math.max(watermarkTime, min.orElse(timestamp));
       }
     }
 
+    private boolean canUpdateWatermark(long currentTime) {
+      // The watermark can be updated if
+      // 1. we received watermarks from all upstream tasks, or
+      // 2. we allow task idle in emitting watermarks and the idle time has 
passed.
+      return (timestamps.size() == expectedTotal)
+              || (watermarkIdleTime > 0 && currentTime - createTime > 
watermarkIdleTime);

Review Comment:
   Thanks for the explanation and adding comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to