akalash commented on code in PR #22761:
URL: https://github.com/apache/flink/pull/22761#discussion_r1243911024


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java:
##########
@@ -179,8 +178,14 @@ class ResultSubpartitionRecoveredStateHandler
     public BufferWithContext<BufferBuilder> getBuffer(ResultSubpartitionInfo 
subpartitionInfo)
             throws IOException, InterruptedException {
         // request the buffer from any mapped subpartition as they all will 
receive the same buffer
-        final List<CheckpointedResultSubpartition> channels = 
getMappedChannels(subpartitionInfo);
-        BufferBuilder bufferBuilder = 
channels.get(0).requestBufferBuilderBlocking();
+        ResultPartitionWriter writer = 
writers[subpartitionInfo.getPartitionIdx()];
+        if (!(writer instanceof CheckpointedResultPartition)) {

Review Comment:
   Something looks wrong here. If we know that we accept only 
CheckpointedResultPartition why we don't use the proper interface?! I mean I 
understand the answer that it was that historically but anyway right now I see 
a lot of implementations of ResultPartitionWriter and only two implementations 
of CheckpointedResultPartition. I understand that all writers we have here 
always implement both ResultPartitionWriter and CheckpointedResultPartition but 
it still looks fragile since there is no explicit contract why one of the 
implementations of ResultPartitionWriter without CheckpointedResultPartition 
can't be passed here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java:
##########
@@ -104,9 +102,8 @@ private void releaseView() {
     }
 
     @Override
-    public void finishReadRecoveredState(boolean notifyAndBlockOnCompletion) 
throws IOException {
-        // The Approximate Local Recovery can not work with unaligned 
checkpoint for now, so no need
-        // to recover channel state
+    public boolean isSupportUnalignedCheckpoint() {

Review Comment:
   We need to think if it should refer to UnalignedCheckpoint. According to 
logic it just recovers the channel states so maybe it should be something like 
`isSupportChannelStateRecover` or similar.
   I understand that we have the channel state only for unaligned checkpoints 
right now but anyway, if we call it `isSupportUnalignedCheckpoint` it means 
that we can use it for anything - not only for recovery. If it's not a problem, 
we can leave this name.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java:
##########
@@ -190,51 +195,60 @@ public void recover(
             int oldSubtaskIndex,
             BufferWithContext<BufferBuilder> bufferWithContext)
             throws IOException {
-        try (BufferBuilder bufferBuilder = bufferWithContext.context) {
-            try (BufferConsumer bufferConsumer =
-                    bufferBuilder.createBufferConsumerFromBeginning()) {
-                bufferBuilder.finish();
-                if (bufferConsumer.isDataAvailable()) {
-                    final List<CheckpointedResultSubpartition> channels =
-                            getMappedChannels(subpartitionInfo);
-                    for (final CheckpointedResultSubpartition channel : 
channels) {
-                        // channel selector is created from the downstream's 
point of view: the
-                        // subtask of downstream = subpartition index of 
recovered buffer
-                        final SubtaskConnectionDescriptor channelSelector =
-                                new SubtaskConnectionDescriptor(
-                                        subpartitionInfo.getSubPartitionIdx(), 
oldSubtaskIndex);
-                        channel.addRecovered(
-                                
EventSerializer.toBufferConsumer(channelSelector, false));
-                        channel.addRecovered(bufferConsumer.copy());
-                    }
-                }
+        try (BufferBuilder bufferBuilder = bufferWithContext.context;
+                BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumerFromBeginning()) {
+            bufferBuilder.finish();
+            if (!bufferConsumer.isDataAvailable()) {
+                return;
+            }
+            final List<ResultSubpartitionInfo> mappedSubpartitions =
+                    getMappedSubpartitions(subpartitionInfo);
+            for (final ResultSubpartitionInfo mappedSubpartition : 
mappedSubpartitions) {
+                // channel selector is created from the downstream's point of 
view: the
+                // subtask of downstream = subpartition index of recovered 
buffer
+                final SubtaskConnectionDescriptor channelSelector =
+                        new SubtaskConnectionDescriptor(
+                                subpartitionInfo.getSubPartitionIdx(), 
oldSubtaskIndex);
+                CheckpointedResultPartition checkpointedResultPartition =
+                        
getCheckpointedResultPartition(mappedSubpartition.getPartitionIdx());
+                checkpointedResultPartition.addRecovered(

Review Comment:
   We take the subpartition to find the partition inside which we take the same 
subpartition that in the beginning. It looks more tricky than it was before. 
But it is not clear yet how to calculate the bytes differently...



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java:
##########
@@ -31,6 +31,9 @@
 /** A single subpartition of a {@link ResultPartition} instance. */
 public abstract class ResultSubpartition {
 
+    // The error code when adding a buffer fails.
+    public static final int ADD_ERROR_CODE = -1;

Review Comment:
   I'm not the best adviser with names but I think this name at least should 
reflect the relationship with the buffer. ADD_BUFFER_ERROR_CODE, 
BUFFER_ADDING_ERROR_CODE, etc.(I don't know how is more correct)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java:
##########
@@ -65,7 +65,7 @@ public abstract class BufferWritingResultPartition extends 
ResultPartition {
 
     private TimerGauge hardBackPressuredTimeMsPerSecond = new TimerGauge();
 
-    private long totalWrittenBytes;
+    protected long totalWrittenBytes;

Review Comment:
   dangerous. I think it is acceptable for the fix but in general, we should 
not force all inherited classes to use specific variables in specific places 
since such a contract is difficult to follow and it will lead to other bugs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to