AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476665640
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -195,28 +193,27 @@ void retriggerSubpartitionRequest(int subpartitionIndex)
throws IOException {
}
@Override
- public void spillInflightBuffers(long checkpointId, ChannelStateWriter
channelStateWriter) throws IOException {
+ public void spillInflightBuffers(long checkpointId, ChannelStateWriter
channelStateWriter) {
synchronized (receivedBuffers) {
- checkState(checkpointId > lastRequestedCheckpointId,
"Need to request the next checkpointId");
-
- final List<Buffer> inflightBuffers = new
ArrayList<>(receivedBuffers.size());
- for (Buffer buffer : receivedBuffers) {
- CheckpointBarrier checkpointBarrier =
parseCheckpointBarrierOrNull(buffer);
- if (checkpointBarrier != null &&
checkpointBarrier.getId() >= checkpointId) {
- break;
+ final Integer numRecords =
numRecordsOvertaken.remove(checkpointId);
Review comment:
Good catch, a leak could happen when the checkpoint is cancelled through
another channel. The map itself is rather small, but it could add up over all
channels and gates.
I don't have a good idea on how to properly abstract this cleanup except by
adding some kind of checkpoint-cancelled hook though.
Alternatively, checkpoint barrier handler becomes more aware of the buffers
to be spilled. So instead of calling `channel.spillInflightBuffers`, it could
be `channel.getSpilledBuffers().forEach(channelStateWriter::write)` on a good
checkpoint and `channel.getSpilledBuffers().forEach(Buffer::recycle)` on
cancelled checkpoints, where `getSpilledBuffers` always cleans up this map.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]