This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c3190b7bce5 [FLINK-37434][state/forst] Make incremental checkpoint 
configurable in ForStSyncKeyedStateBackend (#26267)
c3190b7bce5 is described below

commit c3190b7bce51dc1137ecefaa90e0039fb4465347
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Fri Mar 7 15:38:58 2025 +0800

    [FLINK-37434][state/forst] Make incremental checkpoint configurable in 
ForStSyncKeyedStateBackend (#26267)
---
 .../org/apache/flink/state/forst/ForStStateBackend.java   | 15 +++++++++++++++
 .../forst/sync/ForStSyncKeyedStateBackendBuilder.java     |  6 ++++++
 .../apache/flink/state/forst/ForStStateBackendTest.java   | 13 +++++++++++++
 3 files changed, 34 insertions(+)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
index 5d9b3d69ab7..6dc4dba156d 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DescribedEnum;
@@ -115,6 +116,9 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
 
     // -- configuration values, set in the application / configuration
 
+    /** This determines if incremental checkpointing is enabled. */
+    private final TernaryBoolean enableIncrementalCheckpointing;
+
     /**
      * Base paths for ForSt remote directory, as configured. Null if not yet 
set, in which case the
      * configuration values will be used. The configuration will fallback to 
local directory by
@@ -190,6 +194,7 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
 
     /** Creates a new {@code ForStStateBackend} for storing state. */
     public ForStStateBackend() {
+        this.enableIncrementalCheckpointing = TernaryBoolean.UNDEFINED;
         this.nativeMetricOptions = new ForStNativeMetricOptions();
         this.memoryConfiguration = new ForStMemoryConfiguration();
         this.priorityQueueConfig = new ForStPriorityQueueConfig();
@@ -209,6 +214,9 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
      */
     private ForStStateBackend(
             ForStStateBackend original, ReadableConfig config, ClassLoader 
classLoader) {
+        this.enableIncrementalCheckpointing =
+                original.enableIncrementalCheckpointing.resolveUndefined(
+                        
config.get(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));
         this.memoryConfiguration =
                 ForStMemoryConfiguration.fromOtherAndConfiguration(
                         original.memoryConfiguration, config);
@@ -558,6 +566,7 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
                                 parameters.getStateHandles(),
                                 keyGroupCompressionDecorator,
                                 parameters.getCancelStreamRegistry())
+                        
.setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled())
                         .setNativeMetricOptions(
                                 
resourceContainer.getMemoryWatcherOptions(nativeMetricOptions))
                         .setOverlapFractionThreshold(
@@ -637,6 +646,12 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
         return optionsFactory;
     }
 
+    /** Gets whether incremental checkpoints are enabled for this state 
backend. */
+    public boolean isIncrementalCheckpointsEnabled() {
+        return enableIncrementalCheckpointing.getOrDefault(
+                CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue());
+    }
+
     @Override
     public boolean supportsNoClaimRestoreMode() {
         // Both ForStSyncKeyedStateBackend and ForStKeyedStateBackend support 
no claim mode.
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
index 92bcb0bb6fe..54ced97fa22 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
@@ -245,6 +245,12 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
         this.injectedDefaultColumnFamilyHandle = 
injectedDefaultColumnFamilyHandle;
     }
 
+    public ForStSyncKeyedStateBackendBuilder<K> 
setEnableIncrementalCheckpointing(
+            boolean enableIncrementalCheckpointing) {
+        this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
+        return this;
+    }
+
     public ForStSyncKeyedStateBackendBuilder<K> setNativeMetricOptions(
             ForStNativeMetricOptions nativeMetricOptions) {
         this.nativeMetricOptions = nativeMetricOptions;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
index 39820215491..4c4d853d70a 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.state.forst;
 
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStorage;
@@ -31,6 +32,7 @@ import 
org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.function.SupplierWithException;
 
+import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -40,6 +42,7 @@ import java.util.List;
 
 import static 
org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
 import static org.apache.flink.state.forst.ForStOptions.LOCAL_DIRECTORIES;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the partitioned state part of {@link ForStStateBackendTest}. */
 @ExtendWith(ParameterizedTestExtension.class)
@@ -99,4 +102,14 @@ class ForStStateBackendTest extends 
StateBackendTestBase<ForStStateBackend> {
     protected boolean isSafeToReuseKVState() {
         return true;
     }
+
+    @TestTemplate
+    void testConfiguration() throws Exception {
+        ForStStateBackend backend = new ForStStateBackend();
+        assertThat(backend.isIncrementalCheckpointsEnabled()).isFalse();
+        Configuration config = new Configuration();
+        config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
+        backend = backend.configure(config, 
Thread.currentThread().getContextClassLoader());
+        assertThat(backend.isIncrementalCheckpointsEnabled()).isTrue();
+    }
 }

Reply via email to