becketqin commented on code in PR #1705:
URL: https://github.com/apache/samza/pull/1705#discussion_r1719086060


##########
samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java:
##########
@@ -49,34 +52,75 @@ class WatermarkStates {
   private final static class WatermarkState {
     private final int expectedTotal;
     private final Map<String, Long> timestamps = new HashMap<>();
+    private final Map<String, Long> lastUpdateTime = new HashMap<>();
+    private final long watermarkIdleTimeout;
+    private final int quorumSize;
+    private final long createTime;
+    private final LongSupplier systemTimeFunc;
     private volatile long watermarkTime = WATERMARK_NOT_EXIST;
 
-    WatermarkState(int expectedTotal) {
+    WatermarkState(
+            int expectedTotal,
+            long watermarkIdleTimeout,
+            double watermarkQuorumSizePercentage,
+            LongSupplier systemTimeFunc) {
       this.expectedTotal = expectedTotal;
+      this.watermarkIdleTimeout = watermarkIdleTimeout;
+      this.quorumSize = (int) (expectedTotal * watermarkQuorumSizePercentage);
+      this.systemTimeFunc = systemTimeFunc;
+      this.createTime = systemTimeFunc.getAsLong();
     }
 
     synchronized void update(long timestamp, String taskName) {
+      long currentTime = systemTimeFunc.getAsLong();
       if (taskName != null) {
         Long ts = timestamps.get(taskName);
         if (ts != null && ts > timestamp) {
           LOG.warn(String.format("Incoming watermark %s is smaller than 
existing watermark %s for upstream task %s",
               timestamp, ts, taskName));
         } else {
           timestamps.put(taskName, timestamp);
+          lastUpdateTime.put(taskName, currentTime);
         }
       }
 
       if (taskName == null) {
         // we get watermark either from the source or from the aggregator task
         watermarkTime = Math.max(watermarkTime, timestamp);
-      } else if (timestamps.size() == expectedTotal) {
-        // For any intermediate streams, the expectedTotal is the upstream 
task count.
-        // Check whether we got all the watermarks, and set the watermark to 
be the min.
-        Optional<Long> min = timestamps.values().stream().min(Long::compare);
-        watermarkTime = min.orElse(timestamp);
+      } else if (canUpdateWatermark(currentTime)) {
+        final long minWatermark;
+        if (watermarkIdleTimeout <= 0) {
+          // All upstream tasks are required in the computation
+          minWatermark = 
timestamps.values().stream().min(Long::compare).orElse(timestamp);
+        } else {
+          // Exclude the tasks that have been idle in watermark emission.
+          long min = Long.MAX_VALUE;
+          long watermarkIdleThreshold = currentTime - watermarkIdleTimeout;
+          int updateCount = 0;
+          for (Map.Entry<String, Long> entry : timestamps.entrySet()) {
+            // Check the update happens before the idle timeout
+            if (lastUpdateTime.get(entry.getKey()) > watermarkIdleThreshold) {
+              min = Math.min(min, entry.getValue());
+              updateCount++;
+            }
+          }
+
+          // Active tasks must exceed the quorum size
+          minWatermark = (updateCount >= quorumSize && min != Long.MAX_VALUE) 
? min : WATERMARK_NOT_EXIST;

Review Comment:
   Does it make sense to either have a metric for `quorumSize` or log a warning 
here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to