This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5040eaa877e59e1399cee1cf37ad1d33f62d01da Author: Anton Kalashnikov <[email protected]> AuthorDate: Tue Jun 22 20:18:07 2021 +0300 [FLINK-23041][runtime] Added new well defined checkpoint configuration aligned-checkpoint-timeout instead of alignment-timeout --- .../flink/state/api/output/SnapshotUtils.java | 2 +- .../runtime/checkpoint/CheckpointCoordinator.java | 6 +-- .../runtime/checkpoint/CheckpointOptions.java | 61 ++++++++++++---------- .../network/api/serialization/EventSerializer.java | 2 +- .../tasks/CheckpointCoordinatorConfiguration.java | 24 +++++---- .../job/checkpoints/CheckpointConfigHandler.java | 2 +- .../checkpoint/CheckpointCoordinatorTest.java | 4 +- .../runtime/checkpoint/CheckpointOptionsTest.java | 19 ++++--- .../api/environment/CheckpointConfig.java | 58 +++++++++++++++----- .../environment/ExecutionCheckpointingOptions.java | 27 +++++++++- .../flink/streaming/api/graph/StreamConfig.java | 9 ++-- .../api/graph/StreamingJobGraphGenerator.java | 5 +- .../SingleCheckpointBarrierHandler.java | 4 +- .../runtime/tasks/SourceOperatorStreamTask.java | 2 +- .../streaming/runtime/tasks/SourceStreamTask.java | 2 +- ...tStreamTaskChainedSourcesCheckpointingTest.java | 2 +- .../runtime/tasks/MultipleInputStreamTaskTest.java | 2 +- 17 files changed, 152 insertions(+), 79 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java index e62e84e..9a1867f 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java @@ -53,7 +53,7 @@ public final class SnapshotUtils { AbstractFsCheckpointStorageAccess.encodePathAsReference(savepointPath), isExactlyOnceMode, isUnalignedCheckpoint, - CheckpointOptions.NO_ALIGNMENT_TIME_OUT); + CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT); operator.prepareSnapshotPreBarrier(CHECKPOINT_ID); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 16d588d..896ed7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -170,7 +170,7 @@ public class CheckpointCoordinator { private final boolean unalignedCheckpointsEnabled; - private final long alignmentTimeout; + private final long alignedCheckpointTimeout; /** Actor that receives status updates from the execution graph this coordinator works for. */ private JobStatusListener jobStatusListener; @@ -311,7 +311,7 @@ public class CheckpointCoordinator { this.clock = checkNotNull(clock); this.isExactlyOnceMode = chkConfig.isExactlyOnce(); this.unalignedCheckpointsEnabled = chkConfig.isUnalignedCheckpointsEnabled(); - this.alignmentTimeout = chkConfig.getAlignmentTimeout(); + this.alignedCheckpointTimeout = chkConfig.getAlignedCheckpointTimeout(); this.checkpointIdOfIgnoredInFlightData = chkConfig.getCheckpointIdOfIgnoredInFlightData(); this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); @@ -824,7 +824,7 @@ public class CheckpointCoordinator { checkpointStorageLocation.getLocationReference(), isExactlyOnceMode, unalignedCheckpointsEnabled, - alignmentTimeout); + alignedCheckpointTimeout); // send the messages to the tasks that trigger their checkpoint for (Execution execution : tasksToTrigger) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java index 8076cb9..3b7e2b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java @@ -47,7 +47,7 @@ public class CheckpointOptions implements Serializable { FORCED_ALIGNED } - public static final long NO_ALIGNMENT_TIME_OUT = Long.MAX_VALUE; + public static final long NO_ALIGNED_CHECKPOINT_TIME_OUT = Long.MAX_VALUE; private static final long serialVersionUID = 5010126558083292915L; @@ -59,17 +59,18 @@ public class CheckpointOptions implements Serializable { private final AlignmentType alignmentType; - private final long alignmentTimeout; + private final long alignedCheckpointTimeout; public static CheckpointOptions notExactlyOnce( CheckpointType type, CheckpointStorageLocationReference location) { return new CheckpointOptions( - type, location, AlignmentType.AT_LEAST_ONCE, NO_ALIGNMENT_TIME_OUT); + type, location, AlignmentType.AT_LEAST_ONCE, NO_ALIGNED_CHECKPOINT_TIME_OUT); } public static CheckpointOptions alignedNoTimeout( CheckpointType type, CheckpointStorageLocationReference location) { - return new CheckpointOptions(type, location, AlignmentType.ALIGNED, NO_ALIGNMENT_TIME_OUT); + return new CheckpointOptions( + type, location, AlignmentType.ALIGNED, NO_ALIGNED_CHECKPOINT_TIME_OUT); } public static CheckpointOptions unaligned(CheckpointStorageLocationReference location) { @@ -77,22 +78,25 @@ public class CheckpointOptions implements Serializable { CheckpointType.CHECKPOINT, location, AlignmentType.UNALIGNED, - NO_ALIGNMENT_TIME_OUT); + NO_ALIGNED_CHECKPOINT_TIME_OUT); } public static CheckpointOptions alignedWithTimeout( - CheckpointStorageLocationReference location, long alignmentTimeout) { + CheckpointStorageLocationReference location, long alignedCheckpointTimeout) { return new CheckpointOptions( - CheckpointType.CHECKPOINT, location, AlignmentType.ALIGNED, alignmentTimeout); + CheckpointType.CHECKPOINT, + location, + AlignmentType.ALIGNED, + alignedCheckpointTimeout); } private static CheckpointOptions forceAligned( - CheckpointStorageLocationReference location, long alignmentTimeout) { + CheckpointStorageLocationReference location, long alignedCheckpointTimeout) { return new CheckpointOptions( CheckpointType.CHECKPOINT, location, AlignmentType.FORCED_ALIGNED, - alignmentTimeout); + alignedCheckpointTimeout); } public static CheckpointOptions forConfig( @@ -100,44 +104,45 @@ public class CheckpointOptions implements Serializable { CheckpointStorageLocationReference locationReference, boolean isExactlyOnceMode, boolean isUnalignedEnabled, - long alignmentTimeout) { + long alignedCheckpointTimeout) { if (!isExactlyOnceMode) { return notExactlyOnce(checkpointType, locationReference); } else if (checkpointType.isSavepoint()) { return alignedNoTimeout(checkpointType, locationReference); } else if (!isUnalignedEnabled) { return alignedNoTimeout(checkpointType, locationReference); - } else if (alignmentTimeout == 0 || alignmentTimeout == NO_ALIGNMENT_TIME_OUT) { + } else if (alignedCheckpointTimeout == 0 + || alignedCheckpointTimeout == NO_ALIGNED_CHECKPOINT_TIME_OUT) { return unaligned(locationReference); } else { - return alignedWithTimeout(locationReference, alignmentTimeout); + return alignedWithTimeout(locationReference, alignedCheckpointTimeout); } } @VisibleForTesting public CheckpointOptions( CheckpointType checkpointType, CheckpointStorageLocationReference targetLocation) { - this(checkpointType, targetLocation, AlignmentType.ALIGNED, NO_ALIGNMENT_TIME_OUT); + this(checkpointType, targetLocation, AlignmentType.ALIGNED, NO_ALIGNED_CHECKPOINT_TIME_OUT); } public CheckpointOptions( CheckpointType checkpointType, CheckpointStorageLocationReference targetLocation, AlignmentType alignmentType, - long alignmentTimeout) { + long alignedCheckpointTimeout) { checkArgument( alignmentType != AlignmentType.UNALIGNED || !checkpointType.isSavepoint(), "Savepoint can't be unaligned"); checkArgument( - alignmentTimeout == NO_ALIGNMENT_TIME_OUT + alignedCheckpointTimeout == NO_ALIGNED_CHECKPOINT_TIME_OUT || alignmentType != AlignmentType.UNALIGNED, "Unaligned checkpoint can't have timeout (%s)", - alignmentTimeout); + alignedCheckpointTimeout); this.checkpointType = checkNotNull(checkpointType); this.targetLocation = checkNotNull(targetLocation); this.alignmentType = checkNotNull(alignmentType); - this.alignmentTimeout = alignmentTimeout; + this.alignedCheckpointTimeout = alignedCheckpointTimeout; } public boolean needsAlignment() { @@ -145,8 +150,8 @@ public class CheckpointOptions implements Serializable { && (getCheckpointType().isSavepoint() || !isUnalignedCheckpoint()); } - public long getAlignmentTimeout() { - return alignmentTimeout; + public long getAlignedCheckpointTimeout() { + return alignedCheckpointTimeout; } public AlignmentType getAlignment() { @@ -158,7 +163,8 @@ public class CheckpointOptions implements Serializable { return false; } return alignmentType == AlignmentType.ALIGNED - && (alignmentTimeout > 0 && alignmentTimeout != NO_ALIGNMENT_TIME_OUT); + && (alignedCheckpointTimeout > 0 + && alignedCheckpointTimeout != NO_ALIGNED_CHECKPOINT_TIME_OUT); } // ------------------------------------------------------------------------ @@ -183,8 +189,8 @@ public class CheckpointOptions implements Serializable { public CheckpointOptions withUnalignedSupported() { if (alignmentType == AlignmentType.FORCED_ALIGNED) { - return alignmentTimeout != NO_ALIGNMENT_TIME_OUT - ? alignedWithTimeout(targetLocation, alignmentTimeout) + return alignedCheckpointTimeout != NO_ALIGNED_CHECKPOINT_TIME_OUT + ? alignedWithTimeout(targetLocation, alignedCheckpointTimeout) : unaligned(targetLocation); } return this; @@ -192,7 +198,7 @@ public class CheckpointOptions implements Serializable { public CheckpointOptions withUnalignedUnsupported() { if (isUnalignedCheckpoint() || isTimeoutable()) { - return forceAligned(targetLocation, alignmentTimeout); + return forceAligned(targetLocation, alignedCheckpointTimeout); } return this; } @@ -201,7 +207,8 @@ public class CheckpointOptions implements Serializable { @Override public int hashCode() { - return Objects.hash(targetLocation, checkpointType, alignmentType, alignmentTimeout); + return Objects.hash( + targetLocation, checkpointType, alignmentType, alignedCheckpointTimeout); } @Override @@ -213,7 +220,7 @@ public class CheckpointOptions implements Serializable { return this.checkpointType == that.checkpointType && this.targetLocation.equals(that.targetLocation) && this.alignmentType == that.alignmentType - && this.alignmentTimeout == that.alignmentTimeout; + && this.alignedCheckpointTimeout == that.alignedCheckpointTimeout; } else { return false; } @@ -228,8 +235,8 @@ public class CheckpointOptions implements Serializable { + targetLocation + ", alignment = " + alignmentType - + ", alignmentTimeout = " - + alignmentTimeout + + ", alignedCheckpointTimeout = " + + alignedCheckpointTimeout + "}"; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java index 54f4a9e..ae85a0b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java @@ -236,7 +236,7 @@ public class EventSerializer { buf.put(locationBytes); } buf.put((byte) checkpointOptions.getAlignment().ordinal()); - buf.putLong(checkpointOptions.getAlignmentTimeout()); + buf.putLong(checkpointOptions.getAlignedCheckpointTimeout()); buf.flip(); return buf; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java index 0fd6f87..1a27bfb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java @@ -62,7 +62,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { private final boolean isUnalignedCheckpointsEnabled; - private final long alignmentTimeout; + private final long alignedCheckpointTimeout; private final long checkpointIdOfIgnoredInFlightData; @@ -104,7 +104,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { boolean isPreferCheckpointForRecovery, int tolerableCpFailureNumber, boolean isUnalignedCheckpointsEnabled, - long alignmentTimeout, + long alignedCheckpointTimeout, long checkpointIdOfIgnoredInFlightData) { // sanity checks @@ -128,7 +128,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { this.isPreferCheckpointForRecovery = isPreferCheckpointForRecovery; this.tolerableCheckpointFailureNumber = tolerableCpFailureNumber; this.isUnalignedCheckpointsEnabled = isUnalignedCheckpointsEnabled; - this.alignmentTimeout = alignmentTimeout; + this.alignedCheckpointTimeout = alignedCheckpointTimeout; this.checkpointIdOfIgnoredInFlightData = checkpointIdOfIgnoredInFlightData; } @@ -168,8 +168,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { return isUnalignedCheckpointsEnabled; } - public long getAlignmentTimeout() { - return alignmentTimeout; + public long getAlignedCheckpointTimeout() { + return alignedCheckpointTimeout; } public long getCheckpointIdOfIgnoredInFlightData() { @@ -191,6 +191,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { && maxConcurrentCheckpoints == that.maxConcurrentCheckpoints && isExactlyOnce == that.isExactlyOnce && isUnalignedCheckpointsEnabled == that.isUnalignedCheckpointsEnabled + && alignedCheckpointTimeout == that.alignedCheckpointTimeout && checkpointRetentionPolicy == that.checkpointRetentionPolicy && isPreferCheckpointForRecovery == that.isPreferCheckpointForRecovery && tolerableCheckpointFailureNumber == that.tolerableCheckpointFailureNumber @@ -207,6 +208,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { checkpointRetentionPolicy, isExactlyOnce, isUnalignedCheckpointsEnabled, + alignedCheckpointTimeout, isPreferCheckpointForRecovery, tolerableCheckpointFailureNumber, checkpointIdOfIgnoredInFlightData); @@ -229,6 +231,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { + isExactlyOnce + ", isUnalignedCheckpoint=" + isUnalignedCheckpointsEnabled + + ", alignedCheckpointTimeout=" + + alignedCheckpointTimeout + ", isPreferCheckpointForRecovery=" + isPreferCheckpointForRecovery + ", tolerableCheckpointFailureNumber=" @@ -254,7 +258,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { private boolean isPreferCheckpointForRecovery = true; private int tolerableCheckpointFailureNumber; private boolean isUnalignedCheckpointsEnabled; - private long alignmentTimeout = 0; + private long alignedCheckpointTimeout = 0; private long checkpointIdOfIgnoredInFlightData; public CheckpointCoordinatorConfiguration build() { @@ -268,7 +272,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { isPreferCheckpointForRecovery, tolerableCheckpointFailureNumber, isUnalignedCheckpointsEnabled, - alignmentTimeout, + alignedCheckpointTimeout, checkpointIdOfIgnoredInFlightData); } @@ -325,9 +329,9 @@ public class CheckpointCoordinatorConfiguration implements Serializable { return this; } - public CheckpointCoordinatorConfigurationBuilder setAlignmentTimeout( - long alignmentTimeout) { - this.alignmentTimeout = alignmentTimeout; + public CheckpointCoordinatorConfigurationBuilder setAlignedCheckpointTimeout( + long alignedCheckpointTimeout) { + this.alignedCheckpointTimeout = alignedCheckpointTimeout; return this; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java index dc02c7f..8ba71d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java @@ -131,7 +131,7 @@ public class CheckpointConfigHandler checkpointStorageName, checkpointCoordinatorConfiguration.isUnalignedCheckpointsEnabled(), checkpointCoordinatorConfiguration.getTolerableCheckpointFailureNumber(), - checkpointCoordinatorConfiguration.getAlignmentTimeout()); + checkpointCoordinatorConfiguration.getAlignedCheckpointTimeout()); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 22de6e2..4708598 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -623,7 +623,7 @@ public class CheckpointCoordinatorTest extends TestLogger { .setExecutionGraph(graph) .setCheckpointCoordinatorConfiguration( CheckpointCoordinatorConfiguration.builder() - .setAlignmentTimeout(Long.MAX_VALUE) + .setAlignedCheckpointTimeout(Long.MAX_VALUE) .setMaxConcurrentCheckpoints(Integer.MAX_VALUE) .build()) .setTimer(manuallyTriggeredScheduledExecutor) @@ -3440,7 +3440,7 @@ public class CheckpointCoordinatorTest extends TestLogger { .setExecutionGraph(graph) .setCheckpointCoordinatorConfiguration( CheckpointCoordinatorConfiguration.builder() - .setAlignmentTimeout(Long.MAX_VALUE) + .setAlignedCheckpointTimeout(Long.MAX_VALUE) .setMaxConcurrentCheckpoints(Integer.MAX_VALUE) .build()) .setTimer(manuallyTriggeredScheduledExecutor) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java index a55fb2e..156d187 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java @@ -26,7 +26,7 @@ import org.junit.Test; import java.util.Random; -import static org.apache.flink.runtime.checkpoint.CheckpointOptions.NO_ALIGNMENT_TIME_OUT; +import static org.apache.flink.runtime.checkpoint.CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT; import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT; import static org.junit.Assert.assertArrayEquals; @@ -83,25 +83,28 @@ public class CheckpointOptionsTest { CHECKPOINT, location, AlignmentType.UNALIGNED, - NO_ALIGNMENT_TIME_OUT) + NO_ALIGNED_CHECKPOINT_TIME_OUT) .needsAlignment()); assertTrue( new CheckpointOptions( - CHECKPOINT, location, AlignmentType.ALIGNED, NO_ALIGNMENT_TIME_OUT) + CHECKPOINT, + location, + AlignmentType.ALIGNED, + NO_ALIGNED_CHECKPOINT_TIME_OUT) .needsAlignment()); assertTrue( new CheckpointOptions( CHECKPOINT, location, AlignmentType.FORCED_ALIGNED, - NO_ALIGNMENT_TIME_OUT) + NO_ALIGNED_CHECKPOINT_TIME_OUT) .needsAlignment()); assertFalse( new CheckpointOptions( CHECKPOINT, location, AlignmentType.AT_LEAST_ONCE, - NO_ALIGNMENT_TIME_OUT) + NO_ALIGNED_CHECKPOINT_TIME_OUT) .needsAlignment()); } @@ -111,7 +114,7 @@ public class CheckpointOptionsTest { CheckpointStorageLocationReference.getDefault(); assertTimeoutable(CheckpointOptions.alignedWithTimeout(location, 10), false, true, 10); assertTimeoutable( - CheckpointOptions.unaligned(location), true, false, NO_ALIGNMENT_TIME_OUT); + CheckpointOptions.unaligned(location), true, false, NO_ALIGNED_CHECKPOINT_TIME_OUT); assertTimeoutable( CheckpointOptions.alignedWithTimeout(location, 10).withUnalignedUnsupported(), false, @@ -121,7 +124,7 @@ public class CheckpointOptionsTest { CheckpointOptions.unaligned(location).withUnalignedUnsupported(), false, false, - NO_ALIGNMENT_TIME_OUT); + NO_ALIGNED_CHECKPOINT_TIME_OUT); } @Test @@ -160,6 +163,6 @@ public class CheckpointOptionsTest { assertEquals("need alignment", !isUnaligned, options.needsAlignment()); assertEquals("unaligned", isUnaligned, options.isUnalignedCheckpoint()); assertEquals("timeoutable", isTimeoutable, options.isTimeoutable()); - assertEquals("timeout", timeout, options.getAlignmentTimeout()); + assertEquals("timeout", timeout, options.getAlignedCheckpointTimeout()); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index a087a39..7efd3b4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -98,8 +98,9 @@ public class CheckpointConfig implements java.io.Serializable { private long checkpointIdOfIgnoredInFlightData = DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA; - private Duration alignmentTimeout = - ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT.defaultValue(); + /** The delay from the start of checkpoint after which AC switches to UC. */ + private Duration alignedCheckpointTimeout = + ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT.defaultValue(); /** Flag to enable approximate local recovery. */ private boolean approximateLocalRecovery; @@ -149,7 +150,7 @@ public class CheckpointConfig implements java.io.Serializable { this.preferCheckpointForRecovery = checkpointConfig.preferCheckpointForRecovery; this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber; this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled(); - this.alignmentTimeout = checkpointConfig.alignmentTimeout; + this.alignedCheckpointTimeout = checkpointConfig.alignedCheckpointTimeout; this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled(); this.externalizedCheckpointCleanup = checkpointConfig.externalizedCheckpointCleanup; this.forceCheckpointing = checkpointConfig.forceCheckpointing; @@ -540,26 +541,57 @@ public class CheckpointConfig implements java.io.Serializable { /** * Only relevant if {@link #unalignedCheckpointsEnabled} is enabled. * - * <p>If {@link #alignmentTimeout} has value equal to <code>0</code>, checkpoints will always - * start unaligned. + * <p>If {@link #alignedCheckpointTimeout} has value equal to <code>0</code>, checkpoints will + * always start unaligned. * - * <p>If {@link #alignmentTimeout} has value greater then <code>0</code>, checkpoints will start - * aligned. If during checkpointing, checkpoint start delay exceeds this {@link - * #alignmentTimeout}, alignment will timeout and checkpoint will start working as unaligned - * checkpoint. + * <p>If {@link #alignedCheckpointTimeout} has value greater then <code>0</code>, checkpoints + * will start aligned. If during checkpointing, checkpoint start delay exceeds this {@link + * #alignedCheckpointTimeout}, alignment will timeout and checkpoint will start working as + * unaligned checkpoint. + * + * @deprecated Use {@link #setAlignedCheckpointTimeout(Duration)} instead. */ + @Deprecated @PublicEvolving public void setAlignmentTimeout(Duration alignmentTimeout) { - this.alignmentTimeout = alignmentTimeout; + this.alignedCheckpointTimeout = alignmentTimeout; } /** * @return value of alignment timeout, as configured via {@link #setAlignmentTimeout(Duration)} * or {@link ExecutionCheckpointingOptions#ALIGNMENT_TIMEOUT}. + * @deprecated User {@link #getAlignedCheckpointTimeout()} instead. */ + @Deprecated @PublicEvolving public Duration getAlignmentTimeout() { - return alignmentTimeout; + return alignedCheckpointTimeout; + } + + /** + * @return value of alignment timeout, as configured via {@link + * #setAlignedCheckpointTimeout(Duration)} or {@link + * ExecutionCheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT}. + */ + @PublicEvolving + public Duration getAlignedCheckpointTimeout() { + return alignedCheckpointTimeout; + } + + /** + * Only relevant if {@link #unalignedCheckpointsEnabled} is enabled. + * + * <p>If {@link #alignedCheckpointTimeout} has value equal to <code>0</code>, checkpoints will + * always start unaligned. + * + * <p>If {@link #alignedCheckpointTimeout} has value greater then <code>0</code>, checkpoints + * will start aligned. If during checkpointing, checkpoint start delay exceeds this {@link + * #alignedCheckpointTimeout}, alignment will timeout and checkpoint will start working as + * unaligned checkpoint. + */ + @PublicEvolving + public void setAlignedCheckpointTimeout(Duration alignedCheckpointTimeout) { + this.alignedCheckpointTimeout = alignedCheckpointTimeout; } /** @@ -784,8 +816,8 @@ public class CheckpointConfig implements java.io.Serializable { .getOptional(ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA) .ifPresent(this::setCheckpointIdOfIgnoredInFlightData); configuration - .getOptional(ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT) - .ifPresent(this::setAlignmentTimeout); + .getOptional(ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT) + .ifPresent(this::setAlignedCheckpointTimeout); configuration .getOptional(ExecutionCheckpointingOptions.FORCE_UNALIGNED) .ifPresent(this::setForceUnalignedCheckpoints); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java index 1c9d97a..c8cefe7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java @@ -166,6 +166,30 @@ public class ExecutionCheckpointingOptions { TextElement.code(MAX_CONCURRENT_CHECKPOINTS.key())) .build()); + public static final ConfigOption<Duration> ALIGNED_CHECKPOINT_TIMEOUT = + ConfigOptions.key("execution.checkpointing.aligned-checkpoint-timeout") + .durationType() + .defaultValue(Duration.ofSeconds(0L)) + .withDeprecatedKeys("execution.checkpointing.alignment-timeout") + .withDescription( + Description.builder() + .text( + "Only relevant if %s is enabled.", + TextElement.code(ENABLE_UNALIGNED.key())) + .linebreak() + .linebreak() + .text( + "If timeout is 0, checkpoints will always start unaligned.") + .linebreak() + .linebreak() + .text( + "If timeout has a positive value, checkpoints will start aligned. " + + "If during checkpointing, checkpoint start delay exceeds this timeout, alignment " + + "will timeout and checkpoint barrier will start working as unaligned checkpoint.") + .build()); + + /** @deprecated Use {@link #ALIGNED_CHECKPOINT_TIMEOUT} instead. */ + @Deprecated public static final ConfigOption<Duration> ALIGNMENT_TIMEOUT = ConfigOptions.key("execution.checkpointing.alignment-timeout") .durationType() @@ -173,7 +197,8 @@ public class ExecutionCheckpointingOptions { .withDescription( Description.builder() .text( - "Only relevant if %s is enabled.", + "Deprecated. %s should be used instead. Only relevant if %s is enabled.", + TextElement.code(ALIGNED_CHECKPOINT_TIMEOUT.key()), TextElement.code(ENABLE_UNALIGNED.key())) .linebreak() .linebreak() diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 085a327..6f9e674 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -462,12 +462,13 @@ public class StreamConfig implements Serializable { return getCheckpointMode() == CheckpointingMode.EXACTLY_ONCE; } - public Duration getAlignmentTimeout() { - return config.get(ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT); + public Duration getAlignedCheckpointTimeout() { + return config.get(ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT); } - public void setAlignmentTimeout(Duration alignmentTimeout) { - config.set(ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT, alignmentTimeout); + public void setAlignedCheckpointTimeout(Duration alignedCheckpointTimeout) { + config.set( + ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, alignedCheckpointTimeout); } public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index f06aa05..6e96655 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -728,7 +728,7 @@ public class StreamingJobGraphGenerator { config.setCheckpointingEnabled(checkpointCfg.isCheckpointingEnabled()); config.setCheckpointMode(getCheckpointingMode(checkpointCfg)); config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled()); - config.setAlignmentTimeout(checkpointCfg.getAlignmentTimeout()); + config.setAlignedCheckpointTimeout(checkpointCfg.getAlignedCheckpointTimeout()); for (int i = 0; i < vertex.getStatePartitioners().length; i++) { config.setStatePartitioner(i, vertex.getStatePartitioners()[i]); @@ -1315,7 +1315,8 @@ public class StreamingJobGraphGenerator { .setUnalignedCheckpointsEnabled(cfg.isUnalignedCheckpointsEnabled()) .setCheckpointIdOfIgnoredInFlightData( cfg.getCheckpointIdOfIgnoredInFlightData()) - .setAlignmentTimeout(cfg.getAlignmentTimeout().toMillis()) + .setAlignedCheckpointTimeout( + cfg.getAlignedCheckpointTimeout().toMillis()) .build(), serializedStateBackend, streamGraph.isChangelogStateBackendEnabled(), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java index fcca333..e2ad1cb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java @@ -271,7 +271,7 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { private void registerAlignmentTimer(CheckpointBarrier announcedBarrier) { long alignedCheckpointTimeout = - announcedBarrier.getCheckpointOptions().getAlignmentTimeout(); + announcedBarrier.getCheckpointOptions().getAlignedCheckpointTimeout(); long timePassedSinceCheckpointStart = getClock().absoluteTimeMillis() - announcedBarrier.getTimestamp(); @@ -436,7 +436,7 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { public boolean isTimedOut(CheckpointBarrier barrier) { return barrier.getCheckpointOptions().isTimeoutable() && barrier.getId() <= currentCheckpointId - && barrier.getCheckpointOptions().getAlignmentTimeout() + && barrier.getCheckpointOptions().getAlignedCheckpointTimeout() < (getClock().absoluteTimeMillis() - barrier.getTimestamp()); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java index b77710e..5ba8321 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java @@ -148,7 +148,7 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, CheckpointStorageLocationReference.getDefault(), configuration.isExactlyOnceCheckpointMode(), configuration.isUnalignedCheckpointsEnabled(), - configuration.getAlignmentTimeout().toMillis()); + configuration.getAlignedCheckpointTimeout().toMillis()); final long timestamp = System.currentTimeMillis(); final CheckpointMetaData checkpointMetaData = diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index ad5b99f..1d132e1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -109,7 +109,7 @@ public class SourceStreamTask< CheckpointStorageLocationReference.getDefault(), configuration.isExactlyOnceCheckpointMode(), configuration.isUnalignedCheckpointsEnabled(), - configuration.getAlignmentTimeout().toMillis()); + configuration.getAlignedCheckpointTimeout().toMillis()); final long timestamp = System.currentTimeMillis(); final CheckpointMetaData checkpointMetaData = diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java index c541e67..da70c99 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java @@ -300,7 +300,7 @@ public class MultipleInputStreamTaskChainedSourcesCheckpointingTest { CheckpointStorageLocationReference.getDefault(), config.isExactlyOnceCheckpointMode(), config.isUnalignedCheckpointsEnabled(), - config.getAlignmentTimeout().toMillis()); + config.getAlignedCheckpointTimeout().toMillis()); return new CheckpointBarrier( metaData.getCheckpointId(), metaData.getTimestamp(), checkpointOptions); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java index 6f5e8eb..7a0669d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java @@ -1046,7 +1046,7 @@ public class MultipleInputStreamTaskTest { MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) .modifyExecutionConfig(config -> config.enableObjectReuse()) .modifyStreamConfig(config -> config.setUnalignedCheckpointsEnabled(unaligned)) - .modifyStreamConfig(config -> config.setAlignmentTimeout(Duration.ZERO)) + .modifyStreamConfig(config -> config.setAlignedCheckpointTimeout(Duration.ZERO)) .addInput(BasicTypeInfo.STRING_TYPE_INFO) .addSourceInput( new SourceOperatorFactory<>(
