pnowojski commented on a change in pull request #9471: [FLINK-13754][task] 
Decouple OperatorChain from StreamStatusMaintainer
URL: https://github.com/apache/flink/pull/9471#discussion_r315116599
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 ##########
 @@ -49,18 +48,17 @@
 
        private SerializationDelegate<StreamElement> serializationDelegate;
 
-       private final StreamStatusProvider streamStatusProvider;
-
        private final OutputTag outputTag;
 
        private final WatermarkGauge watermarkGauge = new WatermarkGauge();
 
+       private StreamStatus currentStatus;
+
        @SuppressWarnings("unchecked")
        public RecordWriterOutput(
                        RecordWriter<SerializationDelegate<StreamRecord<OUT>>> 
recordWriter,
                        TypeSerializer<OUT> outSerializer,
-                       OutputTag outputTag,
-                       StreamStatusProvider streamStatusProvider) {
 
 Review comment:
   I'm a little bit torn apart on this change. One one hand it indeed cuts a 
dependency, on another it duplicates the state of `StreamStatus` field. After 
this change, it is possible that because of some bug the value of 
`StreamStatus` will be different in `RecordWriterOutput` compared to 
`OperatorChain`.
   
   Maybe the question should be, why does the `RecordWriterOutput` need 
`StreamStatus` in the first place? 
   
   I asked @tzulitai about this, and this `if (currentStatus.isActive())` 
inside `emitWatermark` is probably redundant and unnecessary check. Contract is 
that nothing should be emitted if stream is not active, so this should be 
checked not only in the `emitWatermark`  but everywhere. However such check 
shouldn't be performed here, but somewhere on upper level, so if anything, this 
should/could be a `checkState` here. However adding an extra dependency for 
just a `checkState` seems like an overkill, so long story short:
   
   Probably we can just remove this code altogether - at least we can remove 
`StreamStatusProvider` dependency/usage from `RecordWriterOutput`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to