This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b49ad6731c9798cb803b058ad02955c6fb3a2f05 Author: Roman Khachatryan <[email protected]> AuthorDate: Mon Nov 9 14:10:44 2020 +0100 [FLINK-19681][checkpointing] Fix barrier tracking in input channels LocalInputChannel: Reset lastSeenBarrier for any barriers, not just UC. In local channels, there are no announcements, therefore lastSeenBarrier may not be reset for AC, therefore extra buffers may be added to state. Reduces failure frequency in UnalignedCheckpointIT par-local case. RemoteInputChannel: Don't update tracking state during conversion. Only do it upon receiving a barrier. Reduces failure frequency in UnalignedCheckpointIT par-remote case. --- .../partition/consumer/ChannelStatePersister.java | 28 ++++++++-------- .../partition/consumer/LocalInputChannel.java | 7 ++-- .../partition/consumer/RemoteInputChannel.java | 22 +++++++------ .../partition/consumer/LocalInputChannelTest.java | 38 ++++++++++++++++++++-- 4 files changed, 63 insertions(+), 32 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java index 924e30f..5742279 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java @@ -88,16 +88,16 @@ final class ChannelStatePersister { } protected Optional<Long> checkForBarrier(Buffer buffer) throws IOException { - final AbstractEvent priorityEvent = parsePriorityEvent(buffer); - if (priorityEvent instanceof CheckpointBarrier) { - if (((CheckpointBarrier) priorityEvent).getId() >= lastSeenBarrier) { + final AbstractEvent event = parseEvent(buffer); + if (event instanceof CheckpointBarrier) { + if (((CheckpointBarrier) event).getId() >= lastSeenBarrier) { checkpointStatus = CheckpointStatus.BARRIER_RECEIVED; - lastSeenBarrier = ((CheckpointBarrier) priorityEvent).getId(); + lastSeenBarrier = ((CheckpointBarrier) event).getId(); return Optional.of(lastSeenBarrier); } } - else if (priorityEvent instanceof EventAnnouncement) { - EventAnnouncement announcement = (EventAnnouncement) priorityEvent; + if (event instanceof EventAnnouncement) { // NOTE: only remote channels + EventAnnouncement announcement = (EventAnnouncement) event; if (announcement.getAnnouncedEvent() instanceof CheckpointBarrier) { return Optional.of(((CheckpointBarrier) announcement.getAnnouncedEvent()).getId()); } @@ -110,16 +110,16 @@ final class ChannelStatePersister { * returns null in all other cases. */ @Nullable - protected AbstractEvent parsePriorityEvent(Buffer buffer) throws IOException { - if (buffer.isBuffer() || !buffer.getDataType().hasPriority()) { + protected AbstractEvent parseEvent(Buffer buffer) throws IOException { + if (buffer.isBuffer()) { return null; + } else { + AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); + // reset the buffer because it would be deserialized again in SingleInputGate while getting next buffer. + // we can further improve to avoid double deserialization in the future. + buffer.setReaderIndex(0); + return event; } - - AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); - // reset the buffer because it would be deserialized again in SingleInputGate while getting next buffer. - // we can further improve to avoid double deserialization in the future. - buffer.setReaderIndex(0); - return event; } protected boolean hasBarrierReceived() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index a0d69c5..0fafb36 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -229,11 +229,8 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit numBytesIn.inc(buffer.getSize()); numBuffersIn.inc(); - if (buffer.getDataType().hasPriority()) { - channelStatePersister.checkForBarrier(buffer); - } else { - channelStatePersister.maybePersist(buffer); - } + channelStatePersister.checkForBarrier(buffer); + channelStatePersister.maybePersist(buffer); return Optional.of(new BufferAndAvailability( buffer, next.getNextDataType(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 27fa58f..c012766 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -451,11 +451,20 @@ public class RemoteInputChannel extends InputChannel { } else { receivedBuffers.add(sequenceBuffer); - channelStatePersister.maybePersist(buffer); if (dataType.requiresAnnouncement()) { firstPriorityEvent = addPriorityBuffer(announce(sequenceBuffer)); } } + channelStatePersister + .checkForBarrier(sequenceBuffer.buffer) + .filter(id -> id > lastBarrierId) + .ifPresent(id -> { + // checkpoint was not yet started by task thread, + // so remember the numbers of buffers to spill for the time when it will be started + lastBarrierId = id; + lastBarrierSequenceNumber = sequenceBuffer.sequenceNumber; + }); + channelStatePersister.maybePersist(buffer); ++expectedSequenceNumber; } recycleBuffer = false; @@ -480,15 +489,8 @@ public class RemoteInputChannel extends InputChannel { /** * @return {@code true} if this was first priority buffer added. */ - private boolean addPriorityBuffer(SequenceBuffer sequenceBuffer) throws IOException { + private boolean addPriorityBuffer(SequenceBuffer sequenceBuffer) { receivedBuffers.addPriorityElement(sequenceBuffer); - channelStatePersister - .checkForBarrier(sequenceBuffer.buffer) - .filter(id -> id > lastBarrierId) - .ifPresent(id -> { - lastBarrierId = id; - lastBarrierSequenceNumber = sequenceBuffer.sequenceNumber; - }); return receivedBuffers.getNumPriorityElements() == 1; } @@ -552,7 +554,7 @@ public class RemoteInputChannel extends InputChannel { public void convertToPriorityEvent(int sequenceNumber) throws IOException { boolean firstPriorityEvent; synchronized (receivedBuffers) { - checkState(!channelStatePersister.hasBarrierReceived()); + checkState(channelStatePersister.hasBarrierReceived()); int numPriorityElementsBeforeRemoval = receivedBuffers.getNumPriorityElements(); SequenceBuffer toPrioritize = receivedBuffers.getAndRemove( sequenceBuffer -> sequenceBuffer.sequenceNumber == sequenceNumber); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 08cdf6c..8785a68 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -29,8 +29,10 @@ import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition; @@ -54,6 +56,7 @@ import org.apache.flink.util.function.CheckedSupplier; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.hamcrest.Matchers; +import org.junit.Assert; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -73,6 +76,7 @@ import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtil import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; import static org.apache.flink.runtime.io.network.partition.InputGateFairnessTest.setupInputGate; import static org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.TestingResultPartitionManager; +import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault; import static org.apache.flink.util.Preconditions.checkArgument; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertFalse; @@ -94,6 +98,30 @@ import static org.mockito.Mockito.when; */ public class LocalInputChannelTest { + @Test + public void testNoDataPersistedAfterReceivingAlignedBarrier() throws Exception { + CheckpointBarrier barrier = new CheckpointBarrier(1L, 0L, CheckpointOptions.alignedWithTimeout(getDefault(), 123L)); + BufferConsumer barrierHolder = new BufferConsumer(EventSerializer.toBuffer(barrier, false).getMemorySegment(), FreeingBufferRecycler.INSTANCE, Buffer.DataType.EVENT_BUFFER); + BufferConsumer data = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1); + + RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter(); + LocalInputChannel channel = InputChannelBuilder + .newBuilder() + .setPartitionManager(new TestingResultPartitionManager(createResultSubpartitionView(barrierHolder, data))) + .setStateWriter(stateWriter) + .buildLocalChannel(new SingleInputGateBuilder().build()); + channel.requestSubpartition(0); + + // pull AC barrier + channel.getNextBuffer(); + // pretend that alignment timed out + stateWriter.start(barrier.getId(), barrier.getCheckpointOptions()); + channel.checkpointStarted(barrier); + // pull data + channel.getNextBuffer(); + Assert.assertTrue("no data should be persisted after receiving a barrier", stateWriter.getAddedInput().isEmpty()); + } + /** * Tests the consumption of multiple subpartitions via local input channels. * @@ -472,7 +500,7 @@ public class LocalInputChannelTest { inputGate.setInputChannels(channel); channel.requestSubpartition(0); - final CheckpointStorageLocationReference location = CheckpointStorageLocationReference.getDefault(); + final CheckpointStorageLocationReference location = getDefault(); CheckpointOptions options = new CheckpointOptions(CheckpointType.CHECKPOINT, location, true, true, 0); stateWriter.start(0, options); @@ -497,6 +525,10 @@ public class LocalInputChannelTest { // --------------------------------------------------------------------------------------------- private static ResultSubpartitionView createResultSubpartitionView(boolean addBuffer) throws IOException { + return addBuffer ? createResultSubpartitionView(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096)) : createResultSubpartitionView(); + } + + private static ResultSubpartitionView createResultSubpartitionView(BufferConsumer... buffers) throws IOException { int bufferSize = 4096; PipelinedResultPartition parent = (PipelinedResultPartition) PartitionTestUtils.createPartition( ResultPartitionType.PIPELINED, @@ -504,8 +536,8 @@ public class LocalInputChannelTest { true, bufferSize); ResultSubpartition subpartition = parent.getAllPartitions()[0]; - if (addBuffer) { - subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(bufferSize)); + for (BufferConsumer buffer : buffers) { + subpartition.add(buffer); } return subpartition.createReadView(() -> {}); }
