pnowojski commented on code in PR #20337:
URL: https://github.com/apache/flink/pull/20337#discussion_r932200661
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java:
##########
@@ -235,7 +252,12 @@ public void emitRecord(StreamRecord<IN> record) throws
Exception {
@Override
public void emitWatermark(Watermark watermark) throws Exception {
- watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
+ outputList.stream()
+ .filter(output -> output.jobVertexID.equals(jobVertexID))
+ .forEach(
+ output ->
+ output.watermarkGauge.setCurrentWatermark(
+ watermark.getTimestamp()));
Review Comment:
This looks wrong:
1. Having static field accessed like that ring warning bells.
2. Why emitting watermark in one subtask should bump watermark gauge from
other subtasks? This sounds very incorrect thing to do. `WatermarkGauge` is not
supposed to present max watermark across all subtasks, but the watermark of
that given subtask. If something needs to aggregate and get "max" value, this
should happen on a higher level, like REST API, or WebUI.
3. Are you sure that job vertex ids are unique across multiple jobs running
on the same cluster?
4. It looks like you are creating a memory leak, never removing anything
from the `outputList`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]