fapaul commented on code in PR #25292:
URL: https://github.com/apache/flink/pull/25292#discussion_r1763062367


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/AbstractStreamingWriter.java:
##########
@@ -123,6 +142,27 @@ public void bucketInactive(Bucket<IN, String> bucket) {
                         bucketCheckInterval);
 
         currentWatermark = Long.MIN_VALUE;
+
+        // Figure out if we have seen end of input before and if we should 
anything downstream. We
+        // have the following
+        // cases:
+        // 1. state is empty:
+        //   - First time initialization
+        //   - Restoring from a previous version of Flink that didn't handle 
EOI
+        //   - Upscaled from a final or regular checkpoint
+        // In all cases, we regularly handle EOI, potentially resulting in 
unnecessary .
+        // 2. state is not empty:
+        //   - This implies Flink restores from a version that handles EOI.
+        //   - If there is one entry, no rescaling happened (for this 
subtask), so if it's true,
+        //     we recover from a final checkpoint (for this subtask) and can 
ignore another EOI
+        //     else we have a regular checkpoint.
+        //   - If there are multiple entries, Flink downscaled, and we need to 
check if all are
+        //     true and do the same as above. As soon as one entry is false, 
we regularly start
+        //     the writer and potentially emit duplicate summaries if we 
indeed recovered from a
+        //     final checkpoint.
+        endOfInputState = 
context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC);
+        List<Boolean> previousState = 
Lists.newArrayList(endOfInputState.get());

Review Comment:
   Nit: Can we use `List.of` here and avoid using the shaded guava dependency?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/AbstractStreamingWriter.java:
##########
@@ -149,15 +190,20 @@ public void processElement(StreamRecord<IN> element) 
throws Exception {
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        commitUpToCheckpoint(checkpointId);
+        if (!this.endOfInput) {
+            commitUpToCheckpoint(checkpointId);
+        }
     }
 
     @Override
     public void endInput() throws Exception {
-        buckets.onProcessingTime(Long.MAX_VALUE);
-        helper.snapshotState(Long.MAX_VALUE);
-        output.emitWatermark(new Watermark(Long.MAX_VALUE));
-        commitUpToCheckpoint(Long.MAX_VALUE);
+        if (!this.endOfInput) {
+            this.endOfInput = true;
+            buckets.onProcessingTime(Long.MAX_VALUE);
+            helper.snapshotState(Long.MAX_VALUE);
+            output.emitWatermark(new Watermark(Long.MAX_VALUE));
+            commitUpToCheckpoint(Long.MAX_VALUE);

Review Comment:
   Can we use the `EOI` variable instead of `Long.MAX_VALUE`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to