AHeise commented on code in PR #25292: URL: https://github.com/apache/flink/pull/25292#discussion_r1756488654
########## flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java: ########## @@ -195,12 +216,18 @@ public void processWatermark(Watermark mark) throws Exception { @Override public void endInput() throws Exception { - endOfInput = true; - sinkWriter.flush(true); - emitCommittables(Long.MAX_VALUE); + if (!this.endOfInput) { + this.endOfInput = true; + // endOfInputState is union state, so it's enough if one task adds something to it + if (getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0) { + endOfInputState.add(true); Review Comment: I changed the logic entirely. We now drop and readd the state in `snapshotState`. This means if a downscaling happens and EOI_state = [true, false], on init endOfInput = false, and on next checkpoint EOI_state=[false]. I discussed the different cases in code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org