xinyuiscool commented on code in PR #1705:
URL: https://github.com/apache/samza/pull/1705#discussion_r1717425287


##########
samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java:
##########
@@ -49,34 +50,59 @@ class WatermarkStates {
   private final static class WatermarkState {
     private final int expectedTotal;
     private final Map<String, Long> timestamps = new HashMap<>();
+    private final Map<String, Long> lastUpdateTime = new HashMap<>();
+    private final long watermarkIdleTime;
+    private final long createTime;
+    private final LongSupplier systemTimeFunc;
     private volatile long watermarkTime = WATERMARK_NOT_EXIST;
 
-    WatermarkState(int expectedTotal) {
+    WatermarkState(int expectedTotal, long watermarkIdleTime, LongSupplier 
systemTimeFunc) {
       this.expectedTotal = expectedTotal;
+      this.watermarkIdleTime = watermarkIdleTime;
+      this.systemTimeFunc = systemTimeFunc;
+      this.createTime = systemTimeFunc.getAsLong();
     }
 
     synchronized void update(long timestamp, String taskName) {
+      long currentTime = systemTimeFunc.getAsLong();
       if (taskName != null) {
         Long ts = timestamps.get(taskName);
         if (ts != null && ts > timestamp) {
           LOG.warn(String.format("Incoming watermark %s is smaller than 
existing watermark %s for upstream task %s",
               timestamp, ts, taskName));
         } else {
           timestamps.put(taskName, timestamp);
+          lastUpdateTime.put(taskName, currentTime);
         }
       }
 
       if (taskName == null) {
         // we get watermark either from the source or from the aggregator task
         watermarkTime = Math.max(watermarkTime, timestamp);
-      } else if (timestamps.size() == expectedTotal) {
-        // For any intermediate streams, the expectedTotal is the upstream 
task count.
-        // Check whether we got all the watermarks, and set the watermark to 
be the min.
-        Optional<Long> min = timestamps.values().stream().min(Long::compare);
-        watermarkTime = min.orElse(timestamp);
+      } else if (canUpdateWatermark(currentTime)) {
+        Optional<Long> min;
+        if (watermarkIdleTime <= 0) {
+          // All upstream tasks are required in the computation
+          min = timestamps.values().stream().min(Long::compare);
+        } else {
+          // Exclude the tasks that have been idle in watermark emission.
+          min = timestamps.entrySet().stream()
+                  .filter(t -> currentTime - lastUpdateTime.get(t.getKey()) < 
watermarkIdleTime)

Review Comment:
   Makes sense. Changed to use a single loop.



##########
samza-core/src/main/java/org/apache/samza/config/TaskConfig.java:
##########
@@ -147,6 +147,9 @@ public class TaskConfig extends MapConfig {
       "task.transactional.state.retain.existing.state";
   private static final boolean 
DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true;
 
+  public static final String WATERMARK_IDLE_MS = "task.watermark.idle.ms";

Review Comment:
   Changed to WATERMARK_IDLE_TIMEOUT_MS. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to