Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5092#discussion_r153748916
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
---
@@ -313,6 +325,7 @@ protected void processAndCollectWithTimestamp(T
element, long timestamp) {
@Override
protected void processAndEmitWatermark(Watermark mark) {
output.emitWatermark(mark);
+
watermarkGauge.setCurrentLowWatermark(mark.getTimestamp());
--- End diff --
I've thought about doing that, but when i noticed that it wouldn't work for
the `AutomaticWatermarkContext` which emits watermarks in `processAndCollect`,
and that contexts may emit watermarks anywhere (meaning we couldn't reliably
cover all cases in the base class) i figured that we may just let each context
do it separately.
---