This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.0 by this push:
     new 4f71f44145b [FLINK-37597][state/forst] Use ResourceGuard to make sure 
snapshot quit before ForSt disposal
4f71f44145b is described below

commit 4f71f44145b0ab7ab3cc23783cd4d5108efafb2e
Author: fredia <fredia...@gmail.com>
AuthorDate: Tue Apr 1 14:42:23 2025 +0800

    [FLINK-37597][state/forst] Use ResourceGuard to make sure snapshot quit 
before ForSt disposal
---
 .../flink/state/forst/ForStKeyedStateBackend.java  | 17 +++++
 .../state/forst/ForStKeyedStateBackendBuilder.java |  1 +
 .../snapshot/ForStNativeFullSnapshotStrategy.java  |  7 +-
 .../forst/snapshot/ForStSnapshotStrategyBase.java  | 87 ----------------------
 4 files changed, 23 insertions(+), 89 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
index 5cc7e2f3d2b..4b40f89fb2a 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
@@ -60,6 +60,7 @@ import 
org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
 import org.apache.flink.util.StateMigrationException;
 
 import org.forstdb.ColumnFamilyHandle;
@@ -120,6 +121,12 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
     /** The container of ForSt options. */
     private final ForStResourceContainer optionsContainer;
 
+    /**
+     * Protects access to ForSt in other threads, like the checkpointing 
thread from parallel call
+     * that disposes the ForSt object.
+     */
+    private final ResourceGuard resourceGuard;
+
     /** Factory function to create column family options from state name. */
     private final Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory;
 
@@ -183,6 +190,7 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
             UUID backendUID,
             ExecutionConfig executionConfig,
             ForStResourceContainer optionsContainer,
+            ResourceGuard resourceGuard,
             int keyGroupPrefixBytes,
             TypeSerializer<K> keySerializer,
             Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder,
@@ -203,6 +211,7 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
         this.backendUID = backendUID;
         this.executionConfig = executionConfig;
         this.optionsContainer = Preconditions.checkNotNull(optionsContainer);
+        this.resourceGuard = resourceGuard;
         this.keyGroupPrefixBytes = keyGroupPrefixBytes;
         this.keyGroupRange = keyContext.getKeyGroupRange();
         this.keySerializer = keySerializer;
@@ -510,6 +519,14 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
             if (this.disposed) {
                 return;
             }
+            // This call will block until all clients that still acquire 
access to the ForSt
+            // instance
+            // have released it,
+            // so that we cannot release the native resources while clients 
are still working with
+            // it in
+            // parallel.
+            resourceGuard.close();
+
             for (StateExecutor executor : managedStateExecutors) {
                 executor.shutdown();
             }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
index 74a128c4dfd..30b37ed82b2 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
@@ -329,6 +329,7 @@ public class ForStKeyedStateBackendBuilder<K>
                 backendUID,
                 executionConfig,
                 this.optionsContainer,
+                forstResourceGuard,
                 keyGroupPrefixBytes,
                 this.keySerializerProvider.currentSchemaSerializer(),
                 serializedKeyBuilder,
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
index 37674de9b86..e4ba427f389 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java
@@ -152,11 +152,12 @@ public class ForStNativeFullSnapshotStrategy<K>
         final PreviousSnapshot previousSnapshot =
                 snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
 
-        // Disable file deletion for file transformation. ForSt will decide 
whether to allow file
+        ResourceGuard.Lease lease = resourceGuard.acquireResource();
+        // Disable file deletion for file transformation. ForSt will decide 
whether to allow
+        // file
         // deletion based on the number of calls to disableFileDeletions() and
         // enableFileDeletions(), so disableFileDeletions() should be call 
only once.
         db.disableFileDeletions();
-
         try {
             // get live files with flush memtable
             RocksDB.LiveFiles liveFiles = db.getLiveFiles(true);
@@ -185,6 +186,7 @@ public class ForStNativeFullSnapshotStrategy<K>
                     () -> {
                         try {
                             db.enableFileDeletions(false);
+                            lease.close();
                             LOG.info(
                                     "Release one file deletion lock with 
ForStNativeSnapshotResources, backendUID:{}, checkpointId:{}.",
                                     backendUID,
@@ -203,6 +205,7 @@ public class ForStNativeFullSnapshotStrategy<K>
                     backendUID,
                     checkpointId);
             db.enableFileDeletions(false);
+            lease.close();
             throw e;
         }
     }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
index 4a9ddcdd49e..7350e8aaf6c 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java
@@ -28,15 +28,11 @@ import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManage
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.DirectoryStateHandle;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
-import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
-import org.apache.flink.runtime.state.SnapshotDirectory;
 import org.apache.flink.runtime.state.SnapshotResources;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategy;
@@ -50,8 +46,6 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ResourceGuard;
 
 import org.forstdb.RocksDB;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
@@ -76,8 +70,6 @@ import java.util.stream.Collectors;
 public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources>
         implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>, 
AutoCloseable {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ForStSnapshotStrategyBase.class);
-
     @Nonnull private final String description;
 
     /** ForSt instance from the backend. */
@@ -194,28 +186,6 @@ public abstract class ForStSnapshotStrategyBase<K, R 
extends SnapshotResources>
     @Override
     public abstract void close();
 
-    protected void cleanupIncompleteSnapshot(
-            @Nonnull CloseableRegistry tmpResourcesRegistry,
-            @Nonnull SnapshotDirectory localBackupDirectory) {
-        try {
-            tmpResourcesRegistry.close();
-        } catch (Exception e) {
-            LOG.warn("Could not properly clean tmp resources.", e);
-        }
-
-        if (localBackupDirectory.isSnapshotCompleted()) {
-            try {
-                DirectoryStateHandle directoryStateHandle =
-                        localBackupDirectory.completeSnapshotAndGetHandle();
-                if (directoryStateHandle != null) {
-                    directoryStateHandle.discardState();
-                }
-            } catch (Exception e) {
-                LOG.warn("Could not properly discard local state.", e);
-            }
-        }
-    }
-
     /** Common operation in native ForSt snapshot result supplier. */
     protected abstract class ForStSnapshotOperation
             implements SnapshotResultSupplier<KeyedStateHandle> {
@@ -230,27 +200,6 @@ public abstract class ForStSnapshotStrategyBase<K, R 
extends SnapshotResources>
             this.checkpointStreamFactory = checkpointStreamFactory;
             this.tmpResourcesRegistry = new CloseableRegistry();
         }
-
-        protected Optional<KeyedStateHandle> getLocalSnapshot(
-                SnapshotDirectory localBackupDirectory,
-                @Nullable StreamStateHandle localStreamStateHandle,
-                List<IncrementalKeyedStateHandle.HandleAndLocalPath> 
sharedState)
-                throws IOException {
-            final DirectoryStateHandle directoryStateHandle =
-                    localBackupDirectory.completeSnapshotAndGetHandle();
-            if (directoryStateHandle != null && localStreamStateHandle != 
null) {
-                return Optional.of(
-                        new IncrementalLocalKeyedStateHandle(
-                                backendUID,
-                                checkpointId,
-                                directoryStateHandle,
-                                keyGroupRange,
-                                localStreamStateHandle,
-                                sharedState));
-            } else {
-                return Optional.empty();
-            }
-        }
     }
 
     /** A {@link SnapshotResources} for native ForSt snapshot. */
@@ -301,42 +250,6 @@ public abstract class ForStSnapshotStrategyBase<K, R 
extends SnapshotResources>
         }
     }
 
-    /** A {@link SnapshotResources} for forst sync snapshot. */
-    protected static class ForStSyncSnapshotResources implements 
SnapshotResources {
-        @Nonnull protected final SnapshotDirectory snapshotDirectory;
-
-        @Nonnull protected final PreviousSnapshot previousSnapshot;
-
-        @Nonnull protected final List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots;
-
-        public ForStSyncSnapshotResources(
-                SnapshotDirectory snapshotDirectory,
-                PreviousSnapshot previousSnapshot,
-                List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
-            this.snapshotDirectory = snapshotDirectory;
-            this.previousSnapshot = previousSnapshot;
-            this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
-        }
-
-        @Override
-        public void release() {
-            try {
-                if (snapshotDirectory.exists()) {
-                    LOG.trace(
-                            "Running cleanup for local RocksDB backup 
directory {}.",
-                            snapshotDirectory);
-                    boolean cleanupOk = snapshotDirectory.cleanup();
-
-                    if (!cleanupOk) {
-                        LOG.debug("Could not properly cleanup local RocksDB 
backup directory.");
-                    }
-                }
-            } catch (IOException e) {
-                LOG.warn("Could not properly cleanup local RocksDB backup 
directory.", e);
-            }
-        }
-    }
-
     protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
             new PreviousSnapshot(Collections.emptyList());
 

Reply via email to