zhijiangW commented on a change in pull request #9471: [FLINK-13754][task] 
Decouple OperatorChain from StreamStatusMaintainer
URL: https://github.com/apache/flink/pull/9471#discussion_r315286950
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 ##########
 @@ -49,18 +48,17 @@
 
        private SerializationDelegate<StreamElement> serializationDelegate;
 
-       private final StreamStatusProvider streamStatusProvider;
-
        private final OutputTag outputTag;
 
        private final WatermarkGauge watermarkGauge = new WatermarkGauge();
 
+       private StreamStatus currentStatus;
+
        @SuppressWarnings("unchecked")
        public RecordWriterOutput(
                        RecordWriter<SerializationDelegate<StreamRecord<OUT>>> 
recordWriter,
                        TypeSerializer<OUT> outSerializer,
-                       OutputTag outputTag,
-                       StreamStatusProvider streamStatusProvider) {
 
 Review comment:
   Thanks for this investigation with @tzulitai .
   
   1. After further reviewing the whole related process, the current idle 
`StreamStatus` only blocks the emitting of watermark, not works for 
`StreamRecord/LatencyMarker`.
   
   2. The mechanism is different for source and non-source tasks to control 
emitting watermark based on stream status:
   
   - For source task, the source context references the 
`StreamStatusMaintainer` to mark idle status if needed. Then the 
`RecordWriterOutput` also references this `StreamStatusMaintainer` to judge the 
status before really emitting watermark.
   
   - For non-source task, we have a high-level `StatusWatermarkValve` component 
to handle watermark with `StreamStatus` together. In this case if the 
maintained status in some channel is idle, it would not really call the 
`Operator#processWatermark` while handling the watermark. So the existing judge 
on `RecordWriterOutput` side seems redundant for non-source case.
   
   In conclusion, the current judge of status logic is still necessary for the 
case of source. I agree that it seems more reasonable to make this judge logic 
in upper layer like the current way for non-source task. If to do so, we need 
to define a similar `StatusWatermarkValve` component to be referenced by source 
context, and the watermark emitting must firstly go through 
`StatusWatermarkValve` to judge whether the status is idle. It would involve in 
more refactoring work on legacy source stack.
   
   So the only feasible way currently might be done in PR. It seems redundant 
for non-source task, but we do not change the behavior and the redundant issue 
still exists before this refactoring.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to