rkhachatryan commented on a change in pull request #14052:
URL: https://github.com/apache/flink/pull/14052#discussion_r524346272
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -506,43 +514,75 @@ public void checkpointStarted(CheckpointBarrier barrier) {
synchronized (receivedBuffers) {
channelStatePersister.startPersisting(
barrier.getId(),
- getInflightBuffers(numBuffersOvertaken == ALL ?
receivedBuffers.getNumUnprioritizedElements() : numBuffersOvertaken));
+ getInflightBuffers());
}
}
public void checkpointStopped(long checkpointId) {
synchronized (receivedBuffers) {
channelStatePersister.stopPersisting(checkpointId);
- numBuffersOvertaken = ALL;
+ lastOvertakenSequenceNumber = null;
+ }
+ }
+
+ @VisibleForTesting
+ List<Buffer> getInflightBuffers() {
+ synchronized (receivedBuffers) {
+ return getInflightBuffersUnsafe();
}
}
/**
* Returns a list of buffers, checking the first n non-priority
buffers, and skipping all events.
*/
- private List<Buffer> getInflightBuffers(int numBuffers) {
+ private List<Buffer> getInflightBuffersUnsafe() {
assert Thread.holdsLock(receivedBuffers);
- if (numBuffers == 0) {
- return Collections.emptyList();
- }
-
- final List<Buffer> inflightBuffers = new
ArrayList<>(numBuffers);
+ final List<Buffer> inflightBuffers = new ArrayList<>();
Iterator<SequenceBuffer> iterator = receivedBuffers.iterator();
// skip all priority events (only buffers are stored anyways)
Iterators.advance(iterator,
receivedBuffers.getNumPriorityElements());
- // spill number of overtaken buffers or all of them if barrier
has not been seen yet
- for (int pos = 0; pos < numBuffers; pos++) {
- Buffer buffer = iterator.next().buffer;
- if (buffer.isBuffer()) {
- inflightBuffers.add(buffer.retainBuffer());
+ while (iterator.hasNext()) {
+ SequenceBuffer sequenceBuffer = iterator.next();
+ if (sequenceBuffer.buffer.isBuffer() &&
shouldBeSpilled(sequenceBuffer.sequenceNumber)) {
Review comment:
I think it wouldn't work because of a possible sequence number overflow
before overtaking.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]