AHeise commented on code in PR #25292:
URL: https://github.com/apache/flink/pull/25292#discussion_r1756488654


##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java:
##########
@@ -195,12 +216,18 @@ public void processWatermark(Watermark mark) throws 
Exception {
 
     @Override
     public void endInput() throws Exception {
-        endOfInput = true;
-        sinkWriter.flush(true);
-        emitCommittables(Long.MAX_VALUE);
+        if (!this.endOfInput) {
+            this.endOfInput = true;
+            // endOfInputState is union state, so it's enough if one task adds 
something to it
+            if (getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 
0) {
+                endOfInputState.add(true);

Review Comment:
   I changed the logic entirely. We now drop and readd the state in 
`snapshotState`. This means if a downscaling happens and EOI_state = [true, 
false], on init endOfInput = false, and on next checkpoint EOI_state=[false]. I 
discussed the different cases in code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to