This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c5c46abbcd8662f7162f934b16209ef3a60585a9 Author: Piotr Nowojski <[email protected]> AuthorDate: Sun Oct 25 18:04:41 2020 +0100 [FLINK-19681][checkpointing] Timeout aligned checkpoints based on checkpointStartDelay --- .../runtime/checkpoint/CheckpointOptions.java | 11 + .../partition/BufferWritingResultPartition.java | 3 + .../io/network/partition/PrioritizedDeque.java | 22 ++ .../partition/consumer/ChannelStatePersister.java | 11 +- .../partition/consumer/CheckpointableInput.java | 2 + .../partition/consumer/IndexedInputGate.java | 7 + .../network/partition/consumer/InputChannel.java | 3 + .../partition/consumer/RemoteInputChannel.java | 47 ++++ .../io/network/partition/PrioritizedDequeTest.java | 27 ++ .../partition/consumer/RemoteInputChannelTest.java | 41 +++ .../streaming/runtime/io/AlignedController.java | 54 +++- .../runtime/io/AlternatingController.java | 125 ++++++++- .../io/CheckpointBarrierBehaviourController.java | 12 +- .../runtime/io/SingleCheckpointBarrierHandler.java | 70 +++-- .../runtime/io/StreamTaskSourceInput.java | 5 + .../streaming/runtime/io/UnalignedController.java | 22 +- .../runtime/tasks/mailbox/MailboxProcessor.java | 5 + .../runtime/io/AlignedControllerTest.java | 29 ++ .../runtime/io/AlternatingControllerTest.java | 306 ++++++++++++++++++++- .../runtime/io/ValidatingCheckpointHandler.java | 8 +- 20 files changed, 754 insertions(+), 56 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java index 126ba0b..b092a92 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java @@ -26,6 +26,7 @@ import java.io.Serializable; import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * Options for performing the checkpoint. @@ -188,4 +189,14 @@ public class CheckpointOptions implements Serializable { isUnalignedCheckpoint, alignmentTimeout); } + + public CheckpointOptions toTimeouted() { + checkState(checkpointType == CheckpointType.CHECKPOINT); + return create( + checkpointType, + targetLocation, + isExactlyOnceMode, + true, + 0); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java index 2ba000a..415ed19 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java @@ -235,6 +235,9 @@ public abstract class BufferWritingResultPartition extends ResultPartition { private BufferBuilder appendUnicastDataForNewRecord( final ByteBuffer record, final int targetSubpartition) throws IOException { + if (targetSubpartition < 0 || targetSubpartition > unicastBufferBuilders.length) { + throw new ArrayIndexOutOfBoundsException(targetSubpartition); + } BufferBuilder buffer = unicastBufferBuilders[targetSubpartition]; if (buffer == null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java index dabb4a3..77108d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java @@ -26,7 +26,9 @@ import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.Objects; +import java.util.function.Predicate; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -142,6 +144,26 @@ public final class PrioritizedDeque<T> implements Iterable<T> { } /** + * Find first element matching the {@link Predicate}, remove it from the {@link PrioritizedDeque} + * and return it. + * @return removed element + */ + public T getAndRemove(Predicate<T> preCondition) { + Iterator<T> iterator = deque.iterator(); + for (int i = 0; i < deque.size(); i++) { + T next = iterator.next(); + if (preCondition.test(next)) { + if (i < numPriorityElements) { + numPriorityElements--; + } + iterator.remove(); + return next; + } + } + throw new NoSuchElementException(); + } + + /** * Polls the first priority element or non-priority element if the former does not exist. * * @return the first element or null. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java index 416c956..924e30f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java @@ -21,6 +21,7 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EventAnnouncement; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.util.CloseableIterator; @@ -47,9 +48,7 @@ final class ChannelStatePersister { private long lastSeenBarrier = -1L; - /** - * Writer must be initialized before usage. {@link #startPersisting(long, List)} enforces this invariant. - */ + /** Writer must be initialized before usage. {@link #startPersisting(long, List)} enforces this invariant. */ private final ChannelStateWriter channelStateWriter; ChannelStatePersister(ChannelStateWriter channelStateWriter, InputChannelInfo channelInfo) { @@ -97,6 +96,12 @@ final class ChannelStatePersister { return Optional.of(lastSeenBarrier); } } + else if (priorityEvent instanceof EventAnnouncement) { + EventAnnouncement announcement = (EventAnnouncement) priorityEvent; + if (announcement.getAnnouncedEvent() instanceof CheckpointBarrier) { + return Optional.of(((CheckpointBarrier) announcement.getAnnouncedEvent()).getId()); + } + } return Optional.empty(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java index 2316a90..75a2ba6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java @@ -44,4 +44,6 @@ public interface CheckpointableInput { void checkpointStopped(long cancelledCheckpointId); int getInputGateIndex(); + + void convertToPriorityEvent(int channelIndex, int sequenceNumber) throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java index 3b8ecb2..6566fd3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java @@ -21,6 +21,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import java.io.IOException; + /** * An {@link InputGate} with a specific index. */ @@ -53,4 +55,9 @@ public abstract class IndexedInputGate extends InputGate implements Checkpointab public void blockConsumption(InputChannelInfo channelInfo) { // Unused. Network stack is blocking consumption automatically by revoking credits. } + + @Override + public void convertToPriorityEvent(int channelIndex, int sequenceNumber) throws IOException { + getChannel(channelIndex).convertToPriorityEvent(sequenceNumber); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index 792392b9..e1909b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -183,6 +183,9 @@ public abstract class InputChannel { public void checkpointStopped(long checkpointId) { } + public void convertToPriorityEvent(int sequenceNumber) throws IOException { + } + // ------------------------------------------------------------------------ // Task events // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 1992cdb..27fa58f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -494,6 +494,7 @@ public class RemoteInputChannel extends InputChannel { private SequenceBuffer announce(SequenceBuffer sequenceBuffer) throws IOException { checkState(!sequenceBuffer.buffer.isBuffer(), "Only a CheckpointBarrier can be announced but found %s", sequenceBuffer.buffer); + checkAnnouncedOnlyOnce(sequenceBuffer); AbstractEvent event = EventSerializer.fromBuffer( sequenceBuffer.buffer, getClass().getClassLoader()); @@ -504,6 +505,20 @@ public class RemoteInputChannel extends InputChannel { sequenceBuffer.sequenceNumber); } + private void checkAnnouncedOnlyOnce(SequenceBuffer sequenceBuffer) { + Iterator<SequenceBuffer> iterator = receivedBuffers.iterator(); + int count = 0; + while (iterator.hasNext()) { + if (iterator.next().sequenceNumber == sequenceBuffer.sequenceNumber) { + count++; + } + } + checkState( + count == 1, + "Before enqueuing the announcement there should be exactly single occurrence of the buffer, but found [%d]", + count); + } + /** * Spills all queued buffers on checkpoint start. If barrier has already been received (and reordered), spill only * the overtaken buffers. @@ -533,6 +548,29 @@ public class RemoteInputChannel extends InputChannel { } } + @Override + public void convertToPriorityEvent(int sequenceNumber) throws IOException { + boolean firstPriorityEvent; + synchronized (receivedBuffers) { + checkState(!channelStatePersister.hasBarrierReceived()); + int numPriorityElementsBeforeRemoval = receivedBuffers.getNumPriorityElements(); + SequenceBuffer toPrioritize = receivedBuffers.getAndRemove( + sequenceBuffer -> sequenceBuffer.sequenceNumber == sequenceNumber); + checkState(lastBarrierSequenceNumber == sequenceNumber); + checkState(!toPrioritize.buffer.isBuffer()); + checkState( + numPriorityElementsBeforeRemoval == receivedBuffers.getNumPriorityElements(), + "Attempted to convertToPriorityEvent an event [%s] that has already been prioritized [%s]", + toPrioritize, + numPriorityElementsBeforeRemoval); + firstPriorityEvent = addPriorityBuffer(toPrioritize); // note that only position of the element is changed + // converting the event itself would require switching the controller sooner + } + if (firstPriorityEvent) { + notifyPriorityEvent(sequenceNumber); + } + } + /** * Returns a list of buffers, checking the first n non-priority buffers, and skipping all events. */ @@ -653,5 +691,14 @@ public class RemoteInputChannel extends InputChannel { this.buffer = buffer; this.sequenceNumber = sequenceNumber; } + + @Override + public String toString() { + return String.format( + "SequenceBuffer(isEvent = %s, dataType = %s, sequenceNumber = %s)", + !buffer.isBuffer(), + buffer.getDataType(), + sequenceNumber); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PrioritizedDequeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PrioritizedDequeTest.java index 8932c86..5cc8a3b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PrioritizedDequeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PrioritizedDequeTest.java @@ -19,7 +19,11 @@ package org.apache.flink.runtime.io.network.partition; import org.junit.Test; +import java.util.NoSuchElementException; + import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; /** * Tests PrioritizedDeque. @@ -53,4 +57,27 @@ public class PrioritizedDequeTest { assertArrayEquals(new Integer[] { 3, 0, 1, 2 }, deque.asUnmodifiableCollection().toArray(new Integer[0])); } + + @Test + public void testGetAndRemove() { + final PrioritizedDeque<Integer> deque = new PrioritizedDeque<>(); + + deque.add(0); + deque.add(1); + deque.add(2); + deque.add(1); + deque.add(3); + + assertEquals(1, deque.getAndRemove(v -> v == 1).intValue()); + assertArrayEquals(new Integer[] { 0, 2, 1, 3 }, deque.asUnmodifiableCollection().toArray(new Integer[0])); + assertEquals(1, deque.getAndRemove(v -> v == 1).intValue()); + assertArrayEquals(new Integer[] { 0, 2, 3 }, deque.asUnmodifiableCollection().toArray(new Integer[0])); + try { + int removed = deque.getAndRemove(v -> v == 1); + fail(String.format("This should not happen. Item [%s] was removed, but it shouldn't be found", removed)); + } + catch (NoSuchElementException ex) { + // expected + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index c74cf4e..2954a3f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -1232,6 +1232,47 @@ public class RemoteInputChannelTest { assertEquals(3, channel.getNextBuffer().get().getSequenceNumber()); } + @Test + public void testGetInflightBuffersBeforeProcessingAnnouncement() throws Exception { + int bufferSize = 1; + int sequenceNumber = 0; + final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber); + sendBuffer(channel, sequenceNumber++, bufferSize++); + sendBuffer(channel, sequenceNumber++, bufferSize++); + sendBarrier(channel, sequenceNumber++, 10); + sendBuffer(channel, sequenceNumber++, bufferSize++); + sendBuffer(channel, sequenceNumber++, bufferSize++); + assertInflightBufferSizes(channel, 1, 2); + } + + @Test + public void testGetInflightBuffersAfterProcessingAnnouncement() throws Exception { + int bufferSize = 1; + int sequenceNumber = 0; + final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber); + sendBuffer(channel, sequenceNumber++, bufferSize++); + sendBuffer(channel, sequenceNumber++, bufferSize++); + sendBarrier(channel, sequenceNumber++, 10); + sendBuffer(channel, sequenceNumber++, bufferSize++); + sendBuffer(channel, sequenceNumber++, bufferSize++); + assertGetNextBufferSequenceNumbers(channel, 2); + assertInflightBufferSizes(channel, 1, 2); + } + + @Test + public void testGetInflightBuffersAfterProcessingAnnouncementAndBuffer() throws Exception { + int bufferSize = 1; + int sequenceNumber = 0; + final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber); + sendBuffer(channel, sequenceNumber++, bufferSize++); + sendBuffer(channel, sequenceNumber++, bufferSize++); + sendBarrier(channel, sequenceNumber++, 10); + sendBuffer(channel, sequenceNumber++, bufferSize++); + sendBuffer(channel, sequenceNumber++, bufferSize++); + assertGetNextBufferSequenceNumbers(channel, 2, 0); + assertInflightBufferSizes(channel, 2); + } + private void sendBarrier(RemoteInputChannel channel, int sequenceNumber, int alignmentTimeout) throws IOException { CheckpointOptions checkpointOptions = CheckpointOptions.create( CHECKPOINT, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java index f3999a4..a2ef674 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java @@ -26,7 +26,10 @@ import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInpu import java.io.IOException; import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -39,13 +42,35 @@ import static org.apache.flink.util.Preconditions.checkState; public class AlignedController implements CheckpointBarrierBehaviourController { private final CheckpointableInput[] inputs; + /** + * {@link #blockedChannels} are the ones for which we have already processed {@link CheckpointBarrier} + * (via {@link #barrierReceived(InputChannelInfo, CheckpointBarrier)}. {@link #sequenceNumberInAnnouncedChannels} + * on the other hand, are the ones that we have processed {@link #barrierAnnouncement(InputChannelInfo, CheckpointBarrier, int)} + * but not yet {@link #barrierReceived(InputChannelInfo, CheckpointBarrier)}. + */ private final Map<InputChannelInfo, Boolean> blockedChannels; + private final Map<InputChannelInfo, Integer> sequenceNumberInAnnouncedChannels; public AlignedController(CheckpointableInput... inputs) { this.inputs = inputs; blockedChannels = Arrays.stream(inputs) .flatMap(gate -> gate.getChannelInfos().stream()) .collect(Collectors.toMap(Function.identity(), info -> false)); + sequenceNumberInAnnouncedChannels = new HashMap<>(); + } + + @Override + public void barrierAnnouncement( + InputChannelInfo channelInfo, + CheckpointBarrier announcedBarrier, + int sequenceNumber) { + Integer previousValue = sequenceNumberInAnnouncedChannels.put(channelInfo, sequenceNumber); + checkState( + previousValue == null, + "Stream corrupt: Repeated barrierAnnouncement [%s] overwriting [%s] for the same checkpoint on input %s", + announcedBarrier, + sequenceNumber, + channelInfo); } @Override @@ -53,27 +78,29 @@ public class AlignedController implements CheckpointBarrierBehaviourController { } @Override - public void barrierReceived( + public Optional<CheckpointBarrier> barrierReceived( InputChannelInfo channelInfo, CheckpointBarrier barrier) { checkState(!blockedChannels.put(channelInfo, true), "Stream corrupt: Repeated barrier for same checkpoint on input " + channelInfo); + sequenceNumberInAnnouncedChannels.remove(channelInfo); CheckpointableInput input = inputs[channelInfo.getGateIdx()]; input.blockConsumption(channelInfo); + return Optional.empty(); } @Override - public boolean preProcessFirstBarrier( + public Optional<CheckpointBarrier> preProcessFirstBarrier( InputChannelInfo channelInfo, CheckpointBarrier barrier) { - return false; + return Optional.empty(); } @Override - public boolean postProcessLastBarrier( + public Optional<CheckpointBarrier> postProcessLastBarrier( InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException { resumeConsumption(); - return true; + return Optional.of(barrier); } @Override @@ -90,16 +117,29 @@ public class AlignedController implements CheckpointBarrierBehaviourController { resumeConsumption(channelInfo); } - private void resumeConsumption() throws IOException { + public Collection<InputChannelInfo> getBlockedChannels() { + return blockedChannels.entrySet() + .stream() + .filter(entry -> entry.getValue()) + .map(entry -> entry.getKey()) + .collect(Collectors.toSet()); + } + + public Map<InputChannelInfo, Integer> getSequenceNumberInAnnouncedChannels() { + return new HashMap<>(sequenceNumberInAnnouncedChannels); + } + + public void resumeConsumption() throws IOException { for (Map.Entry<InputChannelInfo, Boolean> blockedChannel : blockedChannels.entrySet()) { if (blockedChannel.getValue()) { resumeConsumption(blockedChannel.getKey()); } blockedChannel.setValue(false); } + sequenceNumberInAnnouncedChannels.clear(); } - private void resumeConsumption(InputChannelInfo channelInfo) throws IOException { + void resumeConsumption(InputChannelInfo channelInfo) throws IOException { CheckpointableInput input = inputs[channelInfo.getGateIdx()]; input.resumeConsumption(channelInfo); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java index d040c66..f943638 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java @@ -20,10 +20,14 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; import static org.apache.flink.util.Preconditions.checkState; @@ -34,7 +38,9 @@ import static org.apache.flink.util.Preconditions.checkState; public class AlternatingController implements CheckpointBarrierBehaviourController { private final AlignedController alignedController; private final UnalignedController unalignedController; - private CheckpointBarrierBehaviourController activeController; + + private CheckpointBarrierBehaviourController activeController; + private long timeOutedBarrierId = -1; // used to shortcut timeout check public AlternatingController( AlignedController alignedController, @@ -49,23 +55,107 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll } @Override - public void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) { - checkActiveController(barrier); - activeController.barrierReceived(channelInfo, barrier); + public void barrierAnnouncement( + InputChannelInfo channelInfo, + CheckpointBarrier announcedBarrier, + int sequenceNumber) throws IOException { + + Optional<CheckpointBarrier> maybeTimedOut = maybeTimeOut(announcedBarrier); + announcedBarrier = maybeTimedOut.orElse(announcedBarrier); + + if (maybeTimedOut.isPresent() && activeController != unalignedController) { + // Let's timeout this barrier + unalignedController.barrierAnnouncement(channelInfo, announcedBarrier, sequenceNumber); + } + else { + // Either we have already timed out before, or we are still going with aligned checkpoints + activeController.barrierAnnouncement(channelInfo, announcedBarrier, sequenceNumber); + } + } + + @Override + public Optional<CheckpointBarrier> barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException { + if (barrier.getCheckpointOptions().isUnalignedCheckpoint() && activeController == alignedController) { + switchToUnaligned(channelInfo, barrier); + activeController.barrierReceived(channelInfo, barrier); + return Optional.of(barrier); + } + + Optional<CheckpointBarrier> maybeTimedOut = maybeTimeOut(barrier); + barrier = maybeTimedOut.orElse(barrier); + + checkState(!activeController.barrierReceived(channelInfo, barrier).isPresent()); + + if (maybeTimedOut.isPresent()) { + if (activeController == alignedController) { + switchToUnaligned(channelInfo, barrier); + return maybeTimedOut; + } + else { + // TODO: add unit test for this + alignedController.resumeConsumption(channelInfo); + } + } + return Optional.empty(); } @Override - public boolean preProcessFirstBarrier( + public Optional<CheckpointBarrier> preProcessFirstBarrier( InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException { - checkActiveController(barrier); return activeController.preProcessFirstBarrier(channelInfo, barrier); } + private void switchToUnaligned( + InputChannelInfo channelInfo, + CheckpointBarrier barrier) throws IOException, CheckpointException { + checkState(alignedController == activeController); + + // timeout all not yet processed barriers for which alignedController has processed an announcement + for (Map.Entry<InputChannelInfo, Integer> entry : alignedController.getSequenceNumberInAnnouncedChannels().entrySet()) { + InputChannelInfo unProcessedChannelInfo = entry.getKey(); + int announcedBarrierSequenceNumber = entry.getValue(); + unalignedController.barrierAnnouncement(unProcessedChannelInfo, barrier, announcedBarrierSequenceNumber); + } + + // get blocked channels before resuming consumption + Collection<InputChannelInfo> blockedChannels = alignedController.getBlockedChannels(); + + activeController = unalignedController; + + // alignedController might has already processed some barriers, so "migrate"/forward those calls to unalignedController. + unalignedController.preProcessFirstBarrier(channelInfo, barrier); + for (InputChannelInfo blockedChannel : blockedChannels) { + unalignedController.barrierReceived(blockedChannel, barrier); + } + + alignedController.resumeConsumption(); + } + @Override - public boolean postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException { - checkActiveController(barrier); - return activeController.postProcessLastBarrier(channelInfo, barrier); + public Optional<CheckpointBarrier> postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException { + Optional<CheckpointBarrier> maybeTimeOut = maybeTimeOut(barrier); + if (maybeTimeOut.isPresent() && activeController == alignedController) { + switchToUnaligned(channelInfo, maybeTimeOut.get()); + checkState(activeController == unalignedController); + checkState(!activeController.postProcessLastBarrier(channelInfo, maybeTimeOut.orElse(barrier)).isPresent()); + return maybeTimeOut; + } + + barrier = maybeTimeOut.orElse(barrier); + if (barrier.getCheckpointOptions().isUnalignedCheckpoint()) { + checkState(activeController == unalignedController); + checkState(!activeController.postProcessLastBarrier(channelInfo, maybeTimeOut.orElse(barrier)).isPresent()); + return Optional.empty(); + } + else { + checkState(activeController == alignedController); + Optional<CheckpointBarrier> triggerResult = activeController.postProcessLastBarrier( + channelInfo, + barrier); + checkState(triggerResult.isPresent()); + return triggerResult; + } } @Override @@ -94,4 +184,21 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll private CheckpointBarrierBehaviourController chooseController(CheckpointBarrier barrier) { return isAligned(barrier) ? alignedController : unalignedController; } + + private Optional<CheckpointBarrier> maybeTimeOut(CheckpointBarrier barrier) { + CheckpointOptions options = barrier.getCheckpointOptions(); + boolean shouldTimeout = (options.isTimeoutable()) && ( + barrier.getId() == timeOutedBarrierId || + (System.currentTimeMillis() - barrier.getTimestamp()) > options.getAlignmentTimeout()); + if (options.isUnalignedCheckpoint() || !shouldTimeout) { + return Optional.empty(); + } + else { + timeOutedBarrierId = Math.max(timeOutedBarrierId, barrier.getId()); + return Optional.of(new CheckpointBarrier( + barrier.getId(), + barrier.getTimestamp(), + options.toTimeouted())); + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java index 5939451..2a3908a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import java.io.IOException; +import java.util.Optional; /** * Controls when the checkpoint should be actually triggered. @@ -37,23 +38,28 @@ public interface CheckpointBarrierBehaviourController { void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier); /** + * Invoked per every {@link CheckpointBarrier} announcement. + */ + void barrierAnnouncement(InputChannelInfo channelInfo, CheckpointBarrier announcedBarrier, int sequenceNumber) throws IOException; + + /** * Invoked per every received {@link CheckpointBarrier}. */ - void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier); + Optional<CheckpointBarrier> barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException; /** * Invoked once per checkpoint, before the first invocation of * {@link #barrierReceived(InputChannelInfo, CheckpointBarrier)} for that given checkpoint. * @return {@code true} if checkpoint should be triggered. */ - boolean preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException; + Optional<CheckpointBarrier> preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException; /** * Invoked once per checkpoint, after the last invocation of * {@link #barrierReceived(InputChannelInfo, CheckpointBarrier)} for that given checkpoint. * @return {@code true} if checkpoint should be triggered. */ - boolean postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException; + Optional<CheckpointBarrier> postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException; void abortPendingCheckpoint(long cancelledId, CheckpointException exception) throws IOException; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java index fa1ee00..b00b1fb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator; +import org.apache.flink.util.function.TriFunctionWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +38,7 @@ import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; import java.util.Arrays; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM; @@ -116,21 +118,15 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { markAlignmentStart(barrier.getTimestamp()); } allBarriersReceivedFuture = new CompletableFuture<>(); - try { - if (controller.preProcessFirstBarrier(channelInfo, barrier)) { - LOG.debug("{}: Triggering checkpoint {} on the first barrier at {}.", - taskName, - barrier.getId(), - barrier.getTimestamp()); - notifyCheckpoint(barrier); - } - } catch (CheckpointException e) { - abortInternal(barrier.getId(), e); + + if (!handleBarrier(barrier, channelInfo, CheckpointBarrierBehaviourController::preProcessFirstBarrier)) { return; } } - controller.barrierReceived(channelInfo, barrier); + if (!handleBarrier(barrier, channelInfo, CheckpointBarrierBehaviourController::barrierReceived)) { + return; + } if (currentCheckpointId == barrierId) { if (++numBarriersReceived == numOpenChannels) { @@ -139,25 +135,61 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { } numBarriersReceived = 0; lastCancelledOrCompletedCheckpointId = currentCheckpointId; - if (controller.postProcessLastBarrier(channelInfo, barrier)) { - LOG.debug("{}: Triggering checkpoint {} on the last barrier at {}.", - taskName, - barrier.getId(), - barrier.getTimestamp()); - notifyCheckpoint(barrier); - } + handleBarrier(barrier, channelInfo, CheckpointBarrierBehaviourController::postProcessLastBarrier); allBarriersReceivedFuture.complete(null); } } } + private boolean handleBarrier( + CheckpointBarrier barrier, + InputChannelInfo channelInfo, + TriFunctionWithException< + CheckpointBarrierBehaviourController, + InputChannelInfo, + CheckpointBarrier, + Optional<CheckpointBarrier>, + Exception + > controllerAction) throws IOException { + try { + Optional<CheckpointBarrier> triggerMaybe = controllerAction.apply(controller, channelInfo, barrier); + if (triggerMaybe.isPresent()) { + CheckpointBarrier trigger = triggerMaybe.get(); + LOG.debug( + "{}: Triggering checkpoint {} on the barrier announcement at {}.", + taskName, + trigger.getId(), + trigger.getTimestamp()); + notifyCheckpoint(trigger); + } + return true; + } catch (CheckpointException e) { + abortInternal(barrier.getId(), e); + return false; + } catch (RuntimeException | IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + } + @Override public void processBarrierAnnouncement( CheckpointBarrier announcedBarrier, int sequenceNumber, InputChannelInfo channelInfo) throws IOException { checkSubsumedCheckpoint(channelInfo, announcedBarrier); - // TODO: FLINK-19681 + + long barrierId = announcedBarrier.getId(); + if (currentCheckpointId > barrierId || (currentCheckpointId == barrierId && !isCheckpointPending())) { + LOG.debug("{}: Obsolete announcement of checkpoint {} for channel {}.", + taskName, + barrierId, + channelInfo); + return; + } + + controller.barrierAnnouncement(channelInfo, announcedBarrier, sequenceNumber); } private void checkSubsumedCheckpoint(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java index 9b7c80f..01bd056 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInpu import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.operators.SourceOperator; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -128,6 +129,10 @@ public class StreamTaskSourceInput<T> implements StreamTaskInput<T>, Checkpointa } @Override + public void convertToPriorityEvent(int channelIndex, int sequenceNumber) throws IOException { + } + + @Override public int getInputIndex() { return inputIndex; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java index 3d5f575..d19533e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInpu import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator; import java.io.IOException; +import java.util.Optional; /** * Controller for unaligned checkpoints. @@ -48,22 +49,33 @@ public class UnalignedController implements CheckpointBarrierBehaviourController } @Override - public void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) { + public void barrierAnnouncement( + InputChannelInfo channelInfo, + CheckpointBarrier announcedBarrier, + int sequenceNumber) throws IOException { + inputs[channelInfo.getGateIdx()].convertToPriorityEvent( + channelInfo.getInputChannelIdx(), + sequenceNumber); + } + + @Override + public Optional<CheckpointBarrier> barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barrier) { + return Optional.empty(); } @Override - public boolean preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException { + public Optional<CheckpointBarrier> preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException { checkpointCoordinator.initCheckpoint(barrier.getId(), barrier.getCheckpointOptions()); for (final CheckpointableInput input : inputs) { input.checkpointStarted(barrier); } - return true; + return Optional.of(barrier); } @Override - public boolean postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) { + public Optional<CheckpointBarrier> postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) { resetPendingCheckpoint(barrier.getId()); - return false; + return Optional.empty(); } private void resetPendingCheckpoint(long cancelledId) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java index 008ea9f..41b297d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java @@ -86,6 +86,11 @@ public class MailboxProcessor implements Closeable { private Meter idleTime = new MeterView(new SimpleCounter()); + @VisibleForTesting + public MailboxProcessor() { + this(MailboxDefaultAction.Controller::suspendDefaultAction); + } + public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) { this(mailboxDefaultAction, StreamTaskActionExecutor.IMMEDIATE); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java index f2bd319..b9eac08 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java @@ -43,6 +43,7 @@ import org.apache.flink.streaming.api.operators.SyncMailboxExecutor; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.hamcrest.Matchers; +import org.hamcrest.collection.IsMapContaining; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -50,11 +51,16 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.stream.IntStream; import static org.apache.flink.streaming.runtime.io.UnalignedControllerTest.addSequence; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -143,6 +149,29 @@ public class AlignedControllerTest { // Tests // ------------------------------------------------------------------------ + public void testGetChannelsWithUnprocessedBarriers() throws IOException { + mockInputGate = new MockInputGate(4, Collections.emptyList()); + AlignedController alignedController = new AlignedController(mockInputGate); + BufferOrEvent barrier0 = createBarrier(1, 0); + BufferOrEvent barrier1 = createBarrier(1, 1); + BufferOrEvent barrier3 = createBarrier(1, 3); + alignedController.barrierAnnouncement(barrier0.getChannelInfo(), (CheckpointBarrier) barrier0.getEvent(), 0); + alignedController.barrierReceived(barrier0.getChannelInfo(), (CheckpointBarrier) barrier0.getEvent()); + alignedController.barrierAnnouncement(barrier1.getChannelInfo(), (CheckpointBarrier) barrier1.getEvent(), 1); + alignedController.barrierAnnouncement(barrier3.getChannelInfo(), (CheckpointBarrier) barrier3.getEvent(), 42); + + Collection<InputChannelInfo> blockedChannels = alignedController.getBlockedChannels(); + Map<InputChannelInfo, Integer> announcedChannels = alignedController.getSequenceNumberInAnnouncedChannels(); + + // blockedChannels and announcedChannels should be copies and shouldn't be cleared by the resumeConsumption + alignedController.resumeConsumption(); + + assertThat(blockedChannels, contains(barrier0.getChannelInfo())); + assertThat(announcedChannels, IsMapContaining.hasEntry(barrier1.getChannelInfo(), 1)); + assertThat(announcedChannels, IsMapContaining.hasEntry(barrier3.getChannelInfo(), 42)); + assertThat(announcedChannels.size(), equalTo(2)); + } + /** * Validates that the buffer behaves correctly if no checkpoint barriers come, * for a single input channel. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java index db958f3..8d81df2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java @@ -19,11 +19,18 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; +import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EventAnnouncement; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder; import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel; @@ -32,12 +39,14 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.streaming.api.operators.SyncMailboxExecutor; import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor; import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import static java.util.Collections.singletonList; import static junit.framework.TestCase.assertTrue; @@ -45,8 +54,11 @@ import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT; import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -84,7 +96,7 @@ public class AlternatingControllerTest { } @Test - public void testAlignedTimeoutableCheckpoint() throws Exception { + public void testAlignedNeverTimeoutableCheckpoint() throws Exception { int numChannels = 2; int bufferSize = 1000; ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); @@ -95,14 +107,248 @@ public class AlternatingControllerTest { Buffer neverTimeoutableCheckpoint = barrier(1, CHECKPOINT, checkpointCreationTime, Long.MAX_VALUE); send(neverTimeoutableCheckpoint, gate, 0); sendBuffer(bufferSize, gate, 1); - assertEquals(0, target.getTriggeredCheckpointCounter()); send(neverTimeoutableCheckpoint, gate, 1); + assertEquals(1, target.getTriggeredCheckpointCounter()); + } + + @Test + public void testTimeoutAlignment() throws Exception { + int numChannels = 2; + ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); + CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels); + + testTimeoutBarrierOnTwoChannels(target, gate); + } + + @Test + public void testTimeoutAlignmentAfterProcessingBarrier() throws Exception { + int numChannels = 3; + ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); + CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels); + long alignmentTimeout = 10; + long checkpointCreationTime = System.currentTimeMillis() - 2 * alignmentTimeout; + Buffer neverTimeoutableCheckpoint = barrier(1, CHECKPOINT, checkpointCreationTime, Long.MAX_VALUE); + + RemoteInputChannel channel2 = (RemoteInputChannel) gate.getChannel(2); + + channel2.onBuffer(neverTimeoutableCheckpoint.retainBuffer(), 0, 0); + while (gate.pollNext().isPresent()) { + } + + assertEquals(0, target.getTriggeredCheckpointCounter()); + + testTimeoutBarrierOnTwoChannels(target, gate); + } + + private void testTimeoutBarrierOnTwoChannels(ValidatingCheckpointHandler target, CheckpointedInputGate gate) throws Exception { + int bufferSize = 1000; + long alignmentTimeout = 10; + long checkpointCreationTime = System.currentTimeMillis() - 2 * alignmentTimeout; + Buffer checkpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, alignmentTimeout); + Buffer buffer = TestBufferFactory.createBuffer(bufferSize); + + RemoteInputChannel channel0 = (RemoteInputChannel) gate.getChannel(0); + RemoteInputChannel channel1 = (RemoteInputChannel) gate.getChannel(1); + channel0.onBuffer(buffer.retainBuffer(), 0, 0); + channel0.onBuffer(buffer.retainBuffer(), 1, 0); + channel0.onBuffer(checkpointBarrier.retainBuffer(), 2, 0); + channel1.onBuffer(buffer.retainBuffer(), 0, 0); + channel1.onBuffer(checkpointBarrier.retainBuffer(), 1, 0); + + assertEquals(0, target.getTriggeredCheckpointCounter()); + // First announcements and prioritsed barriers + List<AbstractEvent> events = new ArrayList<>(); + events.add(gate.pollNext().get().getEvent()); + events.add(gate.pollNext().get().getEvent()); + events.add(gate.pollNext().get().getEvent()); + events.add(gate.pollNext().get().getEvent()); + assertThat(events, containsInAnyOrder( + instanceOf(EventAnnouncement.class), + instanceOf(EventAnnouncement.class), + instanceOf(CheckpointBarrier.class), + instanceOf(CheckpointBarrier.class))); + assertEquals(1, target.getTriggeredCheckpointCounter()); + assertThat( + target.getTriggeredCheckpointOptions(), + contains(CheckpointOptions.create( + CHECKPOINT, + CheckpointStorageLocationReference.getDefault(), + true, + true, + 0))); + // Followed by overtaken buffers + assertFalse(gate.pollNext().get().isEvent()); + assertFalse(gate.pollNext().get().isEvent()); + assertFalse(gate.pollNext().get().isEvent()); + } + + /** + * This test tries to make sure that the first time out happens after processing + * {@link EventAnnouncement} but before/during processing the first {@link CheckpointBarrier}. + */ + @Test + public void testTimeoutAlignmentOnFirstBarrier() throws Exception { + int numChannels = 2; + ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); + CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels); + + long alignmentTimeout = 100; + long checkpointCreationTime = System.currentTimeMillis(); + Buffer checkpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, alignmentTimeout); + + RemoteInputChannel channel0 = (RemoteInputChannel) gate.getChannel(0); + RemoteInputChannel channel1 = (RemoteInputChannel) gate.getChannel(1); + channel0.onBuffer(checkpointBarrier.retainBuffer(), 0, 0); + channel1.onBuffer(checkpointBarrier.retainBuffer(), 0, 0); + + assertEquals(0, target.getTriggeredCheckpointCounter()); + // First announcements and prioritsed barriers + List<AbstractEvent> events = new ArrayList<>(); + events.add(gate.pollNext().get().getEvent()); + events.add(gate.pollNext().get().getEvent()); + + Thread.sleep(alignmentTimeout * 2); + + events.add(gate.pollNext().get().getEvent()); + assertThat(events, contains( + instanceOf(EventAnnouncement.class), + instanceOf(EventAnnouncement.class), + instanceOf(CheckpointBarrier.class))); assertEquals(1, target.getTriggeredCheckpointCounter()); } + /** + * First we process aligned {@link CheckpointBarrier} and after that we receive an already unaligned + * {@link CheckpointBarrier}, that has timed out on an upstream task. + */ + @Test + public void testTimeoutAlignmentOnUnalignedCheckpoint() throws Exception { + int numChannels = 2; + ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); + RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter(); + CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels, channelStateWriter); + + long checkpointCreationTime = System.currentTimeMillis(); + Buffer alignedCheckpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, Integer.MAX_VALUE); + Buffer unalignedCheckpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, 0); + Buffer buffer = TestBufferFactory.createBuffer(1000); + + RemoteInputChannel channel0 = (RemoteInputChannel) gate.getChannel(0); + RemoteInputChannel channel1 = (RemoteInputChannel) gate.getChannel(1); + channel0.onBuffer(alignedCheckpointBarrier.retainBuffer(), 0, 0); + + List<AbstractEvent> events = new ArrayList<>(); + events.add(gate.pollNext().get().getEvent()); + events.add(gate.pollNext().get().getEvent()); + + assertThat(events, contains( + instanceOf(EventAnnouncement.class), + instanceOf(CheckpointBarrier.class))); + + channel1.onBuffer(buffer.retainBuffer(), 0, 0); + channel1.onBuffer(buffer.retainBuffer(), 1, 0); + channel1.onBuffer(unalignedCheckpointBarrier.retainBuffer(), 2, 0); + + events.add(gate.pollNext().get().getEvent()); + + assertThat(events, contains( + instanceOf(EventAnnouncement.class), + instanceOf(CheckpointBarrier.class), + instanceOf(CheckpointBarrier.class))); + + assertEquals(channelStateWriter.getAddedInput().get(channel1.getChannelInfo()).size(), 2); + assertEquals(1, target.getTriggeredCheckpointCounter()); + } + + @Test + public void testTimeoutAlignmentConsistencyOnPreProcessBarrier() throws Exception { + testTimeoutAlignmentConsistency(true, false, false); + } + + @Test + public void testTimeoutAlignmentConsistencyOnProcessBarrier() throws Exception { + testTimeoutAlignmentConsistency(false, true, false); + } + + @Test + public void testTimeoutAlignmentConsistencyOnPostProcessBarrier() throws Exception { + testTimeoutAlignmentConsistency(false, false, true); + } + + public void testTimeoutAlignmentConsistency( + boolean sleepBeforePreProcess, + boolean sleepBeforeProcess, + boolean sleepBeforePostProcess) throws Exception { + ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); + SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(1).build(); + TestInputChannel channel0 = new TestInputChannel(gate, 0, false, true); + gate.setInputChannels(channel0); + + RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter(); + AlternatingController controller = new AlternatingController( + new AlignedController(gate), + new UnalignedController( + new TestSubtaskCheckpointCoordinator(channelStateWriter), + gate)); + + long checkpointCreationTime = System.currentTimeMillis(); + long alignmentTimeout = 10; + CheckpointBarrier barrier = checkpointBarrier(1, CHECKPOINT, checkpointCreationTime, alignmentTimeout); + + InputChannelInfo channelInfo = channel0.getChannelInfo(); + + controller.preProcessFirstBarrierOrAnnouncement(barrier); + controller.barrierAnnouncement(channelInfo, barrier, 1); + + if (sleepBeforePreProcess) { + Thread.sleep(alignmentTimeout * 2); + } + Optional<CheckpointBarrier> preProcessTrigger = controller.preProcessFirstBarrier(channelInfo, barrier); + if (sleepBeforeProcess) { + Thread.sleep(alignmentTimeout * 2); + } + Optional<CheckpointBarrier> processTrigger = controller.barrierReceived(channelInfo, barrier); + if (sleepBeforePostProcess) { + Thread.sleep(alignmentTimeout * 2); + } + Optional<CheckpointBarrier> postProcessTrigger = controller.postProcessLastBarrier(channelInfo, barrier); + + int triggeredCount = 0; + boolean unalignedCheckpoint = false; + if (preProcessTrigger.isPresent()) { + triggeredCount++; + unalignedCheckpoint = preProcessTrigger.get().getCheckpointOptions().isUnalignedCheckpoint(); + assertTrue(unalignedCheckpoint); + } + if (processTrigger.isPresent()) { + triggeredCount++; + unalignedCheckpoint = processTrigger.get().getCheckpointOptions().isUnalignedCheckpoint(); + assertTrue(unalignedCheckpoint); + } + if (postProcessTrigger.isPresent()) { + triggeredCount++; + unalignedCheckpoint = postProcessTrigger.get().getCheckpointOptions().isUnalignedCheckpoint(); + } + + assertEquals( + String.format( + "Checkpoint should be triggered exactly once, but [%s, %s, %s] was found instead", + preProcessTrigger.isPresent(), + processTrigger.isPresent(), + postProcessTrigger.isPresent()), + 1, + triggeredCount); + + if (unalignedCheckpoint) { + // check that we can add output data if we are in unaligned checkpoint mode. In other words + // if the state writer has been initialised correctly. + assertEquals(barrier.getId(), channelStateWriter.getLastStartedCheckpointId()); + } + } + @Test public void testMetricsAlternation() throws Exception { int numChannels = 2; @@ -362,6 +608,13 @@ public class AlternatingControllerTest { } private static SingleCheckpointBarrierHandler barrierHandler(SingleInputGate inputGate, AbstractInvokable target) { + return barrierHandler(inputGate, target, new RecordingChannelStateWriter()); + } + + private static SingleCheckpointBarrierHandler barrierHandler( + SingleInputGate inputGate, + AbstractInvokable target, + ChannelStateWriter stateWriter) { String taskName = "test"; return new SingleCheckpointBarrierHandler( taskName, @@ -369,7 +622,7 @@ public class AlternatingControllerTest { inputGate.getNumberOfInputChannels(), new AlternatingController( new AlignedController(inputGate), - new UnalignedController(TestSubtaskCheckpointCoordinator.INSTANCE, inputGate))); + new UnalignedController(new TestSubtaskCheckpointCoordinator(stateWriter), inputGate))); } private Buffer barrier(long barrierId, CheckpointType checkpointType) throws IOException { @@ -381,18 +634,27 @@ public class AlternatingControllerTest { } private Buffer barrier(long barrierId, CheckpointType checkpointType, long barrierTimestamp, long alignmentTimeout) throws IOException { + CheckpointBarrier checkpointBarrier = checkpointBarrier( + barrierId, + checkpointType, + barrierTimestamp, + alignmentTimeout); + return toBuffer( + checkpointBarrier, + checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint()); + } + + private CheckpointBarrier checkpointBarrier(long barrierId, CheckpointType checkpointType, long barrierTimestamp, long alignmentTimeout) { CheckpointOptions options = CheckpointOptions.create( checkpointType, CheckpointStorageLocationReference.getDefault(), true, true, alignmentTimeout); - return toBuffer( - new CheckpointBarrier( - barrierId, - barrierTimestamp, - options), - options.isUnalignedCheckpoint()); + return new CheckpointBarrier( + barrierId, + barrierTimestamp, + options); } private static CheckpointedInputGate buildGate(AbstractInvokable target, int numChannels) { @@ -405,4 +667,30 @@ public class AlternatingControllerTest { return new CheckpointedInputGate(gate, barrierHandler(gate, target), new SyncMailboxExecutor()); } + private static CheckpointedInputGate buildRemoteInputGate( + AbstractInvokable target, + int numChannels) throws IOException { + return buildRemoteInputGate(target, numChannels, new RecordingChannelStateWriter()); + } + + private static CheckpointedInputGate buildRemoteInputGate( + AbstractInvokable target, + int numChannels, + ChannelStateWriter channelStateWriter) throws IOException { + int maxUsedBuffers = 10; + NetworkBufferPool networkBufferPool = new NetworkBufferPool(numChannels * maxUsedBuffers, 4096); + SingleInputGate gate = new SingleInputGateBuilder() + .setChannelFactory(InputChannelBuilder::buildRemoteChannel) + .setNumberOfChannels(numChannels) + .setSegmentProvider(networkBufferPool) + .setBufferPoolFactory(networkBufferPool.createBufferPool(numChannels, maxUsedBuffers)) + .setChannelStateWriter(channelStateWriter) + .build(); + gate.setup(); + gate.requestPartitions(); + // do not fire events automatically. If you need events, you should expose mailboxProcessor and + // execute it step by step + MailboxProcessor mailboxProcessor = new MailboxProcessor(); + return new CheckpointedInputGate(gate, barrierHandler(gate, target, channelStateWriter), mailboxProcessor.getMainMailboxExecutor()); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java index f426289..ce36b39 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java @@ -46,7 +46,8 @@ public class ValidatingCheckpointHandler extends AbstractInvokable { protected long abortedCheckpointCounter = 0; protected CompletableFuture<Long> lastAlignmentDurationNanos; protected CompletableFuture<Long> lastBytesProcessedDuringAlignment; - protected List<Long> triggeredCheckpoints = new ArrayList<>(); + protected final List<Long> triggeredCheckpoints = new ArrayList<>(); + protected final List<CheckpointOptions> triggeredCheckpointOptions = new ArrayList<>(); public ValidatingCheckpointHandler() { this(-1); @@ -89,6 +90,10 @@ public class ValidatingCheckpointHandler extends AbstractInvokable { return lastBytesProcessedDuringAlignment; } + public List<CheckpointOptions> getTriggeredCheckpointOptions() { + return triggeredCheckpointOptions; + } + @Override public void invoke() { throw new UnsupportedOperationException(); @@ -119,6 +124,7 @@ public class ValidatingCheckpointHandler extends AbstractInvokable { lastBytesProcessedDuringAlignment = checkpointMetrics.getBytesProcessedDuringAlignment(); triggeredCheckpoints.add(checkpointMetaData.getCheckpointId()); + triggeredCheckpointOptions.add(checkpointOptions); } @Override
