This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 36eb336e4a623400a6bfdf1aab29723038e472a1 Author: fredia <fredia...@gmail.com> AuthorDate: Tue Apr 1 14:42:23 2025 +0800 [FLINK-37597][state/forst] Use ResourceGuard to make sure snapshot quit before ForSt disposal --- .../flink/state/forst/ForStKeyedStateBackend.java | 17 +++++ .../state/forst/ForStKeyedStateBackendBuilder.java | 1 + .../snapshot/ForStNativeFullSnapshotStrategy.java | 17 +++-- .../forst/snapshot/ForStSnapshotStrategyBase.java | 87 ---------------------- 4 files changed, 28 insertions(+), 94 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index b443a766b40..1d78c5bd1a5 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -61,6 +61,7 @@ import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; import org.apache.flink.util.StateMigrationException; import org.forstdb.ColumnFamilyHandle; @@ -121,6 +122,12 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { /** The container of ForSt options. */ private final ForStResourceContainer optionsContainer; + /** + * Protects access to ForSt in other threads, like the checkpointing thread from parallel call + * that disposes the ForSt object. + */ + private final ResourceGuard resourceGuard; + /** Factory function to create column family options from state name. */ private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory; @@ -184,6 +191,7 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { UUID backendUID, ExecutionConfig executionConfig, ForStResourceContainer optionsContainer, + ResourceGuard resourceGuard, int keyGroupPrefixBytes, TypeSerializer<K> keySerializer, Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder, @@ -204,6 +212,7 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { this.backendUID = backendUID; this.executionConfig = executionConfig; this.optionsContainer = Preconditions.checkNotNull(optionsContainer); + this.resourceGuard = resourceGuard; this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.keyGroupRange = keyContext.getKeyGroupRange(); this.keySerializer = keySerializer; @@ -538,6 +547,14 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { if (this.disposed) { return; } + // This call will block until all clients that still acquire access to the ForSt + // instance + // have released it, + // so that we cannot release the native resources while clients are still working with + // it in + // parallel. + resourceGuard.close(); + for (StateExecutor executor : managedStateExecutors) { executor.shutdown(); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java index 74a128c4dfd..30b37ed82b2 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java @@ -329,6 +329,7 @@ public class ForStKeyedStateBackendBuilder<K> backendUID, executionConfig, this.optionsContainer, + forstResourceGuard, keyGroupPrefixBytes, this.keySerializerProvider.currentSchemaSerializer(), serializedKeyBuilder, diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java index 37674de9b86..a23293568f7 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStNativeFullSnapshotStrategy.java @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; @@ -152,12 +153,14 @@ public class ForStNativeFullSnapshotStrategy<K> final PreviousSnapshot previousSnapshot = snapshotMetaData(checkpointId, stateMetaInfoSnapshots); - // Disable file deletion for file transformation. ForSt will decide whether to allow file - // deletion based on the number of calls to disableFileDeletions() and - // enableFileDeletions(), so disableFileDeletions() should be call only once. - db.disableFileDeletions(); + try (ResourceGuard.Lease ignoredLease = resourceGuard.acquireResource()) { + + // Disable file deletion for file transformation. ForSt will decide whether to allow + // file + // deletion based on the number of calls to disableFileDeletions() and + // enableFileDeletions(), so disableFileDeletions() should be call only once. + db.disableFileDeletions(); - try { // get live files with flush memtable RocksDB.LiveFiles liveFiles = db.getLiveFiles(true); List<Path> liveFilesPath = @@ -183,13 +186,13 @@ public class ForStNativeFullSnapshotStrategy<K> manifestFile, previousSnapshot, () -> { - try { + try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) { db.enableFileDeletions(false); LOG.info( "Release one file deletion lock with ForStNativeSnapshotResources, backendUID:{}, checkpointId:{}.", backendUID, checkpointId); - } catch (RocksDBException e) { + } catch (RocksDBException | IOException e) { LOG.error( "Enable file deletion failed, backendUID:{}, checkpointId:{}.", backendUID, diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java index 4a9ddcdd49e..7350e8aaf6c 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java @@ -28,15 +28,11 @@ import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManage import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider; import org.apache.flink.runtime.state.CheckpointedStateScope; -import org.apache.flink.runtime.state.DirectoryStateHandle; -import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; -import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; -import org.apache.flink.runtime.state.SnapshotDirectory; import org.apache.flink.runtime.state.SnapshotResources; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.SnapshotStrategy; @@ -50,8 +46,6 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.ResourceGuard; import org.forstdb.RocksDB; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; @@ -76,8 +70,6 @@ import java.util.stream.Collectors; public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources> implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>, AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(ForStSnapshotStrategyBase.class); - @Nonnull private final String description; /** ForSt instance from the backend. */ @@ -194,28 +186,6 @@ public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources> @Override public abstract void close(); - protected void cleanupIncompleteSnapshot( - @Nonnull CloseableRegistry tmpResourcesRegistry, - @Nonnull SnapshotDirectory localBackupDirectory) { - try { - tmpResourcesRegistry.close(); - } catch (Exception e) { - LOG.warn("Could not properly clean tmp resources.", e); - } - - if (localBackupDirectory.isSnapshotCompleted()) { - try { - DirectoryStateHandle directoryStateHandle = - localBackupDirectory.completeSnapshotAndGetHandle(); - if (directoryStateHandle != null) { - directoryStateHandle.discardState(); - } - } catch (Exception e) { - LOG.warn("Could not properly discard local state.", e); - } - } - } - /** Common operation in native ForSt snapshot result supplier. */ protected abstract class ForStSnapshotOperation implements SnapshotResultSupplier<KeyedStateHandle> { @@ -230,27 +200,6 @@ public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources> this.checkpointStreamFactory = checkpointStreamFactory; this.tmpResourcesRegistry = new CloseableRegistry(); } - - protected Optional<KeyedStateHandle> getLocalSnapshot( - SnapshotDirectory localBackupDirectory, - @Nullable StreamStateHandle localStreamStateHandle, - List<IncrementalKeyedStateHandle.HandleAndLocalPath> sharedState) - throws IOException { - final DirectoryStateHandle directoryStateHandle = - localBackupDirectory.completeSnapshotAndGetHandle(); - if (directoryStateHandle != null && localStreamStateHandle != null) { - return Optional.of( - new IncrementalLocalKeyedStateHandle( - backendUID, - checkpointId, - directoryStateHandle, - keyGroupRange, - localStreamStateHandle, - sharedState)); - } else { - return Optional.empty(); - } - } } /** A {@link SnapshotResources} for native ForSt snapshot. */ @@ -301,42 +250,6 @@ public abstract class ForStSnapshotStrategyBase<K, R extends SnapshotResources> } } - /** A {@link SnapshotResources} for forst sync snapshot. */ - protected static class ForStSyncSnapshotResources implements SnapshotResources { - @Nonnull protected final SnapshotDirectory snapshotDirectory; - - @Nonnull protected final PreviousSnapshot previousSnapshot; - - @Nonnull protected final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; - - public ForStSyncSnapshotResources( - SnapshotDirectory snapshotDirectory, - PreviousSnapshot previousSnapshot, - List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) { - this.snapshotDirectory = snapshotDirectory; - this.previousSnapshot = previousSnapshot; - this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; - } - - @Override - public void release() { - try { - if (snapshotDirectory.exists()) { - LOG.trace( - "Running cleanup for local RocksDB backup directory {}.", - snapshotDirectory); - boolean cleanupOk = snapshotDirectory.cleanup(); - - if (!cleanupOk) { - LOG.debug("Could not properly cleanup local RocksDB backup directory."); - } - } - } catch (IOException e) { - LOG.warn("Could not properly cleanup local RocksDB backup directory.", e); - } - } - } - protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = new PreviousSnapshot(Collections.emptyList());