This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bc96001656 [FLINK-37620][state/forst] ForSt Sync mode support remote 
storage (#26412)
3bc96001656 is described below

commit 3bc96001656cda6ccaeda22741c75e035505ec1b
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Wed Apr 9 10:32:51 2025 +0800

    [FLINK-37620][state/forst] ForSt Sync mode support remote storage (#26412)
---
 .../docs/ops/state/disaggregated_state.md          |  14 +++
 docs/content/docs/ops/state/disaggregated_state.md |  14 +++
 .../shortcodes/generated/forst_configuration.html  |   6 ++
 .../generated/state_backend_forst_section.html     |   6 ++
 .../state/forst/ForStKeyedStateBackendBuilder.java |   1 -
 .../org/apache/flink/state/forst/ForStOptions.java |  11 ++
 .../flink/state/forst/ForStStateBackend.java       |  85 +++++++--------
 .../state/forst/sync/ForStPriorityQueueConfig.java |  18 ++--
 .../forst/sync/ForStSyncKeyedStateBackend.java     | 115 +++++++++------------
 .../sync/ForStSyncKeyedStateBackendBuilder.java    |  72 +++++--------
 .../flink/state/forst/ForStStateBackendTest.java   |  41 ++++++++
 11 files changed, 219 insertions(+), 164 deletions(-)

diff --git a/docs/content.zh/docs/ops/state/disaggregated_state.md 
b/docs/content.zh/docs/ops/state/disaggregated_state.md
index e59a765766b..d18458cef71 100644
--- a/docs/content.zh/docs/ops/state/disaggregated_state.md
+++ b/docs/content.zh/docs/ops/state/disaggregated_state.md
@@ -150,6 +150,20 @@ state.backend.forst.primary-dir: 
s3://your-bucket/forst-state
 checkpoint and fast recovery, since the ForSt will perform file copy between 
the primary
 storage location and the checkpoint directory during checkpointing and 
recovery.
 
+#### ForSt Local Storage Location
+
+By default, ForSt will **ONLY** disaggregate state when asynchronous APIs 
(State V2) are used. When 
+using synchronous state APIs in DataStream and SQL jobs, ForSt will only serve 
as **local state store**. 
+Since a job may contain multiple ForSt instances with mixed API usage, 
synchronous local state access 
+along with asynchronous remote state access could help achieve better overall 
throughput.
+If you want the operators with synchronous state APIs to store state in 
remote, the following configuration will help:
+```yaml
+state.backend.forst.sync.enforce-local: false
+```
+And you can specify the local storage location via:
+```yaml
+state.backend.forst.local-dir: path-to-local-dir
+```
 
 #### ForSt File Cache
 
diff --git a/docs/content/docs/ops/state/disaggregated_state.md 
b/docs/content/docs/ops/state/disaggregated_state.md
index e59a765766b..5d2a757f186 100644
--- a/docs/content/docs/ops/state/disaggregated_state.md
+++ b/docs/content/docs/ops/state/disaggregated_state.md
@@ -150,6 +150,20 @@ state.backend.forst.primary-dir: 
s3://your-bucket/forst-state
 checkpoint and fast recovery, since the ForSt will perform file copy between 
the primary
 storage location and the checkpoint directory during checkpointing and 
recovery.
 
+#### ForSt Local Storage Location
+
+By default, ForSt will **ONLY** disaggregate state when asynchronous APIs 
(State V2) are used. When
+using synchronous state APIs in DataStream and SQL jobs, ForSt will only serve 
as **local state store**.
+Since a job may contain multiple ForSt instances with mixed API usage, 
synchronous local state access
+along with asynchronous remote state access could help achieve better overall 
throughput.
+If you want the operators with synchronous state APIs to store state in 
remote, the following configuration will help:
+```yaml
+state.backend.forst.sync.enforce-local: false
+```
+And you can specify the local storage location via:
+```yaml
+state.backend.forst.local-dir: path-to-local-dir
+```
 
 #### ForSt File Cache
 
diff --git a/docs/layouts/shortcodes/generated/forst_configuration.html 
b/docs/layouts/shortcodes/generated/forst_configuration.html
index 5b5b50ac970..f66c1af4d8b 100644
--- a/docs/layouts/shortcodes/generated/forst_configuration.html
+++ b/docs/layouts/shortcodes/generated/forst_configuration.html
@@ -116,6 +116,12 @@
             <td>String</td>
             <td>The primary directory where ForSt puts its SST files. By 
default, it will be the same as the checkpoint directory. Recognized shortcut 
name is 'checkpoint-dir', which means that ForSt shares the directory with 
checkpoint, and 'local-dir', which means that ForSt will use the local 
directory of TaskManager.</td>
         </tr>
+        <tr>
+            <td><h5>state.backend.forst.sync.enforce-local</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to enforce local state for operators in synchronous 
mode when enabling disaggregated state. This is useful in cases where both 
synchronous operators and asynchronous operators are used in the same job.</td>
+        </tr>
         <tr>
             <td><h5>state.backend.forst.timer-service.cache-size</h5></td>
             <td style="word-wrap: break-word;">128</td>
diff --git a/docs/layouts/shortcodes/generated/state_backend_forst_section.html 
b/docs/layouts/shortcodes/generated/state_backend_forst_section.html
index f4782714f39..6a7ebbfbd93 100644
--- a/docs/layouts/shortcodes/generated/state_backend_forst_section.html
+++ b/docs/layouts/shortcodes/generated/state_backend_forst_section.html
@@ -50,6 +50,12 @@
             <td>String</td>
             <td>The primary directory where ForSt puts its SST files. By 
default, it will be the same as the checkpoint directory. Recognized shortcut 
name is 'checkpoint-dir', which means that ForSt shares the directory with 
checkpoint, and 'local-dir', which means that ForSt will use the local 
directory of TaskManager.</td>
         </tr>
+        <tr>
+            <td><h5>state.backend.forst.sync.enforce-local</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to enforce local state for operators in synchronous 
mode when enabling disaggregated state. This is useful in cases where both 
synchronous operators and asynchronous operators are used in the same job.</td>
+        </tr>
         <tr>
             <td><h5>state.backend.forst.timer-service.cache-size</h5></td>
             <td style="word-wrap: break-word;">128</td>
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
index 30b37ed82b2..bfb2291be32 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
@@ -359,7 +359,6 @@ public class ForStKeyedStateBackendBuilder<K>
         // env. We expect to directly use the dfs directory in flink env or 
local directory as
         // working dir. We will implement this in ForStDB later, but before 
that, we achieved this
         // by setting the dbPath to "/" when the dfs directory existed.
-        // TODO: use localForStPath as dbPath after ForSt Support mixing 
local-dir and remote-dir
         Path instanceForStPath =
                 optionsContainer.getRemoteForStPath() == null
                         ? optionsContainer.getLocalForStPath()
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
index 6d27d9b2a7b..8a816e3e1b1 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
@@ -66,6 +66,17 @@ public class ForStOptions {
                                     CHECKPOINT_DIR_AS_PRIMARY_SHORTCUT,
                                     LOCAL_DIR_AS_PRIMARY_SHORTCUT));
 
+    @Documentation.Section(Documentation.Sections.STATE_BACKEND_FORST)
+    public static final ConfigOption<Boolean> SYNC_ENFORCE_LOCAL =
+            ConfigOptions.key("state.backend.forst.sync.enforce-local")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to enforce local state for operators in 
synchronous mode when"
+                                    + " enabling disaggregated state. This is 
useful in cases where "
+                                    + "both synchronous operators and 
asynchronous operators are used "
+                                    + "in the same job.");
+
     @Documentation.Section(Documentation.Sections.STATE_BACKEND_FORST)
     public static final ConfigOption<String> CACHE_DIRECTORY =
             ConfigOptions.key("state.backend.forst.cache.dir")
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
index 6dc4dba156d..57a8a729f45 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
@@ -188,8 +189,12 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
     /** The recovery claim mode. */
     private RecoveryClaimMode recoveryClaimMode = RecoveryClaimMode.DEFAULT;
 
+    /** Whether to share the ForSt remote directory with checkpoint directory. 
*/
     private boolean remoteShareWithCheckpoint;
 
+    /** Whether to use local directory as primary directory in synchronous 
mode. */
+    private boolean forceSyncLocal;
+
     // ------------------------------------------------------------------------
 
     /** Creates a new {@code ForStStateBackend} for storing state. */
@@ -203,6 +208,7 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
         this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED;
         this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED;
         this.remoteShareWithCheckpoint = false;
+        this.forceSyncLocal = true;
     }
 
     /**
@@ -237,6 +243,7 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
                                 : new Path(remoteDirStr);
             }
         }
+        this.forceSyncLocal = config.get(ForStOptions.SYNC_ENFORCE_LOCAL);
 
         this.priorityQueueConfig =
                 ForStPriorityQueueConfig.fromOtherAndConfiguration(
@@ -409,31 +416,7 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
 
         lazyInitializeForJob(env, fileCompatibleIdentifier);
 
-        String opChildPath =
-                String.format(
-                        "op_%s_attempt_%s",
-                        fileCompatibleIdentifier, 
env.getTaskInfo().getAttemptNumber());
-
-        Path localBasePath =
-                new Path(
-                        new File(new File(getNextStoragePath(), 
jobId.toHexString()), opChildPath)
-                                .getAbsolutePath());
-        Path remoteBasePath = null;
-        if (remoteForStDirectory != null) {
-            remoteBasePath =
-                    new Path(new Path(remoteForStDirectory, 
jobId.toHexString()), opChildPath);
-        } else if (remoteShareWithCheckpoint) {
-            if (env.getCheckpointStorageAccess() instanceof 
FsCheckpointStorageAccess) {
-                Path sharedStateDirectory =
-                        ((FsCheckpointStorageAccess) 
env.getCheckpointStorageAccess())
-                                .getSharedStateDirectory();
-                remoteBasePath = new Path(sharedStateDirectory, opChildPath);
-                LOG.info("Set remote ForSt directory to checkpoint directory 
{}", remoteBasePath);
-            } else {
-                LOG.warn(
-                        "Remote ForSt directory can't be set, because 
checkpoint directory isn't on file system.");
-            }
-        }
+        Tuple2<Path, Path> localAndRemoteBasePath = 
getForStBasePath(fileCompatibleIdentifier, env);
 
         final OpaqueMemoryResource<ForStSharedResources> sharedResources =
                 ForStOperationUtils.allocateSharedCachesIfConfigured(
@@ -448,8 +431,8 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
         final ForStResourceContainer resourceContainer =
                 createOptionsAndResourceContainer(
                         sharedResources,
-                        localBasePath,
-                        remoteBasePath,
+                        localAndRemoteBasePath.f0,
+                        localAndRemoteBasePath.f1,
                         env.getCheckpointStorageAccess(),
                         parameters.getMetricGroup(),
                         nativeMetricOptions.isStatisticsEnabled());
@@ -505,17 +488,7 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
 
         lazyInitializeForJob(env, fileCompatibleIdentifier);
 
-        Path instanceBasePath =
-                new Path(
-                        new File(
-                                        getNextStoragePath(),
-                                        "job_"
-                                                + jobId
-                                                + "_op_"
-                                                + fileCompatibleIdentifier
-                                                + "_uuid_"
-                                                + UUID.randomUUID())
-                                .getAbsolutePath());
+        Tuple2<Path, Path> localAndRemoteBasePath = 
getForStBasePath(fileCompatibleIdentifier, env);
 
         LocalRecoveryConfig localRecoveryConfig =
                 env.getTaskStateManager().createLocalRecoveryConfig();
@@ -533,10 +506,10 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
         final ForStResourceContainer resourceContainer =
                 createOptionsAndResourceContainer(
                         sharedResources,
-                        instanceBasePath,
-                        null,
+                        localAndRemoteBasePath.f0,
+                        forceSyncLocal ? null : localAndRemoteBasePath.f1,
                         env.getCheckpointStorageAccess(),
-                        null,
+                        parameters.getMetricGroup(),
                         nativeMetricOptions.isStatisticsEnabled());
 
         ExecutionConfig executionConfig = env.getExecutionConfig();
@@ -549,7 +522,6 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
                 new ForStSyncKeyedStateBackendBuilder<>(
                                 parameters.getOperatorIdentifier(),
                                 env.getUserCodeClassLoader().asClassLoader(),
-                                instanceBasePath,
                                 resourceContainer,
                                 stateName -> 
resourceContainer.getColumnOptions(),
                                 parameters.getKvStateRegistry(),
@@ -818,6 +790,35 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
         return configuration;
     }
 
+    Tuple2<Path, Path> getForStBasePath(String operatorIdentifier, Environment 
env) {
+        String opChildPath =
+                String.format(
+                        "op_%s_attempt_%s",
+                        operatorIdentifier, 
env.getTaskInfo().getAttemptNumber());
+
+        Path localBasePath =
+                new Path(
+                        new File(new File(getNextStoragePath(), 
jobId.toHexString()), opChildPath)
+                                .getAbsolutePath());
+        Path remoteBasePath = null;
+        if (remoteForStDirectory != null) {
+            remoteBasePath =
+                    new Path(new Path(remoteForStDirectory, 
jobId.toHexString()), opChildPath);
+        } else if (remoteShareWithCheckpoint) {
+            if (env.getCheckpointStorageAccess() instanceof 
FsCheckpointStorageAccess) {
+                Path sharedStateDirectory =
+                        ((FsCheckpointStorageAccess) 
env.getCheckpointStorageAccess())
+                                .getSharedStateDirectory();
+                remoteBasePath = new Path(sharedStateDirectory, opChildPath);
+                LOG.info("Set remote ForSt directory to checkpoint directory 
{}", remoteBasePath);
+            } else {
+                LOG.warn(
+                        "Remote ForSt directory can't be set, because 
checkpoint directory isn't on file system.");
+            }
+        }
+        return Tuple2.of(localBasePath, remoteBasePath);
+    }
+
     @VisibleForTesting
     ForStResourceContainer createOptionsAndResourceContainer(@Nullable Path 
localBasePath) {
         return createOptionsAndResourceContainer(null, localBasePath, null, 
null, null, false);
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStPriorityQueueConfig.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStPriorityQueueConfig.java
index a60907ed7af..afbd3bfe6a6 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStPriorityQueueConfig.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStPriorityQueueConfig.java
@@ -28,7 +28,7 @@ import static 
org.apache.flink.state.forst.ForStOptions.FORST_TIMER_SERVICE_FACT
 import static org.apache.flink.state.forst.ForStOptions.TIMER_SERVICE_FACTORY;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/** The configuration of rocksDB priority queue state implementation. */
+/** The configuration of ForSt priority queue state implementation. */
 public class ForStPriorityQueueConfig implements Serializable {
 
     private static final long serialVersionUID = 1L;
@@ -39,7 +39,7 @@ public class ForStPriorityQueueConfig implements Serializable 
{
     private @Nullable ForStStateBackend.PriorityQueueStateType 
priorityQueueStateType;
 
     /** cache size per keyGroup for rocksDB priority queue state. */
-    private int rocksDBPriorityQueueSetCacheSize;
+    private int forStDBPriorityQueueSetCacheSize;
 
     public ForStPriorityQueueConfig() {
         this(null, UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE);
@@ -47,9 +47,9 @@ public class ForStPriorityQueueConfig implements Serializable 
{
 
     public ForStPriorityQueueConfig(
             ForStStateBackend.PriorityQueueStateType priorityQueueStateType,
-            int rocksDBPriorityQueueSetCacheSize) {
+            int forStDBPriorityQueueSetCacheSize) {
         this.priorityQueueStateType = priorityQueueStateType;
-        this.rocksDBPriorityQueueSetCacheSize = 
rocksDBPriorityQueueSetCacheSize;
+        this.forStDBPriorityQueueSetCacheSize = 
forStDBPriorityQueueSetCacheSize;
     }
 
     /**
@@ -70,10 +70,10 @@ public class ForStPriorityQueueConfig implements 
Serializable {
      * Gets the cache size of rocksDB priority queue set. It will fall back to 
the default value if
      * it is not explicitly set.
      */
-    public int getRocksDBPriorityQueueSetCacheSize() {
-        return rocksDBPriorityQueueSetCacheSize == 
UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE
+    public int getForStDBPriorityQueueSetCacheSize() {
+        return forStDBPriorityQueueSetCacheSize == 
UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE
                 ? FORST_TIMER_SERVICE_FACTORY_CACHE_SIZE.defaultValue()
-                : rocksDBPriorityQueueSetCacheSize;
+                : forStDBPriorityQueueSetCacheSize;
     }
 
     public static ForStPriorityQueueConfig fromOtherAndConfiguration(
@@ -83,10 +83,10 @@ public class ForStPriorityQueueConfig implements 
Serializable {
                         ? config.get(TIMER_SERVICE_FACTORY)
                         : other.priorityQueueStateType;
         int cacheSize =
-                (other.rocksDBPriorityQueueSetCacheSize
+                (other.forStDBPriorityQueueSetCacheSize
                                 == 
UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE)
                         ? config.get(FORST_TIMER_SERVICE_FACTORY_CACHE_SIZE)
-                        : other.rocksDBPriorityQueueSetCacheSize;
+                        : other.forStDBPriorityQueueSetCacheSize;
         return new ForStPriorityQueueConfig(priorityQueueType, cacheSize);
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
index da26ae0d937..783031c1386 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.ICloseableRegistry;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -64,7 +63,6 @@ import org.apache.flink.state.forst.ForStNativeMetricMonitor;
 import org.apache.flink.state.forst.ForStOperationUtils;
 import org.apache.flink.state.forst.ForStResourceContainer;
 import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
-import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
@@ -85,7 +83,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.Closeable;
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -105,7 +102,7 @@ import static 
org.apache.flink.runtime.state.SnapshotExecutionType.ASYNCHRONOUS;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * An {@link AbstractKeyedStateBackend} that stores its state in {@code 
RocksDB} and serializes
+ * An {@link AbstractKeyedStateBackend} that stores its state in {@code 
ForStDB} and serializes
  * state to streams provided by a {@link 
org.apache.flink.runtime.state.CheckpointStreamFactory}
  * upon checkpointing. This state backend can store very large state that 
exceeds memory and spills
  * to disk. Except for the snapshotting, this class should be accessed as if 
it is not threadsafe.
@@ -178,17 +175,14 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
     /** Factory function to create column family options from state name. */
     private final Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory;
 
-    /** The container of RocksDB option factory and predefined options. */
+    /** The container of ForSt option factory and predefined options. */
     private final ForStResourceContainer optionsContainer;
 
-    /** Path where this configured instance stores its data directory. */
-    private final Path instanceBasePath;
-
     /**
-     * Protects access to RocksDB in other threads, like the checkpointing 
thread from parallel call
-     * that disposes the RocksDB object.
+     * Protects access to ForSt in other threads, like the checkpointing 
thread from parallel call
+     * that disposes the ForSt object.
      */
-    private final ResourceGuard rocksDBResourceGuard;
+    private final ResourceGuard forstResourceGuard;
 
     /** The write options to use in the states. We disable write ahead 
logging. */
     private final WriteOptions writeOptions;
@@ -228,7 +222,7 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
      */
     private final ColumnFamilyHandle defaultColumnFamily;
 
-    /** Shared wrapper for batch writes to the RocksDB instance. */
+    /** Shared wrapper for batch writes to the ForSt instance. */
     private final ForStDBWriteBatchWrapper writeBatchWrapper;
 
     /**
@@ -244,14 +238,14 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
     private final PriorityQueueSetFactory priorityQueueFactory;
 
     /**
-     * Helper to build the byte arrays of composite keys to address data in 
RocksDB. Shared across
-     * all states.
+     * Helper to build the byte arrays of composite keys to address data in 
forst. Shared across all
+     * states.
      */
     private final SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
 
     /**
-     * Our RocksDB database, this is used by the actual subclasses of {@link 
AbstractForStSyncState}
-     * to store state. The different k/v states that we have don't each have 
their own RocksDB
+     * Our ForSt database, this is used by the actual subclasses of {@link 
AbstractForStSyncState}
+     * to store state. The different k/v states that we have don't each have 
their own ForSt
      * instance. They all write to this instance but to their own column 
family.
      */
     protected final RocksDB db;
@@ -263,7 +257,6 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
 
     public ForStSyncKeyedStateBackend(
             ClassLoader userCodeClassLoader,
-            Path instanceBasePath,
             ForStResourceContainer optionsContainer,
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             TaskKvStateRegistry kvStateRegistry,
@@ -277,7 +270,7 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
             int keyGroupPrefixBytes,
             CloseableRegistry cancelStreamRegistry,
             StreamCompressionDecorator keyGroupCompressionDecorator,
-            ResourceGuard rocksDBResourceGuard,
+            ResourceGuard forstResourceGuard,
             ForStSnapshotStrategyBase<K, ?> checkpointSnapshotStrategy,
             ForStDBWriteBatchWrapper writeBatchWrapper,
             ColumnFamilyHandle defaultColumnFamilyHandle,
@@ -307,8 +300,6 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
 
         this.optionsContainer = Preconditions.checkNotNull(optionsContainer);
 
-        this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath);
-
         this.keyGroupPrefixBytes = keyGroupPrefixBytes;
         this.kvStateInformation = kvStateInformation;
         this.createdKVStates = new HashMap<>();
@@ -317,7 +308,7 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
         this.readOptions = optionsContainer.getReadOptions();
         this.writeBatchSize = writeBatchSize;
         this.db = db;
-        this.rocksDBResourceGuard = rocksDBResourceGuard;
+        this.forstResourceGuard = forstResourceGuard;
         this.checkpointSnapshotStrategy = checkpointSnapshotStrategy;
         this.writeBatchWrapper = writeBatchWrapper;
         this.defaultColumnFamily = defaultColumnFamilyHandle;
@@ -360,7 +351,8 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
                     namespace, namespaceSerializer, namespaceOutputView, 
ambiguousKeyPossible);
             nameSpaceBytes = namespaceOutputView.getCopyOfBuffer();
         } catch (IOException ex) {
-            throw new FlinkRuntimeException("Failed to get keys from RocksDB 
state backend.", ex);
+            throw new FlinkRuntimeException(
+                    "Failed to get keys from ForSt sync state backend.", ex);
         }
 
         ForStIteratorWrapper iterator =
@@ -398,6 +390,7 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
             return Stream.empty();
         }
 
+        @SuppressWarnings("unchecked")
         RegisteredKeyValueStateBackendMetaInfo<N, ?> 
registeredKeyValueStateBackendMetaInfo =
                 (RegisteredKeyValueStateBackendMetaInfo<N, ?>) 
columnInfo.metaInfo;
 
@@ -428,12 +421,6 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
         return targetStream.onClose(iteratorWrapper::close);
     }
 
-    @VisibleForTesting
-    ColumnFamilyHandle getColumnFamilyHandle(String state) {
-        ForStOperationUtils.ForStKvStateInfo columnInfo = 
kvStateInformation.get(state);
-        return columnInfo != null ? columnInfo.columnFamilyHandle : null;
-    }
-
     @Override
     public void setCurrentKey(K newKey) {
         super.setCurrentKey(newKey);
@@ -454,11 +441,11 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
         }
         super.dispose();
 
-        // This call will block until all clients that still acquire access to 
the RocksDB instance
+        // This call will block until all clients that still acquire access to 
the ForSt instance
         // have released it,
         // so that we cannot release the native resources while clients are 
still working with it in
         // parallel.
-        rocksDBResourceGuard.close();
+        forstResourceGuard.close();
 
         // IMPORTANT: null reference to signal potential async checkpoint 
workers that the db was
         // disposed, as
@@ -467,7 +454,7 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
             IOUtils.closeQuietly(writeBatchWrapper);
 
             // Metric collection occurs on a background thread. When this 
method returns
-            // it is guaranteed that thr RocksDB reference has been invalidated
+            // it is guaranteed that thr ForSt reference has been invalidated
             // and no more metric collection will be attempted against the 
database.
             if (nativeMetricMonitor != null) {
                 nativeMetricMonitor.close();
@@ -497,11 +484,24 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
 
             columnFamilyOptions.forEach(IOUtils::closeQuietly);
 
+            LOG.info(
+                    "Closed ForSt State Backend. Cleaning up ForSt local 
working directory {}, remote working directory {}.",
+                    optionsContainer.getLocalBasePath(),
+                    optionsContainer.getRemoteBasePath());
+
+            try {
+                optionsContainer.clearDirectories();
+            } catch (Exception ex) {
+                LOG.warn(
+                        "Could not delete ForSt local working directory {}, 
remote working directory {}.",
+                        optionsContainer.getLocalBasePath(),
+                        optionsContainer.getRemoteBasePath(),
+                        ex);
+            }
+
             IOUtils.closeQuietly(optionsContainer);
 
             kvStateInformation.clear();
-
-            cleanInstanceBasePath();
         }
         IOUtils.closeQuietly(checkpointSnapshotStrategy);
         this.disposed = true;
@@ -531,18 +531,6 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
         }
     }
 
-    private void cleanInstanceBasePath() {
-        LOG.info(
-                "Closed RocksDB State Backend. Cleaning up RocksDB working 
directory {}.",
-                instanceBasePath);
-
-        try {
-            FileUtils.deleteDirectory(new File(instanceBasePath.getPath()));
-        } catch (IOException ex) {
-            LOG.warn("Could not delete RocksDB working directory: {}", 
instanceBasePath, ex);
-        }
-    }
-
     // ------------------------------------------------------------------------
     //  Getters and Setters
     // ------------------------------------------------------------------------
@@ -568,15 +556,10 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
         return sharedRocksKeyBuilder;
     }
 
-    @VisibleForTesting
-    boolean isDisposed() {
-        return this.disposed;
-    }
-
     /**
-     * Triggers an asynchronous snapshot of the keyed state backend from 
RocksDB. This snapshot can
-     * be canceled and is also stopped when the backend is closed through 
{@link #dispose()}. For
-     * each backend, this method must always be called by the same thread.
+     * Triggers an asynchronous snapshot of the keyed state backend from 
ForSt. This snapshot can be
+     * canceled and is also stopped when the backend is closed through {@link 
#dispose()}. For each
+     * backend, this method must always be called by the same thread.
      *
      * @param checkpointId The Id of the checkpoint.
      * @param timestamp The timestamp of the checkpoint.
@@ -627,11 +610,11 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
     }
 
     /**
-     * Registers a k/v state information, which includes its state id, type, 
RocksDB column family
+     * Registers a k/v state information, which includes its state id, type, 
ForSt column family
      * handle, and serializers.
      *
      * <p>When restoring from a snapshot, we don’t restore the individual k/v 
states, just the
-     * global RocksDB database and the list of k/v state information. When a 
k/v state is first
+     * global ForSt database and the list of k/v state information. When a k/v 
state is first
      * requested we check here whether we already have a registered entry for 
that and return it
      * (after some necessary state compatibility checks) or create a new one 
if it does not exist.
      */
@@ -757,7 +740,7 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
     }
 
     /**
-     * Migrate only the state value, that is the "value" that is stored in 
RocksDB. We don't migrate
+     * Migrate only the state value, that is the "value" that is stored in 
ForSt. We don't migrate
      * the key here, which is made up of key group, key, namespace and map key 
(in case of
      * MapState).
      */
@@ -805,9 +788,9 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
         }
 
         @SuppressWarnings("unchecked")
-        AbstractForStSyncState<?, ?, SV> rocksDBState = 
(AbstractForStSyncState<?, ?, SV>) state;
+        AbstractForStSyncState<?, ?, SV> forStState = 
(AbstractForStSyncState<?, ?, SV>) state;
 
-        Snapshot rocksDBSnapshot = db.getSnapshot();
+        Snapshot forstSnapshot = db.getSnapshot();
         try (ForStIteratorWrapper iterator =
                         ForStOperationUtils.getForStIterator(db, 
stateMetaInfo.f0, readOptions);
                 ForStDBWriteBatchWrapper batchWriter =
@@ -822,7 +805,7 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
             while (iterator.isValid()) {
                 serializedValueInput.setBuffer(iterator.value());
 
-                rocksDBState.migrateSerializedValue(
+                forStState.migrateSerializedValue(
                         serializedValueInput,
                         migratedSerializedValueOutput,
                         stateMetaInfo.f1.getPreviousStateSerializer(),
@@ -837,8 +820,8 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
                 iterator.next();
             }
         } finally {
-            db.releaseSnapshot(rocksDBSnapshot);
-            rocksDBSnapshot.close();
+            db.releaseSnapshot(forstSnapshot);
+            forstSnapshot.close();
         }
     }
 
@@ -925,11 +908,6 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
                 "State %s is not supported by %s", stateDesc.getClass(), 
this.getClass());
     }
 
-    /** Only visible for testing, DO NOT USE. */
-    Path getInstanceBasePath() {
-        return instanceBasePath;
-    }
-
     @VisibleForTesting
     @Override
     public int numKeyValueStateEntries() {
@@ -967,4 +945,9 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
     long getWriteBatchSize() {
         return writeBatchSize;
     }
+
+    @VisibleForTesting
+    public ForStResourceContainer getOptionsContainer() {
+        return optionsContainer;
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
index 54ced97fa22..64ea50ab850 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
@@ -104,12 +104,10 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  */
 public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBackendBuilder<K> {
 
-    private static final String DB_INSTANCE_DIR_STRING = "db";
-
     /** String that identifies the operator that owns this backend. */
     private final String operatorIdentifier;
 
-    /** The configuration of rocksDB priorityQueue state. */
+    /** The configuration of ForSt priorityQueue state. */
     private final ForStPriorityQueueConfig priorityQueueConfig;
 
     /** The configuration of local recovery. */
@@ -118,22 +116,16 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
     /** Factory function to create column family options from state name. */
     private final Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory;
 
-    /** The container of RocksDB option factory and predefined options. */
+    /** The container of ForSt option factory and predefined options. */
     private final ForStResourceContainer optionsContainer;
 
-    /** Path where this configured instance stores its data directory. */
-    private final Path instanceBasePath;
-
-    /** Path where this configured instance stores its RocksDB database. */
-    private final Path instanceForStDBPath;
-
     private final MetricGroup metricGroup;
     private final StateBackend.CustomInitializationMetrics 
customInitializationMetrics;
 
     /** True if incremental checkpointing is enabled. */
     private boolean enableIncrementalCheckpointing;
 
-    /** RocksDB property-based and statistics-based native metrics options. */
+    /** ForSt property-based and statistics-based native metrics options. */
     private ForStNativeMetricOptions nativeMetricOptions;
 
     private long writeBatchSize =
@@ -154,7 +146,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
     public ForStSyncKeyedStateBackendBuilder(
             String operatorIdentifier,
             ClassLoader userCodeClassLoader,
-            Path instanceBasePath,
             ForStResourceContainer optionsContainer,
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             TaskKvStateRegistry kvStateRegistry,
@@ -190,8 +181,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
         // ensure that we use the right merge operator, because other code 
relies on this
         this.columnFamilyOptionsFactory = 
Preconditions.checkNotNull(columnFamilyOptionsFactory);
         this.optionsContainer = optionsContainer;
-        this.instanceBasePath = instanceBasePath;
-        this.instanceForStDBPath = getInstanceRocksDBPath(instanceBasePath);
         this.metricGroup = metricGroup;
         this.customInitializationMetrics = customInitializationMetrics;
         this.enableIncrementalCheckpointing = false;
@@ -203,7 +192,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
     ForStSyncKeyedStateBackendBuilder(
             String operatorIdentifier,
             ClassLoader userCodeClassLoader,
-            Path instanceBasePath,
             ForStResourceContainer optionsContainer,
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             TaskKvStateRegistry kvStateRegistry,
@@ -212,7 +200,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
             KeyGroupRange keyGroupRange,
             ExecutionConfig executionConfig,
             LocalRecoveryConfig localRecoveryConfig,
-            ForStPriorityQueueConfig rocksDBPriorityQueueConfig,
+            ForStPriorityQueueConfig forStPriorityQueueConfig,
             TtlTimeProvider ttlTimeProvider,
             LatencyTrackingStateConfig latencyTrackingStateConfig,
             MetricGroup metricGroup,
@@ -224,7 +212,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
         this(
                 operatorIdentifier,
                 userCodeClassLoader,
-                instanceBasePath,
                 optionsContainer,
                 columnFamilyOptionsFactory,
                 kvStateRegistry,
@@ -233,7 +220,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
                 keyGroupRange,
                 executionConfig,
                 localRecoveryConfig,
-                rocksDBPriorityQueueConfig,
+                forStPriorityQueueConfig,
                 ttlTimeProvider,
                 latencyTrackingStateConfig,
                 metricGroup,
@@ -263,10 +250,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
         return this;
     }
 
-    public static Path getInstanceRocksDBPath(Path instanceBasePath) {
-        return new Path(instanceBasePath, DB_INSTANCE_DIR_STRING);
-    }
-
     private static void checkAndCreateDirectory(File directory) throws 
IOException {
         if (directory.exists()) {
             if (!directory.isDirectory()) {
@@ -274,7 +257,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
             }
         } else if (!directory.mkdirs()) {
             throw new IOException(
-                    String.format("Could not create RocksDB data directory at 
%s.", directory));
+                    String.format("Could not create ForSt data directory at 
%s.", directory));
         }
     }
 
@@ -300,7 +283,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
 
         ForStSnapshotStrategyBase<K, ?> checkpointStrategy = null;
 
-        ResourceGuard rocksDBResourceGuard = new ResourceGuard();
+        ResourceGuard forStResourceGuard = new ResourceGuard();
         PriorityQueueSetFactory priorityQueueFactory;
         SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
         // Number of bytes required to prefix the key groups.
@@ -316,7 +299,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
             UUID backendUID = UUID.randomUUID();
             SortedMap<Long, Collection<HandleAndLocalPath>> 
materializedSstFiles = new TreeMap<>();
             long lastCompletedCheckpointId = -1L;
-            prepareDirectories();
+            optionsContainer.prepareDirectories();
             restoreOperation =
                     getForStDBRestoreOperation(
                             keyGroupPrefixBytes,
@@ -346,7 +329,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
             checkpointStrategy =
                     initializeSnapshotStrategy(
                             db,
-                            rocksDBResourceGuard,
+                            forStResourceGuard,
                             keySerializerProvider.currentSchemaSerializer(),
                             kvStateInformation,
                             keyGroupRange,
@@ -369,7 +352,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
                     new ArrayList<>(kvStateInformation.values().size());
             IOUtils.closeQuietly(cancelRegistryForBackend);
             IOUtils.closeQuietly(writeBatchWrapper);
-            IOUtils.closeQuietly(rocksDBResourceGuard);
+            IOUtils.closeQuietly(forStResourceGuard);
             ForStOperationUtils.addColumnFamilyOptionsToCloseLater(
                     columnFamilyOptions, defaultColumnFamilyHandle);
             IOUtils.closeQuietly(defaultColumnFamilyHandle);
@@ -388,9 +371,11 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
             kvStateInformation.clear();
 
             try {
-                FileUtils.deleteDirectory(new 
File(instanceBasePath.getPath()));
+                FileUtils.deleteDirectory(new 
File(optionsContainer.getBasePath().getPath()));
             } catch (Exception ex) {
-                logger.warn("Failed to delete base path for RocksDB: " + 
instanceBasePath, ex);
+                logger.warn(
+                        "Failed to delete base path for ForSt: " + 
optionsContainer.getBasePath(),
+                        ex);
             }
             // Log and rethrow
             if (e instanceof BackendBuildingException) {
@@ -403,10 +388,11 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
         }
         InternalKeyContext<K> keyContext =
                 new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups);
-        logger.info("Finished building RocksDB keyed state-backend at {}.", 
instanceBasePath);
+        logger.info(
+                "Finished building ForSt keyed state-backend at {}.",
+                optionsContainer.getBasePath());
         return new ForStSyncKeyedStateBackend<>(
                 this.userCodeClassLoader,
-                this.instanceBasePath,
                 this.optionsContainer,
                 columnFamilyOptionsFactory,
                 this.kvStateRegistry,
@@ -420,7 +406,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
                 keyGroupPrefixBytes,
                 cancelRegistryForBackend,
                 this.keyGroupCompressionDecorator,
-                rocksDBResourceGuard,
+                forStResourceGuard,
                 checkpointStrategy,
                 writeBatchWrapper,
                 defaultColumnFamilyHandle,
@@ -513,11 +499,15 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
         // env. We expect to directly use the dfs directory in flink env or 
local directory as
         // working dir. We will implement this in ForStDB later, but before 
that, we achieved this
         // by setting the dbPath to "/" when the dfs directory existed.
+        Path instanceForStPath =
+                optionsContainer.getRemoteForStPath() == null
+                        ? optionsContainer.getLocalForStPath()
+                        : new Path("/db");
 
         if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) {
             return new ForStNoneRestoreOperation(
                     Collections.emptyMap(),
-                    instanceForStDBPath,
+                    instanceForStPath,
                     optionsContainer.getDbOptions(),
                     columnFamilyOptionsFactory,
                     nativeMetricOptions,
@@ -538,7 +528,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
                     keySerializerProvider,
                     optionsContainer,
                     optionsContainer.getBasePath(),
-                    instanceForStDBPath,
+                    instanceForStPath,
                     optionsContainer.getDbOptions(),
                     columnFamilyOptionsFactory,
                     nativeMetricOptions,
@@ -565,7 +555,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
                     registeredPQStates,
                     createHeapQueueFactory(),
                     keySerializerProvider,
-                    instanceForStDBPath,
+                    instanceForStPath,
                     optionsContainer.getDbOptions(),
                     columnFamilyOptionsFactory,
                     nativeMetricOptions,
@@ -605,7 +595,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
                                 nativeMetricMonitor,
                                 columnFamilyOptionsFactory,
                                 
optionsContainer.getWriteBufferManagerCapacity(),
-                                
priorityQueueConfig.getRocksDBPriorityQueueSetCacheSize());
+                                
priorityQueueConfig.getForStDBPriorityQueueSetCacheSize());
                 break;
             default:
                 throw new IllegalArgumentException(
@@ -618,14 +608,4 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
     private HeapPriorityQueueSetFactory createHeapQueueFactory() {
         return new HeapPriorityQueueSetFactory(keyGroupRange, 
numberOfKeyGroups, 128);
     }
-
-    private void prepareDirectories() throws IOException {
-        File baseFile = new File(instanceBasePath.getPath());
-        checkAndCreateDirectory(baseFile);
-        if (new File(instanceForStDBPath.getPath()).exists()) {
-            // Clear the base directory when the backend is created
-            // in case something crashed and the backend never reached 
dispose()
-            FileUtils.deleteDirectory(baseFile);
-        }
-    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
index 4c4d853d70a..7c4e58e1ce6 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
@@ -18,26 +18,35 @@
 
 package org.apache.flink.state.forst;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.ConfigurableStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
 import org.apache.flink.runtime.state.StateBackendTestBase;
 import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
 import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.function.SupplierWithException;
 
+import org.junit.jupiter.api.Assumptions;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
@@ -112,4 +121,36 @@ class ForStStateBackendTest extends 
StateBackendTestBase<ForStStateBackend> {
         backend = backend.configure(config, 
Thread.currentThread().getContextClassLoader());
         assertThat(backend.isIncrementalCheckpointsEnabled()).isTrue();
     }
+
+    @TestTemplate
+    void testCreateKeyedStateBackend() throws Exception {
+        Assumptions.assumeFalse(
+                getCheckpointStorage() instanceof JobManagerCheckpointStorage,
+                "Skip JM checkpoint storage");
+        ForStStateBackend backend = new ForStStateBackend();
+        ForStSyncKeyedStateBackend keyedStateBackend1 =
+                (ForStSyncKeyedStateBackend) 
createKeyedBackend(IntSerializer.INSTANCE);
+        
assertThat(keyedStateBackend1.getOptionsContainer().getRemoteBasePath()).isNull();
+        Configuration config = new Configuration();
+        config.set(ForStOptions.SYNC_ENFORCE_LOCAL, false);
+        backend = backend.configure(config, 
Thread.currentThread().getContextClassLoader());
+        ForStSyncKeyedStateBackend keyedStateBackend2 =
+                (ForStSyncKeyedStateBackend)
+                        backend.createKeyedStateBackend(
+                                new KeyedStateBackendParametersImpl<>(
+                                        env,
+                                        new JobID(),
+                                        "test_op",
+                                        IntSerializer.INSTANCE,
+                                        10,
+                                        KeyGroupRange.of(0, 9),
+                                        env.getTaskKvStateRegistry(),
+                                        TtlTimeProvider.DEFAULT,
+                                        getMetricGroup(),
+                                        getCustomInitializationMetrics(),
+                                        Collections.emptyList(),
+                                        new CloseableRegistry(),
+                                        1.0d));
+        
assertThat(keyedStateBackend2.getOptionsContainer().getRemoteBasePath()).isNotNull();
+    }
 }

Reply via email to