This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push: new 4f71f44145b [FLINK-37597][state/forst] Use ResourceGuard to make sure snapshot quit before ForSt disposal 4f71f44145b is described below commit 4f71f44145b0ab7ab3cc23783cd4d5108efafb2e Author: fredia <fredia...@gmail.com> AuthorDate: Tue Apr 1 14:42:23 2025 +0800 [FLINK-37597][state/forst] Use ResourceGuard to make sure snapshot quit before ForSt disposal --- .../flink/state/forst/ForStKeyedStateBackend.java | 17 +++++ .../state/forst/ForStKeyedStateBackendBuilder.java | 1 + .../snapshot/ForStNativeFullSnapshotStrategy.java | 7 +- .../forst/snapshot/ForStSnapshotStrategyBase.java | 87 ---------------------- 4 files changed, 23 insertions(+), 89 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index 5cc7e2f3d2b..4b40f89fb2a 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -60,6 +60,7 @@ import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; import org.apache.flink.util.StateMigrationException; import org.forstdb.ColumnFamilyHandle; @@ -120,6 +121,12 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { /** The container of ForSt options. */ private final ForStResourceContainer optionsContainer; + /** + * Protects access to ForSt in other threads, like the checkpointing thread from parallel call + * that disposes the ForSt object. + */ + private final ResourceGuard resourceGuard; + /** Factory function to create column family options from state name. */ private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory; @@ -183,6 +190,7 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { UUID backendUID, ExecutionConfig executionConfig, ForStResourceContainer optionsContainer, + ResourceGuard resourceGuard, int keyGroupPrefixBytes, TypeSerializer<K> keySerializer, Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder, @@ -203,6 +211,7 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { this.backendUID = backendUID; this.executionConfig = executionConfig; this.optionsContainer = Preconditions.checkNotNull(optionsContainer); + this.resourceGuard = resourceGuard; this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.keyGroupRange = keyContext.getKeyGroupRange(); this.keySerializer = keySerializer; @@ -510,6 +519,14 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { if (this.disposed) { return; } + // This call will block until all clients that still acquire access to the ForSt + // instance + // have released it, + // so that we cannot release the native resources while clients are still working with + // it in + // parallel. + resourceGuard.close(); + for (StateExecutor executor : managedStateExecutors) { executor.shutdown(); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java index 74a128c4dfd..30b37ed82b2 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java @@ -329,6 +329,7 @@ public class ForStKeyedStateBackendBuilder<K> backendUID, executionConfig, this.optionsContainer, + forstResourceGuard, keyGroupPrefixBytes, this.keySerializerProvider.currentSchemaSerializer(), serializedKeyBuilder, diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java index 37674de9b86..e4ba427f389 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java @@ -152,11 +152,12 @@ public class ForStNativeFullSnapshotStrategy<K> final PreviousSnapshot previousSnapshot = snapshotMetaData(checkpointId, stateMetaInfoSnapshots); - // Disable file deletion for file transformation. ForSt will decide whether to allow file + ResourceGuard.Lease lease = resourceGuard.acquireResource(); + // Disable file deletion for file transformation. ForSt will decide whether to allow + // file // deletion based on the number of calls to disableFileDeletions() and // enableFileDeletions(), so disableFileDeletions() should be call only once. db.disableFileDeletions(); - try { // get live files with flush memtable RocksDB.LiveFiles liveFiles = db.getLiveFiles(true); @@ -185,6 +186,7 @@ public class ForStNativeFullSnapshotStrategy<K> () -> { try { db.enableFileDeletions(false); + lease.close(); LOG.info( "Release one file deletion lock with ForStNativeSnapshotResources, backendUID:{}, checkpointId:{}.", backendUID, @@ -203,6 +205,7 @@ public class ForStNativeFullSnapshotStrategy<K> backendUID, checkpointId); db.enableFileDeletions(false); + lease.close(); throw e; } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java index 4a9ddcdd49e..7350e8aaf6c 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java @@ -28,15 +28,11 @@ import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManage import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider; import org.apache.flink.runtime.state.CheckpointedStateScope; -import org.apache.flink.runtime.state.DirectoryStateHandle; -import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; -import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; -import org.apache.flink.runtime.state.SnapshotDirectory; import org.apache.flink.runtime.state.SnapshotResources; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.SnapshotStrategy; @@ -50,8 +46,6 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.ResourceGuard; import org.forstdb.RocksDB; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; @@ -76,8 +70,6 @@ import java.util.stream.Collectors; public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources> implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>, AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(ForStSnapshotStrategyBase.class); - @Nonnull private final String description; /** ForSt instance from the backend. */ @@ -194,28 +186,6 @@ public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources> @Override public abstract void close(); - protected void cleanupIncompleteSnapshot( - @Nonnull CloseableRegistry tmpResourcesRegistry, - @Nonnull SnapshotDirectory localBackupDirectory) { - try { - tmpResourcesRegistry.close(); - } catch (Exception e) { - LOG.warn("Could not properly clean tmp resources.", e); - } - - if (localBackupDirectory.isSnapshotCompleted()) { - try { - DirectoryStateHandle directoryStateHandle = - localBackupDirectory.completeSnapshotAndGetHandle(); - if (directoryStateHandle != null) { - directoryStateHandle.discardState(); - } - } catch (Exception e) { - LOG.warn("Could not properly discard local state.", e); - } - } - } - /** Common operation in native ForSt snapshot result supplier. */ protected abstract class ForStSnapshotOperation implements SnapshotResultSupplier<KeyedStateHandle> { @@ -230,27 +200,6 @@ public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources> this.checkpointStreamFactory = checkpointStreamFactory; this.tmpResourcesRegistry = new CloseableRegistry(); } - - protected Optional<KeyedStateHandle> getLocalSnapshot( - SnapshotDirectory localBackupDirectory, - @Nullable StreamStateHandle localStreamStateHandle, - List<IncrementalKeyedStateHandle.HandleAndLocalPath> sharedState) - throws IOException { - final DirectoryStateHandle directoryStateHandle = - localBackupDirectory.completeSnapshotAndGetHandle(); - if (directoryStateHandle != null && localStreamStateHandle != null) { - return Optional.of( - new IncrementalLocalKeyedStateHandle( - backendUID, - checkpointId, - directoryStateHandle, - keyGroupRange, - localStreamStateHandle, - sharedState)); - } else { - return Optional.empty(); - } - } } /** A {@link SnapshotResources} for native ForSt snapshot. */ @@ -301,42 +250,6 @@ public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources> } } - /** A {@link SnapshotResources} for forst sync snapshot. */ - protected static class ForStSyncSnapshotResources implements SnapshotResources { - @Nonnull protected final SnapshotDirectory snapshotDirectory; - - @Nonnull protected final PreviousSnapshot previousSnapshot; - - @Nonnull protected final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; - - public ForStSyncSnapshotResources( - SnapshotDirectory snapshotDirectory, - PreviousSnapshot previousSnapshot, - List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) { - this.snapshotDirectory = snapshotDirectory; - this.previousSnapshot = previousSnapshot; - this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; - } - - @Override - public void release() { - try { - if (snapshotDirectory.exists()) { - LOG.trace( - "Running cleanup for local RocksDB backup directory {}.", - snapshotDirectory); - boolean cleanupOk = snapshotDirectory.cleanup(); - - if (!cleanupOk) { - LOG.debug("Could not properly cleanup local RocksDB backup directory."); - } - } - } catch (IOException e) { - LOG.warn("Could not properly cleanup local RocksDB backup directory.", e); - } - } - } - protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = new PreviousSnapshot(Collections.emptyList());