dawidwys commented on a change in pull request #15771:
URL: https://github.com/apache/flink/pull/15771#discussion_r620391722



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
##########
@@ -158,6 +160,32 @@ public 
AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int nu
                         environment.getExternalResourceInfoProvider());
     }
 
+    private WatermarkOutputMultiplexer setupWatermarkMultiplexer(
+            StreamStatusMaintainer streamStatusMaintainer, int inputCount) {
+        WatermarkOutputMultiplexer multiplexer =
+                new WatermarkOutputMultiplexer(
+                        new WatermarkOutput() {
+                            @Override
+                            public void emitWatermark(
+                                    
org.apache.flink.api.common.eventtime.Watermark watermark) {
+                                try {
+                                    processWatermark(new 
Watermark(watermark.getTimestamp()));
+                                } catch (Exception e) {
+                                    throw new WrappingRuntimeException(e);
+                                }
+                            }
+
+                            @Override
+                            public void markIdle() {
+                                
streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);

Review comment:
       This is the part of the whole solution that I am unsure the most. With 
this approach the bookkeeping happens both on the operator and stream task 
level. As far as I can tell it would work with this method empty.
   
   Theoretically we could also move the whole bookkeeping logic from the 
StreamTask level to the operator level.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to