rkhachatryan commented on a change in pull request #14052:
URL: https://github.com/apache/flink/pull/14052#discussion_r524357555



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -506,43 +514,75 @@ public void checkpointStarted(CheckpointBarrier barrier) {
                synchronized (receivedBuffers) {
                        channelStatePersister.startPersisting(
                                barrier.getId(),
-                               getInflightBuffers(numBuffersOvertaken == ALL ? 
receivedBuffers.getNumUnprioritizedElements() : numBuffersOvertaken));
+                               getInflightBuffers());
                }
        }
 
        public void checkpointStopped(long checkpointId) {
                synchronized (receivedBuffers) {
                        channelStatePersister.stopPersisting(checkpointId);
-                       numBuffersOvertaken = ALL;
+                       lastOvertakenSequenceNumber = null;
+               }
+       }
+
+       @VisibleForTesting
+       List<Buffer> getInflightBuffers() {
+               synchronized (receivedBuffers) {
+                       return getInflightBuffersUnsafe();
                }
        }
 
        /**
         * Returns a list of buffers, checking the first n non-priority 
buffers, and skipping all events.
         */
-       private List<Buffer> getInflightBuffers(int numBuffers) {
+       private List<Buffer> getInflightBuffersUnsafe() {
                assert Thread.holdsLock(receivedBuffers);
 
-               if (numBuffers == 0) {
-                       return Collections.emptyList();
-               }
-
-               final List<Buffer> inflightBuffers = new 
ArrayList<>(numBuffers);
+               final List<Buffer> inflightBuffers = new ArrayList<>();
                Iterator<SequenceBuffer> iterator = receivedBuffers.iterator();
                // skip all priority events (only buffers are stored anyways)
                Iterators.advance(iterator, 
receivedBuffers.getNumPriorityElements());
 
-               // spill number of overtaken buffers or all of them if barrier 
has not been seen yet
-               for (int pos = 0; pos < numBuffers; pos++) {
-                       Buffer buffer = iterator.next().buffer;
-                       if (buffer.isBuffer()) {
-                               inflightBuffers.add(buffer.retainBuffer());
+               while (iterator.hasNext()) {
+                       SequenceBuffer sequenceBuffer = iterator.next();
+                       if (sequenceBuffer.buffer.isBuffer() && 
shouldBeSpilled(sequenceBuffer.sequenceNumber)) {
+                               
inflightBuffers.add(sequenceBuffer.buffer.retainBuffer());
                        }
                }
 
+               lastOvertakenSequenceNumber = null;
+
                return inflightBuffers;
        }
 
+       /**
+        * @return if given {@param sequenceNumber} should be spilled given 
{@link #lastOvertakenSequenceNumber}.
+        * We might not have yet received {@link CheckpointBarrier} and we 
might need to spill everything.
+        * If we have already received it, there is a bit nasty corner case of 
{@link SequenceBuffer#sequenceNumber}
+        * overflowing that needs to be handled as well.
+        */
+       private boolean shouldBeSpilled(int sequenceNumber) {
+               if (lastOvertakenSequenceNumber == null) {
+                       return true;
+               }
+               checkState(
+                       receivedBuffers.size() < Integer.MAX_VALUE / 2,
+                       "Too many buffers for sequenceNumber overflow detection 
code to work correctly");
+
+               boolean possibleOverflowAfterOvertaking = Integer.MAX_VALUE / 2 
< lastOvertakenSequenceNumber;
+               boolean possibleOverflowBeforeOvertaking = 
lastOvertakenSequenceNumber < -Integer.MAX_VALUE / 2;
+
+               if (possibleOverflowAfterOvertaking) {
+                       return sequenceNumber < lastOvertakenSequenceNumber && 
sequenceNumber > 0;
+               }
+               else if (possibleOverflowBeforeOvertaking) {
+                       return sequenceNumber < lastOvertakenSequenceNumber || 
sequenceNumber > 0;
+               }
+               else {
+                       return sequenceNumber < lastOvertakenSequenceNumber;
+               }

Review comment:
       As far as I can tell this is the only such place. `AlignedController` 
collects SQNs but eventually passes them to  `AlignedController` which uses 
them here.
   
   I think changing the buffer format would have a broader scope (and require a 
broad discussion), so I'd stick with the current approach.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to