pnowojski commented on a change in pull request #12457:
URL: https://github.com/apache/flink/pull/12457#discussion_r435687298



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
##########
@@ -52,6 +53,10 @@ static ChannelStateWriteRequest write(long checkpointId, 
InputChannelInfo info,
                return buildWriteRequest(checkpointId, "writeInput", iterator, 
(writer, buffer) -> writer.writeInput(info, buffer));
        }
 
+       static ChannelStateWriteRequest write(long checkpointId, 
ResultSubpartitionInfo info, Buffer... buffers) {
+               return buildWriteRequest(checkpointId, "writeOutput", 
ofElements(Buffer::recycleBuffer, buffers), (writer, buffer) -> 
writer.writeOutput(info, buffer));
+       }
+

Review comment:
       What's the difference to the previous version? Especially in the 
behaviour? It looks like a subtle change that I'm missing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
##########
@@ -129,7 +129,7 @@ public void addOutputData(long checkpointId, 
ResultSubpartitionInfo info, int st
                        info,
                        startSeqNum,
                        data == null ? 0 : data.length);
-               enqueue(write(checkpointId, info, checkBufferType(data)), 
false);
+               enqueue(write(checkpointId, info, data), false);

Review comment:
       Why have you dropped `checkBufferType` call? It was actually quite 
helpful couple of times when debugging. Can not we try to keep it?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
##########
@@ -32,6 +32,7 @@
 import static 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.EXECUTING;
 import static 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.FAILED;
 import static 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequestState.NEW;
+import static org.apache.flink.util.CloseableIterator.ofElements;

Review comment:
       Can you copy JIRA description to this commit description? By looking at 
it, it's quite hard to understand what is it fixing and how.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to