dawidwys commented on a change in pull request #15771:
URL: https://github.com/apache/flink/pull/15771#discussion_r620945781
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
##########
@@ -158,6 +160,32 @@ public
AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int nu
environment.getExternalResourceInfoProvider());
}
+ private WatermarkOutputMultiplexer setupWatermarkMultiplexer(
+ StreamStatusMaintainer streamStatusMaintainer, int inputCount) {
+ WatermarkOutputMultiplexer multiplexer =
+ new WatermarkOutputMultiplexer(
+ new WatermarkOutput() {
+ @Override
+ public void emitWatermark(
+
org.apache.flink.api.common.eventtime.Watermark watermark) {
+ try {
+ processWatermark(new
Watermark(watermark.getTimestamp()));
+ } catch (Exception e) {
+ throw new WrappingRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void markIdle() {
+
streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
Review comment:
Everything that is in the `StreamStatusMaintainer(s)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]