johnyangk commented on a change in pull request #151: [NEMO-267] Consider
watermark holds in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/151#discussion_r231352412
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
##########
@@ -139,20 +145,52 @@ private void processElementsAndTriggerTimers(final
Watermark inputWatermark,
}
// Trigger timers
- final long minOutputTimestamp =
- triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
-
- minOutputTimestampsOfEmittedWindows =
Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp);
+ triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
// Remove values
values.clear();
}
// Emit watermark to downstream operators
- if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE
- && currentOutputWatermark < minOutputTimestampsOfEmittedWindows) {
- currentOutputWatermark = minOutputTimestampsOfEmittedWindows;
- getOutputCollector().emitWatermark(new
Watermark(minOutputTimestampsOfEmittedWindows));
+ emitOutputWatermark(inputWatermark);
+ }
+
+ /**
+ * Output watermark
+ * = max(prev output watermark,
+ * min(input watermark, watermark holds)).
+ * @param inputWatermark input watermark
+ */
+ private void emitOutputWatermark(final Watermark inputWatermark) {
+
Review comment:
For completeness, can you check all of the three watermarks that make up the
output watermark?
// prev output watermark - No need to check (?)
// inputWatermark - check for Long.MIN_VALUE
// keyAndWatermarkHoldMap - check emptiness
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services