This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cd6967c7259de29e04fbcb6c5e31fa483f98faf6 Author: Roman Khachatryan <[email protected]> AuthorDate: Thu Dec 3 21:57:48 2020 +0100 [FLINK-19681][network] Force priority for converted barriers Without this, gate interprets barrier as outdated because it has already seen its SQN during the announcement. Preventing announcements from updating gate lastSeenSqn doesn't work because it provokes concurrency issue with notification (by efficitively disable lastSeenSqn guard). --- .../partition/consumer/RemoteInputChannel.java | 13 ++++++++++- .../partition/consumer/SingleInputGate.java | 16 ++++++++----- .../partition/consumer/RemoteInputChannelTest.java | 26 +++++++++++++++++++++- 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index c012766..e765b17 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -565,14 +565,25 @@ public class RemoteInputChannel extends InputChannel { "Attempted to convertToPriorityEvent an event [%s] that has already been prioritized [%s]", toPrioritize, numPriorityElementsBeforeRemoval); + // set the priority flag (checked on poll) + // don't convert the barrier itself (barrier controller might not have been switched yet) + AbstractEvent e = EventSerializer.fromBuffer(toPrioritize.buffer, this.getClass().getClassLoader()); + toPrioritize.buffer.setReaderIndex(0); + toPrioritize = new SequenceBuffer(EventSerializer.toBuffer(e, true), toPrioritize.sequenceNumber); firstPriorityEvent = addPriorityBuffer(toPrioritize); // note that only position of the element is changed // converting the event itself would require switching the controller sooner } if (firstPriorityEvent) { - notifyPriorityEvent(sequenceNumber); + notifyPriorityEventForce(); // forcibly notify about the priority event + // instead of passing barrier SQN to be checked + // because this SQN might have be seen by the input gate during the announcement } } + private void notifyPriorityEventForce() { + inputGate.notifyPriorityEventForce(this); + } + /** * Returns a list of buffers, checking the first n non-priority buffers, and skipping all events. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index fa4699c..f8158ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -773,7 +773,7 @@ public class SingleInputGate extends IndexedInputGate { // ------------------------------------------------------------------------ void notifyChannelNonEmpty(InputChannel channel) { - queueChannel(checkNotNull(channel), null); + queueChannel(checkNotNull(channel), null, false); } /** @@ -782,10 +782,14 @@ public class SingleInputGate extends IndexedInputGate { * <p>The buffer number limits the notification to the respective buffer and voids the whole notification in case * that the buffer has been polled in the meantime. That is, if task thread polls the enqueued priority buffer * before this notification occurs (notification is not performed under lock), this buffer number allows - * {@link #queueChannel(InputChannel, Integer)} to avoid spurious priority wake-ups. + * {@link #queueChannel(InputChannel, Integer, boolean)} to avoid spurious priority wake-ups. */ void notifyPriorityEvent(InputChannel inputChannel, int prioritySequenceNumber) { - queueChannel(checkNotNull(inputChannel), prioritySequenceNumber); + queueChannel(checkNotNull(inputChannel), prioritySequenceNumber, false); + } + + void notifyPriorityEventForce(InputChannel inputChannel) { + queueChannel(checkNotNull(inputChannel), null, true); } void triggerPartitionStateCheck(ResultPartitionID partitionId) { @@ -805,12 +809,12 @@ public class SingleInputGate extends IndexedInputGate { })); } - private void queueChannel(InputChannel channel, @Nullable Integer prioritySequenceNumber) { + private void queueChannel(InputChannel channel, @Nullable Integer prioritySequenceNumber, boolean forcePriority) { try (GateNotificationHelper notification = new GateNotificationHelper(this, inputChannelsWithData)) { synchronized (inputChannelsWithData) { - boolean priority = prioritySequenceNumber != null; + boolean priority = prioritySequenceNumber != null || forcePriority; - if (priority && + if (!forcePriority && priority && isOutdated(prioritySequenceNumber, lastPrioritySequenceNumber[channel.getChannelIndex()])) { // priority event at the given offset already polled (notification is not atomic in respect to // buffer enqueuing), so just ignore the notification diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index d13cb72..08358da 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -75,6 +75,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.apache.flink.runtime.checkpoint.CheckpointOptions.alignedWithTimeout; import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer; @@ -109,7 +110,30 @@ public class RemoteInputChannelTest { private static final long CHECKPOINT_ID = 1L; private static final CheckpointOptions UNALIGNED = CheckpointOptions.unaligned(getDefault()); - private static final CheckpointOptions ALIGNED_WITH_TIMEOUT = CheckpointOptions.alignedWithTimeout(getDefault(), 10); + private static final CheckpointOptions ALIGNED_WITH_TIMEOUT = alignedWithTimeout(getDefault(), 10); + + @Test + public void testGateNotifiedOnBarrierConversion() throws IOException, InterruptedException { + final int sequenceNumber = 0; + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(1, 4096); + try { + SingleInputGate inputGate = new SingleInputGateBuilder().setBufferPoolFactory(networkBufferPool.createBufferPool(1, 1)).build(); + inputGate.setup(); + RemoteInputChannel channel = InputChannelBuilder.newBuilder() + .setConnectionManager(new TestVerifyConnectionManager(new TestVerifyPartitionRequestClient())) + .buildRemoteChannel(inputGate); + channel.requestSubpartition(0); + + channel.onBuffer(toBuffer(new CheckpointBarrier(1L, 123L, alignedWithTimeout(getDefault(), Integer.MAX_VALUE)), false), sequenceNumber, 0); + inputGate.pollNext(); // process announcement to allow the gate remember the SQN + + channel.convertToPriorityEvent(sequenceNumber); + assertTrue(inputGate.getPriorityEventAvailableFuture().isDone()); + + } finally { + networkBufferPool.destroy(); + } + } @Test public void testExceptionOnReordering() throws Exception {
