This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5012c1e7670471f2844c6b96a85a1ab0d8c6263
Author: Rui Fan <1996fan...@gmail.com>
AuthorDate: Tue Jun 24 10:52:06 2025 +0200

    [FLINK-37985][conf] Refactor public config options from StreamConfig to 
JobConfiguration
---
 .../operators/BroadcastStateBootstrapOperator.java |  5 ++-
 .../operators/KeyedStateBootstrapOperator.java     |  5 ++-
 .../output/operators/StateBootstrapOperator.java   |  5 ++-
 .../operators/StateBootstrapWrapperOperator.java   |  5 ++-
 .../flink/streaming/api/graph/StreamConfig.java    | 51 +++-------------------
 .../api/graph/StreamingJobGraphGenerator.java      |  7 +--
 .../api/operators/AbstractStreamOperator.java      | 15 ++++---
 .../api/operators/AbstractStreamOperatorV2.java    | 10 +++--
 .../io/checkpointing/InputProcessorUtil.java       |  8 +++-
 .../runtime/tasks/MultipleInputStreamTask.java     |  1 +
 .../runtime/tasks/OneInputStreamTask.java          |  1 +
 .../streaming/runtime/tasks/SourceStreamTask.java  | 10 ++++-
 .../flink/streaming/runtime/tasks/StreamTask.java  | 16 +++++--
 .../runtime/tasks/TwoInputStreamTask.java          |  1 +
 .../io/checkpointing/InputProcessorUtilTest.java   |  4 +-
 .../api/graph/JobGraphGeneratorTestBase.java       | 15 ++++---
 ...nalignedCheckpointsInterruptibleTimersTest.java |  9 +++-
 ...tStreamTaskChainedSourcesCheckpointingTest.java | 19 ++++----
 .../runtime/tasks/MultipleInputStreamTaskTest.java | 17 ++++----
 .../tasks/StreamTaskFinalCheckpointsTest.java      |  8 ++--
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java |  7 +++
 21 files changed, 113 insertions(+), 106 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
index 8dfb6d3e124..94641b4b601 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
@@ -21,6 +21,7 @@ package org.apache.flink.state.api.output.operators;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
 import org.apache.flink.state.api.output.SnapshotUtils;
@@ -84,7 +85,9 @@ public class BroadcastStateBootstrapOperator<IN>
                         
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
                         timestamp,
                         
getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(),
-                        
getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(),
+                        getContainingTask()
+                                .getJobConfiguration()
+                                .get(CheckpointingOptions.ENABLE_UNALIGNED),
                         
getContainingTask().getConfiguration().getConfiguration(),
                         savepointPath);
 
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
index 6f4b4a1c223..52755d76e57 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.state.api.output.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -99,7 +100,9 @@ public class KeyedStateBootstrapOperator<K, IN>
                         
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
                         timestamp,
                         
getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(),
-                        
getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(),
+                        getContainingTask()
+                                .getJobConfiguration()
+                                .get(CheckpointingOptions.ENABLE_UNALIGNED),
                         
getContainingTask().getConfiguration().getConfiguration(),
                         savepointPath);
 
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
index 149aa938b02..07e609ce8fa 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.state.api.output.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.state.api.functions.StateBootstrapFunction;
 import org.apache.flink.state.api.output.SnapshotUtils;
@@ -80,7 +81,9 @@ public class StateBootstrapOperator<IN>
                         
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
                         timestamp,
                         
getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(),
-                        
getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(),
+                        getContainingTask()
+                                .getJobConfiguration()
+                                .get(CheckpointingOptions.ENABLE_UNALIGNED),
                         
getContainingTask().getConfiguration().getConfiguration(),
                         savepointPath);
 
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
index 4170c8f66f6..8c7e68f8bce 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.state.api.output.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -189,8 +190,8 @@ public final class StateBootstrapWrapperOperator<
                                 .getConfiguration()
                                 .isExactlyOnceCheckpointMode(),
                         operator.getContainingTask()
-                                .getConfiguration()
-                                .isUnalignedCheckpointsEnabled(),
+                                .getJobConfiguration()
+                                .get(CheckpointingOptions.ENABLE_UNALIGNED),
                         
operator.getContainingTask().getConfiguration().getConfiguration(),
                         savepointPath);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 0f856cebcfe..8e475f883b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.attribute.Attribute;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
@@ -47,7 +46,6 @@ import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -68,6 +66,11 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * Internal configuration for a {@link StreamOperator}. This is created and 
populated by the {@link
  * StreamingJobGraphGenerator}.
+ *
+ * <p>NOTE TO IMPLEMENTERS: Please do not set public ConfigOption to this 
class. Use the job
+ * Configuration instead! See {@link
+ * org.apache.flink.configuration.CheckpointingOptions#ENABLE_UNALIGNED} for a 
reference
+ * implementation.
  */
 @Internal
 public class StreamConfig implements Serializable {
@@ -555,54 +558,10 @@ public class StreamConfig implements Serializable {
         }
     }
 
-    public void setUnalignedCheckpointsEnabled(boolean enabled) {
-        config.set(CheckpointingOptions.ENABLE_UNALIGNED, enabled);
-    }
-
-    public boolean isUnalignedCheckpointsEnabled() {
-        return config.get(CheckpointingOptions.ENABLE_UNALIGNED, false);
-    }
-
-    public void setUnalignedCheckpointsSplittableTimersEnabled(boolean 
enabled) {
-        config.set(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, 
enabled);
-    }
-
-    public boolean isUnalignedCheckpointsSplittableTimersEnabled() {
-        return 
config.get(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS);
-    }
-
     public boolean isExactlyOnceCheckpointMode() {
         return getCheckpointMode() == CheckpointingMode.EXACTLY_ONCE;
     }
 
-    public Duration getAlignedCheckpointTimeout() {
-        return config.get(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT);
-    }
-
-    public void setAlignedCheckpointTimeout(Duration alignedCheckpointTimeout) 
{
-        config.set(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, 
alignedCheckpointTimeout);
-    }
-
-    public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
-        config.set(CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 
maxConcurrentCheckpoints);
-    }
-
-    public int getMaxConcurrentCheckpoints() {
-        return config.get(
-                CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS,
-                
CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.defaultValue());
-    }
-
-    public int getMaxSubtasksPerChannelStateFile() {
-        return 
config.get(CheckpointingOptions.UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE);
-    }
-
-    public void setMaxSubtasksPerChannelStateFile(int 
maxSubtasksPerChannelStateFile) {
-        config.set(
-                
CheckpointingOptions.UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE,
-                maxSubtasksPerChannelStateFile);
-    }
-
     /**
      * Sets the job vertex level non-chained outputs. The given output list 
must have the same order
      * with {@link JobVertex#getProducedDataSets()}.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 2cc54a7496e..767ecf639b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -531,6 +531,7 @@ public class StreamingJobGraphGenerator {
         if (checkpointConfig.isUnalignedCheckpointsEnabled()
                 && streamGraph.getCheckpointingMode() != 
CheckpointingMode.EXACTLY_ONCE) {
             LOG.warn("Unaligned checkpoints can only be used with 
checkpointing mode EXACTLY_ONCE");
+            
streamGraph.getJobConfiguration().set(CheckpointingOptions.ENABLE_UNALIGNED, 
false);
             checkpointConfig.enableUnalignedCheckpoints(false);
         }
     }
@@ -1257,12 +1258,6 @@ public class StreamingJobGraphGenerator {
                         
CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
                         streamGraph.isEnableCheckpointsAfterTasksFinish());
         
config.setCheckpointMode(StreamGraph.getCheckpointingMode(checkpointCfg));
-        
config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled());
-        config.setUnalignedCheckpointsSplittableTimersEnabled(
-                
checkpointCfg.isUnalignedCheckpointsInterruptibleTimersEnabled());
-        
config.setAlignedCheckpointTimeout(checkpointCfg.getAlignedCheckpointTimeout());
-        
config.setMaxSubtasksPerChannelStateFile(checkpointCfg.getMaxSubtasksPerChannelStateFile());
-        
config.setMaxConcurrentCheckpoints(checkpointCfg.getMaxConcurrentCheckpoints());
 
         for (int i = 0; i < vertex.getStatePartitioners().length; i++) {
             config.setStatePartitioner(i, vertex.getStatePartitioners()[i]);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 5978da122d2..f6244752710 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -348,9 +349,9 @@ public abstract class AbstractStreamOperator<OUT>
      * option is enabled. By default, splittable timers are disabled.
      *
      * @return {@code true} if splittable timers should be used (subject to 
{@link
-     *     StreamConfig#isUnalignedCheckpointsEnabled()} and {@link
-     *     StreamConfig#isUnalignedCheckpointsSplittableTimersEnabled()}. 
{@code false} if
-     *     splittable timers should never be used.
+     *     CheckpointingOptions#ENABLE_UNALIGNED} and {@link
+     *     CheckpointingOptions#ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS}. {@code 
false} if splittable
+     *     timers should never be used.
      */
     @Internal
     public boolean useSplittableTimers() {
@@ -359,13 +360,13 @@ public abstract class AbstractStreamOperator<OUT>
 
     @Internal
     private boolean areSplittableTimersConfigured() {
-        return areSplittableTimersConfigured(config);
+        return areSplittableTimersConfigured(config, 
getContainingTask().getJobConfiguration());
     }
 
-    static boolean areSplittableTimersConfigured(StreamConfig config) {
+    static boolean areSplittableTimersConfigured(StreamConfig config, 
Configuration conf) {
         return config.isCheckpointingEnabled()
-                && config.isUnalignedCheckpointsEnabled()
-                && config.isUnalignedCheckpointsSplittableTimersEnabled();
+                && conf.get(CheckpointingOptions.ENABLE_UNALIGNED)
+                && 
conf.get(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS);
     }
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index 31da01700f5..174c7ad744c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -246,9 +247,9 @@ public abstract class AbstractStreamOperatorV2<OUT>
      * option is enabled. By default, splittable timers are disabled.
      *
      * @return {@code true} if splittable timers should be used (subject to 
{@link
-     *     StreamConfig#isUnalignedCheckpointsEnabled()} and {@link
-     *     StreamConfig#isUnalignedCheckpointsSplittableTimersEnabled()}. 
{@code false} if
-     *     splittable timers should never be used.
+     *     CheckpointingOptions#ENABLE_UNALIGNED} and {@link
+     *     CheckpointingOptions#ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS}. {@code 
false} if splittable
+     *     timers should never be used.
      */
     @Internal
     public boolean useSplittableTimers() {
@@ -257,7 +258,8 @@ public abstract class AbstractStreamOperatorV2<OUT>
 
     @Internal
     private boolean areSplittableTimersConfigured() {
-        return AbstractStreamOperator.areSplittableTimersConfigured(config);
+        return AbstractStreamOperator.areSplittableTimersConfigured(
+                config, runtimeContext.getJobConfiguration());
     }
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java
index 434cab13684..0b82e8fd959 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io.checkpointing;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -78,6 +79,7 @@ public class InputProcessorUtil {
 
     public static CheckpointBarrierHandler createCheckpointBarrierHandler(
             CheckpointableTask toNotifyOnCheckpoint,
+            Configuration jobConf,
             StreamConfig config,
             SubtaskCheckpointCoordinator checkpointCoordinator,
             String taskName,
@@ -103,6 +105,7 @@ public class InputProcessorUtil {
                                         .sum();
                 return createBarrierHandler(
                         toNotifyOnCheckpoint,
+                        jobConf,
                         config,
                         checkpointCoordinator,
                         taskName,
@@ -112,7 +115,7 @@ public class InputProcessorUtil {
                         clock,
                         numberOfChannels);
             case AT_LEAST_ONCE:
-                if (config.isUnalignedCheckpointsEnabled()) {
+                if (jobConf.get(CheckpointingOptions.ENABLE_UNALIGNED)) {
                     throw new IllegalStateException(
                             "Cannot use unaligned checkpoints with 
AT_LEAST_ONCE "
                                     + "checkpointing mode");
@@ -135,6 +138,7 @@ public class InputProcessorUtil {
 
     private static SingleCheckpointBarrierHandler createBarrierHandler(
             CheckpointableTask toNotifyOnCheckpoint,
+            Configuration jobConf,
             StreamConfig config,
             SubtaskCheckpointCoordinator checkpointCoordinator,
             String taskName,
@@ -146,7 +150,7 @@ public class InputProcessorUtil {
         boolean enableCheckpointAfterTasksFinished =
                 config.getConfiguration()
                         
.get(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH);
-        if (config.isUnalignedCheckpointsEnabled()) {
+        if (jobConf.get(CheckpointingOptions.ENABLE_UNALIGNED)) {
             return SingleCheckpointBarrierHandler.alternating(
                     taskName,
                     toNotifyOnCheckpoint,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
index 390de7f5e0a..6e45024e1ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
@@ -142,6 +142,7 @@ public class MultipleInputStreamTask<OUT>
         checkpointBarrierHandler =
                 InputProcessorUtil.createCheckpointBarrierHandler(
                         this,
+                        getJobConfiguration(),
                         getConfiguration(),
                         getCheckpointCoordinator(),
                         getTaskNameWithSubtaskAndId(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 9025210b8e0..68f49f4fce0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -166,6 +166,7 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
         checkpointBarrierHandler =
                 InputProcessorUtil.createCheckpointBarrierHandler(
                         this,
+                        getJobConfiguration(),
                         configuration,
                         getCheckpointCoordinator(),
                         getTaskNameWithSubtaskAndId(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 753dae5c964..343870cde42 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -19,6 +19,8 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
@@ -134,13 +136,17 @@ public class SourceStreamTask<
                             // between the trigger
                             // TODO -   message from the master, and the 
source's trigger
                             // notification
+                            Configuration jobConf = getJobConfiguration();
                             final CheckpointOptions checkpointOptions =
                                     CheckpointOptions.forConfig(
                                             CheckpointType.CHECKPOINT,
                                             
CheckpointStorageLocationReference.getDefault(),
                                             
configuration.isExactlyOnceCheckpointMode(),
-                                            
configuration.isUnalignedCheckpointsEnabled(),
-                                            
configuration.getAlignedCheckpointTimeout().toMillis());
+                                            
jobConf.get(CheckpointingOptions.ENABLE_UNALIGNED),
+                                            jobConf.get(
+                                                            
CheckpointingOptions
+                                                                    
.ALIGNED_CHECKPOINT_TIMEOUT)
+                                                    .toMillis());
                             final long timestamp = System.currentTimeMillis();
 
                             final CheckpointMetaData checkpointMetaData =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8457d1a9817..444acb3ef1a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -445,7 +445,11 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                             getEnvironment().getJobID(),
                             new ThreadPoolExecutor(
                                     0,
-                                    
configuration.getMaxConcurrentCheckpoints() + 1,
+                                    getJobConfiguration()
+                                                    .get(
+                                                            
CheckpointingOptions
+                                                                    
.MAX_CONCURRENT_CHECKPOINTS)
+                                            + 1,
                                     60L,
                                     TimeUnit.SECONDS,
                                     new LinkedBlockingQueue<>(),
@@ -488,7 +492,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
             final CheckpointStorageAccess finalCheckpointStorageAccess = 
checkpointStorageAccess;
 
             ChannelStateWriter channelStateWriter =
-                    configuration.isUnalignedCheckpointsEnabled()
+                    
getJobConfiguration().get(CheckpointingOptions.ENABLE_UNALIGNED)
                             ? openChannelStateWriter(
                                     getName(),
                                     // Note: don't pass 
checkpointStorageAccess directly to channel
@@ -513,7 +517,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                         }
                                     },
                                     environment,
-                                    
configuration.getMaxSubtasksPerChannelStateFile())
+                                    getJobConfiguration()
+                                            .get(
+                                                    CheckpointingOptions
+                                                            
.UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE))
                             : ChannelStateWriter.NO_OP;
             this.subtaskCheckpointCoordinator =
                     new SubtaskCheckpointCoordinatorImpl(
@@ -524,7 +531,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                             environment,
                             this,
                             this::prepareInputSnapshot,
-                            configuration.getMaxConcurrentCheckpoints(),
+                            getJobConfiguration()
+                                    
.get(CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS),
                             channelStateWriter,
                             configuration
                                     .getConfiguration()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index e64870c1c75..f933d5069d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -65,6 +65,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
AbstractTwoInputStreamTas
         checkpointBarrierHandler =
                 InputProcessorUtil.createCheckpointBarrierHandler(
                         this,
+                        getJobConfiguration(),
                         configuration,
                         getCheckpointCoordinator(),
                         getTaskNameWithSubtaskAndId(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java
index 60046ec4fad..74038bbb413 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io.checkpointing;
 
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.core.execution.CheckpointingMode;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -59,9 +60,9 @@ class InputProcessorUtilTest {
         try (CloseableRegistry registry = new CloseableRegistry()) {
             MockEnvironment environment = new MockEnvironmentBuilder().build();
             MockStreamTask streamTask = new 
MockStreamTaskBuilder(environment).build();
+            
streamTask.getJobConfiguration().set(CheckpointingOptions.ENABLE_UNALIGNED, 
true);
             StreamConfig streamConfig = new 
StreamConfig(environment.getJobConfiguration());
             streamConfig.setCheckpointMode(CheckpointingMode.EXACTLY_ONCE);
-            streamConfig.setUnalignedCheckpointsEnabled(true);
 
             // First input gate has index larger than the second
             List<IndexedInputGate>[] inputGates =
@@ -73,6 +74,7 @@ class InputProcessorUtilTest {
             CheckpointBarrierHandler barrierHandler =
                     InputProcessorUtil.createCheckpointBarrierHandler(
                             streamTask,
+                            streamTask.getJobConfiguration(),
                             streamConfig,
                             new TestSubtaskCheckpointCoordinator(new 
MockChannelStateWriter()),
                             streamTask.getName(),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
index dd29081dd6e..d8c7fc3cd13 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java
@@ -48,6 +48,7 @@ import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource;
 import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.PipelineOptions;
@@ -256,19 +257,21 @@ abstract class JobGraphGeneratorTestBase {
     @Test
     void testEnabledUnalignedCheckAndDisabledCheckpointing() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getCheckpointConfig().enableUnalignedCheckpoints(true);
         env.fromData(0).print();
         StreamGraph streamGraph = env.getStreamGraph();
         assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
                 .withFailMessage("Checkpointing enabled")
                 .isFalse();
-        env.getCheckpointConfig().enableUnalignedCheckpoints(true);
 
         JobGraph jobGraph = createJobGraph(streamGraph);
 
         List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
         StreamConfig streamConfig = new 
StreamConfig(verticesSorted.get(0).getConfiguration());
         
assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
-        assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
+        
assertThat(streamGraph.getCheckpointConfig().isUnalignedCheckpointsEnabled()).isFalse();
+        
assertThat(jobGraph.getJobConfiguration().get(CheckpointingOptions.ENABLE_UNALIGNED))
+                .isFalse();
     }
 
     @Test
@@ -417,17 +420,19 @@ abstract class JobGraphGeneratorTestBase {
     @Test
     void testUnalignedCheckAndAtLeastOnce() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.fromData(0).print();
-        StreamGraph streamGraph = env.getStreamGraph();
         env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
         env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+        env.fromData(0).print();
+        StreamGraph streamGraph = env.getStreamGraph();
 
         JobGraph jobGraph = createJobGraph(streamGraph);
 
         List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
         StreamConfig streamConfig = new 
StreamConfig(verticesSorted.get(0).getConfiguration());
         
assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
-        assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
+        
assertThat(streamGraph.getCheckpointConfig().isUnalignedCheckpointsEnabled()).isFalse();
+        
assertThat(jobGraph.getJobConfiguration().get(CheckpointingOptions.ENABLE_UNALIGNED))
+                .isFalse();
     }
 
     @Test
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
index a5e3712ec76..140e6b62e6c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io.checkpointing;
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimer;
@@ -67,6 +68,9 @@ class UnalignedCheckpointsInterruptibleTimersTest {
 
         try (final StreamTaskMailboxTestHarness<String> harness =
                 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, Types.STRING)
+                        .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, 
true)
+                        .addJobConfig(
+                                
CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, true)
                         .modifyStreamConfig(
                                 
UnalignedCheckpointsInterruptibleTimersTest::setupStreamConfig)
                         .addInput(Types.STRING)
@@ -106,6 +110,9 @@ class UnalignedCheckpointsInterruptibleTimersTest {
 
         try (final StreamTaskMailboxTestHarness<String> harness =
                 new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, Types.STRING)
+                        .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, 
true)
+                        .addJobConfig(
+                                
CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, true)
                         .modifyStreamConfig(
                                 
UnalignedCheckpointsInterruptibleTimersTest::setupStreamConfig)
                         .addInput(Types.STRING)
@@ -148,8 +155,6 @@ class UnalignedCheckpointsInterruptibleTimersTest {
     }
 
     private static void setupStreamConfig(StreamConfig cfg) {
-        cfg.setUnalignedCheckpointsEnabled(true);
-        cfg.setUnalignedCheckpointsSplittableTimersEnabled(true);
         cfg.setStateKeySerializer(StringSerializer.INSTANCE);
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
index 92ab98e32b6..689233b4e4f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -384,13 +386,11 @@ class 
MultipleInputStreamTaskChainedSourcesCheckpointingTest {
             try (StreamTaskMailboxTestHarness<String> testHarness =
                     new StreamTaskMailboxTestHarnessBuilder<>(
                                     MultipleInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO)
-                            .modifyStreamConfig(
-                                    config -> {
-                                        config.setCheckpointingEnabled(true);
-                                        config.setUnalignedCheckpointsEnabled(
-                                                
checkpointOptions.isUnalignedCheckpoint()
-                                                        || 
checkpointOptions.isTimeoutable());
-                                    })
+                            .addJobConfig(
+                                    CheckpointingOptions.ENABLE_UNALIGNED,
+                                    checkpointOptions.isUnalignedCheckpoint()
+                                            || 
checkpointOptions.isTimeoutable())
+                            .modifyStreamConfig(config -> 
config.setCheckpointingEnabled(true))
                             
.modifyExecutionConfig(applyObjectReuse(objectReuse))
                             .setCheckpointResponder(checkpointResponder)
                             .addInput(BasicTypeInfo.INT_TYPE_INFO)
@@ -548,13 +548,14 @@ class 
MultipleInputStreamTaskChainedSourcesCheckpointingTest {
 
     private CheckpointBarrier 
createBarrier(StreamTaskMailboxTestHarness<String> testHarness) {
         StreamConfig config = testHarness.getStreamTask().getConfiguration();
+        Configuration jobConf = 
testHarness.getStreamTask().getJobConfiguration();
         CheckpointOptions checkpointOptions =
                 CheckpointOptions.forConfig(
                         CheckpointType.CHECKPOINT,
                         CheckpointStorageLocationReference.getDefault(),
                         config.isExactlyOnceCheckpointMode(),
-                        config.isUnalignedCheckpointsEnabled(),
-                        config.getAlignedCheckpointTimeout().toMillis());
+                        jobConf.get(CheckpointingOptions.ENABLE_UNALIGNED),
+                        
jobConf.get(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT).toMillis());
 
         return new CheckpointBarrier(
                 metaData.getCheckpointId(), metaData.getTimestamp(), 
checkpointOptions);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 479cf3d7b6b..f63788aabc2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.api.connector.source.mocks.MockSourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -1007,13 +1008,11 @@ class MultipleInputStreamTaskTest {
                             .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
                             .addAdditionalOutput(partitionWriters)
                             .setCheckpointResponder(checkpointResponder)
-                            .modifyStreamConfig(
-                                    config -> {
-                                        config.setCheckpointingEnabled(true);
-                                        config.setUnalignedCheckpointsEnabled(
-                                                
checkpointOptions.isUnalignedCheckpoint()
-                                                        || 
checkpointOptions.isTimeoutable());
-                                    })
+                            .addJobConfig(
+                                    CheckpointingOptions.ENABLE_UNALIGNED,
+                                    checkpointOptions.isUnalignedCheckpoint()
+                                            || 
checkpointOptions.isTimeoutable())
+                            .modifyStreamConfig(config -> 
config.setCheckpointingEnabled(true))
                             .setupOperatorChain(new 
MapToStringMultipleInputOperatorFactory(3))
                             
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
                             .build()) {
@@ -1293,8 +1292,8 @@ class MultipleInputStreamTaskTest {
         return new StreamTaskMailboxTestHarnessBuilder<>(
                         MultipleInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO)
                 .modifyExecutionConfig(applyObjectReuse(objectReuse))
-                .modifyStreamConfig(config -> 
config.setUnalignedCheckpointsEnabled(unaligned))
-                .modifyStreamConfig(config -> 
config.setAlignedCheckpointTimeout(Duration.ZERO))
+                .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, unaligned)
+                .addJobConfig(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, 
Duration.ZERO)
                 .addInput(BasicTypeInfo.STRING_TYPE_INFO)
                 .addSourceInput(
                         new SourceOperatorFactory<>(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
index a90802bd661..5046d3f4930 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -228,11 +229,11 @@ class StreamTaskFinalCheckpointsTest {
         StreamTaskMailboxTestHarness<String> testHarness =
                 testHarnessBuilder
                         .addInput(STRING_TYPE_INFO, 3)
+                        .addJobConfig(
+                                CheckpointingOptions.ENABLE_UNALIGNED, 
enableUnalignedCheckpoint)
                         .modifyStreamConfig(
                                 config -> {
                                     config.setCheckpointingEnabled(true);
-                                    config.setUnalignedCheckpointsEnabled(
-                                            enableUnalignedCheckpoint);
                                 })
                         .setCheckpointResponder(checkpointResponder)
                         .setupOperatorChain(new EmptyOperator())
@@ -848,8 +849,7 @@ class StreamTaskFinalCheckpointsTest {
         try (StreamTaskMailboxTestHarness<String> harness =
                 new StreamTaskMailboxTestHarnessBuilder<>(
                                 OneInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO)
-                        .modifyStreamConfig(
-                                streamConfig -> 
streamConfig.setUnalignedCheckpointsEnabled(true))
+                        .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, 
true)
                         .addInput(BasicTypeInfo.STRING_TYPE_INFO, 3)
                         .setCollectNetworkEvents()
                         .setTaskStateSnapshot(1, 
TaskStateSnapshot.FINISHED_ON_RESTORE)
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index f5098c5b971..287c0f4ca75 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
@@ -130,6 +131,12 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
         return this;
     }
 
+    public <T> StreamTaskMailboxTestHarnessBuilder<OUT> addJobConfig(
+            ConfigOption<T> option, T value) {
+        jobConfig.set(option, value);
+        return this;
+    }
+
     public <T> StreamTaskMailboxTestHarnessBuilder<OUT> setCheckpointResponder(
             CheckpointResponder checkpointResponder) {
         this.checkpointResponder = checkpointResponder;


Reply via email to