fapaul commented on code in PR #25292:
URL: https://github.com/apache/flink/pull/25292#discussion_r1758462817


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java:
##########
@@ -138,15 +136,15 @@ public void processElement(StreamRecord<CompactorRequest> 
element) throws Except
     @Override
     public void endInput() throws Exception {
         // add collecting requests into the final snapshot
-        checkpointRequests.put(Long.MAX_VALUE, collectingRequests);
+        checkpointRequests.put(CommittableMessage.EOI, collectingRequests);
         collectingRequests = new ArrayList<>();
 
         // submit all requests and wait until they are done
-        submitUntil(Long.MAX_VALUE);
+        submitUntil(CommittableMessage.EOI);
         assert checkpointRequests.isEmpty();
 
         getAllTasksFuture().join();
-        emitCompacted(null);

Review Comment:
   Is it safe to change this? 



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializer.java:
##########
@@ -91,13 +90,13 @@ public CommittableMessage<CommT> deserialize(int version, 
byte[] serialized)
                 return new CommittableWithLineage<>(
                         SimpleVersionedSerialization.readVersionAndDeSerialize(
                                 committableSerializer, in),
-                        readCheckpointId(in),
+                        in.readLong(),

Review Comment:
   Do we need consider migration cases from SinkV1 where afaik the checkpointId 
is always `null`.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java:
##########
@@ -160,13 +171,38 @@ public void initializeState(StateInitializationContext 
context) throws Exception
                 legacyCommitterState.clear();
             }
         }
-        sinkWriter = writerStateHandler.createWriter(initContext, context);
+
+        endOfInputState = 
context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC);
+        // We can have the following cases:
+        // 1. state is empty:
+        //   - First time initialization
+        //   - Restoring from a previous version of Flink
+        //   - Upscaled from a final or regular checkpoint
+        // In all cases, we properly initialize the data and handle EOI, 
potentially resulting in
+        // duplicate summaries that the CommitterOperator needs to handle.
+        // 2. state is not empty:
+        //   - This implies Flink restores from a version that handles EOI
+        //   - If there is one entry, no rescaling happened (for this 
subtask), so if it's true, we
+        //     recover from a final checkpoint (for this subtask) and can 
ignore another EOI else we
+        //     have a regular checkpoint.
+        //   - If there are multiple entries, Flink downscaled, and we need to 
check if all are true
+        //     and do the same as above. As soon as one entry is EOI, we 
regularly start the writer
+        //     and potentially emit duplicate summaries if we indeed recovered 
from a final
+        //     checkpoint.
+        ArrayList<Boolean> previousState = 
Lists.newArrayList(endOfInputState.get());
+        this.endOfInput = !previousState.isEmpty() && 
!previousState.contains(false);
+        sinkWriter =
+                this.endOfInput
+                        ? new ClosedWriter<>()

Review Comment:
   As discussed offline, this probably leaves an unclean state from the 
SinkWriter when it previously crashed.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java:
##########
@@ -156,23 +156,16 @@ public void endInput() throws Exception {
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        if (endInput) {
-            // This is the final checkpoint, all committables should be 
committed
-            lastCompletedCheckpointId = Long.MAX_VALUE;
-        } else {
-            lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
-        }
+        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
         commitAndEmitCheckpoints();
     }
 
     private void commitAndEmitCheckpoints() throws IOException, 
InterruptedException {
+        long completedCheckpointId = endInput ? EOI : 
lastCompletedCheckpointId;
         do {
             for (CheckpointCommittableManager<CommT> manager :
-                    
committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) {
-                // wait for all committables of the current manager before 
submission
-                boolean fullyReceived =
-                        !endInput && manager.getCheckpointId() == 
lastCompletedCheckpointId;
-                commitAndEmit(manager, fullyReceived);
+                    
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {

Review Comment:
   On first glance this change doesn't look correct. 
   
   By removing `fullyReceived`, can we now commit committables that are from 
the "current" checkpoint but on receival from a delayed 
notifyCheckpointComplete from a previous checkpoint.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java:
##########
@@ -178,7 +214,7 @@ public void processElement(StreamRecord<InputT> element) 
throws Exception {
     @Override
     public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
         super.prepareSnapshotPreBarrier(checkpointId);
-        if (!endOfInput) {
+        if (!this.endOfInput) {

Review Comment:
   Nit: Why `this` here we do not use inside the other methods?



##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java:
##########
@@ -147,15 +146,16 @@ Collection<CommittableWithLineage<CommT>> drainFinished() 
{
     }
 
     CheckpointCommittableManagerImpl<CommT> 
merge(CheckpointCommittableManagerImpl<CommT> other) {
-        checkArgument(Objects.equals(other.checkpointId, checkpointId));
+        checkArgument(other.checkpointId == checkpointId);

Review Comment:
   Nit: This change should probably go into the commit, changing the type of 
the checkpoint fields



##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java:
##########
@@ -160,13 +171,38 @@ public void initializeState(StateInitializationContext 
context) throws Exception
                 legacyCommitterState.clear();
             }
         }
-        sinkWriter = writerStateHandler.createWriter(initContext, context);
+
+        endOfInputState = 
context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC);
+        // We can have the following cases:
+        // 1. state is empty:
+        //   - First time initialization
+        //   - Restoring from a previous version of Flink
+        //   - Upscaled from a final or regular checkpoint
+        // In all cases, we properly initialize the data and handle EOI, 
potentially resulting in
+        // duplicate summaries that the CommitterOperator needs to handle.
+        // 2. state is not empty:
+        //   - This implies Flink restores from a version that handles EOI
+        //   - If there is one entry, no rescaling happened (for this 
subtask), so if it's true, we
+        //     recover from a final checkpoint (for this subtask) and can 
ignore another EOI else we
+        //     have a regular checkpoint.
+        //   - If there are multiple entries, Flink downscaled, and we need to 
check if all are true
+        //     and do the same as above. As soon as one entry is EOI, we 
regularly start the writer
+        //     and potentially emit duplicate summaries if we indeed recovered 
from a final
+        //     checkpoint.

Review Comment:
   Very nice 👍 



##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java:
##########
@@ -208,8 +209,8 @@ public void 
processElement(StreamRecord<CommittableMessage<CommT>> element) thro
 
         // in case of unaligned checkpoint, we may receive 
notifyCheckpointComplete before the
         // committables
-        OptionalLong checkpointId = element.getValue().getCheckpointId();
-        if (checkpointId.isPresent() && checkpointId.getAsLong() <= 
lastCompletedCheckpointId) {
+        long checkpointId = element.getValue().getCheckpointIdOrEOI();
+        if (checkpointId <= lastCompletedCheckpointId) {

Review Comment:
   I do not fully understand the comment.
   
   - Why is it a change in semantics? The condition looks the same the only 
case that is removed is a committable without checkpoint. Which scenario was 
this before the change?
   - I thought that for the final checkpoint, we would first receive EOI and 
then do a final checkpoint. This would mean the committer receives data after 
EOI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to