pnowojski commented on a change in pull request #9471: [FLINK-13754][task]
Decouple OperatorChain from StreamStatusMaintainer
URL: https://github.com/apache/flink/pull/9471#discussion_r315116599
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
##########
@@ -49,18 +48,17 @@
private SerializationDelegate<StreamElement> serializationDelegate;
- private final StreamStatusProvider streamStatusProvider;
-
private final OutputTag outputTag;
private final WatermarkGauge watermarkGauge = new WatermarkGauge();
+ private StreamStatus currentStatus;
+
@SuppressWarnings("unchecked")
public RecordWriterOutput(
RecordWriter<SerializationDelegate<StreamRecord<OUT>>>
recordWriter,
TypeSerializer<OUT> outSerializer,
- OutputTag outputTag,
- StreamStatusProvider streamStatusProvider) {
Review comment:
I'm a little bit torn apart on this change. One one hand it indeed cuts a
dependency, on another it duplicates the state of `StreamStatus` field. After
this change, it is possible that because of some bug the value of
`StreamStatus` will be different in `RecordWriterOutput` compared to
`OperatorChain`.
Maybe the question should be, why does the `RecordWriterOutput` need
`StreamStatus` in the first place?
I asked @tzulitai about this, and this `if (currentStatus.isActive())`
inside `emitWatermark` is probably redundant and unnecessary check. Contract is
that nothing should be emitted if stream is not active, so this should be
checked not only in the `emitWatermark` but everywhere. However such check
shouldn't be performed here, but somewhere on upper level, so if anything, this
should/could be a `checkState` here. However adding an extra dependency for
just a `checkState` seems like an overkill, so long story short:
Probably we can just remove this code altogether - at least we can remove
`StreamStatusProvider` dependency/usage from `RecordWriterOutput`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services