AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476665640



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -195,28 +193,27 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException {
        }
 
        @Override
-       public void spillInflightBuffers(long checkpointId, ChannelStateWriter 
channelStateWriter) throws IOException {
+       public void spillInflightBuffers(long checkpointId, ChannelStateWriter 
channelStateWriter) {
                synchronized (receivedBuffers) {
-                       checkState(checkpointId > lastRequestedCheckpointId, 
"Need to request the next checkpointId");
-
-                       final List<Buffer> inflightBuffers = new 
ArrayList<>(receivedBuffers.size());
-                       for (Buffer buffer : receivedBuffers) {
-                               CheckpointBarrier checkpointBarrier = 
parseCheckpointBarrierOrNull(buffer);
-                               if (checkpointBarrier != null && 
checkpointBarrier.getId() >= checkpointId) {
-                                       break;
+                       final Integer numRecords = 
numRecordsOvertaken.remove(checkpointId);

Review comment:
       Good catch, a leak could happen when the checkpoint is cancelled through 
another channel. The map itself is rather small, but it could add up over all 
channels and gates.
   I don't have a good idea on how to properly abstract this cleanup except by 
adding some kind of checkpoint-cancelled hook though. 
   Alternatively, checkpoint barrier handler becomes more aware of the buffers 
to be spilled. So instead of calling `channel.spillInflightBuffers`, it could 
be `channel.getSpilledBuffers().forEach(channelStateWriter::write)` on a good 
checkpoint and `channel.getSpilledBuffers().forEach(Buffer::recycle)` on 
cancelled checkpoints, where `getSpilledBuffers` always cleans up this map.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to