This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c3190b7bce5 [FLINK-37434][state/forst] Make incremental checkpoint configurable in ForStSyncKeyedStateBackend (#26267) c3190b7bce5 is described below commit c3190b7bce51dc1137ecefaa90e0039fb4465347 Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Fri Mar 7 15:38:58 2025 +0800 [FLINK-37434][state/forst] Make incremental checkpoint configurable in ForStSyncKeyedStateBackend (#26267) --- .../org/apache/flink/state/forst/ForStStateBackend.java | 15 +++++++++++++++ .../forst/sync/ForStSyncKeyedStateBackendBuilder.java | 6 ++++++ .../apache/flink/state/forst/ForStStateBackendTest.java | 13 +++++++++++++ 3 files changed, 34 insertions(+) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java index 5d9b3d69ab7..6dc4dba156d 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DescribedEnum; @@ -115,6 +116,9 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend // -- configuration values, set in the application / configuration + /** This determines if incremental checkpointing is enabled. */ + private final TernaryBoolean enableIncrementalCheckpointing; + /** * Base paths for ForSt remote directory, as configured. Null if not yet set, in which case the * configuration values will be used. The configuration will fallback to local directory by @@ -190,6 +194,7 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend /** Creates a new {@code ForStStateBackend} for storing state. */ public ForStStateBackend() { + this.enableIncrementalCheckpointing = TernaryBoolean.UNDEFINED; this.nativeMetricOptions = new ForStNativeMetricOptions(); this.memoryConfiguration = new ForStMemoryConfiguration(); this.priorityQueueConfig = new ForStPriorityQueueConfig(); @@ -209,6 +214,9 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend */ private ForStStateBackend( ForStStateBackend original, ReadableConfig config, ClassLoader classLoader) { + this.enableIncrementalCheckpointing = + original.enableIncrementalCheckpointing.resolveUndefined( + config.get(CheckpointingOptions.INCREMENTAL_CHECKPOINTS)); this.memoryConfiguration = ForStMemoryConfiguration.fromOtherAndConfiguration( original.memoryConfiguration, config); @@ -558,6 +566,7 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend parameters.getStateHandles(), keyGroupCompressionDecorator, parameters.getCancelStreamRegistry()) + .setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled()) .setNativeMetricOptions( resourceContainer.getMemoryWatcherOptions(nativeMetricOptions)) .setOverlapFractionThreshold( @@ -637,6 +646,12 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend return optionsFactory; } + /** Gets whether incremental checkpoints are enabled for this state backend. */ + public boolean isIncrementalCheckpointsEnabled() { + return enableIncrementalCheckpointing.getOrDefault( + CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue()); + } + @Override public boolean supportsNoClaimRestoreMode() { // Both ForStSyncKeyedStateBackend and ForStKeyedStateBackend support no claim mode. diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java index 92bcb0bb6fe..54ced97fa22 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java @@ -245,6 +245,12 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack this.injectedDefaultColumnFamilyHandle = injectedDefaultColumnFamilyHandle; } + public ForStSyncKeyedStateBackendBuilder<K> setEnableIncrementalCheckpointing( + boolean enableIncrementalCheckpointing) { + this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; + return this; + } + public ForStSyncKeyedStateBackendBuilder<K> setNativeMetricOptions( ForStNativeMetricOptions nativeMetricOptions) { this.nativeMetricOptions = nativeMetricOptions; diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java index 39820215491..4c4d853d70a 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java @@ -18,6 +18,7 @@ package org.apache.flink.state.forst; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.CheckpointStorage; @@ -31,6 +32,7 @@ import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.function.SupplierWithException; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; @@ -40,6 +42,7 @@ import java.util.List; import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.apache.flink.state.forst.ForStOptions.LOCAL_DIRECTORIES; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the partitioned state part of {@link ForStStateBackendTest}. */ @ExtendWith(ParameterizedTestExtension.class) @@ -99,4 +102,14 @@ class ForStStateBackendTest extends StateBackendTestBase<ForStStateBackend> { protected boolean isSafeToReuseKVState() { return true; } + + @TestTemplate + void testConfiguration() throws Exception { + ForStStateBackend backend = new ForStStateBackend(); + assertThat(backend.isIncrementalCheckpointsEnabled()).isFalse(); + Configuration config = new Configuration(); + config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); + backend = backend.configure(config, Thread.currentThread().getContextClassLoader()); + assertThat(backend.isIncrementalCheckpointsEnabled()).isTrue(); + } }