xinyuiscool commented on code in PR #1705:
URL: https://github.com/apache/samza/pull/1705#discussion_r1717424079


##########
samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java:
##########
@@ -100,4 +120,74 @@ public void testUpdate() {
     // verify we got a watermark 6 (min) for int stream
     assertEquals(watermarkStates.getWatermark(intermediate), 6L);
   }
+
+  @Test
+  public void testIdle() {
+    WatermarkStates watermarkStates = new WatermarkStates(ssps, 
producerCounts, new MetricsRegistryMap(),
+            TASK_WATERMARK_IDLE_MS, new MockSystemTime());
+
+    // First wm is marked before the idle time, so it's not counted
+    WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0");
+    watermarkStates.update(watermarkMessage, intPartition0);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 
WATERMARK_NOT_EXIST);
+    assertEquals(watermarkStates.getWatermark(intermediate), 
WATERMARK_NOT_EXIST);
+
+    // Watermark is computed based on "task 1" since "task 0" passes the idle 
time
+    watermarkMessage = new WatermarkMessage(5L, "task 1");
+    watermarkStates.update(watermarkMessage, intPartition0);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L);
+    assertEquals(watermarkStates.getWatermark(intermediate), 
WATERMARK_NOT_EXIST);
+
+    // Watermark is computed based on "task 0" since the time already passes 
the idle threshold
+    watermarkMessage = new WatermarkMessage(6L, "task 0");
+    watermarkStates.update(watermarkMessage, intPartition1);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 6L);
+    assertEquals(watermarkStates.getWatermark(intermediate), 5L);
+
+    // Watermark from "task 1" is less than current watermark, ignore
+    watermarkMessage = new WatermarkMessage(2L, "task 1");
+    watermarkStates.update(watermarkMessage, intPartition1);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 6L);
+    // verify we got a watermark (min) for int stream
+    assertEquals(watermarkStates.getWatermark(intermediate), 5L);
+
+    // Watermark from "task 0" is updated, but less than current watermark
+    watermarkMessage = new WatermarkMessage(3L, "task 0");
+    watermarkStates.update(watermarkMessage, intPartition0);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L);
+    assertEquals(watermarkStates.getWatermark(intermediate), 5L);
+
+    // Watermark is computed this time due to advance in "task 0"
+    watermarkMessage = new WatermarkMessage(7L, "task 0");
+    watermarkStates.update(watermarkMessage, intPartition0);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L);
+    assertEquals(watermarkStates.getWatermark(intermediate), 5L);
+
+    // Watermark is computed this time due to advance in "task 1"
+    watermarkMessage = new WatermarkMessage(10L, "task 1");
+    watermarkStates.update(watermarkMessage, intPartition0);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 7L);
+    assertEquals(watermarkStates.getWatermark(intermediate), 6L);
+  }
+
+  class MockSystemTime implements LongSupplier {
+    int createTime = 0;
+    boolean firstWatermark = true;
+
+    @Override
+    public long getAsLong() {
+      if (createTime < ssps.size()) {
+        createTime++;
+        return System.currentTimeMillis() - TASK_WATERMARK_IDLE_MS;
+      }
+
+      if (firstWatermark) {
+        firstWatermark = false;
+        // Make the first task idle
+        return System.currentTimeMillis() - TASK_WATERMARK_IDLE_MS;
+      } else {
+        return System.currentTimeMillis();
+      }
+    }
+  }

Review Comment:
   Good suggestion! I modified the code accordingly. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to