This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 3bc96001656 [FLINK-37620][state/forst] ForSt Sync mode support remote storage (#26412) 3bc96001656 is described below commit 3bc96001656cda6ccaeda22741c75e035505ec1b Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Wed Apr 9 10:32:51 2025 +0800 [FLINK-37620][state/forst] ForSt Sync mode support remote storage (#26412) --- .../docs/ops/state/disaggregated_state.md | 14 +++ docs/content/docs/ops/state/disaggregated_state.md | 14 +++ .../shortcodes/generated/forst_configuration.html | 6 ++ .../generated/state_backend_forst_section.html | 6 ++ .../state/forst/ForStKeyedStateBackendBuilder.java | 1 - .../org/apache/flink/state/forst/ForStOptions.java | 11 ++ .../flink/state/forst/ForStStateBackend.java | 85 +++++++-------- .../state/forst/sync/ForStPriorityQueueConfig.java | 18 ++-- .../forst/sync/ForStSyncKeyedStateBackend.java | 115 +++++++++------------ .../sync/ForStSyncKeyedStateBackendBuilder.java | 72 +++++-------- .../flink/state/forst/ForStStateBackendTest.java | 41 ++++++++ 11 files changed, 219 insertions(+), 164 deletions(-) diff --git a/docs/content.zh/docs/ops/state/disaggregated_state.md b/docs/content.zh/docs/ops/state/disaggregated_state.md index e59a765766b..d18458cef71 100644 --- a/docs/content.zh/docs/ops/state/disaggregated_state.md +++ b/docs/content.zh/docs/ops/state/disaggregated_state.md @@ -150,6 +150,20 @@ state.backend.forst.primary-dir: s3://your-bucket/forst-state checkpoint and fast recovery, since the ForSt will perform file copy between the primary storage location and the checkpoint directory during checkpointing and recovery. +#### ForSt Local Storage Location + +By default, ForSt will **ONLY** disaggregate state when asynchronous APIs (State V2) are used. When +using synchronous state APIs in DataStream and SQL jobs, ForSt will only serve as **local state store**. +Since a job may contain multiple ForSt instances with mixed API usage, synchronous local state access +along with asynchronous remote state access could help achieve better overall throughput. +If you want the operators with synchronous state APIs to store state in remote, the following configuration will help: +```yaml +state.backend.forst.sync.enforce-local: false +``` +And you can specify the local storage location via: +```yaml +state.backend.forst.local-dir: path-to-local-dir +``` #### ForSt File Cache diff --git a/docs/content/docs/ops/state/disaggregated_state.md b/docs/content/docs/ops/state/disaggregated_state.md index e59a765766b..5d2a757f186 100644 --- a/docs/content/docs/ops/state/disaggregated_state.md +++ b/docs/content/docs/ops/state/disaggregated_state.md @@ -150,6 +150,20 @@ state.backend.forst.primary-dir: s3://your-bucket/forst-state checkpoint and fast recovery, since the ForSt will perform file copy between the primary storage location and the checkpoint directory during checkpointing and recovery. +#### ForSt Local Storage Location + +By default, ForSt will **ONLY** disaggregate state when asynchronous APIs (State V2) are used. When +using synchronous state APIs in DataStream and SQL jobs, ForSt will only serve as **local state store**. +Since a job may contain multiple ForSt instances with mixed API usage, synchronous local state access +along with asynchronous remote state access could help achieve better overall throughput. +If you want the operators with synchronous state APIs to store state in remote, the following configuration will help: +```yaml +state.backend.forst.sync.enforce-local: false +``` +And you can specify the local storage location via: +```yaml +state.backend.forst.local-dir: path-to-local-dir +``` #### ForSt File Cache diff --git a/docs/layouts/shortcodes/generated/forst_configuration.html b/docs/layouts/shortcodes/generated/forst_configuration.html index 5b5b50ac970..f66c1af4d8b 100644 --- a/docs/layouts/shortcodes/generated/forst_configuration.html +++ b/docs/layouts/shortcodes/generated/forst_configuration.html @@ -116,6 +116,12 @@ <td>String</td> <td>The primary directory where ForSt puts its SST files. By default, it will be the same as the checkpoint directory. Recognized shortcut name is 'checkpoint-dir', which means that ForSt shares the directory with checkpoint, and 'local-dir', which means that ForSt will use the local directory of TaskManager.</td> </tr> + <tr> + <td><h5>state.backend.forst.sync.enforce-local</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Whether to enforce local state for operators in synchronous mode when enabling disaggregated state. This is useful in cases where both synchronous operators and asynchronous operators are used in the same job.</td> + </tr> <tr> <td><h5>state.backend.forst.timer-service.cache-size</h5></td> <td style="word-wrap: break-word;">128</td> diff --git a/docs/layouts/shortcodes/generated/state_backend_forst_section.html b/docs/layouts/shortcodes/generated/state_backend_forst_section.html index f4782714f39..6a7ebbfbd93 100644 --- a/docs/layouts/shortcodes/generated/state_backend_forst_section.html +++ b/docs/layouts/shortcodes/generated/state_backend_forst_section.html @@ -50,6 +50,12 @@ <td>String</td> <td>The primary directory where ForSt puts its SST files. By default, it will be the same as the checkpoint directory. Recognized shortcut name is 'checkpoint-dir', which means that ForSt shares the directory with checkpoint, and 'local-dir', which means that ForSt will use the local directory of TaskManager.</td> </tr> + <tr> + <td><h5>state.backend.forst.sync.enforce-local</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Whether to enforce local state for operators in synchronous mode when enabling disaggregated state. This is useful in cases where both synchronous operators and asynchronous operators are used in the same job.</td> + </tr> <tr> <td><h5>state.backend.forst.timer-service.cache-size</h5></td> <td style="word-wrap: break-word;">128</td> diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java index 30b37ed82b2..bfb2291be32 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java @@ -359,7 +359,6 @@ public class ForStKeyedStateBackendBuilder<K> // env. We expect to directly use the dfs directory in flink env or local directory as // working dir. We will implement this in ForStDB later, but before that, we achieved this // by setting the dbPath to "/" when the dfs directory existed. - // TODO: use localForStPath as dbPath after ForSt Support mixing local-dir and remote-dir Path instanceForStPath = optionsContainer.getRemoteForStPath() == null ? optionsContainer.getLocalForStPath() diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java index 6d27d9b2a7b..8a816e3e1b1 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java @@ -66,6 +66,17 @@ public class ForStOptions { CHECKPOINT_DIR_AS_PRIMARY_SHORTCUT, LOCAL_DIR_AS_PRIMARY_SHORTCUT)); + @Documentation.Section(Documentation.Sections.STATE_BACKEND_FORST) + public static final ConfigOption<Boolean> SYNC_ENFORCE_LOCAL = + ConfigOptions.key("state.backend.forst.sync.enforce-local") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to enforce local state for operators in synchronous mode when" + + " enabling disaggregated state. This is useful in cases where " + + "both synchronous operators and asynchronous operators are used " + + "in the same job."); + @Documentation.Section(Documentation.Sections.STATE_BACKEND_FORST) public static final ConfigOption<String> CACHE_DIRECTORY = ConfigOptions.key("state.backend.forst.cache.dir") diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java index 6dc4dba156d..57a8a729f45 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; @@ -188,8 +189,12 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend /** The recovery claim mode. */ private RecoveryClaimMode recoveryClaimMode = RecoveryClaimMode.DEFAULT; + /** Whether to share the ForSt remote directory with checkpoint directory. */ private boolean remoteShareWithCheckpoint; + /** Whether to use local directory as primary directory in synchronous mode. */ + private boolean forceSyncLocal; + // ------------------------------------------------------------------------ /** Creates a new {@code ForStStateBackend} for storing state. */ @@ -203,6 +208,7 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED; this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED; this.remoteShareWithCheckpoint = false; + this.forceSyncLocal = true; } /** @@ -237,6 +243,7 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend : new Path(remoteDirStr); } } + this.forceSyncLocal = config.get(ForStOptions.SYNC_ENFORCE_LOCAL); this.priorityQueueConfig = ForStPriorityQueueConfig.fromOtherAndConfiguration( @@ -409,31 +416,7 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend lazyInitializeForJob(env, fileCompatibleIdentifier); - String opChildPath = - String.format( - "op_%s_attempt_%s", - fileCompatibleIdentifier, env.getTaskInfo().getAttemptNumber()); - - Path localBasePath = - new Path( - new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath) - .getAbsolutePath()); - Path remoteBasePath = null; - if (remoteForStDirectory != null) { - remoteBasePath = - new Path(new Path(remoteForStDirectory, jobId.toHexString()), opChildPath); - } else if (remoteShareWithCheckpoint) { - if (env.getCheckpointStorageAccess() instanceof FsCheckpointStorageAccess) { - Path sharedStateDirectory = - ((FsCheckpointStorageAccess) env.getCheckpointStorageAccess()) - .getSharedStateDirectory(); - remoteBasePath = new Path(sharedStateDirectory, opChildPath); - LOG.info("Set remote ForSt directory to checkpoint directory {}", remoteBasePath); - } else { - LOG.warn( - "Remote ForSt directory can't be set, because checkpoint directory isn't on file system."); - } - } + Tuple2<Path, Path> localAndRemoteBasePath = getForStBasePath(fileCompatibleIdentifier, env); final OpaqueMemoryResource<ForStSharedResources> sharedResources = ForStOperationUtils.allocateSharedCachesIfConfigured( @@ -448,8 +431,8 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend final ForStResourceContainer resourceContainer = createOptionsAndResourceContainer( sharedResources, - localBasePath, - remoteBasePath, + localAndRemoteBasePath.f0, + localAndRemoteBasePath.f1, env.getCheckpointStorageAccess(), parameters.getMetricGroup(), nativeMetricOptions.isStatisticsEnabled()); @@ -505,17 +488,7 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend lazyInitializeForJob(env, fileCompatibleIdentifier); - Path instanceBasePath = - new Path( - new File( - getNextStoragePath(), - "job_" - + jobId - + "_op_" - + fileCompatibleIdentifier - + "_uuid_" - + UUID.randomUUID()) - .getAbsolutePath()); + Tuple2<Path, Path> localAndRemoteBasePath = getForStBasePath(fileCompatibleIdentifier, env); LocalRecoveryConfig localRecoveryConfig = env.getTaskStateManager().createLocalRecoveryConfig(); @@ -533,10 +506,10 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend final ForStResourceContainer resourceContainer = createOptionsAndResourceContainer( sharedResources, - instanceBasePath, - null, + localAndRemoteBasePath.f0, + forceSyncLocal ? null : localAndRemoteBasePath.f1, env.getCheckpointStorageAccess(), - null, + parameters.getMetricGroup(), nativeMetricOptions.isStatisticsEnabled()); ExecutionConfig executionConfig = env.getExecutionConfig(); @@ -549,7 +522,6 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend new ForStSyncKeyedStateBackendBuilder<>( parameters.getOperatorIdentifier(), env.getUserCodeClassLoader().asClassLoader(), - instanceBasePath, resourceContainer, stateName -> resourceContainer.getColumnOptions(), parameters.getKvStateRegistry(), @@ -818,6 +790,35 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend return configuration; } + Tuple2<Path, Path> getForStBasePath(String operatorIdentifier, Environment env) { + String opChildPath = + String.format( + "op_%s_attempt_%s", + operatorIdentifier, env.getTaskInfo().getAttemptNumber()); + + Path localBasePath = + new Path( + new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath) + .getAbsolutePath()); + Path remoteBasePath = null; + if (remoteForStDirectory != null) { + remoteBasePath = + new Path(new Path(remoteForStDirectory, jobId.toHexString()), opChildPath); + } else if (remoteShareWithCheckpoint) { + if (env.getCheckpointStorageAccess() instanceof FsCheckpointStorageAccess) { + Path sharedStateDirectory = + ((FsCheckpointStorageAccess) env.getCheckpointStorageAccess()) + .getSharedStateDirectory(); + remoteBasePath = new Path(sharedStateDirectory, opChildPath); + LOG.info("Set remote ForSt directory to checkpoint directory {}", remoteBasePath); + } else { + LOG.warn( + "Remote ForSt directory can't be set, because checkpoint directory isn't on file system."); + } + } + return Tuple2.of(localBasePath, remoteBasePath); + } + @VisibleForTesting ForStResourceContainer createOptionsAndResourceContainer(@Nullable Path localBasePath) { return createOptionsAndResourceContainer(null, localBasePath, null, null, null, false); diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStPriorityQueueConfig.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStPriorityQueueConfig.java index a60907ed7af..afbd3bfe6a6 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStPriorityQueueConfig.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStPriorityQueueConfig.java @@ -28,7 +28,7 @@ import static org.apache.flink.state.forst.ForStOptions.FORST_TIMER_SERVICE_FACT import static org.apache.flink.state.forst.ForStOptions.TIMER_SERVICE_FACTORY; import static org.apache.flink.util.Preconditions.checkNotNull; -/** The configuration of rocksDB priority queue state implementation. */ +/** The configuration of ForSt priority queue state implementation. */ public class ForStPriorityQueueConfig implements Serializable { private static final long serialVersionUID = 1L; @@ -39,7 +39,7 @@ public class ForStPriorityQueueConfig implements Serializable { private @Nullable ForStStateBackend.PriorityQueueStateType priorityQueueStateType; /** cache size per keyGroup for rocksDB priority queue state. */ - private int rocksDBPriorityQueueSetCacheSize; + private int forStDBPriorityQueueSetCacheSize; public ForStPriorityQueueConfig() { this(null, UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE); @@ -47,9 +47,9 @@ public class ForStPriorityQueueConfig implements Serializable { public ForStPriorityQueueConfig( ForStStateBackend.PriorityQueueStateType priorityQueueStateType, - int rocksDBPriorityQueueSetCacheSize) { + int forStDBPriorityQueueSetCacheSize) { this.priorityQueueStateType = priorityQueueStateType; - this.rocksDBPriorityQueueSetCacheSize = rocksDBPriorityQueueSetCacheSize; + this.forStDBPriorityQueueSetCacheSize = forStDBPriorityQueueSetCacheSize; } /** @@ -70,10 +70,10 @@ public class ForStPriorityQueueConfig implements Serializable { * Gets the cache size of rocksDB priority queue set. It will fall back to the default value if * it is not explicitly set. */ - public int getRocksDBPriorityQueueSetCacheSize() { - return rocksDBPriorityQueueSetCacheSize == UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE + public int getForStDBPriorityQueueSetCacheSize() { + return forStDBPriorityQueueSetCacheSize == UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE ? FORST_TIMER_SERVICE_FACTORY_CACHE_SIZE.defaultValue() - : rocksDBPriorityQueueSetCacheSize; + : forStDBPriorityQueueSetCacheSize; } public static ForStPriorityQueueConfig fromOtherAndConfiguration( @@ -83,10 +83,10 @@ public class ForStPriorityQueueConfig implements Serializable { ? config.get(TIMER_SERVICE_FACTORY) : other.priorityQueueStateType; int cacheSize = - (other.rocksDBPriorityQueueSetCacheSize + (other.forStDBPriorityQueueSetCacheSize == UNDEFINED_ROCKSDB_PRIORITY_QUEUE_SET_CACHE_SIZE) ? config.get(FORST_TIMER_SERVICE_FACTORY_CACHE_SIZE) - : other.rocksDBPriorityQueueSetCacheSize; + : other.forStDBPriorityQueueSetCacheSize; return new ForStPriorityQueueConfig(priorityQueueType, cacheSize); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java index da26ae0d937..783031c1386 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java @@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.ICloseableRegistry; -import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -64,7 +63,6 @@ import org.apache.flink.state.forst.ForStNativeMetricMonitor; import org.apache.flink.state.forst.ForStOperationUtils; import org.apache.flink.state.forst.ForStResourceContainer; import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase; -import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; @@ -85,7 +83,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.Closeable; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -105,7 +102,7 @@ import static org.apache.flink.runtime.state.SnapshotExecutionType.ASYNCHRONOUS; import static org.apache.flink.util.Preconditions.checkState; /** - * An {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and serializes + * An {@link AbstractKeyedStateBackend} that stores its state in {@code ForStDB} and serializes * state to streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} * upon checkpointing. This state backend can store very large state that exceeds memory and spills * to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe. @@ -178,17 +175,14 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> /** Factory function to create column family options from state name. */ private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory; - /** The container of RocksDB option factory and predefined options. */ + /** The container of ForSt option factory and predefined options. */ private final ForStResourceContainer optionsContainer; - /** Path where this configured instance stores its data directory. */ - private final Path instanceBasePath; - /** - * Protects access to RocksDB in other threads, like the checkpointing thread from parallel call - * that disposes the RocksDB object. + * Protects access to ForSt in other threads, like the checkpointing thread from parallel call + * that disposes the ForSt object. */ - private final ResourceGuard rocksDBResourceGuard; + private final ResourceGuard forstResourceGuard; /** The write options to use in the states. We disable write ahead logging. */ private final WriteOptions writeOptions; @@ -228,7 +222,7 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> */ private final ColumnFamilyHandle defaultColumnFamily; - /** Shared wrapper for batch writes to the RocksDB instance. */ + /** Shared wrapper for batch writes to the ForSt instance. */ private final ForStDBWriteBatchWrapper writeBatchWrapper; /** @@ -244,14 +238,14 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> private final PriorityQueueSetFactory priorityQueueFactory; /** - * Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across - * all states. + * Helper to build the byte arrays of composite keys to address data in forst. Shared across all + * states. */ private final SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder; /** - * Our RocksDB database, this is used by the actual subclasses of {@link AbstractForStSyncState} - * to store state. The different k/v states that we have don't each have their own RocksDB + * Our ForSt database, this is used by the actual subclasses of {@link AbstractForStSyncState} + * to store state. The different k/v states that we have don't each have their own ForSt * instance. They all write to this instance but to their own column family. */ protected final RocksDB db; @@ -263,7 +257,6 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> public ForStSyncKeyedStateBackend( ClassLoader userCodeClassLoader, - Path instanceBasePath, ForStResourceContainer optionsContainer, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, @@ -277,7 +270,7 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> int keyGroupPrefixBytes, CloseableRegistry cancelStreamRegistry, StreamCompressionDecorator keyGroupCompressionDecorator, - ResourceGuard rocksDBResourceGuard, + ResourceGuard forstResourceGuard, ForStSnapshotStrategyBase<K, ?> checkpointSnapshotStrategy, ForStDBWriteBatchWrapper writeBatchWrapper, ColumnFamilyHandle defaultColumnFamilyHandle, @@ -307,8 +300,6 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> this.optionsContainer = Preconditions.checkNotNull(optionsContainer); - this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath); - this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.kvStateInformation = kvStateInformation; this.createdKVStates = new HashMap<>(); @@ -317,7 +308,7 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> this.readOptions = optionsContainer.getReadOptions(); this.writeBatchSize = writeBatchSize; this.db = db; - this.rocksDBResourceGuard = rocksDBResourceGuard; + this.forstResourceGuard = forstResourceGuard; this.checkpointSnapshotStrategy = checkpointSnapshotStrategy; this.writeBatchWrapper = writeBatchWrapper; this.defaultColumnFamily = defaultColumnFamilyHandle; @@ -360,7 +351,8 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> namespace, namespaceSerializer, namespaceOutputView, ambiguousKeyPossible); nameSpaceBytes = namespaceOutputView.getCopyOfBuffer(); } catch (IOException ex) { - throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex); + throw new FlinkRuntimeException( + "Failed to get keys from ForSt sync state backend.", ex); } ForStIteratorWrapper iterator = @@ -398,6 +390,7 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> return Stream.empty(); } + @SuppressWarnings("unchecked") RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo = (RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.metaInfo; @@ -428,12 +421,6 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> return targetStream.onClose(iteratorWrapper::close); } - @VisibleForTesting - ColumnFamilyHandle getColumnFamilyHandle(String state) { - ForStOperationUtils.ForStKvStateInfo columnInfo = kvStateInformation.get(state); - return columnInfo != null ? columnInfo.columnFamilyHandle : null; - } - @Override public void setCurrentKey(K newKey) { super.setCurrentKey(newKey); @@ -454,11 +441,11 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> } super.dispose(); - // This call will block until all clients that still acquire access to the RocksDB instance + // This call will block until all clients that still acquire access to the ForSt instance // have released it, // so that we cannot release the native resources while clients are still working with it in // parallel. - rocksDBResourceGuard.close(); + forstResourceGuard.close(); // IMPORTANT: null reference to signal potential async checkpoint workers that the db was // disposed, as @@ -467,7 +454,7 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> IOUtils.closeQuietly(writeBatchWrapper); // Metric collection occurs on a background thread. When this method returns - // it is guaranteed that thr RocksDB reference has been invalidated + // it is guaranteed that thr ForSt reference has been invalidated // and no more metric collection will be attempted against the database. if (nativeMetricMonitor != null) { nativeMetricMonitor.close(); @@ -497,11 +484,24 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> columnFamilyOptions.forEach(IOUtils::closeQuietly); + LOG.info( + "Closed ForSt State Backend. Cleaning up ForSt local working directory {}, remote working directory {}.", + optionsContainer.getLocalBasePath(), + optionsContainer.getRemoteBasePath()); + + try { + optionsContainer.clearDirectories(); + } catch (Exception ex) { + LOG.warn( + "Could not delete ForSt local working directory {}, remote working directory {}.", + optionsContainer.getLocalBasePath(), + optionsContainer.getRemoteBasePath(), + ex); + } + IOUtils.closeQuietly(optionsContainer); kvStateInformation.clear(); - - cleanInstanceBasePath(); } IOUtils.closeQuietly(checkpointSnapshotStrategy); this.disposed = true; @@ -531,18 +531,6 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> } } - private void cleanInstanceBasePath() { - LOG.info( - "Closed RocksDB State Backend. Cleaning up RocksDB working directory {}.", - instanceBasePath); - - try { - FileUtils.deleteDirectory(new File(instanceBasePath.getPath())); - } catch (IOException ex) { - LOG.warn("Could not delete RocksDB working directory: {}", instanceBasePath, ex); - } - } - // ------------------------------------------------------------------------ // Getters and Setters // ------------------------------------------------------------------------ @@ -568,15 +556,10 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> return sharedRocksKeyBuilder; } - @VisibleForTesting - boolean isDisposed() { - return this.disposed; - } - /** - * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can - * be canceled and is also stopped when the backend is closed through {@link #dispose()}. For - * each backend, this method must always be called by the same thread. + * Triggers an asynchronous snapshot of the keyed state backend from ForSt. This snapshot can be + * canceled and is also stopped when the backend is closed through {@link #dispose()}. For each + * backend, this method must always be called by the same thread. * * @param checkpointId The Id of the checkpoint. * @param timestamp The timestamp of the checkpoint. @@ -627,11 +610,11 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> } /** - * Registers a k/v state information, which includes its state id, type, RocksDB column family + * Registers a k/v state information, which includes its state id, type, ForSt column family * handle, and serializers. * * <p>When restoring from a snapshot, we don’t restore the individual k/v states, just the - * global RocksDB database and the list of k/v state information. When a k/v state is first + * global ForSt database and the list of k/v state information. When a k/v state is first * requested we check here whether we already have a registered entry for that and return it * (after some necessary state compatibility checks) or create a new one if it does not exist. */ @@ -757,7 +740,7 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> } /** - * Migrate only the state value, that is the "value" that is stored in RocksDB. We don't migrate + * Migrate only the state value, that is the "value" that is stored in ForSt. We don't migrate * the key here, which is made up of key group, key, namespace and map key (in case of * MapState). */ @@ -805,9 +788,9 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> } @SuppressWarnings("unchecked") - AbstractForStSyncState<?, ?, SV> rocksDBState = (AbstractForStSyncState<?, ?, SV>) state; + AbstractForStSyncState<?, ?, SV> forStState = (AbstractForStSyncState<?, ?, SV>) state; - Snapshot rocksDBSnapshot = db.getSnapshot(); + Snapshot forstSnapshot = db.getSnapshot(); try (ForStIteratorWrapper iterator = ForStOperationUtils.getForStIterator(db, stateMetaInfo.f0, readOptions); ForStDBWriteBatchWrapper batchWriter = @@ -822,7 +805,7 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> while (iterator.isValid()) { serializedValueInput.setBuffer(iterator.value()); - rocksDBState.migrateSerializedValue( + forStState.migrateSerializedValue( serializedValueInput, migratedSerializedValueOutput, stateMetaInfo.f1.getPreviousStateSerializer(), @@ -837,8 +820,8 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> iterator.next(); } } finally { - db.releaseSnapshot(rocksDBSnapshot); - rocksDBSnapshot.close(); + db.releaseSnapshot(forstSnapshot); + forstSnapshot.close(); } } @@ -925,11 +908,6 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> "State %s is not supported by %s", stateDesc.getClass(), this.getClass()); } - /** Only visible for testing, DO NOT USE. */ - Path getInstanceBasePath() { - return instanceBasePath; - } - @VisibleForTesting @Override public int numKeyValueStateEntries() { @@ -967,4 +945,9 @@ public class ForStSyncKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> long getWriteBatchSize() { return writeBatchSize; } + + @VisibleForTesting + public ForStResourceContainer getOptionsContainer() { + return optionsContainer; + } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java index 54ced97fa22..64ea50ab850 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java @@ -104,12 +104,10 @@ import static org.apache.flink.util.Preconditions.checkArgument; */ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBuilder<K> { - private static final String DB_INSTANCE_DIR_STRING = "db"; - /** String that identifies the operator that owns this backend. */ private final String operatorIdentifier; - /** The configuration of rocksDB priorityQueue state. */ + /** The configuration of ForSt priorityQueue state. */ private final ForStPriorityQueueConfig priorityQueueConfig; /** The configuration of local recovery. */ @@ -118,22 +116,16 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack /** Factory function to create column family options from state name. */ private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory; - /** The container of RocksDB option factory and predefined options. */ + /** The container of ForSt option factory and predefined options. */ private final ForStResourceContainer optionsContainer; - /** Path where this configured instance stores its data directory. */ - private final Path instanceBasePath; - - /** Path where this configured instance stores its RocksDB database. */ - private final Path instanceForStDBPath; - private final MetricGroup metricGroup; private final StateBackend.CustomInitializationMetrics customInitializationMetrics; /** True if incremental checkpointing is enabled. */ private boolean enableIncrementalCheckpointing; - /** RocksDB property-based and statistics-based native metrics options. */ + /** ForSt property-based and statistics-based native metrics options. */ private ForStNativeMetricOptions nativeMetricOptions; private long writeBatchSize = @@ -154,7 +146,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack public ForStSyncKeyedStateBackendBuilder( String operatorIdentifier, ClassLoader userCodeClassLoader, - Path instanceBasePath, ForStResourceContainer optionsContainer, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, @@ -190,8 +181,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack // ensure that we use the right merge operator, because other code relies on this this.columnFamilyOptionsFactory = Preconditions.checkNotNull(columnFamilyOptionsFactory); this.optionsContainer = optionsContainer; - this.instanceBasePath = instanceBasePath; - this.instanceForStDBPath = getInstanceRocksDBPath(instanceBasePath); this.metricGroup = metricGroup; this.customInitializationMetrics = customInitializationMetrics; this.enableIncrementalCheckpointing = false; @@ -203,7 +192,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack ForStSyncKeyedStateBackendBuilder( String operatorIdentifier, ClassLoader userCodeClassLoader, - Path instanceBasePath, ForStResourceContainer optionsContainer, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, @@ -212,7 +200,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, - ForStPriorityQueueConfig rocksDBPriorityQueueConfig, + ForStPriorityQueueConfig forStPriorityQueueConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, MetricGroup metricGroup, @@ -224,7 +212,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack this( operatorIdentifier, userCodeClassLoader, - instanceBasePath, optionsContainer, columnFamilyOptionsFactory, kvStateRegistry, @@ -233,7 +220,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack keyGroupRange, executionConfig, localRecoveryConfig, - rocksDBPriorityQueueConfig, + forStPriorityQueueConfig, ttlTimeProvider, latencyTrackingStateConfig, metricGroup, @@ -263,10 +250,6 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack return this; } - public static Path getInstanceRocksDBPath(Path instanceBasePath) { - return new Path(instanceBasePath, DB_INSTANCE_DIR_STRING); - } - private static void checkAndCreateDirectory(File directory) throws IOException { if (directory.exists()) { if (!directory.isDirectory()) { @@ -274,7 +257,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack } } else if (!directory.mkdirs()) { throw new IOException( - String.format("Could not create RocksDB data directory at %s.", directory)); + String.format("Could not create ForSt data directory at %s.", directory)); } } @@ -300,7 +283,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack ForStSnapshotStrategyBase<K, ?> checkpointStrategy = null; - ResourceGuard rocksDBResourceGuard = new ResourceGuard(); + ResourceGuard forStResourceGuard = new ResourceGuard(); PriorityQueueSetFactory priorityQueueFactory; SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder; // Number of bytes required to prefix the key groups. @@ -316,7 +299,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack UUID backendUID = UUID.randomUUID(); SortedMap<Long, Collection<HandleAndLocalPath>> materializedSstFiles = new TreeMap<>(); long lastCompletedCheckpointId = -1L; - prepareDirectories(); + optionsContainer.prepareDirectories(); restoreOperation = getForStDBRestoreOperation( keyGroupPrefixBytes, @@ -346,7 +329,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack checkpointStrategy = initializeSnapshotStrategy( db, - rocksDBResourceGuard, + forStResourceGuard, keySerializerProvider.currentSchemaSerializer(), kvStateInformation, keyGroupRange, @@ -369,7 +352,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack new ArrayList<>(kvStateInformation.values().size()); IOUtils.closeQuietly(cancelRegistryForBackend); IOUtils.closeQuietly(writeBatchWrapper); - IOUtils.closeQuietly(rocksDBResourceGuard); + IOUtils.closeQuietly(forStResourceGuard); ForStOperationUtils.addColumnFamilyOptionsToCloseLater( columnFamilyOptions, defaultColumnFamilyHandle); IOUtils.closeQuietly(defaultColumnFamilyHandle); @@ -388,9 +371,11 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack kvStateInformation.clear(); try { - FileUtils.deleteDirectory(new File(instanceBasePath.getPath())); + FileUtils.deleteDirectory(new File(optionsContainer.getBasePath().getPath())); } catch (Exception ex) { - logger.warn("Failed to delete base path for RocksDB: " + instanceBasePath, ex); + logger.warn( + "Failed to delete base path for ForSt: " + optionsContainer.getBasePath(), + ex); } // Log and rethrow if (e instanceof BackendBuildingException) { @@ -403,10 +388,11 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack } InternalKeyContext<K> keyContext = new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups); - logger.info("Finished building RocksDB keyed state-backend at {}.", instanceBasePath); + logger.info( + "Finished building ForSt keyed state-backend at {}.", + optionsContainer.getBasePath()); return new ForStSyncKeyedStateBackend<>( this.userCodeClassLoader, - this.instanceBasePath, this.optionsContainer, columnFamilyOptionsFactory, this.kvStateRegistry, @@ -420,7 +406,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack keyGroupPrefixBytes, cancelRegistryForBackend, this.keyGroupCompressionDecorator, - rocksDBResourceGuard, + forStResourceGuard, checkpointStrategy, writeBatchWrapper, defaultColumnFamilyHandle, @@ -513,11 +499,15 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack // env. We expect to directly use the dfs directory in flink env or local directory as // working dir. We will implement this in ForStDB later, but before that, we achieved this // by setting the dbPath to "/" when the dfs directory existed. + Path instanceForStPath = + optionsContainer.getRemoteForStPath() == null + ? optionsContainer.getLocalForStPath() + : new Path("/db"); if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) { return new ForStNoneRestoreOperation( Collections.emptyMap(), - instanceForStDBPath, + instanceForStPath, optionsContainer.getDbOptions(), columnFamilyOptionsFactory, nativeMetricOptions, @@ -538,7 +528,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack keySerializerProvider, optionsContainer, optionsContainer.getBasePath(), - instanceForStDBPath, + instanceForStPath, optionsContainer.getDbOptions(), columnFamilyOptionsFactory, nativeMetricOptions, @@ -565,7 +555,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack registeredPQStates, createHeapQueueFactory(), keySerializerProvider, - instanceForStDBPath, + instanceForStPath, optionsContainer.getDbOptions(), columnFamilyOptionsFactory, nativeMetricOptions, @@ -605,7 +595,7 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack nativeMetricMonitor, columnFamilyOptionsFactory, optionsContainer.getWriteBufferManagerCapacity(), - priorityQueueConfig.getRocksDBPriorityQueueSetCacheSize()); + priorityQueueConfig.getForStDBPriorityQueueSetCacheSize()); break; default: throw new IllegalArgumentException( @@ -618,14 +608,4 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends AbstractKeyedStateBack private HeapPriorityQueueSetFactory createHeapQueueFactory() { return new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128); } - - private void prepareDirectories() throws IOException { - File baseFile = new File(instanceBasePath.getPath()); - checkAndCreateDirectory(baseFile); - if (new File(instanceForStDBPath.getPath()).exists()) { - // Clear the base directory when the backend is created - // in case something crashed and the backend never reached dispose() - FileUtils.deleteDirectory(baseFile); - } - } } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java index 4c4d853d70a..7c4e58e1ce6 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java @@ -18,26 +18,35 @@ package org.apache.flink.state.forst; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.ConfigurableStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl; import org.apache.flink.runtime.state.StateBackendTestBase; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend; import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.function.SupplierWithException; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.apache.flink.state.forst.ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; @@ -112,4 +121,36 @@ class ForStStateBackendTest extends StateBackendTestBase<ForStStateBackend> { backend = backend.configure(config, Thread.currentThread().getContextClassLoader()); assertThat(backend.isIncrementalCheckpointsEnabled()).isTrue(); } + + @TestTemplate + void testCreateKeyedStateBackend() throws Exception { + Assumptions.assumeFalse( + getCheckpointStorage() instanceof JobManagerCheckpointStorage, + "Skip JM checkpoint storage"); + ForStStateBackend backend = new ForStStateBackend(); + ForStSyncKeyedStateBackend keyedStateBackend1 = + (ForStSyncKeyedStateBackend) createKeyedBackend(IntSerializer.INSTANCE); + assertThat(keyedStateBackend1.getOptionsContainer().getRemoteBasePath()).isNull(); + Configuration config = new Configuration(); + config.set(ForStOptions.SYNC_ENFORCE_LOCAL, false); + backend = backend.configure(config, Thread.currentThread().getContextClassLoader()); + ForStSyncKeyedStateBackend keyedStateBackend2 = + (ForStSyncKeyedStateBackend) + backend.createKeyedStateBackend( + new KeyedStateBackendParametersImpl<>( + env, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + 10, + KeyGroupRange.of(0, 9), + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT, + getMetricGroup(), + getCustomInitializationMetrics(), + Collections.emptyList(), + new CloseableRegistry(), + 1.0d)); + assertThat(keyedStateBackend2.getOptionsContainer().getRemoteBasePath()).isNotNull(); + } }