This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5040eaa877e59e1399cee1cf37ad1d33f62d01da
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Tue Jun 22 20:18:07 2021 +0300

    [FLINK-23041][runtime] Added new well defined checkpoint configuration 
aligned-checkpoint-timeout instead of alignment-timeout
---
 .../flink/state/api/output/SnapshotUtils.java      |  2 +-
 .../runtime/checkpoint/CheckpointCoordinator.java  |  6 +--
 .../runtime/checkpoint/CheckpointOptions.java      | 61 ++++++++++++----------
 .../network/api/serialization/EventSerializer.java |  2 +-
 .../tasks/CheckpointCoordinatorConfiguration.java  | 24 +++++----
 .../job/checkpoints/CheckpointConfigHandler.java   |  2 +-
 .../checkpoint/CheckpointCoordinatorTest.java      |  4 +-
 .../runtime/checkpoint/CheckpointOptionsTest.java  | 19 ++++---
 .../api/environment/CheckpointConfig.java          | 58 +++++++++++++++-----
 .../environment/ExecutionCheckpointingOptions.java | 27 +++++++++-
 .../flink/streaming/api/graph/StreamConfig.java    |  9 ++--
 .../api/graph/StreamingJobGraphGenerator.java      |  5 +-
 .../SingleCheckpointBarrierHandler.java            |  4 +-
 .../runtime/tasks/SourceOperatorStreamTask.java    |  2 +-
 .../streaming/runtime/tasks/SourceStreamTask.java  |  2 +-
 ...tStreamTaskChainedSourcesCheckpointingTest.java |  2 +-
 .../runtime/tasks/MultipleInputStreamTaskTest.java |  2 +-
 17 files changed, 152 insertions(+), 79 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
index e62e84e..9a1867f 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
@@ -53,7 +53,7 @@ public final class SnapshotUtils {
                         
AbstractFsCheckpointStorageAccess.encodePathAsReference(savepointPath),
                         isExactlyOnceMode,
                         isUnalignedCheckpoint,
-                        CheckpointOptions.NO_ALIGNMENT_TIME_OUT);
+                        CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT);
 
         operator.prepareSnapshotPreBarrier(CHECKPOINT_ID);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 16d588d..896ed7b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -170,7 +170,7 @@ public class CheckpointCoordinator {
 
     private final boolean unalignedCheckpointsEnabled;
 
-    private final long alignmentTimeout;
+    private final long alignedCheckpointTimeout;
 
     /** Actor that receives status updates from the execution graph this 
coordinator works for. */
     private JobStatusListener jobStatusListener;
@@ -311,7 +311,7 @@ public class CheckpointCoordinator {
         this.clock = checkNotNull(clock);
         this.isExactlyOnceMode = chkConfig.isExactlyOnce();
         this.unalignedCheckpointsEnabled = 
chkConfig.isUnalignedCheckpointsEnabled();
-        this.alignmentTimeout = chkConfig.getAlignmentTimeout();
+        this.alignedCheckpointTimeout = 
chkConfig.getAlignedCheckpointTimeout();
         this.checkpointIdOfIgnoredInFlightData = 
chkConfig.getCheckpointIdOfIgnoredInFlightData();
 
         this.recentPendingCheckpoints = new 
ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
@@ -824,7 +824,7 @@ public class CheckpointCoordinator {
                         checkpointStorageLocation.getLocationReference(),
                         isExactlyOnceMode,
                         unalignedCheckpointsEnabled,
-                        alignmentTimeout);
+                        alignedCheckpointTimeout);
 
         // send the messages to the tasks that trigger their checkpoint
         for (Execution execution : tasksToTrigger) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index 8076cb9..3b7e2b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -47,7 +47,7 @@ public class CheckpointOptions implements Serializable {
         FORCED_ALIGNED
     }
 
-    public static final long NO_ALIGNMENT_TIME_OUT = Long.MAX_VALUE;
+    public static final long NO_ALIGNED_CHECKPOINT_TIME_OUT = Long.MAX_VALUE;
 
     private static final long serialVersionUID = 5010126558083292915L;
 
@@ -59,17 +59,18 @@ public class CheckpointOptions implements Serializable {
 
     private final AlignmentType alignmentType;
 
-    private final long alignmentTimeout;
+    private final long alignedCheckpointTimeout;
 
     public static CheckpointOptions notExactlyOnce(
             CheckpointType type, CheckpointStorageLocationReference location) {
         return new CheckpointOptions(
-                type, location, AlignmentType.AT_LEAST_ONCE, 
NO_ALIGNMENT_TIME_OUT);
+                type, location, AlignmentType.AT_LEAST_ONCE, 
NO_ALIGNED_CHECKPOINT_TIME_OUT);
     }
 
     public static CheckpointOptions alignedNoTimeout(
             CheckpointType type, CheckpointStorageLocationReference location) {
-        return new CheckpointOptions(type, location, AlignmentType.ALIGNED, 
NO_ALIGNMENT_TIME_OUT);
+        return new CheckpointOptions(
+                type, location, AlignmentType.ALIGNED, 
NO_ALIGNED_CHECKPOINT_TIME_OUT);
     }
 
     public static CheckpointOptions 
unaligned(CheckpointStorageLocationReference location) {
@@ -77,22 +78,25 @@ public class CheckpointOptions implements Serializable {
                 CheckpointType.CHECKPOINT,
                 location,
                 AlignmentType.UNALIGNED,
-                NO_ALIGNMENT_TIME_OUT);
+                NO_ALIGNED_CHECKPOINT_TIME_OUT);
     }
 
     public static CheckpointOptions alignedWithTimeout(
-            CheckpointStorageLocationReference location, long 
alignmentTimeout) {
+            CheckpointStorageLocationReference location, long 
alignedCheckpointTimeout) {
         return new CheckpointOptions(
-                CheckpointType.CHECKPOINT, location, AlignmentType.ALIGNED, 
alignmentTimeout);
+                CheckpointType.CHECKPOINT,
+                location,
+                AlignmentType.ALIGNED,
+                alignedCheckpointTimeout);
     }
 
     private static CheckpointOptions forceAligned(
-            CheckpointStorageLocationReference location, long 
alignmentTimeout) {
+            CheckpointStorageLocationReference location, long 
alignedCheckpointTimeout) {
         return new CheckpointOptions(
                 CheckpointType.CHECKPOINT,
                 location,
                 AlignmentType.FORCED_ALIGNED,
-                alignmentTimeout);
+                alignedCheckpointTimeout);
     }
 
     public static CheckpointOptions forConfig(
@@ -100,44 +104,45 @@ public class CheckpointOptions implements Serializable {
             CheckpointStorageLocationReference locationReference,
             boolean isExactlyOnceMode,
             boolean isUnalignedEnabled,
-            long alignmentTimeout) {
+            long alignedCheckpointTimeout) {
         if (!isExactlyOnceMode) {
             return notExactlyOnce(checkpointType, locationReference);
         } else if (checkpointType.isSavepoint()) {
             return alignedNoTimeout(checkpointType, locationReference);
         } else if (!isUnalignedEnabled) {
             return alignedNoTimeout(checkpointType, locationReference);
-        } else if (alignmentTimeout == 0 || alignmentTimeout == 
NO_ALIGNMENT_TIME_OUT) {
+        } else if (alignedCheckpointTimeout == 0
+                || alignedCheckpointTimeout == NO_ALIGNED_CHECKPOINT_TIME_OUT) 
{
             return unaligned(locationReference);
         } else {
-            return alignedWithTimeout(locationReference, alignmentTimeout);
+            return alignedWithTimeout(locationReference, 
alignedCheckpointTimeout);
         }
     }
 
     @VisibleForTesting
     public CheckpointOptions(
             CheckpointType checkpointType, CheckpointStorageLocationReference 
targetLocation) {
-        this(checkpointType, targetLocation, AlignmentType.ALIGNED, 
NO_ALIGNMENT_TIME_OUT);
+        this(checkpointType, targetLocation, AlignmentType.ALIGNED, 
NO_ALIGNED_CHECKPOINT_TIME_OUT);
     }
 
     public CheckpointOptions(
             CheckpointType checkpointType,
             CheckpointStorageLocationReference targetLocation,
             AlignmentType alignmentType,
-            long alignmentTimeout) {
+            long alignedCheckpointTimeout) {
 
         checkArgument(
                 alignmentType != AlignmentType.UNALIGNED || 
!checkpointType.isSavepoint(),
                 "Savepoint can't be unaligned");
         checkArgument(
-                alignmentTimeout == NO_ALIGNMENT_TIME_OUT
+                alignedCheckpointTimeout == NO_ALIGNED_CHECKPOINT_TIME_OUT
                         || alignmentType != AlignmentType.UNALIGNED,
                 "Unaligned checkpoint can't have timeout (%s)",
-                alignmentTimeout);
+                alignedCheckpointTimeout);
         this.checkpointType = checkNotNull(checkpointType);
         this.targetLocation = checkNotNull(targetLocation);
         this.alignmentType = checkNotNull(alignmentType);
-        this.alignmentTimeout = alignmentTimeout;
+        this.alignedCheckpointTimeout = alignedCheckpointTimeout;
     }
 
     public boolean needsAlignment() {
@@ -145,8 +150,8 @@ public class CheckpointOptions implements Serializable {
                 && (getCheckpointType().isSavepoint() || 
!isUnalignedCheckpoint());
     }
 
-    public long getAlignmentTimeout() {
-        return alignmentTimeout;
+    public long getAlignedCheckpointTimeout() {
+        return alignedCheckpointTimeout;
     }
 
     public AlignmentType getAlignment() {
@@ -158,7 +163,8 @@ public class CheckpointOptions implements Serializable {
             return false;
         }
         return alignmentType == AlignmentType.ALIGNED
-                && (alignmentTimeout > 0 && alignmentTimeout != 
NO_ALIGNMENT_TIME_OUT);
+                && (alignedCheckpointTimeout > 0
+                        && alignedCheckpointTimeout != 
NO_ALIGNED_CHECKPOINT_TIME_OUT);
     }
 
     // ------------------------------------------------------------------------
@@ -183,8 +189,8 @@ public class CheckpointOptions implements Serializable {
 
     public CheckpointOptions withUnalignedSupported() {
         if (alignmentType == AlignmentType.FORCED_ALIGNED) {
-            return alignmentTimeout != NO_ALIGNMENT_TIME_OUT
-                    ? alignedWithTimeout(targetLocation, alignmentTimeout)
+            return alignedCheckpointTimeout != NO_ALIGNED_CHECKPOINT_TIME_OUT
+                    ? alignedWithTimeout(targetLocation, 
alignedCheckpointTimeout)
                     : unaligned(targetLocation);
         }
         return this;
@@ -192,7 +198,7 @@ public class CheckpointOptions implements Serializable {
 
     public CheckpointOptions withUnalignedUnsupported() {
         if (isUnalignedCheckpoint() || isTimeoutable()) {
-            return forceAligned(targetLocation, alignmentTimeout);
+            return forceAligned(targetLocation, alignedCheckpointTimeout);
         }
         return this;
     }
@@ -201,7 +207,8 @@ public class CheckpointOptions implements Serializable {
 
     @Override
     public int hashCode() {
-        return Objects.hash(targetLocation, checkpointType, alignmentType, 
alignmentTimeout);
+        return Objects.hash(
+                targetLocation, checkpointType, alignmentType, 
alignedCheckpointTimeout);
     }
 
     @Override
@@ -213,7 +220,7 @@ public class CheckpointOptions implements Serializable {
             return this.checkpointType == that.checkpointType
                     && this.targetLocation.equals(that.targetLocation)
                     && this.alignmentType == that.alignmentType
-                    && this.alignmentTimeout == that.alignmentTimeout;
+                    && this.alignedCheckpointTimeout == 
that.alignedCheckpointTimeout;
         } else {
             return false;
         }
@@ -228,8 +235,8 @@ public class CheckpointOptions implements Serializable {
                 + targetLocation
                 + ", alignment = "
                 + alignmentType
-                + ", alignmentTimeout = "
-                + alignmentTimeout
+                + ", alignedCheckpointTimeout = "
+                + alignedCheckpointTimeout
                 + "}";
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 54f4a9e..ae85a0b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -236,7 +236,7 @@ public class EventSerializer {
             buf.put(locationBytes);
         }
         buf.put((byte) checkpointOptions.getAlignment().ordinal());
-        buf.putLong(checkpointOptions.getAlignmentTimeout());
+        buf.putLong(checkpointOptions.getAlignedCheckpointTimeout());
 
         buf.flip();
         return buf;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index 0fd6f87..1a27bfb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -62,7 +62,7 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
 
     private final boolean isUnalignedCheckpointsEnabled;
 
-    private final long alignmentTimeout;
+    private final long alignedCheckpointTimeout;
 
     private final long checkpointIdOfIgnoredInFlightData;
 
@@ -104,7 +104,7 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
             boolean isPreferCheckpointForRecovery,
             int tolerableCpFailureNumber,
             boolean isUnalignedCheckpointsEnabled,
-            long alignmentTimeout,
+            long alignedCheckpointTimeout,
             long checkpointIdOfIgnoredInFlightData) {
 
         // sanity checks
@@ -128,7 +128,7 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
         this.isPreferCheckpointForRecovery = isPreferCheckpointForRecovery;
         this.tolerableCheckpointFailureNumber = tolerableCpFailureNumber;
         this.isUnalignedCheckpointsEnabled = isUnalignedCheckpointsEnabled;
-        this.alignmentTimeout = alignmentTimeout;
+        this.alignedCheckpointTimeout = alignedCheckpointTimeout;
         this.checkpointIdOfIgnoredInFlightData = 
checkpointIdOfIgnoredInFlightData;
     }
 
@@ -168,8 +168,8 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
         return isUnalignedCheckpointsEnabled;
     }
 
-    public long getAlignmentTimeout() {
-        return alignmentTimeout;
+    public long getAlignedCheckpointTimeout() {
+        return alignedCheckpointTimeout;
     }
 
     public long getCheckpointIdOfIgnoredInFlightData() {
@@ -191,6 +191,7 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
                 && maxConcurrentCheckpoints == that.maxConcurrentCheckpoints
                 && isExactlyOnce == that.isExactlyOnce
                 && isUnalignedCheckpointsEnabled == 
that.isUnalignedCheckpointsEnabled
+                && alignedCheckpointTimeout == that.alignedCheckpointTimeout
                 && checkpointRetentionPolicy == that.checkpointRetentionPolicy
                 && isPreferCheckpointForRecovery == 
that.isPreferCheckpointForRecovery
                 && tolerableCheckpointFailureNumber == 
that.tolerableCheckpointFailureNumber
@@ -207,6 +208,7 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
                 checkpointRetentionPolicy,
                 isExactlyOnce,
                 isUnalignedCheckpointsEnabled,
+                alignedCheckpointTimeout,
                 isPreferCheckpointForRecovery,
                 tolerableCheckpointFailureNumber,
                 checkpointIdOfIgnoredInFlightData);
@@ -229,6 +231,8 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
                 + isExactlyOnce
                 + ", isUnalignedCheckpoint="
                 + isUnalignedCheckpointsEnabled
+                + ", alignedCheckpointTimeout="
+                + alignedCheckpointTimeout
                 + ", isPreferCheckpointForRecovery="
                 + isPreferCheckpointForRecovery
                 + ", tolerableCheckpointFailureNumber="
@@ -254,7 +258,7 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
         private boolean isPreferCheckpointForRecovery = true;
         private int tolerableCheckpointFailureNumber;
         private boolean isUnalignedCheckpointsEnabled;
-        private long alignmentTimeout = 0;
+        private long alignedCheckpointTimeout = 0;
         private long checkpointIdOfIgnoredInFlightData;
 
         public CheckpointCoordinatorConfiguration build() {
@@ -268,7 +272,7 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
                     isPreferCheckpointForRecovery,
                     tolerableCheckpointFailureNumber,
                     isUnalignedCheckpointsEnabled,
-                    alignmentTimeout,
+                    alignedCheckpointTimeout,
                     checkpointIdOfIgnoredInFlightData);
         }
 
@@ -325,9 +329,9 @@ public class CheckpointCoordinatorConfiguration implements 
Serializable {
             return this;
         }
 
-        public CheckpointCoordinatorConfigurationBuilder setAlignmentTimeout(
-                long alignmentTimeout) {
-            this.alignmentTimeout = alignmentTimeout;
+        public CheckpointCoordinatorConfigurationBuilder 
setAlignedCheckpointTimeout(
+                long alignedCheckpointTimeout) {
+            this.alignedCheckpointTimeout = alignedCheckpointTimeout;
             return this;
         }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
index dc02c7f..8ba71d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -131,7 +131,7 @@ public class CheckpointConfigHandler
                     checkpointStorageName,
                     
checkpointCoordinatorConfiguration.isUnalignedCheckpointsEnabled(),
                     
checkpointCoordinatorConfiguration.getTolerableCheckpointFailureNumber(),
-                    checkpointCoordinatorConfiguration.getAlignmentTimeout());
+                    
checkpointCoordinatorConfiguration.getAlignedCheckpointTimeout());
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 22de6e2..4708598 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -623,7 +623,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
                             .setExecutionGraph(graph)
                             .setCheckpointCoordinatorConfiguration(
                                     
CheckpointCoordinatorConfiguration.builder()
-                                            
.setAlignmentTimeout(Long.MAX_VALUE)
+                                            
.setAlignedCheckpointTimeout(Long.MAX_VALUE)
                                             
.setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
                                             .build())
                             .setTimer(manuallyTriggeredScheduledExecutor)
@@ -3440,7 +3440,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                 .setExecutionGraph(graph)
                 .setCheckpointCoordinatorConfiguration(
                         CheckpointCoordinatorConfiguration.builder()
-                                .setAlignmentTimeout(Long.MAX_VALUE)
+                                .setAlignedCheckpointTimeout(Long.MAX_VALUE)
                                 .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
                                 .build())
                 .setTimer(manuallyTriggeredScheduledExecutor)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
index a55fb2e..156d187 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
@@ -26,7 +26,7 @@ import org.junit.Test;
 
 import java.util.Random;
 
-import static 
org.apache.flink.runtime.checkpoint.CheckpointOptions.NO_ALIGNMENT_TIME_OUT;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
 import static org.junit.Assert.assertArrayEquals;
@@ -83,25 +83,28 @@ public class CheckpointOptionsTest {
                                 CHECKPOINT,
                                 location,
                                 AlignmentType.UNALIGNED,
-                                NO_ALIGNMENT_TIME_OUT)
+                                NO_ALIGNED_CHECKPOINT_TIME_OUT)
                         .needsAlignment());
         assertTrue(
                 new CheckpointOptions(
-                                CHECKPOINT, location, AlignmentType.ALIGNED, 
NO_ALIGNMENT_TIME_OUT)
+                                CHECKPOINT,
+                                location,
+                                AlignmentType.ALIGNED,
+                                NO_ALIGNED_CHECKPOINT_TIME_OUT)
                         .needsAlignment());
         assertTrue(
                 new CheckpointOptions(
                                 CHECKPOINT,
                                 location,
                                 AlignmentType.FORCED_ALIGNED,
-                                NO_ALIGNMENT_TIME_OUT)
+                                NO_ALIGNED_CHECKPOINT_TIME_OUT)
                         .needsAlignment());
         assertFalse(
                 new CheckpointOptions(
                                 CHECKPOINT,
                                 location,
                                 AlignmentType.AT_LEAST_ONCE,
-                                NO_ALIGNMENT_TIME_OUT)
+                                NO_ALIGNED_CHECKPOINT_TIME_OUT)
                         .needsAlignment());
     }
 
@@ -111,7 +114,7 @@ public class CheckpointOptionsTest {
                 CheckpointStorageLocationReference.getDefault();
         assertTimeoutable(CheckpointOptions.alignedWithTimeout(location, 10), 
false, true, 10);
         assertTimeoutable(
-                CheckpointOptions.unaligned(location), true, false, 
NO_ALIGNMENT_TIME_OUT);
+                CheckpointOptions.unaligned(location), true, false, 
NO_ALIGNED_CHECKPOINT_TIME_OUT);
         assertTimeoutable(
                 CheckpointOptions.alignedWithTimeout(location, 
10).withUnalignedUnsupported(),
                 false,
@@ -121,7 +124,7 @@ public class CheckpointOptionsTest {
                 
CheckpointOptions.unaligned(location).withUnalignedUnsupported(),
                 false,
                 false,
-                NO_ALIGNMENT_TIME_OUT);
+                NO_ALIGNED_CHECKPOINT_TIME_OUT);
     }
 
     @Test
@@ -160,6 +163,6 @@ public class CheckpointOptionsTest {
         assertEquals("need alignment", !isUnaligned, options.needsAlignment());
         assertEquals("unaligned", isUnaligned, 
options.isUnalignedCheckpoint());
         assertEquals("timeoutable", isTimeoutable, options.isTimeoutable());
-        assertEquals("timeout", timeout, options.getAlignmentTimeout());
+        assertEquals("timeout", timeout, 
options.getAlignedCheckpointTimeout());
     }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index a087a39..7efd3b4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -98,8 +98,9 @@ public class CheckpointConfig implements java.io.Serializable 
{
     private long checkpointIdOfIgnoredInFlightData =
             DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA;
 
-    private Duration alignmentTimeout =
-            ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT.defaultValue();
+    /** The delay from the start of checkpoint after which AC switches to UC. 
*/
+    private Duration alignedCheckpointTimeout =
+            
ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT.defaultValue();
 
     /** Flag to enable approximate local recovery. */
     private boolean approximateLocalRecovery;
@@ -149,7 +150,7 @@ public class CheckpointConfig implements 
java.io.Serializable {
         this.preferCheckpointForRecovery = 
checkpointConfig.preferCheckpointForRecovery;
         this.tolerableCheckpointFailureNumber = 
checkpointConfig.tolerableCheckpointFailureNumber;
         this.unalignedCheckpointsEnabled = 
checkpointConfig.isUnalignedCheckpointsEnabled();
-        this.alignmentTimeout = checkpointConfig.alignmentTimeout;
+        this.alignedCheckpointTimeout = 
checkpointConfig.alignedCheckpointTimeout;
         this.approximateLocalRecovery = 
checkpointConfig.isApproximateLocalRecoveryEnabled();
         this.externalizedCheckpointCleanup = 
checkpointConfig.externalizedCheckpointCleanup;
         this.forceCheckpointing = checkpointConfig.forceCheckpointing;
@@ -540,26 +541,57 @@ public class CheckpointConfig implements 
java.io.Serializable {
     /**
      * Only relevant if {@link #unalignedCheckpointsEnabled} is enabled.
      *
-     * <p>If {@link #alignmentTimeout} has value equal to <code>0</code>, 
checkpoints will always
-     * start unaligned.
+     * <p>If {@link #alignedCheckpointTimeout} has value equal to 
<code>0</code>, checkpoints will
+     * always start unaligned.
      *
-     * <p>If {@link #alignmentTimeout} has value greater then <code>0</code>, 
checkpoints will start
-     * aligned. If during checkpointing, checkpoint start delay exceeds this 
{@link
-     * #alignmentTimeout}, alignment will timeout and checkpoint will start 
working as unaligned
-     * checkpoint.
+     * <p>If {@link #alignedCheckpointTimeout} has value greater then 
<code>0</code>, checkpoints
+     * will start aligned. If during checkpointing, checkpoint start delay 
exceeds this {@link
+     * #alignedCheckpointTimeout}, alignment will timeout and checkpoint will 
start working as
+     * unaligned checkpoint.
+     *
+     * @deprecated Use {@link #setAlignedCheckpointTimeout(Duration)} instead.
      */
+    @Deprecated
     @PublicEvolving
     public void setAlignmentTimeout(Duration alignmentTimeout) {
-        this.alignmentTimeout = alignmentTimeout;
+        this.alignedCheckpointTimeout = alignmentTimeout;
     }
 
     /**
      * @return value of alignment timeout, as configured via {@link 
#setAlignmentTimeout(Duration)}
      *     or {@link ExecutionCheckpointingOptions#ALIGNMENT_TIMEOUT}.
+     * @deprecated User {@link #getAlignedCheckpointTimeout()} instead.
      */
+    @Deprecated
     @PublicEvolving
     public Duration getAlignmentTimeout() {
-        return alignmentTimeout;
+        return alignedCheckpointTimeout;
+    }
+
+    /**
+     * @return value of alignment timeout, as configured via {@link
+     *     #setAlignedCheckpointTimeout(Duration)} or {@link
+     *     ExecutionCheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT}.
+     */
+    @PublicEvolving
+    public Duration getAlignedCheckpointTimeout() {
+        return alignedCheckpointTimeout;
+    }
+
+    /**
+     * Only relevant if {@link #unalignedCheckpointsEnabled} is enabled.
+     *
+     * <p>If {@link #alignedCheckpointTimeout} has value equal to 
<code>0</code>, checkpoints will
+     * always start unaligned.
+     *
+     * <p>If {@link #alignedCheckpointTimeout} has value greater then 
<code>0</code>, checkpoints
+     * will start aligned. If during checkpointing, checkpoint start delay 
exceeds this {@link
+     * #alignedCheckpointTimeout}, alignment will timeout and checkpoint will 
start working as
+     * unaligned checkpoint.
+     */
+    @PublicEvolving
+    public void setAlignedCheckpointTimeout(Duration alignedCheckpointTimeout) 
{
+        this.alignedCheckpointTimeout = alignedCheckpointTimeout;
     }
 
     /**
@@ -784,8 +816,8 @@ public class CheckpointConfig implements 
java.io.Serializable {
                 
.getOptional(ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA)
                 .ifPresent(this::setCheckpointIdOfIgnoredInFlightData);
         configuration
-                .getOptional(ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT)
-                .ifPresent(this::setAlignmentTimeout);
+                
.getOptional(ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT)
+                .ifPresent(this::setAlignedCheckpointTimeout);
         configuration
                 .getOptional(ExecutionCheckpointingOptions.FORCE_UNALIGNED)
                 .ifPresent(this::setForceUnalignedCheckpoints);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
index 1c9d97a..c8cefe7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
@@ -166,6 +166,30 @@ public class ExecutionCheckpointingOptions {
                                             
TextElement.code(MAX_CONCURRENT_CHECKPOINTS.key()))
                                     .build());
 
+    public static final ConfigOption<Duration> ALIGNED_CHECKPOINT_TIMEOUT =
+            
ConfigOptions.key("execution.checkpointing.aligned-checkpoint-timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(0L))
+                    
.withDeprecatedKeys("execution.checkpointing.alignment-timeout")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Only relevant if %s is enabled.",
+                                            
TextElement.code(ENABLE_UNALIGNED.key()))
+                                    .linebreak()
+                                    .linebreak()
+                                    .text(
+                                            "If timeout is 0, checkpoints will 
always start unaligned.")
+                                    .linebreak()
+                                    .linebreak()
+                                    .text(
+                                            "If timeout has a positive value, 
checkpoints will start aligned. "
+                                                    + "If during 
checkpointing, checkpoint start delay exceeds this timeout, alignment "
+                                                    + "will timeout and 
checkpoint barrier will start working as unaligned checkpoint.")
+                                    .build());
+
+    /** @deprecated Use {@link #ALIGNED_CHECKPOINT_TIMEOUT} instead. */
+    @Deprecated
     public static final ConfigOption<Duration> ALIGNMENT_TIMEOUT =
             ConfigOptions.key("execution.checkpointing.alignment-timeout")
                     .durationType()
@@ -173,7 +197,8 @@ public class ExecutionCheckpointingOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Only relevant if %s is enabled.",
+                                            "Deprecated. %s should be used 
instead. Only relevant if %s is enabled.",
+                                            
TextElement.code(ALIGNED_CHECKPOINT_TIMEOUT.key()),
                                             
TextElement.code(ENABLE_UNALIGNED.key()))
                                     .linebreak()
                                     .linebreak()
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 085a327..6f9e674 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -462,12 +462,13 @@ public class StreamConfig implements Serializable {
         return getCheckpointMode() == CheckpointingMode.EXACTLY_ONCE;
     }
 
-    public Duration getAlignmentTimeout() {
-        return config.get(ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT);
+    public Duration getAlignedCheckpointTimeout() {
+        return 
config.get(ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT);
     }
 
-    public void setAlignmentTimeout(Duration alignmentTimeout) {
-        config.set(ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT, 
alignmentTimeout);
+    public void setAlignedCheckpointTimeout(Duration alignedCheckpointTimeout) 
{
+        config.set(
+                ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, 
alignedCheckpointTimeout);
     }
 
     public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index f06aa05..6e96655 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -728,7 +728,7 @@ public class StreamingJobGraphGenerator {
         config.setCheckpointingEnabled(checkpointCfg.isCheckpointingEnabled());
         config.setCheckpointMode(getCheckpointingMode(checkpointCfg));
         
config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled());
-        config.setAlignmentTimeout(checkpointCfg.getAlignmentTimeout());
+        
config.setAlignedCheckpointTimeout(checkpointCfg.getAlignedCheckpointTimeout());
 
         for (int i = 0; i < vertex.getStatePartitioners().length; i++) {
             config.setStatePartitioner(i, vertex.getStatePartitioners()[i]);
@@ -1315,7 +1315,8 @@ public class StreamingJobGraphGenerator {
                                 
.setUnalignedCheckpointsEnabled(cfg.isUnalignedCheckpointsEnabled())
                                 .setCheckpointIdOfIgnoredInFlightData(
                                         
cfg.getCheckpointIdOfIgnoredInFlightData())
-                                
.setAlignmentTimeout(cfg.getAlignmentTimeout().toMillis())
+                                .setAlignedCheckpointTimeout(
+                                        
cfg.getAlignedCheckpointTimeout().toMillis())
                                 .build(),
                         serializedStateBackend,
                         streamGraph.isChangelogStateBackendEnabled(),
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
index fcca333..e2ad1cb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
@@ -271,7 +271,7 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
 
     private void registerAlignmentTimer(CheckpointBarrier announcedBarrier) {
         long alignedCheckpointTimeout =
-                announcedBarrier.getCheckpointOptions().getAlignmentTimeout();
+                
announcedBarrier.getCheckpointOptions().getAlignedCheckpointTimeout();
         long timePassedSinceCheckpointStart =
                 getClock().absoluteTimeMillis() - 
announcedBarrier.getTimestamp();
 
@@ -436,7 +436,7 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
         public boolean isTimedOut(CheckpointBarrier barrier) {
             return barrier.getCheckpointOptions().isTimeoutable()
                     && barrier.getId() <= currentCheckpointId
-                    && barrier.getCheckpointOptions().getAlignmentTimeout()
+                    && 
barrier.getCheckpointOptions().getAlignedCheckpointTimeout()
                             < (getClock().absoluteTimeMillis() - 
barrier.getTimestamp());
         }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index b77710e..5ba8321 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -148,7 +148,7 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
                         CheckpointStorageLocationReference.getDefault(),
                         configuration.isExactlyOnceCheckpointMode(),
                         configuration.isUnalignedCheckpointsEnabled(),
-                        configuration.getAlignmentTimeout().toMillis());
+                        
configuration.getAlignedCheckpointTimeout().toMillis());
         final long timestamp = System.currentTimeMillis();
 
         final CheckpointMetaData checkpointMetaData =
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index ad5b99f..1d132e1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -109,7 +109,7 @@ public class SourceStreamTask<
                                             
CheckpointStorageLocationReference.getDefault(),
                                             
configuration.isExactlyOnceCheckpointMode(),
                                             
configuration.isUnalignedCheckpointsEnabled(),
-                                            
configuration.getAlignmentTimeout().toMillis());
+                                            
configuration.getAlignedCheckpointTimeout().toMillis());
                             final long timestamp = System.currentTimeMillis();
 
                             final CheckpointMetaData checkpointMetaData =
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
index c541e67..da70c99 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
@@ -300,7 +300,7 @@ public class 
MultipleInputStreamTaskChainedSourcesCheckpointingTest {
                         CheckpointStorageLocationReference.getDefault(),
                         config.isExactlyOnceCheckpointMode(),
                         config.isUnalignedCheckpointsEnabled(),
-                        config.getAlignmentTimeout().toMillis());
+                        config.getAlignedCheckpointTimeout().toMillis());
 
         return new CheckpointBarrier(
                 metaData.getCheckpointId(), metaData.getTimestamp(), 
checkpointOptions);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 6f5e8eb..7a0669d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -1046,7 +1046,7 @@ public class MultipleInputStreamTaskTest {
                         MultipleInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO)
                 .modifyExecutionConfig(config -> config.enableObjectReuse())
                 .modifyStreamConfig(config -> 
config.setUnalignedCheckpointsEnabled(unaligned))
-                .modifyStreamConfig(config -> 
config.setAlignmentTimeout(Duration.ZERO))
+                .modifyStreamConfig(config -> 
config.setAlignedCheckpointTimeout(Duration.ZERO))
                 .addInput(BasicTypeInfo.STRING_TYPE_INFO)
                 .addSourceInput(
                         new SourceOperatorFactory<>(

Reply via email to