taegeonum commented on a change in pull request #151: [NEMO-267] Consider watermark holds in GroupByKeyAndWindowDoFnTransform URL: https://github.com/apache/incubator-nemo/pull/151#discussion_r231362810
########## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ########## @@ -139,20 +145,52 @@ private void processElementsAndTriggerTimers(final Watermark inputWatermark, } // Trigger timers - final long minOutputTimestamp = - triggerTimers(key, inputWatermark, processingTime, synchronizedTime); - - minOutputTimestampsOfEmittedWindows = Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp); + triggerTimers(key, inputWatermark, processingTime, synchronizedTime); // Remove values values.clear(); } // Emit watermark to downstream operators - if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE - && currentOutputWatermark < minOutputTimestampsOfEmittedWindows) { - currentOutputWatermark = minOutputTimestampsOfEmittedWindows; - getOutputCollector().emitWatermark(new Watermark(minOutputTimestampsOfEmittedWindows)); + emitOutputWatermark(inputWatermark); + } + + /** + * Output watermark + * = max(prev output watermark, + * min(input watermark, watermark holds)). + * @param inputWatermark input watermark + */ + private void emitOutputWatermark(final Watermark inputWatermark) { + Review comment: Why do we check Long.MIN_VALUE for inputWatermark? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services