1996fanrui commented on code in PR #22761:
URL: https://github.com/apache/flink/pull/22761#discussion_r1244886670


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java:
##########
@@ -31,6 +31,9 @@
 /** A single subpartition of a {@link ResultPartition} instance. */
 public abstract class ResultSubpartition {
 
+    // The error code when adding a buffer fails.
+    public static final int ADD_ERROR_CODE = -1;

Review Comment:
   Updated to `ADD_BUFFER_ERROR_CODE`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java:
##########
@@ -65,7 +65,7 @@ public abstract class BufferWritingResultPartition extends 
ResultPartition {
 
     private TimerGauge hardBackPressuredTimeMsPerSecond = new TimerGauge();
 
-    private long totalWrittenBytes;
+    protected long totalWrittenBytes;

Review Comment:
   Do you means the `inherited class` should update the `totalWrittenBytes` 
through some method of `BufferWritingResultPartition`? Or the `inherited class` 
shouldn't update the `totalWrittenBytes` anywhere?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java:
##########
@@ -179,8 +178,14 @@ class ResultSubpartitionRecoveredStateHandler
     public BufferWithContext<BufferBuilder> getBuffer(ResultSubpartitionInfo 
subpartitionInfo)
             throws IOException, InterruptedException {
         // request the buffer from any mapped subpartition as they all will 
receive the same buffer
-        final List<CheckpointedResultSubpartition> channels = 
getMappedChannels(subpartitionInfo);
-        BufferBuilder bufferBuilder = 
channels.get(0).requestBufferBuilderBlocking();
+        ResultPartitionWriter writer = 
writers[subpartitionInfo.getPartitionIdx()];
+        if (!(writer instanceof CheckpointedResultPartition)) {

Review Comment:
   First of all, I found I should call `getCheckpointedResultPartition` here, 
it has checked the `CheckpointedResultPartition`.
   
   > I see a lot of implementations of ResultPartitionWriter and only two 
implementations of CheckpointedResultPartition.
   
   I just found one implementation of CheckpointedResultPartition, it's 
`PipelinedResultPartition`.
   
   >  I understand that all writers we have here always implement both 
ResultPartitionWriter and CheckpointedResultPartition but it still looks 
fragile since there is no explicit contract why one of the implementations of 
ResultPartitionWriter without CheckpointedResultPartition can't be passed here.
   
   I found all writers are passed to `ResultSubpartitionRecoveredStateHandler`, 
and just CheckpointedResultPartition may calls the `getBuffer`. That's why:
   
   - Cannot pass the CheckpointedResultPartition to 
`ResultSubpartitionRecoveredStateHandler`
   - And `all writers we have here always implement both ResultPartitionWriter 
and CheckpointedResultPartition`
   
   If all CheckpointedResultPartitions are got through 
`getCheckpointedResultPartition`, do you think it's ok? If it isn't 
CheckpointedResultPartition, it will throws `new IllegalStateException("Cannot 
restore state to a non-checkpointable partition type: " + writer);` inside of 
`getCheckpointedResultPartition`.
   
   Or do you have any good suggestions about the proper interface? 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java:
##########
@@ -104,9 +102,8 @@ private void releaseView() {
     }
 
     @Override
-    public void finishReadRecoveredState(boolean notifyAndBlockOnCompletion) 
throws IOException {
-        // The Approximate Local Recovery can not work with unaligned 
checkpoint for now, so no need
-        // to recover channel state
+    public boolean isSupportUnalignedCheckpoint() {

Review Comment:
   updated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java:
##########
@@ -190,51 +195,60 @@ public void recover(
             int oldSubtaskIndex,
             BufferWithContext<BufferBuilder> bufferWithContext)
             throws IOException {
-        try (BufferBuilder bufferBuilder = bufferWithContext.context) {
-            try (BufferConsumer bufferConsumer =
-                    bufferBuilder.createBufferConsumerFromBeginning()) {
-                bufferBuilder.finish();
-                if (bufferConsumer.isDataAvailable()) {
-                    final List<CheckpointedResultSubpartition> channels =
-                            getMappedChannels(subpartitionInfo);
-                    for (final CheckpointedResultSubpartition channel : 
channels) {
-                        // channel selector is created from the downstream's 
point of view: the
-                        // subtask of downstream = subpartition index of 
recovered buffer
-                        final SubtaskConnectionDescriptor channelSelector =
-                                new SubtaskConnectionDescriptor(
-                                        subpartitionInfo.getSubPartitionIdx(), 
oldSubtaskIndex);
-                        channel.addRecovered(
-                                
EventSerializer.toBufferConsumer(channelSelector, false));
-                        channel.addRecovered(bufferConsumer.copy());
-                    }
-                }
+        try (BufferBuilder bufferBuilder = bufferWithContext.context;
+                BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumerFromBeginning()) {
+            bufferBuilder.finish();
+            if (!bufferConsumer.isDataAvailable()) {
+                return;
+            }
+            final List<ResultSubpartitionInfo> mappedSubpartitions =
+                    getMappedSubpartitions(subpartitionInfo);
+            for (final ResultSubpartitionInfo mappedSubpartition : 
mappedSubpartitions) {
+                // channel selector is created from the downstream's point of 
view: the
+                // subtask of downstream = subpartition index of recovered 
buffer
+                final SubtaskConnectionDescriptor channelSelector =
+                        new SubtaskConnectionDescriptor(
+                                subpartitionInfo.getSubPartitionIdx(), 
oldSubtaskIndex);
+                CheckpointedResultPartition checkpointedResultPartition =
+                        
getCheckpointedResultPartition(mappedSubpartition.getPartitionIdx());
+                checkpointedResultPartition.addRecovered(

Review Comment:
   Do you mean we should call `getCheckpointedResultPartition` based on the 
`subpartitionInfo` instead of mappedSubpartition?
   
   If yes, sounds make sense. And updated at the last commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to