This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b49ad6731c9798cb803b058ad02955c6fb3a2f05
Author: Roman Khachatryan <[email protected]>
AuthorDate: Mon Nov 9 14:10:44 2020 +0100

    [FLINK-19681][checkpointing] Fix barrier tracking in input channels
    
    LocalInputChannel:
    Reset lastSeenBarrier for any barriers, not just UC.
    In local channels, there are no announcements,
    therefore lastSeenBarrier may not be reset for AC,
    therefore extra buffers may be added to state.
    
    Reduces failure frequency in UnalignedCheckpointIT par-local case.
    
    RemoteInputChannel:
    Don't update tracking state during conversion. Only do it
    upon receiving a barrier.
    
    Reduces failure frequency in UnalignedCheckpointIT par-remote case.
---
 .../partition/consumer/ChannelStatePersister.java  | 28 ++++++++--------
 .../partition/consumer/LocalInputChannel.java      |  7 ++--
 .../partition/consumer/RemoteInputChannel.java     | 22 +++++++------
 .../partition/consumer/LocalInputChannelTest.java  | 38 ++++++++++++++++++++--
 4 files changed, 63 insertions(+), 32 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
index 924e30f..5742279 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
@@ -88,16 +88,16 @@ final class ChannelStatePersister {
        }
 
        protected Optional<Long> checkForBarrier(Buffer buffer) throws 
IOException {
-               final AbstractEvent priorityEvent = parsePriorityEvent(buffer);
-               if (priorityEvent instanceof CheckpointBarrier) {
-                       if (((CheckpointBarrier) priorityEvent).getId() >= 
lastSeenBarrier) {
+               final AbstractEvent event = parseEvent(buffer);
+               if (event instanceof CheckpointBarrier) {
+                       if (((CheckpointBarrier) event).getId() >= 
lastSeenBarrier) {
                                checkpointStatus = 
CheckpointStatus.BARRIER_RECEIVED;
-                               lastSeenBarrier = ((CheckpointBarrier) 
priorityEvent).getId();
+                               lastSeenBarrier = ((CheckpointBarrier) 
event).getId();
                                return Optional.of(lastSeenBarrier);
                        }
                }
-               else if (priorityEvent instanceof EventAnnouncement) {
-                       EventAnnouncement announcement = (EventAnnouncement) 
priorityEvent;
+               if (event instanceof EventAnnouncement) { // NOTE: only remote 
channels
+                       EventAnnouncement announcement = (EventAnnouncement) 
event;
                        if (announcement.getAnnouncedEvent() instanceof 
CheckpointBarrier) {
                                return Optional.of(((CheckpointBarrier) 
announcement.getAnnouncedEvent()).getId());
                        }
@@ -110,16 +110,16 @@ final class ChannelStatePersister {
         * returns null in all other cases.
         */
        @Nullable
-       protected AbstractEvent parsePriorityEvent(Buffer buffer) throws 
IOException {
-               if (buffer.isBuffer() || !buffer.getDataType().hasPriority()) {
+       protected AbstractEvent parseEvent(Buffer buffer) throws IOException {
+               if (buffer.isBuffer()) {
                        return null;
+               } else {
+                       AbstractEvent event = 
EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+                       // reset the buffer because it would be deserialized 
again in SingleInputGate while getting next buffer.
+                       // we can further improve to avoid double 
deserialization in the future.
+                       buffer.setReaderIndex(0);
+                       return event;
                }
-
-               AbstractEvent event = EventSerializer.fromBuffer(buffer, 
getClass().getClassLoader());
-               // reset the buffer because it would be deserialized again in 
SingleInputGate while getting next buffer.
-               // we can further improve to avoid double deserialization in 
the future.
-               buffer.setReaderIndex(0);
-               return event;
        }
 
        protected boolean hasBarrierReceived() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index a0d69c5..0fafb36 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -229,11 +229,8 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
 
                numBytesIn.inc(buffer.getSize());
                numBuffersIn.inc();
-               if (buffer.getDataType().hasPriority()) {
-                       channelStatePersister.checkForBarrier(buffer);
-               } else {
-                       channelStatePersister.maybePersist(buffer);
-               }
+               channelStatePersister.checkForBarrier(buffer);
+               channelStatePersister.maybePersist(buffer);
                return Optional.of(new BufferAndAvailability(
                        buffer,
                        next.getNextDataType(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 27fa58f..c012766 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -451,11 +451,20 @@ public class RemoteInputChannel extends InputChannel {
                                }
                                else {
                                        receivedBuffers.add(sequenceBuffer);
-                                       
channelStatePersister.maybePersist(buffer);
                                        if (dataType.requiresAnnouncement()) {
                                                firstPriorityEvent = 
addPriorityBuffer(announce(sequenceBuffer));
                                        }
                                }
+                               channelStatePersister
+                                       .checkForBarrier(sequenceBuffer.buffer)
+                                       .filter(id -> id > lastBarrierId)
+                                       .ifPresent(id -> {
+                                               // checkpoint was not yet 
started by task thread,
+                                               // so remember the numbers of 
buffers to spill for the time when it will be started
+                                               lastBarrierId = id;
+                                               lastBarrierSequenceNumber = 
sequenceBuffer.sequenceNumber;
+                                       });
+                               channelStatePersister.maybePersist(buffer);
                                ++expectedSequenceNumber;
                        }
                        recycleBuffer = false;
@@ -480,15 +489,8 @@ public class RemoteInputChannel extends InputChannel {
        /**
         * @return {@code true} if this was first priority buffer added.
         */
-       private boolean addPriorityBuffer(SequenceBuffer sequenceBuffer) throws 
IOException {
+       private boolean addPriorityBuffer(SequenceBuffer sequenceBuffer) {
                receivedBuffers.addPriorityElement(sequenceBuffer);
-               channelStatePersister
-                       .checkForBarrier(sequenceBuffer.buffer)
-                       .filter(id -> id > lastBarrierId)
-                       .ifPresent(id -> {
-                               lastBarrierId = id;
-                               lastBarrierSequenceNumber = 
sequenceBuffer.sequenceNumber;
-                       });
                return receivedBuffers.getNumPriorityElements() == 1;
        }
 
@@ -552,7 +554,7 @@ public class RemoteInputChannel extends InputChannel {
        public void convertToPriorityEvent(int sequenceNumber) throws 
IOException {
                boolean firstPriorityEvent;
                synchronized (receivedBuffers) {
-                       checkState(!channelStatePersister.hasBarrierReceived());
+                       checkState(channelStatePersister.hasBarrierReceived());
                        int numPriorityElementsBeforeRemoval = 
receivedBuffers.getNumPriorityElements();
                        SequenceBuffer toPrioritize = 
receivedBuffers.getAndRemove(
                                sequenceBuffer -> sequenceBuffer.sequenceNumber 
== sequenceNumber);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 08cdf6c..8785a68 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -29,8 +29,10 @@ import 
org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
@@ -54,6 +56,7 @@ import org.apache.flink.util.function.CheckedSupplier;
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
 import org.hamcrest.Matchers;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -73,6 +76,7 @@ import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtil
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static 
org.apache.flink.runtime.io.network.partition.InputGateFairnessTest.setupInputGate;
 import static 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.TestingResultPartitionManager;
+import static 
org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
@@ -94,6 +98,30 @@ import static org.mockito.Mockito.when;
  */
 public class LocalInputChannelTest {
 
+       @Test
+       public void testNoDataPersistedAfterReceivingAlignedBarrier() throws 
Exception {
+               CheckpointBarrier barrier = new CheckpointBarrier(1L, 0L, 
CheckpointOptions.alignedWithTimeout(getDefault(), 123L));
+               BufferConsumer barrierHolder = new 
BufferConsumer(EventSerializer.toBuffer(barrier, false).getMemorySegment(), 
FreeingBufferRecycler.INSTANCE, Buffer.DataType.EVENT_BUFFER);
+               BufferConsumer data = 
BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1);
+
+               RecordingChannelStateWriter stateWriter = new 
RecordingChannelStateWriter();
+               LocalInputChannel channel = InputChannelBuilder
+                       .newBuilder()
+                       .setPartitionManager(new 
TestingResultPartitionManager(createResultSubpartitionView(barrierHolder, 
data)))
+                       .setStateWriter(stateWriter)
+                       .buildLocalChannel(new 
SingleInputGateBuilder().build());
+               channel.requestSubpartition(0);
+
+               // pull AC barrier
+               channel.getNextBuffer();
+               // pretend that alignment timed out
+               stateWriter.start(barrier.getId(), 
barrier.getCheckpointOptions());
+               channel.checkpointStarted(barrier);
+               // pull data
+               channel.getNextBuffer();
+               Assert.assertTrue("no data should be persisted after receiving 
a barrier", stateWriter.getAddedInput().isEmpty());
+       }
+
        /**
         * Tests the consumption of multiple subpartitions via local input 
channels.
         *
@@ -472,7 +500,7 @@ public class LocalInputChannelTest {
                inputGate.setInputChannels(channel);
                channel.requestSubpartition(0);
 
-               final CheckpointStorageLocationReference location = 
CheckpointStorageLocationReference.getDefault();
+               final CheckpointStorageLocationReference location = 
getDefault();
                CheckpointOptions options = new 
CheckpointOptions(CheckpointType.CHECKPOINT, location, true, true, 0);
                stateWriter.start(0, options);
 
@@ -497,6 +525,10 @@ public class LocalInputChannelTest {
        // 
---------------------------------------------------------------------------------------------
 
        private static ResultSubpartitionView 
createResultSubpartitionView(boolean addBuffer) throws IOException {
+               return addBuffer ? 
createResultSubpartitionView(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096))
 : createResultSubpartitionView();
+       }
+
+       private static ResultSubpartitionView 
createResultSubpartitionView(BufferConsumer... buffers) throws IOException {
                int bufferSize = 4096;
                PipelinedResultPartition parent = (PipelinedResultPartition) 
PartitionTestUtils.createPartition(
                        ResultPartitionType.PIPELINED,
@@ -504,8 +536,8 @@ public class LocalInputChannelTest {
                        true,
                        bufferSize);
                ResultSubpartition subpartition = parent.getAllPartitions()[0];
-               if (addBuffer) {
-                       
subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(bufferSize));
+               for (BufferConsumer buffer : buffers) {
+                       subpartition.add(buffer);
                }
                return subpartition.createReadView(() -> {});
        }

Reply via email to