This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 36eb336e4a623400a6bfdf1aab29723038e472a1
Author: fredia <fredia...@gmail.com>
AuthorDate: Tue Apr 1 14:42:23 2025 +0800

    [FLINK-37597][state/forst] Use ResourceGuard to make sure snapshot quit 
before ForSt disposal
---
 .../flink/state/forst/ForStKeyedStateBackend.java  | 17 +++++
 .../state/forst/ForStKeyedStateBackendBuilder.java |  1 +
 .../snapshot/ForStNativeFullSnapshotStrategy.java  | 17 +++--
 .../forst/snapshot/ForStSnapshotStrategyBase.java  | 87 ----------------------
 4 files changed, 28 insertions(+), 94 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
index b443a766b40..1d78c5bd1a5 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
@@ -61,6 +61,7 @@ import 
org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
 import org.apache.flink.util.StateMigrationException;
 
 import org.forstdb.ColumnFamilyHandle;
@@ -121,6 +122,12 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
     /** The container of ForSt options. */
     private final ForStResourceContainer optionsContainer;
 
+    /**
+     * Protects access to ForSt in other threads, like the checkpointing 
thread from parallel call
+     * that disposes the ForSt object.
+     */
+    private final ResourceGuard resourceGuard;
+
     /** Factory function to create column family options from state name. */
     private final Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory;
 
@@ -184,6 +191,7 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
             UUID backendUID,
             ExecutionConfig executionConfig,
             ForStResourceContainer optionsContainer,
+            ResourceGuard resourceGuard,
             int keyGroupPrefixBytes,
             TypeSerializer<K> keySerializer,
             Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder,
@@ -204,6 +212,7 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
         this.backendUID = backendUID;
         this.executionConfig = executionConfig;
         this.optionsContainer = Preconditions.checkNotNull(optionsContainer);
+        this.resourceGuard = resourceGuard;
         this.keyGroupPrefixBytes = keyGroupPrefixBytes;
         this.keyGroupRange = keyContext.getKeyGroupRange();
         this.keySerializer = keySerializer;
@@ -538,6 +547,14 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
             if (this.disposed) {
                 return;
             }
+            // This call will block until all clients that still acquire 
access to the ForSt
+            // instance
+            // have released it,
+            // so that we cannot release the native resources while clients 
are still working with
+            // it in
+            // parallel.
+            resourceGuard.close();
+
             for (StateExecutor executor : managedStateExecutors) {
                 executor.shutdown();
             }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
index 74a128c4dfd..30b37ed82b2 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
@@ -329,6 +329,7 @@ public class ForStKeyedStateBackendBuilder<K>
                 backendUID,
                 executionConfig,
                 this.optionsContainer,
+                forstResourceGuard,
                 keyGroupPrefixBytes,
                 this.keySerializerProvider.currentSchemaSerializer(),
                 serializedKeyBuilder,
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
index 37674de9b86..a23293568f7 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -152,12 +153,14 @@ public class ForStNativeFullSnapshotStrategy<K>
         final PreviousSnapshot previousSnapshot =
                 snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
 
-        // Disable file deletion for file transformation. ForSt will decide 
whether to allow file
-        // deletion based on the number of calls to disableFileDeletions() and
-        // enableFileDeletions(), so disableFileDeletions() should be call 
only once.
-        db.disableFileDeletions();
+        try (ResourceGuard.Lease ignoredLease = 
resourceGuard.acquireResource()) {
+
+            // Disable file deletion for file transformation. ForSt will 
decide whether to allow
+            // file
+            // deletion based on the number of calls to disableFileDeletions() 
and
+            // enableFileDeletions(), so disableFileDeletions() should be call 
only once.
+            db.disableFileDeletions();
 
-        try {
             // get live files with flush memtable
             RocksDB.LiveFiles liveFiles = db.getLiveFiles(true);
             List<Path> liveFilesPath =
@@ -183,13 +186,13 @@ public class ForStNativeFullSnapshotStrategy<K>
                     manifestFile,
                     previousSnapshot,
                     () -> {
-                        try {
+                        try (ResourceGuard.Lease lease = 
resourceGuard.acquireResource()) {
                             db.enableFileDeletions(false);
                             LOG.info(
                                     "Release one file deletion lock with 
ForStNativeSnapshotResources, backendUID:{}, checkpointId:{}.",
                                     backendUID,
                                     checkpointId);
-                        } catch (RocksDBException e) {
+                        } catch (RocksDBException | IOException e) {
                             LOG.error(
                                     "Enable file deletion failed, 
backendUID:{}, checkpointId:{}.",
                                     backendUID,
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
index 4a9ddcdd49e..7350e8aaf6c 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
@@ -28,15 +28,11 @@ import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManage
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.DirectoryStateHandle;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
-import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
-import org.apache.flink.runtime.state.SnapshotDirectory;
 import org.apache.flink.runtime.state.SnapshotResources;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategy;
@@ -50,8 +46,6 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ResourceGuard;
 
 import org.forstdb.RocksDB;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
@@ -76,8 +70,6 @@ import java.util.stream.Collectors;
 public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources>
         implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>, 
AutoCloseable {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ForStSnapshotStrategyBase.class);
-
     @Nonnull private final String description;
 
     /** ForSt instance from the backend. */
@@ -194,28 +186,6 @@ public abstract class ForStSnapshotStrategyBase<K, R 
extends SnapshotResources>
     @Override
     public abstract void close();
 
-    protected void cleanupIncompleteSnapshot(
-            @Nonnull CloseableRegistry tmpResourcesRegistry,
-            @Nonnull SnapshotDirectory localBackupDirectory) {
-        try {
-            tmpResourcesRegistry.close();
-        } catch (Exception e) {
-            LOG.warn("Could not properly clean tmp resources.", e);
-        }
-
-        if (localBackupDirectory.isSnapshotCompleted()) {
-            try {
-                DirectoryStateHandle directoryStateHandle =
-                        localBackupDirectory.completeSnapshotAndGetHandle();
-                if (directoryStateHandle != null) {
-                    directoryStateHandle.discardState();
-                }
-            } catch (Exception e) {
-                LOG.warn("Could not properly discard local state.", e);
-            }
-        }
-    }
-
     /** Common operation in native ForSt snapshot result supplier. */
     protected abstract class ForStSnapshotOperation
             implements SnapshotResultSupplier<KeyedStateHandle> {
@@ -230,27 +200,6 @@ public abstract class ForStSnapshotStrategyBase<K, R 
extends SnapshotResources>
             this.checkpointStreamFactory = checkpointStreamFactory;
             this.tmpResourcesRegistry = new CloseableRegistry();
         }
-
-        protected Optional<KeyedStateHandle> getLocalSnapshot(
-                SnapshotDirectory localBackupDirectory,
-                @Nullable StreamStateHandle localStreamStateHandle,
-                List<IncrementalKeyedStateHandle.HandleAndLocalPath> 
sharedState)
-                throws IOException {
-            final DirectoryStateHandle directoryStateHandle =
-                    localBackupDirectory.completeSnapshotAndGetHandle();
-            if (directoryStateHandle != null && localStreamStateHandle != 
null) {
-                return Optional.of(
-                        new IncrementalLocalKeyedStateHandle(
-                                backendUID,
-                                checkpointId,
-                                directoryStateHandle,
-                                keyGroupRange,
-                                localStreamStateHandle,
-                                sharedState));
-            } else {
-                return Optional.empty();
-            }
-        }
     }
 
     /** A {@link SnapshotResources} for native ForSt snapshot. */
@@ -301,42 +250,6 @@ public abstract class ForStSnapshotStrategyBase<K, R 
extends SnapshotResources>
         }
     }
 
-    /** A {@link SnapshotResources} for forst sync snapshot. */
-    protected static class ForStSyncSnapshotResources implements 
SnapshotResources {
-        @Nonnull protected final SnapshotDirectory snapshotDirectory;
-
-        @Nonnull protected final PreviousSnapshot previousSnapshot;
-
-        @Nonnull protected final List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots;
-
-        public ForStSyncSnapshotResources(
-                SnapshotDirectory snapshotDirectory,
-                PreviousSnapshot previousSnapshot,
-                List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
-            this.snapshotDirectory = snapshotDirectory;
-            this.previousSnapshot = previousSnapshot;
-            this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
-        }
-
-        @Override
-        public void release() {
-            try {
-                if (snapshotDirectory.exists()) {
-                    LOG.trace(
-                            "Running cleanup for local RocksDB backup 
directory {}.",
-                            snapshotDirectory);
-                    boolean cleanupOk = snapshotDirectory.cleanup();
-
-                    if (!cleanupOk) {
-                        LOG.debug("Could not properly cleanup local RocksDB 
backup directory.");
-                    }
-                }
-            } catch (IOException e) {
-                LOG.warn("Could not properly cleanup local RocksDB backup 
directory.", e);
-            }
-        }
-    }
-
     protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
             new PreviousSnapshot(Collections.emptyList());
 

Reply via email to