This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 64f686c705d [FLINK-37021][state/forst] Forst remote dir share 
checkpoint dir (#26006)
64f686c705d is described below

commit 64f686c705db06a7ae498e472b5fe9ceaaa99c61
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Sun Jan 19 23:51:35 2025 +0800

    [FLINK-37021][state/forst] Forst remote dir share checkpoint dir (#26006)
---
 .../filesystem/FsCheckpointStorageAccess.java      |  4 ++
 .../org/apache/flink/state/forst/ForStOptions.java |  6 +-
 .../flink/state/forst/ForStStateBackend.java       | 34 +++++++++--
 .../flink/state/forst/ForStStateBackendV2Test.java | 67 ++++++++++++++++++++++
 4 files changed, 104 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
index 56f4205d0b1..bb79d5a139e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
@@ -128,6 +128,10 @@ public class FsCheckpointStorageAccess extends 
AbstractFsCheckpointStorageAccess
         return checkpointsDirectory;
     }
 
+    public Path getSharedStateDirectory() {
+        return sharedStateDirectory;
+    }
+
     // ------------------------------------------------------------------------
     //  CheckpointStorage implementation
     // ------------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
index 5ae74ef7034..73e43443332 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.description.Description;
 import org.apache.flink.configuration.description.TextElement;
 
 import static 
org.apache.flink.state.forst.ForStStateBackend.PriorityQueueStateType.ForStDB;
+import static 
org.apache.flink.state.forst.ForStStateBackend.REMOTE_SHORTCUT_CHECKPOINT;
 
 /** Configuration options for the ForStStateBackend. */
 @Experimental
@@ -55,8 +56,9 @@ public class ForStOptions {
                     .noDefaultValue()
                     .withDescription(
                             String.format(
-                                    "The remote directory where ForSt puts its 
SST files, fallback to %s if not configured.",
-                                    LOCAL_DIRECTORIES.key()));
+                                    "The remote directory where ForSt puts its 
SST files, fallback to %s if not configured."
+                                            + " Recognized shortcut name is 
'%s', which means that forst shares the directory with checkpoint.",
+                                    LOCAL_DIRECTORIES.key(), 
REMOTE_SHORTCUT_CHECKPOINT));
 
     public static final ConfigOption<String> CACHE_DIRECTORY =
             ConfigOptions.key("state.backend.forst.cache.dir")
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
index 47faf014ec4..49d2f288141 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
 import 
org.apache.flink.state.forst.ForStMemoryControllerUtils.ForStMemoryFactory;
 import org.apache.flink.state.forst.sync.ForStPriorityQueueConfig;
@@ -91,6 +92,8 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 public class ForStStateBackend extends AbstractManagedMemoryStateBackend
         implements ConfigurableStateBackend {
 
+    public static final String REMOTE_SHORTCUT_CHECKPOINT = "checkpoint-dir";
+
     private static final long serialVersionUID = 1L;
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ForStStateBackend.class);
@@ -173,6 +176,8 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
      */
     private final TernaryBoolean rescalingUseDeleteFilesInRange;
 
+    private boolean remoteShareWithCheckpoint;
+
     // ------------------------------------------------------------------------
 
     /** Creates a new {@code ForStStateBackend} for storing state. */
@@ -184,6 +189,7 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
         this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD;
         this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED;
         this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED;
+        this.remoteShareWithCheckpoint = false;
     }
 
     /**
@@ -200,11 +206,17 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
                         original.memoryConfiguration, config);
         this.memoryConfiguration.validate();
 
+        this.remoteShareWithCheckpoint = false;
         if (original.remoteForStDirectory != null) {
             this.remoteForStDirectory = original.remoteForStDirectory;
         } else {
             String remoteDirStr = config.get(ForStOptions.REMOTE_DIRECTORY);
-            this.remoteForStDirectory = remoteDirStr == null ? null : new 
Path(remoteDirStr);
+            if (REMOTE_SHORTCUT_CHECKPOINT.equals(remoteDirStr)) {
+                this.remoteForStDirectory = null;
+                this.remoteShareWithCheckpoint = true;
+            } else {
+                this.remoteForStDirectory = remoteDirStr == null ? null : new 
Path(remoteDirStr);
+            }
         }
 
         this.priorityQueueConfig =
@@ -383,10 +395,22 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
                 new Path(
                         new File(new File(getNextStoragePath(), 
jobId.toHexString()), opChildPath)
                                 .getAbsolutePath());
-        Path remoteBasePath =
-                remoteForStDirectory != null
-                        ? new Path(new Path(remoteForStDirectory, 
jobId.toHexString()), opChildPath)
-                        : null;
+        Path remoteBasePath = null;
+        if (remoteForStDirectory != null) {
+            remoteBasePath =
+                    new Path(new Path(remoteForStDirectory, 
jobId.toHexString()), opChildPath);
+        } else if (remoteShareWithCheckpoint) {
+            if (env.getCheckpointStorageAccess() instanceof 
FsCheckpointStorageAccess) {
+                Path sharedStateDirectory =
+                        ((FsCheckpointStorageAccess) 
env.getCheckpointStorageAccess())
+                                .getSharedStateDirectory();
+                remoteBasePath = new Path(sharedStateDirectory, opChildPath);
+                LOG.info("Set remote ForSt directory to checkpoint directory 
{}", remoteBasePath);
+            } else {
+                LOG.warn(
+                        "Remote ForSt directory can't be set, because 
checkpoint directory isn't on file system.");
+            }
+        }
 
         final OpaqueMemoryResource<ForStSharedResources> sharedResources =
                 ForStOperationUtils.allocateSharedCachesIfConfigured(
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendV2Test.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendV2Test.java
index 86af6bec570..7bd4ab539fd 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendV2Test.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendV2Test.java
@@ -18,12 +18,20 @@
 
 package org.apache.flink.state.forst;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.ConfigurableStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
 import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
 import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.state.v2.StateBackendTestV2Base;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
@@ -31,17 +39,21 @@ import 
org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.function.SupplierWithException;
 
+import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.flink.state.forst.ForStConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
 import static 
org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
 import static org.apache.flink.state.forst.ForStOptions.LOCAL_DIRECTORIES;
 import static org.apache.flink.state.forst.ForStOptions.REMOTE_DIRECTORY;
+import static 
org.apache.flink.state.forst.ForStStateBackend.REMOTE_SHORTCUT_CHECKPOINT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 /** Tests for the async keyed state backend part of {@link ForStStateBackend}. 
*/
 @ExtendWith(ParameterizedTestExtension.class)
@@ -110,4 +122,59 @@ class ForStStateBackendV2Test extends 
StateBackendTestV2Base<ForStStateBackend>
         config.set(USE_DELETE_FILES_IN_RANGE_DURING_RESCALING, 
useDeleteFileInRange);
         return backend.configure(config, 
Thread.currentThread().getContextClassLoader());
     }
+
+    @TestTemplate
+    void testRemoteDirShareCheckpointDirWithJobId() throws Exception {
+        testRemoteDirShareCheckpointDir(true);
+    }
+
+    @TestTemplate
+    void testRemoteDirShareCheckpointDirWOJob() throws Exception {
+        testRemoteDirShareCheckpointDir(false);
+    }
+
+    void testRemoteDirShareCheckpointDir(boolean createJob) throws Exception {
+        JobID jobID = new JobID();
+        String checkpointPath = 
TempDirUtils.newFolder(tempFolder).toURI().toString();
+        FileSystemCheckpointStorage checkpointStorage =
+                new FileSystemCheckpointStorage(new Path(checkpointPath), 0, 
-1);
+
+        Configuration config = new Configuration();
+        config.set(LOCAL_DIRECTORIES, tempFolderForForStLocal.toString());
+        config.set(REMOTE_DIRECTORY, REMOTE_SHORTCUT_CHECKPOINT);
+        config.set(CheckpointingOptions.CREATE_CHECKPOINT_SUB_DIR, createJob);
+
+        checkpointStorage =
+                checkpointStorage.configure(config, 
Thread.currentThread().getContextClassLoader());
+        MockEnvironment mockEnvironment =
+                
MockEnvironment.builder().setTaskStateManager(getTestTaskStateManager()).build();
+        mockEnvironment.setCheckpointStorageAccess(
+                checkpointStorage.createCheckpointStorage(jobID));
+
+        ForStStateBackend backend = new ForStStateBackend();
+        backend = backend.configure(config, 
Thread.currentThread().getContextClassLoader());
+        KeyGroupRange keyGroupRange = KeyGroupRange.of(0, 127);
+        ForStKeyedStateBackend<Integer> keyedBackend =
+                backend.createAsyncKeyedStateBackend(
+                        new KeyedStateBackendParametersImpl<>(
+                                mockEnvironment,
+                                jobID,
+                                "test_op",
+                                IntSerializer.INSTANCE,
+                                keyGroupRange.getNumberOfKeyGroups(),
+                                keyGroupRange,
+                                env.getTaskKvStateRegistry(),
+                                TtlTimeProvider.DEFAULT,
+                                getMetricGroup(),
+                                getCustomInitializationMetrics(),
+                                Collections.emptyList(),
+                                new CloseableRegistry(),
+                                1.0d));
+
+        assertThat(keyedBackend.getRemoteBasePath().getParent())
+                .isEqualTo(
+                        new Path(
+                                checkpointStorage.getCheckpointPath(),
+                                createJob ? jobID + "/shared" : "/shared"));
+    }
 }

Reply via email to