taegeonum commented on a change in pull request #151: [NEMO-267] Consider 
watermark holds in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/151#discussion_r231362810
 
 

 ##########
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##########
 @@ -139,20 +145,52 @@ private void processElementsAndTriggerTimers(final 
Watermark inputWatermark,
       }
 
       // Trigger timers
-      final long minOutputTimestamp =
-        triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
-
-      minOutputTimestampsOfEmittedWindows = 
Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp);
+      triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
 
       // Remove values
       values.clear();
     }
 
     // Emit watermark to downstream operators
-    if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE
-      && currentOutputWatermark < minOutputTimestampsOfEmittedWindows) {
-      currentOutputWatermark = minOutputTimestampsOfEmittedWindows;
-      getOutputCollector().emitWatermark(new 
Watermark(minOutputTimestampsOfEmittedWindows));
+    emitOutputWatermark(inputWatermark);
+  }
+
+  /**
+   * Output watermark
+   * = max(prev output watermark,
+   *          min(input watermark, watermark holds)).
+   * @param inputWatermark input watermark
+   */
+  private void emitOutputWatermark(final Watermark inputWatermark) {
+
 
 Review comment:
   Why do we check Long.MIN_VALUE for inputWatermark? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to