dawidwys commented on a change in pull request #16095:
URL: https://github.com/apache/flink/pull/16095#discussion_r649763606



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
##########
@@ -110,12 +108,14 @@ public void collect(StreamRecord<OUT> record) {
 
     @Override
     public void emitWatermark(Watermark mark) {
-        // watermark could've been generated somewhere in the pipeline even 
though an IDLE status
-        // was emitted. It might've originated from a periodic watermark 
generator or just a wrong
-        // behaving operator
-        try (AutoCloseable ignored = 
announcedStatus.ensureActive(this::writeStreamStatus)) {
-            watermarkGauge.setCurrentWatermark(mark.getTimestamp());
-            serializationDelegate.setInstance(mark);
+        if (announcedStatus.isIdle()) {
+            return;
+        }

Review comment:
       Yes, in 1.13 we ignore/swallow watermarks emitted in the `IDLE`state. 
There we check the status via `StreamStatusProvider` though.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
##########
@@ -110,12 +108,14 @@ public void collect(StreamRecord<OUT> record) {
 
     @Override
     public void emitWatermark(Watermark mark) {
-        // watermark could've been generated somewhere in the pipeline even 
though an IDLE status
-        // was emitted. It might've originated from a periodic watermark 
generator or just a wrong
-        // behaving operator
-        try (AutoCloseable ignored = 
announcedStatus.ensureActive(this::writeStreamStatus)) {
-            watermarkGauge.setCurrentWatermark(mark.getTimestamp());
-            serializationDelegate.setInstance(mark);
+        if (announcedStatus.isIdle()) {
+            return;
+        }

Review comment:
       The comment about tests reminded me that I dropped a test for it along 
with the behaviour in outputs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to