AHeise commented on a change in pull request #15771:
URL: https://github.com/apache/flink/pull/15771#discussion_r636010165
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
##########
@@ -243,6 +240,13 @@ public void setup(
stateKeySelector2 = config.getStatePartitioner(1,
getUserCodeClassloader());
}
+ private CombinedWatermark setupCombinedWatermark() {
+ CombinedWatermark combinedWatermark = new CombinedWatermark();
+ combinedWatermark.add(new CombinedWatermark.PartialWatermark());
+ combinedWatermark.add(new CombinedWatermark.PartialWatermark());
Review comment:
Hm, haven't thought about the dynamic cases (actually Kafka is not
really dynamic but I can see that other sources are). Still the comment stands
that I wouldn't expose `PartialWatermark`.
You could have a similar method to `register` in `CombinedWatermark` that
returns the index of the newly added partial. The index is then used in
`private final Map<String, Integer> watermarkIndexPerOutputId;` This index can
then be used to update the `CombinedWatermark` via index.
This approach makes more sense if you can easily have all side-effects
inside the `CombinedWatermark#updateWatermark(int index, Watermark watermark)`.
So it avoids the case, where you update the partial and then tell the combined
watermark: "hey something changed, please check".
However, the current way may still be valid, if you explicitly want to have
the information drift and only update at specific times for performance
reasons. I'm currently not in the position to assess that properly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]