pnowojski commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r879507376


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -112,6 +115,20 @@ public class PipelinedSubpartition extends 
ResultSubpartition
 
     private int bufferSize = Integer.MAX_VALUE;
 
+    /**
+     * The channelState Future of unaligned checkpoint. Access to the 
channelStateFutures is
+     * synchronized on buffers.

Review Comment:
   I would drop:
   > Access to the channelStateFutures is synchronized on buffers.
   
   It duplicates `GuardedBy` annotation. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +260,158 @@ private boolean processPriorityBuffer(BufferConsumer 
bufferConsumer, int partial
                         inflightBuffers.toArray(new Buffer[0]));
             }
         }
-        return numPriorityElements == 1
-                && !isBlocked; // if subpartition is blocked then downstream 
doesn't expect any
-        // notifications
+        return needNotifyPriorityEvent();
+    }
+
+    // It just be called after add priorityEvent.
+    private boolean needNotifyPriorityEvent() {
+        assert Thread.holdsLock(buffers);
+        // if subpartition is blocked then downstream doesn't expect any 
notifications
+        return buffers.getNumPriorityElements() == 1 && !isBlocked;
+    }
+
+    private void processTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        channelStateWriter.addOutputDataFuture(
+                barrier.getId(),
+                subpartitionInfo,
+                ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                createChannelStateFuture(barrier.getId()));
+    }
+
+    private void completeTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        if (channelStateFutureIsAvailable(barrier.getId())) {
+            completeChannelStateFuture(Collections.emptyList(), null);
+        }
+    }
+
+    private CompletableFuture<List<Buffer>> createChannelStateFuture(long 
checkpointId) {
+        assert Thread.holdsLock(buffers);
+        if (channelStateFuture != null) {
+            completeChannelStateFuture(
+                    null,
+                    new IllegalStateException(
+                            String.format(
+                                    "%s has uncompleted channelStateFuture of 
checkpointId=%s, but it received "
+                                            + "a new timeoutable checkpoint 
barrier of checkpointId=%s, it maybe "
+                                            + "a bug due to currently does not 
support concurrent unaligned checkpoints.",

Review Comment:
   nit:
   > a bug due to currently not supported concurrent unaligned checkpoint



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +260,158 @@ private boolean processPriorityBuffer(BufferConsumer 
bufferConsumer, int partial
                         inflightBuffers.toArray(new Buffer[0]));
             }
         }
-        return numPriorityElements == 1
-                && !isBlocked; // if subpartition is blocked then downstream 
doesn't expect any
-        // notifications
+        return needNotifyPriorityEvent();
+    }
+
+    // It just be called after add priorityEvent.
+    private boolean needNotifyPriorityEvent() {
+        assert Thread.holdsLock(buffers);
+        // if subpartition is blocked then downstream doesn't expect any 
notifications
+        return buffers.getNumPriorityElements() == 1 && !isBlocked;
+    }
+
+    private void processTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        channelStateWriter.addOutputDataFuture(
+                barrier.getId(),
+                subpartitionInfo,
+                ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                createChannelStateFuture(barrier.getId()));
+    }
+
+    private void completeTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        if (channelStateFutureIsAvailable(barrier.getId())) {
+            completeChannelStateFuture(Collections.emptyList(), null);
+        }
+    }

Review Comment:
   Shouldn't we `checkState` here actually that 
`channelStateFutureIsAvailable(barrier.getId())` is true? Is there a valid 
scenario where this method should return false?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +260,158 @@ private boolean processPriorityBuffer(BufferConsumer 
bufferConsumer, int partial
                         inflightBuffers.toArray(new Buffer[0]));
             }
         }
-        return numPriorityElements == 1
-                && !isBlocked; // if subpartition is blocked then downstream 
doesn't expect any
-        // notifications
+        return needNotifyPriorityEvent();
+    }
+
+    // It just be called after add priorityEvent.
+    private boolean needNotifyPriorityEvent() {
+        assert Thread.holdsLock(buffers);
+        // if subpartition is blocked then downstream doesn't expect any 
notifications
+        return buffers.getNumPriorityElements() == 1 && !isBlocked;
+    }
+
+    private void processTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        channelStateWriter.addOutputDataFuture(
+                barrier.getId(),
+                subpartitionInfo,
+                ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                createChannelStateFuture(barrier.getId()));
+    }
+
+    private void completeTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        if (channelStateFutureIsAvailable(barrier.getId())) {
+            completeChannelStateFuture(Collections.emptyList(), null);
+        }
+    }

Review Comment:
   nit: move `completeTimeoutableCheckpointBarrier()` below `pollBuffer()` 
(where it's being used?)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##########
@@ -103,18 +104,21 @@ static ChannelStateWriteRequest buildFutureWriteRequest(
                             checkBufferIsBuffer(buffer);
                             bufferConsumer.accept(writer, buffer);
                         }
-                    } catch (Throwable e) {
+                    } catch (ExecutionException e) {
                         writer.fail(e);
                     }
                 },
                 throwable -> {
-                    List<Buffer> buffers = dataFuture.get();
-                    if (buffers == null || buffers.isEmpty()) {
-                        return;
+                    try {
+                        List<Buffer> buffers = dataFuture.get();
+                        if (buffers == null || buffers.isEmpty()) {
+                            return;
+                        }
+                        CloseableIterator<Buffer> iterator =
+                                CloseableIterator.fromList(buffers, 
Buffer::recycleBuffer);
+                        iterator.close();
+                    } catch (ExecutionException ignored) {

Review Comment:
   Ohhh, ok. Thanks for the detailed explanation. I think either optionB or 
optionC should be fine.
   
   Maybe just add a comment 
   ```
       } catch (ExecutionException e) {
           // If dataFuture fails, fail only the single related writer 
           writer.fail(e);
       }
   ```
   ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -112,6 +115,20 @@ public class PipelinedSubpartition extends 
ResultSubpartition
 
     private int bufferSize = Integer.MAX_VALUE;
 
+    /**
+     * The channelState Future of unaligned checkpoint. Access to the 
channelStateFutures is
+     * synchronized on buffers.
+     */
+    @GuardedBy("buffers")
+    private CompletableFuture<List<Buffer>> channelStateFuture;
+
+    /**
+     * It is the checkpointId corresponding to channelStateFuture. And It 
should be always update
+     * with channelStateFuture.

Review Comment:
   nitty nit:
   `{@link #channelStateFuture}`
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +260,158 @@ private boolean processPriorityBuffer(BufferConsumer 
bufferConsumer, int partial
                         inflightBuffers.toArray(new Buffer[0]));
             }
         }
-        return numPriorityElements == 1
-                && !isBlocked; // if subpartition is blocked then downstream 
doesn't expect any
-        // notifications
+        return needNotifyPriorityEvent();
+    }
+
+    // It just be called after add priorityEvent.
+    private boolean needNotifyPriorityEvent() {
+        assert Thread.holdsLock(buffers);
+        // if subpartition is blocked then downstream doesn't expect any 
notifications
+        return buffers.getNumPriorityElements() == 1 && !isBlocked;
+    }
+
+    private void processTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        channelStateWriter.addOutputDataFuture(
+                barrier.getId(),
+                subpartitionInfo,
+                ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                createChannelStateFuture(barrier.getId()));
+    }
+
+    private void completeTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        if (channelStateFutureIsAvailable(barrier.getId())) {
+            completeChannelStateFuture(Collections.emptyList(), null);
+        }
+    }
+
+    private CompletableFuture<List<Buffer>> createChannelStateFuture(long 
checkpointId) {
+        assert Thread.holdsLock(buffers);
+        if (channelStateFuture != null) {
+            completeChannelStateFuture(
+                    null,
+                    new IllegalStateException(
+                            String.format(
+                                    "%s has uncompleted channelStateFuture of 
checkpointId=%s, but it received "
+                                            + "a new timeoutable checkpoint 
barrier of checkpointId=%s, it maybe "
+                                            + "a bug due to currently does not 
support concurrent unaligned checkpoints.",
+                                    this, channelStateCheckpointId, 
checkpointId)));
+        }
+        channelStateFuture = new CompletableFuture<>();
+        channelStateCheckpointId = checkpointId;
+        return channelStateFuture;
+    }
+
+    private void completeChannelStateFuture(List<Buffer> channelResult, 
Throwable e) {
+        assert Thread.holdsLock(buffers);
+        if (e != null) {
+            channelStateFuture.completeExceptionally(e);
+        } else {
+            channelStateFuture.complete(channelResult);
+        }
+        channelStateFuture = null;
+    }
+
+    private boolean channelStateFutureIsAvailable(long checkpointId) {

Review Comment:
   nit: `isChannelStateFutureAvailable`



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -335,6 +380,34 @@ public void checkpointState(
         }
     }
 
+    private void registerAlignmentTimer(
+            long checkpointId,
+            OperatorChain<?, ?> operatorChain,
+            CheckpointBarrier checkpointBarrier) {
+        if (alignmentTimer != null) {

Review Comment:
   shouldn't this be `checkState(alignmentTimer == null)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to