zhijiangW commented on a change in pull request #9471: [FLINK-13754][task]
Decouple OperatorChain from StreamStatusMaintainer
URL: https://github.com/apache/flink/pull/9471#discussion_r315286950
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
##########
@@ -49,18 +48,17 @@
private SerializationDelegate<StreamElement> serializationDelegate;
- private final StreamStatusProvider streamStatusProvider;
-
private final OutputTag outputTag;
private final WatermarkGauge watermarkGauge = new WatermarkGauge();
+ private StreamStatus currentStatus;
+
@SuppressWarnings("unchecked")
public RecordWriterOutput(
RecordWriter<SerializationDelegate<StreamRecord<OUT>>>
recordWriter,
TypeSerializer<OUT> outSerializer,
- OutputTag outputTag,
- StreamStatusProvider streamStatusProvider) {
Review comment:
Thanks for this investigation with @tzulitai .
1. After further reviewing the whole related process, the current idle
`StreamStatus` only blocks the emitting of watermark, not works for
`StreamRecord/LatencyMarker`.
2. The mechanism is different for source and non-source tasks to control
emitting watermark based on stream status:
- For source task, the source context references the
`StreamStatusMaintainer` to mark idle status if needed. Then the
`RecordWriterOutput` also references this `StreamStatusMaintainer` to judge the
status before really emitting watermark.
- For non-source task, we have a high-level `StatusWatermarkValve` component
to handle watermark with `StreamStatus` together. In this case if the
maintained status in some channel is idle, it would not really call the
`Operator#processWatermark` while handling the watermark. So the existing judge
on `RecordWriterOutput` side seems redundant for non-source case.
Overall the current judge of status logic is still necessary for the case of
source. I agree that it seems more reasonable to make this judge logic in upper
layer like the current way for non-source task. If to do so, we need to define
a similar `StatusWatermarkValve` component to be referenced by source context,
and the watermark emitting must firstly go through `StatusWatermarkValve` to
judge whether the status is idle. It would involve in more refactoring work on
legacy source stack.
So the only feasible way currently might be done in PR. It seems redundant
for non-source task, but we do not change the behavior and the redundant issue
still exists before this refactoring.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services