This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 26ca3f6d8af862f30d71ebcf9d0b22eab149ba85 Author: Roman Khachatryan <[email protected]> AuthorDate: Wed Dec 2 14:20:26 2020 +0100 [hotfix][checkpointing] Explicit creation of CheckpointOptions The motivation is to eliminate subtle bugs when changing checkpoint type on the fly. 1. Only guess options when creating a new barrier from configuration 2. For other cases provide explicit factory methods 2. Carry the current checkpoint/barrier requirements instead of the initial configuration. --- .../runtime/checkpoint/CheckpointCoordinator.java | 2 +- .../runtime/checkpoint/CheckpointOptions.java | 89 +++--- .../runtime/io/network/api/CheckpointBarrier.java | 2 +- .../checkpoint/CheckpointCoordinatorTest.java | 5 +- .../runtime/checkpoint/CheckpointOptionsTest.java | 53 +--- .../api/serialization/EventSerializerTest.java | 8 +- .../PipelinedSubpartitionWithReadViewTest.java | 15 +- .../partition/consumer/LocalInputChannelTest.java | 3 +- .../partition/consumer/RemoteInputChannelTest.java | 32 +-- .../runtime/tasks/SourceOperatorStreamTask.java | 6 +- .../streaming/runtime/tasks/SourceStreamTask.java | 6 +- .../runtime/io/AlternatingControllerTest.java | 303 +++++++++------------ .../runtime/io/InputProcessorUtilTest.java | 3 +- .../runtime/io/StreamTaskNetworkInputTest.java | 2 +- .../io/UnalignedControllerCancellationTest.java | 2 +- .../runtime/io/UnalignedControllerTest.java | 8 +- ...tStreamTaskChainedSourcesCheckpointingTest.java | 2 +- .../tasks/SubtaskCheckpointCoordinatorTest.java | 5 +- 18 files changed, 249 insertions(+), 297 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 9154989..27635c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -778,7 +778,7 @@ public class CheckpointCoordinator { Execution[] executions, boolean advanceToEndOfTime) { - final CheckpointOptions checkpointOptions = CheckpointOptions.create( + final CheckpointOptions checkpointOptions = CheckpointOptions.forConfig( props.getCheckpointType(), checkpointStorageLocation.getLocationReference(), isExactlyOnceMode, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java index 1f12e7f..a788abd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java @@ -25,11 +25,13 @@ import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import java.io.Serializable; import java.util.Objects; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** - * Options for performing the checkpoint. + * Options for performing the checkpoint. Note that different + * {@link org.apache.flink.runtime.io.network.api.CheckpointBarrier barriers} may have different options. * * <p>The {@link CheckpointProperties} are related and cover properties that * are only relevant at the {@link CheckpointCoordinator}. These options are @@ -53,19 +55,59 @@ public class CheckpointOptions implements Serializable { private final long alignmentTimeout; - public static CheckpointOptions create( + public static CheckpointOptions notExactlyOnce(CheckpointType type, CheckpointStorageLocationReference location) { + return new CheckpointOptions( + type, + location, + false, + false, + NO_ALIGNMENT_TIME_OUT); + } + + public static CheckpointOptions alignedNoTimeout(CheckpointType type, CheckpointStorageLocationReference location) { + return new CheckpointOptions( + type, + location, + true, + false, + NO_ALIGNMENT_TIME_OUT); + } + + public static CheckpointOptions unaligned(CheckpointStorageLocationReference location) { + return new CheckpointOptions( + CheckpointType.CHECKPOINT, + location, + true, + true, + NO_ALIGNMENT_TIME_OUT); + } + + public static CheckpointOptions alignedWithTimeout(CheckpointStorageLocationReference location, long alignmentTimeout) { + return new CheckpointOptions( + CheckpointType.CHECKPOINT, + location, + true, + false, + alignmentTimeout); + } + + public static CheckpointOptions forConfig( CheckpointType checkpointType, CheckpointStorageLocationReference locationReference, boolean isExactlyOnceMode, - boolean unalignedCheckpointsEnabled, + boolean isUnalignedEnabled, long alignmentTimeout) { - boolean canBeUnaligned = checkpointType == CheckpointType.CHECKPOINT && unalignedCheckpointsEnabled; - return new CheckpointOptions( - checkpointType, - locationReference, - isExactlyOnceMode, - canBeUnaligned && alignmentTimeout == 0, - canBeUnaligned ? alignmentTimeout : NO_ALIGNMENT_TIME_OUT); + if (!isExactlyOnceMode) { + return notExactlyOnce(checkpointType, locationReference); + } else if (checkpointType.isSavepoint()) { + return alignedNoTimeout(checkpointType, locationReference); + } else if (!isUnalignedEnabled) { + return alignedNoTimeout(checkpointType, locationReference); + } else if (alignmentTimeout == 0 || alignmentTimeout == NO_ALIGNMENT_TIME_OUT) { + return unaligned(locationReference); + } else { + return alignedWithTimeout(locationReference, alignmentTimeout); + } } @VisibleForTesting @@ -82,6 +124,8 @@ public class CheckpointOptions implements Serializable { boolean isUnalignedCheckpoint, long alignmentTimeout) { + checkArgument(!isUnalignedCheckpoint || !checkpointType.isSavepoint(), "Savepoint can't be unaligned"); + checkArgument(alignmentTimeout == NO_ALIGNMENT_TIME_OUT || !isUnalignedCheckpoint, "Unaligned checkpoint can't have timeout (%s)", alignmentTimeout); this.checkpointType = checkNotNull(checkpointType); this.targetLocation = checkNotNull(targetLocation); this.isExactlyOnceMode = isExactlyOnceMode; @@ -98,7 +142,7 @@ public class CheckpointOptions implements Serializable { } public boolean isTimeoutable() { - return !isUnalignedCheckpoint && (alignmentTimeout > 0 && alignmentTimeout != NO_ALIGNMENT_TIME_OUT); + return isExactlyOnceMode && !isUnalignedCheckpoint && (alignmentTimeout > 0 && alignmentTimeout != NO_ALIGNMENT_TIME_OUT); } // ------------------------------------------------------------------------ @@ -178,25 +222,8 @@ public class CheckpointOptions implements Serializable { return CHECKPOINT_AT_DEFAULT_LOCATION; } - public static CheckpointOptions forCheckpointWithDefaultLocation( - boolean isExactlyOnceMode, - boolean isUnalignedCheckpoint, - long alignmentTimeout) { - return new CheckpointOptions( - CheckpointType.CHECKPOINT, - CheckpointStorageLocationReference.getDefault(), - isExactlyOnceMode, - isUnalignedCheckpoint, - alignmentTimeout); - } - - public CheckpointOptions asTimedOut() { - checkState(checkpointType == CheckpointType.CHECKPOINT); - return create( - checkpointType, - targetLocation, - isExactlyOnceMode, - true, - 0); + public CheckpointOptions toUnaligned() { + checkState(!isUnalignedCheckpoint); + return unaligned(targetLocation); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java index e734616..e7e78e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java @@ -119,6 +119,6 @@ public class CheckpointBarrier extends RuntimeEvent { } public CheckpointBarrier asUnaligned() { - return checkpointOptions.isUnalignedCheckpoint() ? this : new CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().asTimedOut()); + return checkpointOptions.isUnalignedCheckpoint() ? this : new CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().toUnaligned()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 128b74c..d5b0851 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -2620,7 +2620,10 @@ public class CheckpointCoordinatorTest extends TestLogger { return new CheckpointCoordinatorBuilder() .setJobId(jobId) .setTasks(new ExecutionVertex[]{ vertex1, vertex2 }) - .setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()) + .setCheckpointCoordinatorConfiguration( + CheckpointCoordinatorConfiguration.builder() + .setAlignmentTimeout(Long.MAX_VALUE) + .setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()) .setTimer(manuallyTriggeredScheduledExecutor) .build(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java index b443e8a..a1485ca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java @@ -25,6 +25,7 @@ import org.junit.Test; import java.util.Random; +import static org.apache.flink.runtime.checkpoint.CheckpointOptions.NO_ALIGNMENT_TIME_OUT; import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT; import static org.junit.Assert.assertArrayEquals; @@ -63,64 +64,40 @@ public class CheckpointOptionsTest { assertArrayEquals(locationBytes, copy.getTargetLocation().getReferenceBytes()); } - @Test + @Test(expected = IllegalArgumentException.class) public void testSavepointNeedsAlignment() { - CheckpointStorageLocationReference location = CheckpointStorageLocationReference.getDefault(); - assertTrue(new CheckpointOptions(SAVEPOINT, location, true, true, 0).needsAlignment()); - assertFalse(new CheckpointOptions(SAVEPOINT, location, false, true, 0).needsAlignment()); - assertTrue(new CheckpointOptions(SAVEPOINT, location, true, false, 0).needsAlignment()); - assertFalse(new CheckpointOptions(SAVEPOINT, location, false, false, 0).needsAlignment()); + new CheckpointOptions(SAVEPOINT, CheckpointStorageLocationReference.getDefault(), true, true, 0); } @Test public void testCheckpointNeedsAlignment() { CheckpointStorageLocationReference location = CheckpointStorageLocationReference.getDefault(); - assertFalse(new CheckpointOptions(CHECKPOINT, location, true, true, 0).needsAlignment()); - assertTrue(new CheckpointOptions(CHECKPOINT, location, true, false, 0).needsAlignment()); - assertFalse(new CheckpointOptions(CHECKPOINT, location, false, true, 0).needsAlignment()); - assertFalse(new CheckpointOptions(CHECKPOINT, location, false, false, 0).needsAlignment()); + assertFalse(new CheckpointOptions(CHECKPOINT, location, true, true, Long.MAX_VALUE).needsAlignment()); + assertTrue(new CheckpointOptions(CHECKPOINT, location, true, false, Long.MAX_VALUE).needsAlignment()); + assertFalse(new CheckpointOptions(CHECKPOINT, location, false, true, Long.MAX_VALUE).needsAlignment()); + assertFalse(new CheckpointOptions(CHECKPOINT, location, false, false, Long.MAX_VALUE).needsAlignment()); } @Test public void testCheckpointIsTimeoutable() { CheckpointStorageLocationReference location = CheckpointStorageLocationReference.getDefault(); assertTimeoutable( - CheckpointOptions.create( - CHECKPOINT, - location, - true, - true, - 10), + CheckpointOptions.alignedWithTimeout(location, 10), false, true, 10); assertTimeoutable( - CheckpointOptions.create( - CHECKPOINT, - location, - true, - false, - 10), - false, - false, - CheckpointOptions.NO_ALIGNMENT_TIME_OUT); - assertTimeoutable( - CheckpointOptions.create( - CHECKPOINT, - location, - true, - true, - 0), + CheckpointOptions.unaligned(location), true, false, - 0); + NO_ALIGNMENT_TIME_OUT); } private void assertTimeoutable(CheckpointOptions options, boolean isUnaligned, boolean isTimeoutable, long timeout) { - assertTrue(options.isExactlyOnceMode()); - assertEquals(!isUnaligned, options.needsAlignment()); - assertEquals(isUnaligned, options.isUnalignedCheckpoint()); - assertEquals(isTimeoutable, options.isTimeoutable()); - assertEquals(timeout, options.getAlignmentTimeout()); + assertTrue("exactly once", options.isExactlyOnceMode()); + assertEquals("need alignment", !isUnaligned, options.needsAlignment()); + assertEquals("unaligned", isUnaligned, options.isUnalignedCheckpoint()); + assertEquals("timeoutable", isTimeoutable, options.isTimeoutable()); + assertEquals("timeout", timeout, options.getAlignmentTimeout()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java index ee97672..06c375f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -55,12 +54,7 @@ public class EventSerializerTest { new EventAnnouncement(new CheckpointBarrier( 42L, 1337L, - CheckpointOptions.create( - CheckpointType.CHECKPOINT, - CheckpointStorageLocationReference.getDefault(), - true, - true, - 10)), + CheckpointOptions.alignedWithTimeout(CheckpointStorageLocationReference.getDefault(), 10)), 44) }; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java index 38cd22f..b2a4c23 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter; import org.apache.flink.runtime.event.AbstractEvent; @@ -338,12 +337,7 @@ public class PipelinedSubpartitionWithReadViewTest { assertEquals(1, availablityListener.getNumNotifications()); assertEquals(0, availablityListener.getNumPriorityEvents()); - CheckpointOptions options = new CheckpointOptions( - CheckpointType.CHECKPOINT, - new CheckpointStorageLocationReference(new byte[]{0, 1, 2}), - true, - true, - 0); + CheckpointOptions options = CheckpointOptions.unaligned(new CheckpointStorageLocationReference(new byte[]{0, 1, 2})); channelStateWriter.start(0, options); BufferConsumer barrierBuffer = EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options), true); subpartition.add(barrierBuffer); @@ -366,12 +360,7 @@ public class PipelinedSubpartitionWithReadViewTest { public void testAvailabilityAfterPriority() throws Exception { subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP); - CheckpointOptions options = new CheckpointOptions( - CheckpointType.CHECKPOINT, - new CheckpointStorageLocationReference(new byte[]{0, 1, 2}), - true, - true, - 0); + CheckpointOptions options = CheckpointOptions.unaligned(new CheckpointStorageLocationReference(new byte[]{0, 1, 2})); BufferConsumer barrierBuffer = EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options), true); subpartition.add(barrierBuffer); assertEquals(1, availablityListener.getNumNotifications()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 8785a68..7b69780 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.CancelTaskException; @@ -501,7 +500,7 @@ public class LocalInputChannelTest { channel.requestSubpartition(0); final CheckpointStorageLocationReference location = getDefault(); - CheckpointOptions options = new CheckpointOptions(CheckpointType.CHECKPOINT, location, true, true, 0); + CheckpointOptions options = CheckpointOptions.unaligned(location); stateWriter.start(0, options); final CheckpointBarrier barrier = new CheckpointBarrier(0, 123L, options); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 2954a3f..d13cb72 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -44,7 +44,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TestTaskBuilder; @@ -83,6 +82,7 @@ import static org.apache.flink.runtime.io.network.partition.AvailabilityUtil.ass import static org.apache.flink.runtime.io.network.partition.AvailabilityUtil.assertPriorityAvailability; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer; +import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -107,7 +107,9 @@ import static org.mockito.Mockito.when; */ public class RemoteInputChannelTest { - public static final long CHECKPOINT_ID = 1L; + private static final long CHECKPOINT_ID = 1L; + private static final CheckpointOptions UNALIGNED = CheckpointOptions.unaligned(getDefault()); + private static final CheckpointOptions ALIGNED_WITH_TIMEOUT = CheckpointOptions.alignedWithTimeout(getDefault(), 10); @Test public void testExceptionOnReordering() throws Exception { @@ -1136,7 +1138,7 @@ public class RemoteInputChannelTest { final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); - sendBarrier(channel, sequenceNumber++, 0); + sendBarrier(channel, sequenceNumber++, UNALIGNED); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); @@ -1150,7 +1152,7 @@ public class RemoteInputChannelTest { final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); - sendBarrier(channel, sequenceNumber++, 0); + sendBarrier(channel, sequenceNumber++, UNALIGNED); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); assertInflightBufferSizes(channel, 1, 2); @@ -1176,7 +1178,7 @@ public class RemoteInputChannelTest { int sequenceNumber = startingSequence; sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); - sendBarrier(channel, sequenceNumber++, 0); + sendBarrier(channel, sequenceNumber++, UNALIGNED); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); assertThat( @@ -1193,7 +1195,7 @@ public class RemoteInputChannelTest { final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); - sendBarrier(channel, sequenceNumber++, 0); + sendBarrier(channel, sequenceNumber++, UNALIGNED); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); assertGetNextBufferSequenceNumbers(channel, 2, 0); @@ -1213,7 +1215,7 @@ public class RemoteInputChannelTest { final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); - sendBarrier(channel, sequenceNumber++, 10); + sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); @@ -1239,7 +1241,7 @@ public class RemoteInputChannelTest { final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); - sendBarrier(channel, sequenceNumber++, 10); + sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); assertInflightBufferSizes(channel, 1, 2); @@ -1252,7 +1254,7 @@ public class RemoteInputChannelTest { final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); - sendBarrier(channel, sequenceNumber++, 10); + sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); assertGetNextBufferSequenceNumbers(channel, 2); @@ -1266,20 +1268,14 @@ public class RemoteInputChannelTest { final RemoteInputChannel channel = buildInputGateAndGetChannel(sequenceNumber); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); - sendBarrier(channel, sequenceNumber++, 10); + sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT); sendBuffer(channel, sequenceNumber++, bufferSize++); sendBuffer(channel, sequenceNumber++, bufferSize++); assertGetNextBufferSequenceNumbers(channel, 2, 0); assertInflightBufferSizes(channel, 2); } - private void sendBarrier(RemoteInputChannel channel, int sequenceNumber, int alignmentTimeout) throws IOException { - CheckpointOptions checkpointOptions = CheckpointOptions.create( - CHECKPOINT, - CheckpointStorageLocationReference.getDefault(), - true, - true, - alignmentTimeout); + private void sendBarrier(RemoteInputChannel channel, int sequenceNumber, CheckpointOptions checkpointOptions) throws IOException { send( channel, sequenceNumber, @@ -1438,7 +1434,7 @@ public class RemoteInputChannelTest { SingleInputGate inputGate = new SingleInputGateBuilder().build(); RemoteInputChannel channel = InputChannelTestUtils.createRemoteInputChannel(inputGate, 0); - CheckpointOptions options = new CheckpointOptions(CHECKPOINT, CheckpointStorageLocationReference.getDefault()); + CheckpointOptions options = new CheckpointOptions(CHECKPOINT, getDefault()); assertPriorityAvailability(inputGate, false, false, () -> assertAvailability(inputGate, false, true, () -> { channel.onBuffer(toBuffer(new CheckpointBarrier(1L, 123L, options), false), 0, 0); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java index 1f91062..ec20fdf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java @@ -23,7 +23,9 @@ import org.apache.flink.api.connector.source.ExternallyInducedSourceReader; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.SourceOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -131,7 +133,9 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, // -------------------------- private void triggerCheckpointForExternallyInducedSource(long checkpointId) { - final CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation( + final CheckpointOptions checkpointOptions = CheckpointOptions.forConfig( + CheckpointType.CHECKPOINT, + CheckpointStorageLocationReference.getDefault(), configuration.isExactlyOnceCheckpointMode(), configuration.isUnalignedCheckpointsEnabled(), configuration.getAlignmentTimeout()); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index afd67f3..5b10df5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -21,9 +21,11 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.util.FatalExitExceptionHandler; import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -90,7 +92,9 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S // TODO - we need to see how to derive those. We should probably not encode this in the // TODO - source's trigger message, but do a handshake in this task between the trigger // TODO - message from the master, and the source's trigger notification - final CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation( + final CheckpointOptions checkpointOptions = CheckpointOptions.forConfig( + CheckpointType.CHECKPOINT, + CheckpointStorageLocationReference.getDefault(), configuration.isExactlyOnceCheckpointMode(), configuration.isUnalignedCheckpointsEnabled(), configuration.getAlignmentTimeout()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java index d998649..a7be650 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java @@ -23,20 +23,19 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter; import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.event.RuntimeEvent; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EventAnnouncement; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder; import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel; -import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.streaming.api.operators.SyncMailboxExecutor; import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor; @@ -50,15 +49,18 @@ import java.util.Optional; import static java.util.Collections.singletonList; import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.checkpoint.CheckpointOptions.alignedNoTimeout; +import static org.apache.flink.runtime.checkpoint.CheckpointOptions.alignedWithTimeout; +import static org.apache.flink.runtime.checkpoint.CheckpointOptions.unaligned; import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT; import static org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer; +import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer; +import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -146,7 +148,7 @@ public class AlternatingControllerTest { barriers.add(barrier); CheckpointType type = barrier % 2 == 0 ? CHECKPOINT : SAVEPOINT; for (int channel = 0; channel < numChannels; channel++) { - sendBarrier(barrier, type, (TestInputChannel) gate.getChannel(channel), gate); + send(barrier(barrier, System.currentTimeMillis(), alignedNoTimeout(type, getDefault())).retainBuffer(), channel, gate); } } assertEquals(barriers, target.triggeredCheckpoints); @@ -155,28 +157,22 @@ public class AlternatingControllerTest { @Test public void testAlignedNeverTimeoutableCheckpoint() throws Exception { int numChannels = 2; - int bufferSize = 1000; ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); CheckpointedInputGate gate = buildGate(target, numChannels); - long checkpointCreationTime = System.currentTimeMillis(); - // Aligned checkpoint that never times out - Buffer neverTimeoutableCheckpoint = barrier(1, CHECKPOINT, checkpointCreationTime, Long.MAX_VALUE); - send(neverTimeoutableCheckpoint, gate, 0); - sendBuffer(bufferSize, gate, 1); + Buffer neverTimeoutableCheckpoint = withTimeout(Integer.MAX_VALUE); + send(neverTimeoutableCheckpoint, 0, gate); + sendData(1000, 1, gate); assertEquals(0, target.getTriggeredCheckpointCounter()); - send(neverTimeoutableCheckpoint, gate, 1); + send(neverTimeoutableCheckpoint, 1, gate); assertEquals(1, target.getTriggeredCheckpointCounter()); } @Test public void testTimeoutAlignment() throws Exception { - int numChannels = 2; ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); - CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels); - - testTimeoutBarrierOnTwoChannels(target, gate); + testTimeoutBarrierOnTwoChannels(target, buildRemoteInputGate(target, 2)); } @Test @@ -185,15 +181,7 @@ public class AlternatingControllerTest { ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels); - long alignmentTimeout = 10; - long checkpointCreationTime = System.currentTimeMillis() - 2 * alignmentTimeout; - Buffer neverTimeoutableCheckpoint = barrier(1, CHECKPOINT, checkpointCreationTime, Long.MAX_VALUE); - - RemoteInputChannel channel2 = (RemoteInputChannel) gate.getChannel(2); - - channel2.onBuffer(neverTimeoutableCheckpoint.retainBuffer(), 0, 0); - while (gate.pollNext().isPresent()) { - } + send(barrier(1, System.currentTimeMillis(), alignedWithTimeout(getDefault(), Integer.MAX_VALUE)), 2, gate); assertEquals(0, target.getTriggeredCheckpointCounter()); @@ -201,46 +189,31 @@ public class AlternatingControllerTest { } private void testTimeoutBarrierOnTwoChannels(ValidatingCheckpointHandler target, CheckpointedInputGate gate) throws Exception { - int bufferSize = 1000; long alignmentTimeout = 10; - long checkpointCreationTime = System.currentTimeMillis() - 2 * alignmentTimeout; - Buffer checkpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, alignmentTimeout); - Buffer buffer = TestBufferFactory.createBuffer(bufferSize); - - RemoteInputChannel channel0 = (RemoteInputChannel) gate.getChannel(0); - RemoteInputChannel channel1 = (RemoteInputChannel) gate.getChannel(1); - channel0.onBuffer(buffer.retainBuffer(), 0, 0); - channel0.onBuffer(buffer.retainBuffer(), 1, 0); - channel0.onBuffer(checkpointBarrier.retainBuffer(), 2, 0); - channel1.onBuffer(buffer.retainBuffer(), 0, 0); - channel1.onBuffer(checkpointBarrier.retainBuffer(), 1, 0); + Buffer checkpointBarrier = withTimeout(alignmentTimeout); + + getChannel(gate, 0).onBuffer(dataBuffer(), 0, 0); + getChannel(gate, 0).onBuffer(dataBuffer(), 1, 0); + getChannel(gate, 0).onBuffer(checkpointBarrier.retainBuffer(), 2, 0); + getChannel(gate, 1).onBuffer(dataBuffer(), 0, 0); + getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 1, 0); assertEquals(0, target.getTriggeredCheckpointCounter()); - // First announcements and prioritsed barriers - List<AbstractEvent> events = new ArrayList<>(); - events.add(gate.pollNext().get().getEvent()); + assertAnnouncement(gate); Thread.sleep(alignmentTimeout * 2); - events.add(gate.pollNext().get().getEvent()); - events.add(gate.pollNext().get().getEvent()); - events.add(gate.pollNext().get().getEvent()); - assertThat(events, containsInAnyOrder( - instanceOf(EventAnnouncement.class), - instanceOf(EventAnnouncement.class), - instanceOf(CheckpointBarrier.class), - instanceOf(CheckpointBarrier.class))); + assertAnnouncement(gate); + assertBarrier(gate); + assertBarrier(gate); assertEquals(1, target.getTriggeredCheckpointCounter()); - assertThat( - target.getTriggeredCheckpointOptions(), - contains(CheckpointOptions.create( - CHECKPOINT, - CheckpointStorageLocationReference.getDefault(), - true, - true, - 0))); + assertThat(target.getTriggeredCheckpointOptions(), contains(unaligned(getDefault()))); // Followed by overtaken buffers - assertFalse(gate.pollNext().get().isEvent()); - assertFalse(gate.pollNext().get().isEvent()); - assertFalse(gate.pollNext().get().isEvent()); + assertData(gate); + assertData(gate); + assertData(gate); + } + + private Buffer dataBuffer() { + return createBuffer(100).retainBuffer(); } /** @@ -254,27 +227,21 @@ public class AlternatingControllerTest { CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels); long alignmentTimeout = 100; - long checkpointCreationTime = System.currentTimeMillis(); - Buffer checkpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, alignmentTimeout); + Buffer checkpointBarrier = withTimeout(alignmentTimeout); - RemoteInputChannel channel0 = (RemoteInputChannel) gate.getChannel(0); - RemoteInputChannel channel1 = (RemoteInputChannel) gate.getChannel(1); - channel0.onBuffer(checkpointBarrier.retainBuffer(), 0, 0); - channel1.onBuffer(checkpointBarrier.retainBuffer(), 0, 0); + for (int i = 0; i < numChannels; i++) { + (getChannel(gate, i)).onBuffer(checkpointBarrier.retainBuffer(), 0, 0); + } assertEquals(0, target.getTriggeredCheckpointCounter()); - // First announcements and prioritsed barriers - List<AbstractEvent> events = new ArrayList<>(); - events.add(gate.pollNext().get().getEvent()); - events.add(gate.pollNext().get().getEvent()); + for (int i = 0; i < numChannels; i++) { + assertAnnouncement(gate); + } + assertEquals(0, target.getTriggeredCheckpointCounter()); - Thread.sleep(alignmentTimeout * 2); + Thread.sleep(alignmentTimeout * 4); - events.add(gate.pollNext().get().getEvent()); - assertThat(events, contains( - instanceOf(EventAnnouncement.class), - instanceOf(EventAnnouncement.class), - instanceOf(CheckpointBarrier.class))); + assertBarrier(gate); assertEquals(1, target.getTriggeredCheckpointCounter()); } @@ -284,43 +251,29 @@ public class AlternatingControllerTest { */ @Test public void testTimeoutAlignmentOnUnalignedCheckpoint() throws Exception { - int numChannels = 2; ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter(); - CheckpointedInputGate gate = buildRemoteInputGate(target, numChannels, channelStateWriter); - - long checkpointCreationTime = System.currentTimeMillis(); - Buffer alignedCheckpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, Integer.MAX_VALUE); - Buffer unalignedCheckpointBarrier = barrier(1, CHECKPOINT, checkpointCreationTime, 0); - Buffer buffer = TestBufferFactory.createBuffer(1000); + CheckpointedInputGate gate = buildRemoteInputGate(target, 2, channelStateWriter); - RemoteInputChannel channel0 = (RemoteInputChannel) gate.getChannel(0); - RemoteInputChannel channel1 = (RemoteInputChannel) gate.getChannel(1); - channel0.onBuffer(alignedCheckpointBarrier.retainBuffer(), 0, 0); + getChannel(gate, 0).onBuffer(withTimeout(Integer.MAX_VALUE).retainBuffer(), 0, 0); - List<AbstractEvent> events = new ArrayList<>(); - events.add(gate.pollNext().get().getEvent()); - events.add(gate.pollNext().get().getEvent()); + assertAnnouncement(gate); + assertBarrier(gate); - assertThat(events, contains( - instanceOf(EventAnnouncement.class), - instanceOf(CheckpointBarrier.class))); + getChannel(gate, 1).onBuffer(dataBuffer(), 0, 0); + getChannel(gate, 1).onBuffer(dataBuffer(), 1, 0); + getChannel(gate, 1).onBuffer(toBuffer(new CheckpointBarrier(1, System.currentTimeMillis(), unaligned(getDefault())), true).retainBuffer(), 2, 0); - channel1.onBuffer(buffer.retainBuffer(), 0, 0); - channel1.onBuffer(buffer.retainBuffer(), 1, 0); - channel1.onBuffer(unalignedCheckpointBarrier.retainBuffer(), 2, 0); + assertBarrier(gate); - events.add(gate.pollNext().get().getEvent()); - - assertThat(events, contains( - instanceOf(EventAnnouncement.class), - instanceOf(CheckpointBarrier.class), - instanceOf(CheckpointBarrier.class))); - - assertEquals(channelStateWriter.getAddedInput().get(channel1.getChannelInfo()).size(), 2); + assertEquals(channelStateWriter.getAddedInput().get(getChannel(gate, 1).getChannelInfo()).size(), 2); assertEquals(1, target.getTriggeredCheckpointCounter()); } + private RemoteInputChannel getChannel(CheckpointedInputGate gate, int channelIndex) { + return (RemoteInputChannel) gate.getChannel(channelIndex); + } + @Test public void testTimeoutAlignmentConsistencyOnPreProcessBarrier() throws Exception { testTimeoutAlignmentConsistency(true, false, false); @@ -352,9 +305,8 @@ public class AlternatingControllerTest { new TestSubtaskCheckpointCoordinator(channelStateWriter), gate)); - long checkpointCreationTime = System.currentTimeMillis(); long alignmentTimeout = 10; - CheckpointBarrier barrier = checkpointBarrier(1, CHECKPOINT, checkpointCreationTime, alignmentTimeout); + CheckpointBarrier barrier = new CheckpointBarrier(1, System.currentTimeMillis(), alignedNoTimeout(CHECKPOINT, getDefault())); InputChannelInfo channelInfo = channel0.getChannelInfo(); @@ -417,12 +369,12 @@ public class AlternatingControllerTest { long startNanos = System.nanoTime(); long checkpoint1CreationTime = System.currentTimeMillis() - 10; sendBarrier(1, checkpoint1CreationTime, CHECKPOINT, gate, 0); - sendBuffer(bufferSize, gate, 0); - sendBuffer(bufferSize, gate, 1); + sendData(bufferSize, 0, gate); + sendData(bufferSize, 1, gate); Thread.sleep(6); sendBarrier(1, checkpoint1CreationTime, CHECKPOINT, gate, 1); - sendBuffer(bufferSize, gate, 0); + sendData(bufferSize, 0, gate); assertMetrics( target, @@ -436,7 +388,7 @@ public class AlternatingControllerTest { startNanos = System.nanoTime(); long checkpoint2CreationTime = System.currentTimeMillis() - 5; sendBarrier(2, checkpoint2CreationTime, SAVEPOINT, gate, 0); - sendBuffer(bufferSize, gate, 1); + sendData(bufferSize, 1, gate); assertMetrics( target, @@ -448,7 +400,7 @@ public class AlternatingControllerTest { bufferSize * 2); Thread.sleep(5); sendBarrier(2, checkpoint2CreationTime, SAVEPOINT, gate, 1); - sendBuffer(bufferSize, gate, 0); + sendData(bufferSize, 0, gate); assertMetrics( target, @@ -461,9 +413,9 @@ public class AlternatingControllerTest { startNanos = System.nanoTime(); long checkpoint3CreationTime = System.currentTimeMillis() - 7; - sendBarrier(3, checkpoint3CreationTime, CHECKPOINT, gate, 0); - sendBuffer(bufferSize, gate, 0); - sendBuffer(bufferSize, gate, 1); + send(barrier(3, checkpoint3CreationTime, unaligned(getDefault())), 0, gate); + sendData(bufferSize, 0, gate); + sendData(bufferSize, 1, gate); assertMetrics( target, gate.getCheckpointBarrierHandler(), @@ -473,7 +425,7 @@ public class AlternatingControllerTest { 7_000_000L, -1L); Thread.sleep(10); - sendBarrier(3, checkpoint2CreationTime, CHECKPOINT, gate, 1); + send(barrier(3, checkpoint2CreationTime, unaligned(getDefault())), 1, gate); assertMetrics( target, gate.getCheckpointBarrierHandler(), @@ -487,16 +439,15 @@ public class AlternatingControllerTest { @Test public void testMetricsSingleChannel() throws Exception { int numChannels = 1; - int bufferSize = 1000; ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); CheckpointedInputGate gate = buildGate(target, numChannels); long checkpoint1CreationTime = System.currentTimeMillis() - 10; long startNanos = System.nanoTime(); - sendBuffer(bufferSize, gate, 0); + sendData(1000, 0, gate); sendBarrier(1, checkpoint1CreationTime, CHECKPOINT, gate, 0); - sendBuffer(bufferSize, gate, 0); + sendData(1000, 0, gate); Thread.sleep(6); assertMetrics( target, @@ -509,9 +460,9 @@ public class AlternatingControllerTest { long checkpoint2CreationTime = System.currentTimeMillis() - 5; startNanos = System.nanoTime(); - sendBuffer(bufferSize, gate, 0); + sendData(1000, 0, gate); sendBarrier(2, checkpoint2CreationTime, SAVEPOINT, gate, 0); - sendBuffer(bufferSize, gate, 0); + sendData(1000, 0, gate); Thread.sleep(5); assertMetrics( target, @@ -561,7 +512,7 @@ public class AlternatingControllerTest { if (type.isSavepoint()) { channels[channel].setBlocked(true); } - barrierHandler.processBarrier(new CheckpointBarrier(i, System.currentTimeMillis(), new CheckpointOptions(type, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, channel)); + barrierHandler.processBarrier(new CheckpointBarrier(i, System.currentTimeMillis(), new CheckpointOptions(type, getDefault())), new InputChannelInfo(0, channel)); if (type.isSavepoint()) { assertTrue(channels[channel].isBlocked()); assertFalse(channels[(channel + 1) % 2].isBlocked()); @@ -587,7 +538,7 @@ public class AlternatingControllerTest { SingleCheckpointBarrierHandler barrierHandler = barrierHandler(inputGate, target); final long id = 1; - barrierHandler.processBarrier(new CheckpointBarrier(id, System.currentTimeMillis(), new CheckpointOptions(CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0)); + barrierHandler.processBarrier(new CheckpointBarrier(id, System.currentTimeMillis(), new CheckpointOptions(CHECKPOINT, getDefault())), new InputChannelInfo(0, 0)); assertFalse(barrierHandler.getAllBarriersReceivedFuture(id).isDone()); } @@ -604,9 +555,9 @@ public class AlternatingControllerTest { long checkpointId = 10; long outOfOrderSavepointId = 5; - barrierHandler.processBarrier(new CheckpointBarrier(checkpointId, System.currentTimeMillis(), new CheckpointOptions(CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0)); + barrierHandler.processBarrier(new CheckpointBarrier(checkpointId, System.currentTimeMillis(), new CheckpointOptions(CHECKPOINT, getDefault())), new InputChannelInfo(0, 0)); secondChannel.setBlocked(true); - barrierHandler.processBarrier(new CheckpointBarrier(outOfOrderSavepointId, System.currentTimeMillis(), new CheckpointOptions(SAVEPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 1)); + barrierHandler.processBarrier(new CheckpointBarrier(outOfOrderSavepointId, System.currentTimeMillis(), new CheckpointOptions(SAVEPOINT, getDefault())), new InputChannelInfo(0, 1)); assertEquals(checkpointId, barrierHandler.getLatestCheckpointId()); assertFalse(secondChannel.isBlocked()); @@ -627,9 +578,11 @@ public class AlternatingControllerTest { slow.setBlocked(true); } - sendBarrier(barrierId, checkpointType, fast, checkpointedGate); + CheckpointOptions options = checkpointType.isSavepoint() ? alignedNoTimeout(checkpointType, getDefault()) : unaligned(getDefault()); + Buffer barrier = barrier(barrierId, 1, options); + send(barrier.retainBuffer(), fast, checkpointedGate); assertEquals(checkpointType.isSavepoint(), target.triggeredCheckpoints.isEmpty()); - sendBarrier(barrierId, checkpointType, slow, checkpointedGate); + send(barrier.retainBuffer(), slow, checkpointedGate); assertEquals(singletonList(barrierId), target.triggeredCheckpoints); if (checkpointType.isSavepoint()) { @@ -642,29 +595,43 @@ public class AlternatingControllerTest { } private void sendBarrier(long barrierId, long barrierCreationTime, CheckpointType type, CheckpointedInputGate gate, int channelId) throws Exception { - send(barrier(barrierId, type, barrierCreationTime), gate, channelId); + send(barrier(barrierId, barrierCreationTime, alignedNoTimeout(type, getDefault())), channelId, gate); } - private void send(Buffer buffer, CheckpointedInputGate gate, int channelId) throws Exception { - TestInputChannel channel = (TestInputChannel) gate.getChannel(channelId); - channel.read(buffer.retainBuffer()); - while (gate.pollNext().isPresent()) { - } + private void sendData(int dataSize, int channelId, CheckpointedInputGate gate) throws Exception { + send(createBuffer(dataSize), channelId, gate); } - private void sendBarrier(long barrierId, CheckpointType type, TestInputChannel channel, CheckpointedInputGate gate) throws Exception { - channel.read(barrier(barrierId, type).retainBuffer()); - while (gate.pollNext().isPresent()) { - } + private void send(Buffer buffer, int channelId, CheckpointedInputGate gate) throws Exception { + send(buffer.retainBuffer(), gate.getChannel(channelId), gate); } - private void sendBuffer(int bufferSize, CheckpointedInputGate gate, int channelId) throws Exception { - TestInputChannel channel = (TestInputChannel) gate.getChannel(channelId); - channel.read(TestBufferFactory.createBuffer(bufferSize)); - while (gate.pollNext().isPresent()) { + private void send(Buffer buffer, InputChannel channel, CheckpointedInputGate checkpointedGate) throws IOException, InterruptedException { + if (channel instanceof TestInputChannel) { + ((TestInputChannel) channel).read(buffer); + } else if (channel instanceof RemoteInputChannel) { + ((RemoteInputChannel) channel).onBuffer(buffer, 0, 0); + } else { + throw new IllegalArgumentException("Unknown channel type: " + channel); + } + while (checkpointedGate.pollNext().isPresent()) { } } + private Buffer withTimeout(long alignmentTimeout) throws IOException { + return barrier(1, System.currentTimeMillis(), alignedWithTimeout(getDefault(), alignmentTimeout)); + } + + private Buffer barrier(long barrierId, long barrierTimestamp, CheckpointOptions options) throws IOException { + CheckpointBarrier checkpointBarrier = new CheckpointBarrier( + barrierId, + barrierTimestamp, + options); + return toBuffer( + checkpointBarrier, + checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint()); + } + private static SingleCheckpointBarrierHandler barrierHandler(SingleInputGate inputGate, AbstractInvokable target) { return barrierHandler(inputGate, target, new RecordingChannelStateWriter()); } @@ -683,38 +650,6 @@ public class AlternatingControllerTest { new UnalignedController(new TestSubtaskCheckpointCoordinator(stateWriter), inputGate))); } - private Buffer barrier(long barrierId, CheckpointType checkpointType) throws IOException { - return barrier(barrierId, checkpointType, System.currentTimeMillis()); - } - - private Buffer barrier(long barrierId, CheckpointType checkpointType, long barrierTimestamp) throws IOException { - return barrier(barrierId, checkpointType, barrierTimestamp, 0); - } - - private Buffer barrier(long barrierId, CheckpointType checkpointType, long barrierTimestamp, long alignmentTimeout) throws IOException { - CheckpointBarrier checkpointBarrier = checkpointBarrier( - barrierId, - checkpointType, - barrierTimestamp, - alignmentTimeout); - return toBuffer( - checkpointBarrier, - checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint()); - } - - private CheckpointBarrier checkpointBarrier(long barrierId, CheckpointType checkpointType, long barrierTimestamp, long alignmentTimeout) { - CheckpointOptions options = CheckpointOptions.create( - checkpointType, - CheckpointStorageLocationReference.getDefault(), - true, - true, - alignmentTimeout); - return new CheckpointBarrier( - barrierId, - barrierTimestamp, - options); - } - private static CheckpointedInputGate buildGate(AbstractInvokable target, int numChannels) { SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(numChannels).build(); TestInputChannel[] channels = new TestInputChannel[numChannels]; @@ -751,4 +686,30 @@ public class AlternatingControllerTest { MailboxProcessor mailboxProcessor = new MailboxProcessor(); return new CheckpointedInputGate(gate, barrierHandler(gate, target, channelStateWriter), mailboxProcessor.getMainMailboxExecutor()); } + + private static void assertAnnouncement(CheckpointedInputGate gate) throws IOException, InterruptedException { + assertEvent(gate, EventAnnouncement.class); + } + + private static void assertBarrier(CheckpointedInputGate gate) throws IOException, InterruptedException { + assertEvent(gate, CheckpointBarrier.class); + } + + private static <T extends RuntimeEvent> void assertEvent(CheckpointedInputGate gate, Class<T> clazz) throws IOException, InterruptedException { + Optional<BufferOrEvent> bufferOrEvent = assertPoll(gate); + assertTrue("expected event, got data buffer on " + bufferOrEvent.get().getChannelInfo(), bufferOrEvent.get().isEvent()); + assertEquals(clazz, bufferOrEvent.get().getEvent().getClass()); + } + + private static <T extends RuntimeEvent> void assertData(CheckpointedInputGate gate) throws IOException, InterruptedException { + Optional<BufferOrEvent> bufferOrEvent = assertPoll(gate); + assertTrue("expected data, got " + bufferOrEvent.get().getEvent() + " on " + bufferOrEvent.get().getChannelInfo(), bufferOrEvent.get().isBuffer()); + } + + private static Optional<BufferOrEvent> assertPoll(CheckpointedInputGate gate) throws IOException, InterruptedException { + Optional<BufferOrEvent> bufferOrEvent = gate.pollNext(); + assertTrue("empty gate", bufferOrEvent.isPresent()); + return bufferOrEvent; + } + } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java index 6f449ef..7d401d5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/InputProcessorUtilTest.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.SyncMailboxExecutor; @@ -84,7 +85,7 @@ public class InputProcessorUtilTest { for (IndexedInputGate inputGate : allInputGates) { for (int channelId = 0; channelId < inputGate.getNumberOfInputChannels(); channelId++) { barrierHandler.processBarrier( - new CheckpointBarrier(1, 42, CheckpointOptions.forCheckpointWithDefaultLocation(true, true, 0)), + new CheckpointBarrier(1, 42, CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(inputGate.getGateIndex(), channelId)); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java index e916329..319c4f9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java @@ -145,7 +145,7 @@ public class StreamTaskNetworkInputTest { deserializers); inputGate.sendEvent( - new CheckpointBarrier(checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation().asTimedOut()), + new CheckpointBarrier(checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation().toUnaligned()), channelId); inputGate.sendElement(new StreamRecord<>(42L), channelId); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java index 82ac3fe..1291457 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerCancellationTest.java @@ -102,7 +102,7 @@ public class UnalignedControllerCancellationTest { } private static CheckpointBarrier checkpoint(int checkpointId) { - return new CheckpointBarrier(checkpointId, 1, CheckpointOptions.forCheckpointWithDefaultLocation().asTimedOut()); + return new CheckpointBarrier(checkpointId, 1, CheckpointOptions.forCheckpointWithDefaultLocation().toUnaligned()); } private static CancelCheckpointMarker cancel(int checkpointId) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java index 7604746..7b64ee7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/UnalignedControllerTest.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter; import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; @@ -39,7 +38,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBui import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.streaming.api.operators.SyncMailboxExecutor; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator; @@ -59,8 +57,6 @@ import java.util.Random; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.flink.runtime.checkpoint.CheckpointOptions.NO_ALIGNMENT_TIME_OUT; -import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -620,7 +616,7 @@ public class UnalignedControllerTest { new CheckpointBarrier( checkpointId, timestamp, - new CheckpointOptions(CHECKPOINT, getDefault(), true, true, NO_ALIGNMENT_TIME_OUT)), + CheckpointOptions.unaligned(getDefault())), new InputChannelInfo(0, channel)); } @@ -734,7 +730,7 @@ public class UnalignedControllerTest { } private CheckpointBarrier buildCheckpointBarrier(long id) { - return new CheckpointBarrier(id, 0, new CheckpointOptions(CHECKPOINT, getDefault(), true, true, NO_ALIGNMENT_TIME_OUT)); + return new CheckpointBarrier(id, 0, CheckpointOptions.unaligned(getDefault())); } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java index 5f912ab..d97bca2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java @@ -221,7 +221,7 @@ public class MultipleInputStreamTaskChainedSourcesCheckpointingTest { private CheckpointBarrier createBarrier(StreamTaskMailboxTestHarness<String> testHarness) { StreamConfig config = testHarness.getStreamTask().getConfiguration(); - CheckpointOptions checkpointOptions = CheckpointOptions.create( + CheckpointOptions checkpointOptions = CheckpointOptions.forConfig( CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault(), config.isExactlyOnceCheckpointMode(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java index 304e448..b83e2de 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java @@ -104,8 +104,9 @@ public class SubtaskCheckpointCoordinatorTest { MockWriter writer = new MockWriter(); SubtaskCheckpointCoordinator coordinator = coordinator(unalignedCheckpointEnabled, writer); CheckpointStorageLocationReference locationReference = CheckpointStorageLocationReference.getDefault(); - CheckpointOptions options = new CheckpointOptions(checkpointType, locationReference, true, unalignedCheckpointEnabled, 0); - coordinator.initCheckpoint(1L, options); + coordinator.initCheckpoint(1L, unalignedCheckpointEnabled ? + CheckpointOptions.unaligned(locationReference) : + CheckpointOptions.alignedNoTimeout(checkpointType, locationReference)); return writer.started; }
