This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cd6967c7259de29e04fbcb6c5e31fa483f98faf6
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Dec 3 21:57:48 2020 +0100

    [FLINK-19681][network] Force priority for converted barriers
    
    Without this, gate interprets barrier as outdated
    because it has already seen its SQN during the announcement.
    
    Preventing announcements from updating gate lastSeenSqn
    doesn't work because it provokes concurrency issue with
    notification (by efficitively disable lastSeenSqn guard).
---
 .../partition/consumer/RemoteInputChannel.java     | 13 ++++++++++-
 .../partition/consumer/SingleInputGate.java        | 16 ++++++++-----
 .../partition/consumer/RemoteInputChannelTest.java | 26 +++++++++++++++++++++-
 3 files changed, 47 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index c012766..e765b17 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -565,14 +565,25 @@ public class RemoteInputChannel extends InputChannel {
                                "Attempted to convertToPriorityEvent an event 
[%s] that has already been prioritized [%s]",
                                toPrioritize,
                                numPriorityElementsBeforeRemoval);
+                       // set the priority flag (checked on poll)
+                       // don't convert the barrier itself (barrier controller 
might not have been switched yet)
+                       AbstractEvent e = 
EventSerializer.fromBuffer(toPrioritize.buffer, 
this.getClass().getClassLoader());
+                       toPrioritize.buffer.setReaderIndex(0);
+                       toPrioritize = new 
SequenceBuffer(EventSerializer.toBuffer(e, true), toPrioritize.sequenceNumber);
                        firstPriorityEvent = addPriorityBuffer(toPrioritize);   
// note that only position of the element is changed
                                                                                
                                                        // converting the event 
itself would require switching the controller sooner
                }
                if (firstPriorityEvent) {
-                       notifyPriorityEvent(sequenceNumber);
+                       notifyPriorityEventForce(); // forcibly notify about 
the priority event
+                                                                               
// instead of passing barrier SQN to be checked
+                                                                               
// because this SQN might have be seen by the input gate during the announcement
                }
        }
 
+       private void notifyPriorityEventForce() {
+               inputGate.notifyPriorityEventForce(this);
+       }
+
        /**
         * Returns a list of buffers, checking the first n non-priority 
buffers, and skipping all events.
         */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index fa4699c..f8158ac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -773,7 +773,7 @@ public class SingleInputGate extends IndexedInputGate {
        // 
------------------------------------------------------------------------
 
        void notifyChannelNonEmpty(InputChannel channel) {
-               queueChannel(checkNotNull(channel), null);
+               queueChannel(checkNotNull(channel), null, false);
        }
 
        /**
@@ -782,10 +782,14 @@ public class SingleInputGate extends IndexedInputGate {
         * <p>The buffer number limits the notification to the respective 
buffer and voids the whole notification in case
         * that the buffer has been polled in the meantime. That is, if task 
thread polls the enqueued priority buffer
         * before this notification occurs (notification is not performed under 
lock), this buffer number allows
-        * {@link #queueChannel(InputChannel, Integer)} to avoid spurious 
priority wake-ups.
+        * {@link #queueChannel(InputChannel, Integer, boolean)} to avoid 
spurious priority wake-ups.
         */
        void notifyPriorityEvent(InputChannel inputChannel, int 
prioritySequenceNumber) {
-               queueChannel(checkNotNull(inputChannel), 
prioritySequenceNumber);
+               queueChannel(checkNotNull(inputChannel), 
prioritySequenceNumber, false);
+       }
+
+       void notifyPriorityEventForce(InputChannel inputChannel) {
+               queueChannel(checkNotNull(inputChannel), null, true);
        }
 
        void triggerPartitionStateCheck(ResultPartitionID partitionId) {
@@ -805,12 +809,12 @@ public class SingleInputGate extends IndexedInputGate {
                        }));
        }
 
-       private void queueChannel(InputChannel channel, @Nullable Integer 
prioritySequenceNumber) {
+       private void queueChannel(InputChannel channel, @Nullable Integer 
prioritySequenceNumber, boolean forcePriority) {
                try (GateNotificationHelper notification = new 
GateNotificationHelper(this, inputChannelsWithData)) {
                        synchronized (inputChannelsWithData) {
-                               boolean priority = prioritySequenceNumber != 
null;
+                               boolean priority = prioritySequenceNumber != 
null || forcePriority;
 
-                               if (priority &&
+                               if (!forcePriority && priority &&
                                                
isOutdated(prioritySequenceNumber, 
lastPrioritySequenceNumber[channel.getChannelIndex()])) {
                                        // priority event at the given offset 
already polled (notification is not atomic in respect to
                                        // buffer enqueuing), so just ignore 
the notification
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index d13cb72..08358da 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -75,6 +75,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.runtime.checkpoint.CheckpointOptions.alignedWithTimeout;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static 
org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
@@ -109,7 +110,30 @@ public class RemoteInputChannelTest {
 
        private static final long CHECKPOINT_ID = 1L;
        private static final CheckpointOptions UNALIGNED = 
CheckpointOptions.unaligned(getDefault());
-       private static final CheckpointOptions ALIGNED_WITH_TIMEOUT = 
CheckpointOptions.alignedWithTimeout(getDefault(), 10);
+       private static final CheckpointOptions ALIGNED_WITH_TIMEOUT = 
alignedWithTimeout(getDefault(), 10);
+
+       @Test
+       public void testGateNotifiedOnBarrierConversion() throws IOException, 
InterruptedException {
+               final int sequenceNumber = 0;
+               final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(1, 4096);
+               try {
+                       SingleInputGate inputGate = new 
SingleInputGateBuilder().setBufferPoolFactory(networkBufferPool.createBufferPool(1,
 1)).build();
+                       inputGate.setup();
+                       RemoteInputChannel channel = 
InputChannelBuilder.newBuilder()
+                               .setConnectionManager(new 
TestVerifyConnectionManager(new TestVerifyPartitionRequestClient()))
+                               .buildRemoteChannel(inputGate);
+                       channel.requestSubpartition(0);
+
+                       channel.onBuffer(toBuffer(new CheckpointBarrier(1L, 
123L, alignedWithTimeout(getDefault(), Integer.MAX_VALUE)), false), 
sequenceNumber, 0);
+                       inputGate.pollNext(); // process announcement to allow 
the gate remember the SQN
+
+                       channel.convertToPriorityEvent(sequenceNumber);
+                       
assertTrue(inputGate.getPriorityEventAvailableFuture().isDone());
+
+               } finally {
+                       networkBufferPool.destroy();
+               }
+       }
 
        @Test
        public void testExceptionOnReordering() throws Exception {

Reply via email to