dawidwys commented on a change in pull request #15771:
URL: https://github.com/apache/flink/pull/15771#discussion_r636879275



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
##########
@@ -101,32 +97,57 @@ public void collect(StreamRecord<OUT> record) {
     }
 
     private <X> void pushToRecordWriter(StreamRecord<X> record) {
-        serializationDelegate.setInstance(record);
+        // StreamStatus.IDLE requires that no records nor watermarks travel 
through the branch
+        // in order to keep the older behaviour that records could've been 
generated down the
+        // pipeline even though the sources were idle we go through a short 
ACTIVE/IDLE loop
+        if (announcedStatus.isIdle()) {
+            writeStreamStatus(StreamStatus.ACTIVE);
+        }
 
+        serializationDelegate.setInstance(record);
         try {
             recordWriter.emit(serializationDelegate);
         } catch (Exception e) {
             throw new RuntimeException(e.getMessage(), e);
         }
+
+        if (announcedStatus.isIdle()) {
+            writeStreamStatus(StreamStatus.IDLE);
+        }

Review comment:
       Nice one! Will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to