johnyangk commented on a change in pull request #151: [NEMO-267] Consider
watermark holds in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/151#discussion_r231348032
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
##########
@@ -139,20 +145,52 @@ private void processElementsAndTriggerTimers(final
Watermark inputWatermark,
}
// Trigger timers
- final long minOutputTimestamp =
- triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
-
- minOutputTimestampsOfEmittedWindows =
Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp);
+ triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
// Remove values
values.clear();
}
// Emit watermark to downstream operators
- if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE
- && currentOutputWatermark < minOutputTimestampsOfEmittedWindows) {
- currentOutputWatermark = minOutputTimestampsOfEmittedWindows;
- getOutputCollector().emitWatermark(new
Watermark(minOutputTimestampsOfEmittedWindows));
+ emitOutputWatermark(inputWatermark);
+ }
+
+ /**
+ * Output watermark
+ * = max(prev output watermark,
+ * min(input watermark, watermark holds)).
+ * @param inputWatermark input watermark
+ */
+ private void emitOutputWatermark(final Watermark inputWatermark) {
+
+ if (keyAndWatermarkHoldMap.isEmpty()) {
+ return;
+ }
+
+ // Find min watermark hold
+ final Watermark watermarkHold =
Collections.min(keyAndWatermarkHoldMap.values());
Review comment:
minOfWatermarkHolds
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services