This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c5012c1e7670471f2844c6b96a85a1ab0d8c6263 Author: Rui Fan <1996fan...@gmail.com> AuthorDate: Tue Jun 24 10:52:06 2025 +0200 [FLINK-37985][conf] Refactor public config options from StreamConfig to JobConfiguration --- .../operators/BroadcastStateBootstrapOperator.java | 5 ++- .../operators/KeyedStateBootstrapOperator.java | 5 ++- .../output/operators/StateBootstrapOperator.java | 5 ++- .../operators/StateBootstrapWrapperOperator.java | 5 ++- .../flink/streaming/api/graph/StreamConfig.java | 51 +++------------------- .../api/graph/StreamingJobGraphGenerator.java | 7 +-- .../api/operators/AbstractStreamOperator.java | 15 ++++--- .../api/operators/AbstractStreamOperatorV2.java | 10 +++-- .../io/checkpointing/InputProcessorUtil.java | 8 +++- .../runtime/tasks/MultipleInputStreamTask.java | 1 + .../runtime/tasks/OneInputStreamTask.java | 1 + .../streaming/runtime/tasks/SourceStreamTask.java | 10 ++++- .../flink/streaming/runtime/tasks/StreamTask.java | 16 +++++-- .../runtime/tasks/TwoInputStreamTask.java | 1 + .../io/checkpointing/InputProcessorUtilTest.java | 4 +- .../api/graph/JobGraphGeneratorTestBase.java | 15 ++++--- ...nalignedCheckpointsInterruptibleTimersTest.java | 9 +++- ...tStreamTaskChainedSourcesCheckpointingTest.java | 19 ++++---- .../runtime/tasks/MultipleInputStreamTaskTest.java | 17 ++++---- .../tasks/StreamTaskFinalCheckpointsTest.java | 8 ++-- .../tasks/StreamTaskMailboxTestHarnessBuilder.java | 7 +++ 21 files changed, 113 insertions(+), 106 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java index 8dfb6d3e124..94641b4b601 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java @@ -21,6 +21,7 @@ package org.apache.flink.state.api.output.operators; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction; import org.apache.flink.state.api.output.SnapshotUtils; @@ -84,7 +85,9 @@ public class BroadcastStateBootstrapOperator<IN> getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), timestamp, getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(), - getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(), + getContainingTask() + .getJobConfiguration() + .get(CheckpointingOptions.ENABLE_UNALIGNED), getContainingTask().getConfiguration().getConfiguration(), savepointPath); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java index 6f4b4a1c223..52755d76e57 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java @@ -19,6 +19,7 @@ package org.apache.flink.state.api.output.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; @@ -99,7 +100,9 @@ public class KeyedStateBootstrapOperator<K, IN> getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), timestamp, getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(), - getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(), + getContainingTask() + .getJobConfiguration() + .get(CheckpointingOptions.ENABLE_UNALIGNED), getContainingTask().getConfiguration().getConfiguration(), savepointPath); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java index 149aa938b02..07e609ce8fa 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java @@ -19,6 +19,7 @@ package org.apache.flink.state.api.output.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.state.api.functions.StateBootstrapFunction; import org.apache.flink.state.api.output.SnapshotUtils; @@ -80,7 +81,9 @@ public class StateBootstrapOperator<IN> getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), timestamp, getContainingTask().getConfiguration().isExactlyOnceCheckpointMode(), - getContainingTask().getConfiguration().isUnalignedCheckpointsEnabled(), + getContainingTask() + .getJobConfiguration() + .get(CheckpointingOptions.ENABLE_UNALIGNED), getContainingTask().getConfiguration().getConfiguration(), savepointPath); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java index 4170c8f66f6..8c7e68f8bce 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java @@ -19,6 +19,7 @@ package org.apache.flink.state.api.output.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -189,8 +190,8 @@ public final class StateBootstrapWrapperOperator< .getConfiguration() .isExactlyOnceCheckpointMode(), operator.getContainingTask() - .getConfiguration() - .isUnalignedCheckpointsEnabled(), + .getJobConfiguration() + .get(CheckpointingOptions.ENABLE_UNALIGNED), operator.getContainingTask().getConfiguration().getConfiguration(), savepointPath); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 0f856cebcfe..8e475f883b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.attribute.Attribute; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; @@ -47,7 +46,6 @@ import org.apache.flink.util.concurrent.FutureUtils; import java.io.IOException; import java.io.Serializable; -import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -68,6 +66,11 @@ import static org.apache.flink.util.Preconditions.checkState; /** * Internal configuration for a {@link StreamOperator}. This is created and populated by the {@link * StreamingJobGraphGenerator}. + * + * <p>NOTE TO IMPLEMENTERS: Please do not set public ConfigOption to this class. Use the job + * Configuration instead! See {@link + * org.apache.flink.configuration.CheckpointingOptions#ENABLE_UNALIGNED} for a reference + * implementation. */ @Internal public class StreamConfig implements Serializable { @@ -555,54 +558,10 @@ public class StreamConfig implements Serializable { } } - public void setUnalignedCheckpointsEnabled(boolean enabled) { - config.set(CheckpointingOptions.ENABLE_UNALIGNED, enabled); - } - - public boolean isUnalignedCheckpointsEnabled() { - return config.get(CheckpointingOptions.ENABLE_UNALIGNED, false); - } - - public void setUnalignedCheckpointsSplittableTimersEnabled(boolean enabled) { - config.set(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, enabled); - } - - public boolean isUnalignedCheckpointsSplittableTimersEnabled() { - return config.get(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS); - } - public boolean isExactlyOnceCheckpointMode() { return getCheckpointMode() == CheckpointingMode.EXACTLY_ONCE; } - public Duration getAlignedCheckpointTimeout() { - return config.get(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT); - } - - public void setAlignedCheckpointTimeout(Duration alignedCheckpointTimeout) { - config.set(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, alignedCheckpointTimeout); - } - - public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) { - config.set(CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, maxConcurrentCheckpoints); - } - - public int getMaxConcurrentCheckpoints() { - return config.get( - CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, - CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.defaultValue()); - } - - public int getMaxSubtasksPerChannelStateFile() { - return config.get(CheckpointingOptions.UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE); - } - - public void setMaxSubtasksPerChannelStateFile(int maxSubtasksPerChannelStateFile) { - config.set( - CheckpointingOptions.UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE, - maxSubtasksPerChannelStateFile); - } - /** * Sets the job vertex level non-chained outputs. The given output list must have the same order * with {@link JobVertex#getProducedDataSets()}. diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 2cc54a7496e..767ecf639b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -531,6 +531,7 @@ public class StreamingJobGraphGenerator { if (checkpointConfig.isUnalignedCheckpointsEnabled() && streamGraph.getCheckpointingMode() != CheckpointingMode.EXACTLY_ONCE) { LOG.warn("Unaligned checkpoints can only be used with checkpointing mode EXACTLY_ONCE"); + streamGraph.getJobConfiguration().set(CheckpointingOptions.ENABLE_UNALIGNED, false); checkpointConfig.enableUnalignedCheckpoints(false); } } @@ -1257,12 +1258,6 @@ public class StreamingJobGraphGenerator { CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, streamGraph.isEnableCheckpointsAfterTasksFinish()); config.setCheckpointMode(StreamGraph.getCheckpointingMode(checkpointCfg)); - config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled()); - config.setUnalignedCheckpointsSplittableTimersEnabled( - checkpointCfg.isUnalignedCheckpointsInterruptibleTimersEnabled()); - config.setAlignedCheckpointTimeout(checkpointCfg.getAlignedCheckpointTimeout()); - config.setMaxSubtasksPerChannelStateFile(checkpointCfg.getMaxSubtasksPerChannelStateFile()); - config.setMaxConcurrentCheckpoints(checkpointCfg.getMaxConcurrentCheckpoints()); for (int i = 0; i < vertex.getStatePartitioners().length; i++) { config.setStatePartitioner(i, vertex.getStatePartitioners()[i]); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 5978da122d2..f6244752710 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.core.fs.CloseableRegistry; @@ -348,9 +349,9 @@ public abstract class AbstractStreamOperator<OUT> * option is enabled. By default, splittable timers are disabled. * * @return {@code true} if splittable timers should be used (subject to {@link - * StreamConfig#isUnalignedCheckpointsEnabled()} and {@link - * StreamConfig#isUnalignedCheckpointsSplittableTimersEnabled()}. {@code false} if - * splittable timers should never be used. + * CheckpointingOptions#ENABLE_UNALIGNED} and {@link + * CheckpointingOptions#ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS}. {@code false} if splittable + * timers should never be used. */ @Internal public boolean useSplittableTimers() { @@ -359,13 +360,13 @@ public abstract class AbstractStreamOperator<OUT> @Internal private boolean areSplittableTimersConfigured() { - return areSplittableTimersConfigured(config); + return areSplittableTimersConfigured(config, getContainingTask().getJobConfiguration()); } - static boolean areSplittableTimersConfigured(StreamConfig config) { + static boolean areSplittableTimersConfigured(StreamConfig config, Configuration conf) { return config.isCheckpointingEnabled() - && config.isUnalignedCheckpointsEnabled() - && config.isUnalignedCheckpointsSplittableTimersEnabled(); + && conf.get(CheckpointingOptions.ENABLE_UNALIGNED) + && conf.get(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index 31da01700f5..174c7ad744c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.core.fs.CloseableRegistry; @@ -246,9 +247,9 @@ public abstract class AbstractStreamOperatorV2<OUT> * option is enabled. By default, splittable timers are disabled. * * @return {@code true} if splittable timers should be used (subject to {@link - * StreamConfig#isUnalignedCheckpointsEnabled()} and {@link - * StreamConfig#isUnalignedCheckpointsSplittableTimersEnabled()}. {@code false} if - * splittable timers should never be used. + * CheckpointingOptions#ENABLE_UNALIGNED} and {@link + * CheckpointingOptions#ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS}. {@code false} if splittable + * timers should never be used. */ @Internal public boolean useSplittableTimers() { @@ -257,7 +258,8 @@ public abstract class AbstractStreamOperatorV2<OUT> @Internal private boolean areSplittableTimersConfigured() { - return AbstractStreamOperator.areSplittableTimersConfigured(config); + return AbstractStreamOperator.areSplittableTimersConfigured( + config, runtimeContext.getJobConfiguration()); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java index 434cab13684..0b82e8fd959 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtil.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io.checkpointing; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; @@ -78,6 +79,7 @@ public class InputProcessorUtil { public static CheckpointBarrierHandler createCheckpointBarrierHandler( CheckpointableTask toNotifyOnCheckpoint, + Configuration jobConf, StreamConfig config, SubtaskCheckpointCoordinator checkpointCoordinator, String taskName, @@ -103,6 +105,7 @@ public class InputProcessorUtil { .sum(); return createBarrierHandler( toNotifyOnCheckpoint, + jobConf, config, checkpointCoordinator, taskName, @@ -112,7 +115,7 @@ public class InputProcessorUtil { clock, numberOfChannels); case AT_LEAST_ONCE: - if (config.isUnalignedCheckpointsEnabled()) { + if (jobConf.get(CheckpointingOptions.ENABLE_UNALIGNED)) { throw new IllegalStateException( "Cannot use unaligned checkpoints with AT_LEAST_ONCE " + "checkpointing mode"); @@ -135,6 +138,7 @@ public class InputProcessorUtil { private static SingleCheckpointBarrierHandler createBarrierHandler( CheckpointableTask toNotifyOnCheckpoint, + Configuration jobConf, StreamConfig config, SubtaskCheckpointCoordinator checkpointCoordinator, String taskName, @@ -146,7 +150,7 @@ public class InputProcessorUtil { boolean enableCheckpointAfterTasksFinished = config.getConfiguration() .get(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH); - if (config.isUnalignedCheckpointsEnabled()) { + if (jobConf.get(CheckpointingOptions.ENABLE_UNALIGNED)) { return SingleCheckpointBarrierHandler.alternating( taskName, toNotifyOnCheckpoint, diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java index 390de7f5e0a..6e45024e1ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java @@ -142,6 +142,7 @@ public class MultipleInputStreamTask<OUT> checkpointBarrierHandler = InputProcessorUtil.createCheckpointBarrierHandler( this, + getJobConfiguration(), getConfiguration(), getCheckpointCoordinator(), getTaskNameWithSubtaskAndId(), diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 9025210b8e0..68f49f4fce0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -166,6 +166,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO checkpointBarrierHandler = InputProcessorUtil.createCheckpointBarrierHandler( this, + getJobConfiguration(), configuration, getCheckpointCoordinator(), getTaskNameWithSubtaskAndId(), diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 753dae5c964..343870cde42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -19,6 +19,8 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; @@ -134,13 +136,17 @@ public class SourceStreamTask< // between the trigger // TODO - message from the master, and the source's trigger // notification + Configuration jobConf = getJobConfiguration(); final CheckpointOptions checkpointOptions = CheckpointOptions.forConfig( CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault(), configuration.isExactlyOnceCheckpointMode(), - configuration.isUnalignedCheckpointsEnabled(), - configuration.getAlignedCheckpointTimeout().toMillis()); + jobConf.get(CheckpointingOptions.ENABLE_UNALIGNED), + jobConf.get( + CheckpointingOptions + .ALIGNED_CHECKPOINT_TIMEOUT) + .toMillis()); final long timestamp = System.currentTimeMillis(); final CheckpointMetaData checkpointMetaData = diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 8457d1a9817..444acb3ef1a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -445,7 +445,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> getEnvironment().getJobID(), new ThreadPoolExecutor( 0, - configuration.getMaxConcurrentCheckpoints() + 1, + getJobConfiguration() + .get( + CheckpointingOptions + .MAX_CONCURRENT_CHECKPOINTS) + + 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), @@ -488,7 +492,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> final CheckpointStorageAccess finalCheckpointStorageAccess = checkpointStorageAccess; ChannelStateWriter channelStateWriter = - configuration.isUnalignedCheckpointsEnabled() + getJobConfiguration().get(CheckpointingOptions.ENABLE_UNALIGNED) ? openChannelStateWriter( getName(), // Note: don't pass checkpointStorageAccess directly to channel @@ -513,7 +517,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } }, environment, - configuration.getMaxSubtasksPerChannelStateFile()) + getJobConfiguration() + .get( + CheckpointingOptions + .UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE)) : ChannelStateWriter.NO_OP; this.subtaskCheckpointCoordinator = new SubtaskCheckpointCoordinatorImpl( @@ -524,7 +531,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> environment, this, this::prepareInputSnapshot, - configuration.getMaxConcurrentCheckpoints(), + getJobConfiguration() + .get(CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS), channelStateWriter, configuration .getConfiguration() diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index e64870c1c75..f933d5069d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -65,6 +65,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTas checkpointBarrierHandler = InputProcessorUtil.createCheckpointBarrierHandler( this, + getJobConfiguration(), configuration, getCheckpointCoordinator(), getTaskNameWithSubtaskAndId(), diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java index 60046ec4fad..74038bbb413 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.io.checkpointing; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -59,9 +60,9 @@ class InputProcessorUtilTest { try (CloseableRegistry registry = new CloseableRegistry()) { MockEnvironment environment = new MockEnvironmentBuilder().build(); MockStreamTask streamTask = new MockStreamTaskBuilder(environment).build(); + streamTask.getJobConfiguration().set(CheckpointingOptions.ENABLE_UNALIGNED, true); StreamConfig streamConfig = new StreamConfig(environment.getJobConfiguration()); streamConfig.setCheckpointMode(CheckpointingMode.EXACTLY_ONCE); - streamConfig.setUnalignedCheckpointsEnabled(true); // First input gate has index larger than the second List<IndexedInputGate>[] inputGates = @@ -73,6 +74,7 @@ class InputProcessorUtilTest { CheckpointBarrierHandler barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler( streamTask, + streamTask.getJobConfiguration(), streamConfig, new TestSubtaskCheckpointCoordinator(new MockChannelStateWriter()), streamTask.getName(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java index dd29081dd6e..d8c7fc3cd13 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java @@ -48,6 +48,7 @@ import org.apache.flink.api.connector.source.lib.NumberSequenceSource; import org.apache.flink.api.connector.source.mocks.MockSource; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptions; @@ -256,19 +257,21 @@ abstract class JobGraphGeneratorTestBase { @Test void testEnabledUnalignedCheckAndDisabledCheckpointing() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getCheckpointConfig().enableUnalignedCheckpoints(true); env.fromData(0).print(); StreamGraph streamGraph = env.getStreamGraph(); assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled()) .withFailMessage("Checkpointing enabled") .isFalse(); - env.getCheckpointConfig().enableUnalignedCheckpoints(true); JobGraph jobGraph = createJobGraph(streamGraph); List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration()); assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE); - assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse(); + assertThat(streamGraph.getCheckpointConfig().isUnalignedCheckpointsEnabled()).isFalse(); + assertThat(jobGraph.getJobConfiguration().get(CheckpointingOptions.ENABLE_UNALIGNED)) + .isFalse(); } @Test @@ -417,17 +420,19 @@ abstract class JobGraphGeneratorTestBase { @Test void testUnalignedCheckAndAtLeastOnce() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.fromData(0).print(); - StreamGraph streamGraph = env.getStreamGraph(); env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE); env.getCheckpointConfig().enableUnalignedCheckpoints(true); + env.fromData(0).print(); + StreamGraph streamGraph = env.getStreamGraph(); JobGraph jobGraph = createJobGraph(streamGraph); List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration()); assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE); - assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse(); + assertThat(streamGraph.getCheckpointConfig().isUnalignedCheckpointsEnabled()).isFalse(); + assertThat(jobGraph.getJobConfiguration().get(CheckpointingOptions.ENABLE_UNALIGNED)) + .isFalse(); } @Test diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java index a5e3712ec76..140e6b62e6c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io.checkpointing; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimer; @@ -67,6 +68,9 @@ class UnalignedCheckpointsInterruptibleTimersTest { try (final StreamTaskMailboxTestHarness<String> harness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, Types.STRING) + .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, true) + .addJobConfig( + CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, true) .modifyStreamConfig( UnalignedCheckpointsInterruptibleTimersTest::setupStreamConfig) .addInput(Types.STRING) @@ -106,6 +110,9 @@ class UnalignedCheckpointsInterruptibleTimersTest { try (final StreamTaskMailboxTestHarness<String> harness = new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, Types.STRING) + .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, true) + .addJobConfig( + CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, true) .modifyStreamConfig( UnalignedCheckpointsInterruptibleTimersTest::setupStreamConfig) .addInput(Types.STRING) @@ -148,8 +155,6 @@ class UnalignedCheckpointsInterruptibleTimersTest { } private static void setupStreamConfig(StreamConfig cfg) { - cfg.setUnalignedCheckpointsEnabled(true); - cfg.setUnalignedCheckpointsSplittableTimersEnabled(true); cfg.setStateKeySerializer(StringSerializer.INSTANCE); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java index 92ab98e32b6..689233b4e4f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java @@ -25,6 +25,8 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.mocks.MockSource; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -384,13 +386,11 @@ class MultipleInputStreamTaskChainedSourcesCheckpointingTest { try (StreamTaskMailboxTestHarness<String> testHarness = new StreamTaskMailboxTestHarnessBuilder<>( MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) - .modifyStreamConfig( - config -> { - config.setCheckpointingEnabled(true); - config.setUnalignedCheckpointsEnabled( - checkpointOptions.isUnalignedCheckpoint() - || checkpointOptions.isTimeoutable()); - }) + .addJobConfig( + CheckpointingOptions.ENABLE_UNALIGNED, + checkpointOptions.isUnalignedCheckpoint() + || checkpointOptions.isTimeoutable()) + .modifyStreamConfig(config -> config.setCheckpointingEnabled(true)) .modifyExecutionConfig(applyObjectReuse(objectReuse)) .setCheckpointResponder(checkpointResponder) .addInput(BasicTypeInfo.INT_TYPE_INFO) @@ -548,13 +548,14 @@ class MultipleInputStreamTaskChainedSourcesCheckpointingTest { private CheckpointBarrier createBarrier(StreamTaskMailboxTestHarness<String> testHarness) { StreamConfig config = testHarness.getStreamTask().getConfiguration(); + Configuration jobConf = testHarness.getStreamTask().getJobConfiguration(); CheckpointOptions checkpointOptions = CheckpointOptions.forConfig( CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault(), config.isExactlyOnceCheckpointMode(), - config.isUnalignedCheckpointsEnabled(), - config.getAlignedCheckpointTimeout().toMillis()); + jobConf.get(CheckpointingOptions.ENABLE_UNALIGNED), + jobConf.get(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT).toMillis()); return new CheckpointBarrier( metaData.getCheckpointId(), metaData.getTimestamp(), checkpointOptions); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java index 479cf3d7b6b..f63788aabc2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java @@ -37,6 +37,7 @@ import org.apache.flink.api.connector.source.mocks.MockSource; import org.apache.flink.api.connector.source.mocks.MockSourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -1007,13 +1008,11 @@ class MultipleInputStreamTaskTest { .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO) .addAdditionalOutput(partitionWriters) .setCheckpointResponder(checkpointResponder) - .modifyStreamConfig( - config -> { - config.setCheckpointingEnabled(true); - config.setUnalignedCheckpointsEnabled( - checkpointOptions.isUnalignedCheckpoint() - || checkpointOptions.isTimeoutable()); - }) + .addJobConfig( + CheckpointingOptions.ENABLE_UNALIGNED, + checkpointOptions.isUnalignedCheckpoint() + || checkpointOptions.isTimeoutable()) + .modifyStreamConfig(config -> config.setCheckpointingEnabled(true)) .setupOperatorChain(new MapToStringMultipleInputOperatorFactory(3)) .finishForSingletonOperatorChain(StringSerializer.INSTANCE) .build()) { @@ -1293,8 +1292,8 @@ class MultipleInputStreamTaskTest { return new StreamTaskMailboxTestHarnessBuilder<>( MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) .modifyExecutionConfig(applyObjectReuse(objectReuse)) - .modifyStreamConfig(config -> config.setUnalignedCheckpointsEnabled(unaligned)) - .modifyStreamConfig(config -> config.setAlignedCheckpointTimeout(Duration.ZERO)) + .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, unaligned) + .addJobConfig(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, Duration.ZERO) .addInput(BasicTypeInfo.STRING_TYPE_INFO) .addSourceInput( new SourceOperatorFactory<>( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java index a90802bd661..5046d3f4930 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; @@ -228,11 +229,11 @@ class StreamTaskFinalCheckpointsTest { StreamTaskMailboxTestHarness<String> testHarness = testHarnessBuilder .addInput(STRING_TYPE_INFO, 3) + .addJobConfig( + CheckpointingOptions.ENABLE_UNALIGNED, enableUnalignedCheckpoint) .modifyStreamConfig( config -> { config.setCheckpointingEnabled(true); - config.setUnalignedCheckpointsEnabled( - enableUnalignedCheckpoint); }) .setCheckpointResponder(checkpointResponder) .setupOperatorChain(new EmptyOperator()) @@ -848,8 +849,7 @@ class StreamTaskFinalCheckpointsTest { try (StreamTaskMailboxTestHarness<String> harness = new StreamTaskMailboxTestHarnessBuilder<>( OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) - .modifyStreamConfig( - streamConfig -> streamConfig.setUnalignedCheckpointsEnabled(true)) + .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, true) .addInput(BasicTypeInfo.STRING_TYPE_INFO, 3) .setCollectNetworkEvents() .setTaskStateSnapshot(1, TaskStateSnapshot.FINISHED_ON_RESTORE) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java index f5098c5b971..287c0f4ca75 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.execution.Environment; @@ -130,6 +131,12 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> { return this; } + public <T> StreamTaskMailboxTestHarnessBuilder<OUT> addJobConfig( + ConfigOption<T> option, T value) { + jobConfig.set(option, value); + return this; + } + public <T> StreamTaskMailboxTestHarnessBuilder<OUT> setCheckpointResponder( CheckpointResponder checkpointResponder) { this.checkpointResponder = checkpointResponder;