carp84 commented on a change in pull request #7674: [FLINK-10043] [State
Backends] Refactor RocksDBKeyedStateBackend object
construction/initialization/restore code
URL: https://github.com/apache/flink/pull/7674#discussion_r257724111
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
##########
@@ -510,831 +505,92 @@ public WriteOptions getWriteOptions() {
return snapshotRunner;
}
- @Override
- public void restore(Collection<KeyedStateHandle> restoreState) throws
Exception {
-
- LOG.info("Initializing RocksDB keyed state backend.");
-
- // clear all meta data
- kvStateInformation.clear();
-
- try {
- RocksDBIncrementalRestoreOperation<K>
incrementalRestoreOperation = null;
- if (restoreState == null || restoreState.isEmpty()) {
- createDB();
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Restoring snapshot from
state handles: {}, will use {} thread(s) to download files from DFS.",
restoreState, numberOfTransferingThreads);
- }
-
- KeyedStateHandle firstStateHandle =
restoreState.iterator().next();
- if (firstStateHandle instanceof
IncrementalKeyedStateHandle
- || firstStateHandle instanceof
IncrementalLocalKeyedStateHandle) {
- incrementalRestoreOperation = new
RocksDBIncrementalRestoreOperation<>(this);
-
incrementalRestoreOperation.restore(restoreState);
- } else {
- RocksDBFullRestoreOperation<K>
fullRestoreOperation = new RocksDBFullRestoreOperation<>(this);
-
fullRestoreOperation.doRestore(restoreState);
- }
- }
-
- // it is important that we only create the key builder
after the restore, and not before;
- // restore operations may reconfigure the key
serializer, so accessing the key serializer
- // only now we can be certain that the key serializer
used in the builder is final.
- this.sharedRocksKeyBuilder = new
RocksDBSerializedCompositeKeyBuilder<>(
- getKeySerializer(),
+ private RocksDBRestoreOperation<K> getRocksDBRestoreOperation(int
keyGroupPrefixBytes,
+
CloseableRegistry
cancelStreamRegistry,
+
LinkedHashMap<String,
StateColumnFamilyHandle> kvStateInformation,
+
RocksDBNativeMetricMonitor
nativeMetricMonitor,
+
Collection<KeyedStateHandle>
restoreState) {
+ File instanceRocksDBPath = new File(instanceBasePath,
DB_INSTANCE_DIR_STRING);
+ if (restoreState == null || restoreState.isEmpty()) {
+ return new RocksDBNoneRestoreOperation<>(
+ keyGroupRange,
keyGroupPrefixBytes,
- 32);
-
- initializeSnapshotStrategy(incrementalRestoreOperation);
- } catch (Exception ex) {
- dispose();
- throw ex;
- }
- }
-
- @VisibleForTesting
- void initializeSnapshotStrategy(@Nullable
RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation) {
-
- this.savepointSnapshotStrategy =
- new RocksFullSnapshotStrategy<>(
- db,
- rocksDBResourceGuard,
- getKeySerializer(),
+ numberOfTransferingThreads,
+ cancelStreamRegistry,
+ userCodeClassLoader,
kvStateInformation,
+ keySerializerProvider,
+ instanceBasePath,
+ instanceRocksDBPath,
+ dbOptions,
+ columnOptions,
+ nativeMetricMonitor);
+ }
+ KeyedStateHandle firstStateHandle =
restoreState.iterator().next();
+ boolean isIncrementalStateHandle = (firstStateHandle instanceof
IncrementalKeyedStateHandle)
+ || (firstStateHandle instanceof
IncrementalLocalKeyedStateHandle);
+ if (isIncrementalStateHandle) {
+ return new RocksDBIncrementalRestoreOperation<>(
+ operatorIdentifier,
keyGroupRange,
keyGroupPrefixBytes,
- localRecoveryConfig,
+ numberOfTransferingThreads,
cancelStreamRegistry,
- keyGroupCompressionDecorator);
-
- if (enableIncrementalCheckpointing) {
- final UUID backendUID;
- final SortedMap<Long, Set<StateHandleID>>
materializedSstFiles;
- final long lastCompletedCheckpointId;
-
- if (incrementalRestoreOperation == null) {
- backendUID = UUID.randomUUID();
- materializedSstFiles = new TreeMap<>();
- lastCompletedCheckpointId = -1L;
- } else {
- backendUID =
Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredBackendUID());
- materializedSstFiles =
Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredSstFiles());
- lastCompletedCheckpointId =
incrementalRestoreOperation.getLastCompletedCheckpointId();
-
Preconditions.checkState(lastCompletedCheckpointId >= 0L);
- }
- // TODO eventually we might want to separate savepoint
and snapshot strategy, i.e. having 2 strategies.
- this.checkpointSnapshotStrategy = new
RocksIncrementalSnapshotStrategy<>(
- db,
- rocksDBResourceGuard,
- getKeySerializer(),
+ userCodeClassLoader,
kvStateInformation,
+ keySerializerProvider,
+ instanceBasePath,
+ instanceRocksDBPath,
+ dbOptions,
+ columnOptions,
+ nativeMetricMonitor);
+ } else {
+ return new RocksDBFullRestoreOperation<>(
keyGroupRange,
keyGroupPrefixBytes,
- localRecoveryConfig,
+ numberOfTransferingThreads,
cancelStreamRegistry,
+ userCodeClassLoader,
+ kvStateInformation,
+ keySerializerProvider,
instanceBasePath,
- backendUID,
- materializedSstFiles,
- lastCompletedCheckpointId,
- numberOfTransferingThreads);
- } else {
- this.checkpointSnapshotStrategy =
savepointSnapshotStrategy;
+ instanceRocksDBPath,
+ dbOptions,
+ columnOptions,
+ nativeMetricMonitor);
}
}
@Override
- public void notifyCheckpointComplete(long completedCheckpointId) throws
Exception {
-
- if (checkpointSnapshotStrategy != null) {
-
checkpointSnapshotStrategy.notifyCheckpointComplete(completedCheckpointId);
- }
+ public void restore(Collection<KeyedStateHandle> restoreState) throws
Exception {
- if (savepointSnapshotStrategy != null) {
-
savepointSnapshotStrategy.notifyCheckpointComplete(completedCheckpointId);
+ if (restoreState == null) {
+ return;
}
- }
-
- private void createDB() throws IOException {
- List<ColumnFamilyHandle> columnFamilyHandles = new
ArrayList<>(1);
- this.db = openDB(instanceRocksDBPath.getAbsolutePath(),
Collections.emptyList(), columnFamilyHandles);
- this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db,
writeOptions);
- this.defaultColumnFamily = columnFamilyHandles.get(0);
- }
-
- private RocksDB openDB(
- String path,
- List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
- List<ColumnFamilyHandle> stateColumnFamilyHandles) throws
IOException {
-
- List<ColumnFamilyDescriptor> columnFamilyDescriptors =
- new ArrayList<>(1 +
stateColumnFamilyDescriptors.size());
-
- // we add the required descriptor for the default CF in FIRST
position, see
- //
https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
- columnFamilyDescriptors.add(new
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions));
- columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
-
- RocksDB dbRef;
+ LOG.info("Restoring RocksDB keyed state backend.");
try {
- dbRef = RocksDB.open(
- Preconditions.checkNotNull(dbOptions),
- Preconditions.checkNotNull(path),
- columnFamilyDescriptors,
- stateColumnFamilyHandles);
- } catch (RocksDBException e) {
- throw new IOException("Error while opening RocksDB
instance.", e);
- }
-
- // requested + default CF
- Preconditions.checkState(1 +
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
- "Not all requested column family handles have been
created");
-
- if (this.metricOptions.isEnabled()) {
- this.nativeMetricMonitor = new
RocksDBNativeMetricMonitor(
- dbRef,
- metricOptions,
- metricGroup
- );
- }
-
- return dbRef;
- }
-
- /**
- * Encapsulates the process of restoring a RocksDBKeyedStateBackend
from a full snapshot.
- */
- private static final class RocksDBFullRestoreOperation<K> {
-
- private final RocksDBKeyedStateBackend<K>
rocksDBKeyedStateBackend;
-
- /** Current key-groups state handle from which we restore
key-groups. */
- private KeyGroupsStateHandle currentKeyGroupsStateHandle;
- /** Current input stream we obtained from
currentKeyGroupsStateHandle. */
- private FSDataInputStream currentStateHandleInStream;
- /** Current data input view that wraps
currentStateHandleInStream. */
- private DataInputView currentStateHandleInView;
- /** Current list of ColumnFamilyHandles for all column families
we restore from currentKeyGroupsStateHandle. */
- private List<ColumnFamilyHandle>
currentStateHandleKVStateColumnFamilies;
- /** The compression decorator that was used for writing the
state, as determined by the meta data. */
- private StreamCompressionDecorator
keygroupStreamCompressionDecorator;
-
- private boolean isKeySerializerCompatibilityChecked;
-
- /**
- * Creates a restore operation object for the given state
backend instance.
- *
- * @param rocksDBKeyedStateBackend the state backend into which
we restore
- */
- RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K>
rocksDBKeyedStateBackend) {
- this.rocksDBKeyedStateBackend =
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
- }
-
- /**
- * Restores all key-groups data that is referenced by the
passed state handles.
- *
- * @param keyedStateHandles List of all key groups state
handles that shall be restored.
- */
- void doRestore(Collection<KeyedStateHandle> keyedStateHandles)
- throws IOException, StateMigrationException,
RocksDBException {
-
- rocksDBKeyedStateBackend.createDB();
-
- for (KeyedStateHandle keyedStateHandle :
keyedStateHandles) {
- if (keyedStateHandle != null) {
-
- if (!(keyedStateHandle instanceof
KeyGroupsStateHandle)) {
- throw new
IllegalStateException("Unexpected state handle type, " +
- "expected: " +
KeyGroupsStateHandle.class +
- ", but found: " +
keyedStateHandle.getClass());
- }
- this.currentKeyGroupsStateHandle =
(KeyGroupsStateHandle) keyedStateHandle;
- restoreKeyGroupsInStateHandle();
- }
- }
- }
-
- /**
- * Restore one key groups state handle.
- */
- private void restoreKeyGroupsInStateHandle()
- throws IOException, StateMigrationException,
RocksDBException {
- try {
- currentStateHandleInStream =
currentKeyGroupsStateHandle.openInputStream();
-
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
- currentStateHandleInView = new
DataInputViewStreamWrapper(currentStateHandleInStream);
- restoreKVStateMetaData();
- restoreKVStateData();
- } finally {
- if
(rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
{
-
IOUtils.closeQuietly(currentStateHandleInStream);
- }
- }
- }
-
- /**
- * Restore the KV-state / ColumnFamily meta data for all
key-groups referenced by the current state handle.
- */
- private void restoreKVStateMetaData() throws IOException,
StateMigrationException, RocksDBException {
-
- // isSerializerPresenceRequired flag is set to false,
since for the RocksDB state backend,
- // deserialization of state happens lazily during
runtime; we depend on the fact
- // that the new serializer for states could be
compatible, and therefore the restore can continue
- // without old serializers required to be present.
- KeyedBackendSerializationProxy<K> serializationProxy =
- new
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
-
- serializationProxy.read(currentStateHandleInView);
-
- if (!isKeySerializerCompatibilityChecked) {
- // check for key serializer compatibility; this
also reconfigures the
- // key serializer to be compatible, if it is
required and is possible
- TypeSerializerSchemaCompatibility<K>
keySerializerSchemaCompat =
-
rocksDBKeyedStateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerSnapshot());
- if
(keySerializerSchemaCompat.isCompatibleAfterMigration() ||
keySerializerSchemaCompat.isIncompatible()) {
- throw new StateMigrationException("The
new key serializer must be compatible.");
- }
-
- isKeySerializerCompatibilityChecked = true;
- }
-
- this.keygroupStreamCompressionDecorator =
serializationProxy.isUsingKeyGroupCompression() ?
- SnappyStreamCompressionDecorator.INSTANCE :
UncompressedStreamCompressionDecorator.INSTANCE;
-
- List<StateMetaInfoSnapshot> restoredMetaInfos =
- serializationProxy.getStateMetaInfoSnapshots();
- currentStateHandleKVStateColumnFamilies = new
ArrayList<>(restoredMetaInfos.size());
-
- for (StateMetaInfoSnapshot restoredMetaInfo :
restoredMetaInfos) {
-
- Tuple2<ColumnFamilyHandle,
RegisteredStateMetaInfoBase> registeredColumn =
-
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
-
- if (registeredColumn == null) {
- byte[] nameBytes =
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
-
- ColumnFamilyDescriptor
columnFamilyDescriptor = new ColumnFamilyDescriptor(
- nameBytes,
-
rocksDBKeyedStateBackend.columnOptions);
-
- ColumnFamilyHandle columnFamily =
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
-
- // create a meta info for the state on
restore;
- // this allows us to retain the state
in future snapshots even if it wasn't accessed
- RegisteredStateMetaInfoBase
stateMetaInfo =
-
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo);
- registeredColumn = new
Tuple2<>(columnFamily, stateMetaInfo);
-
rocksDBKeyedStateBackend.kvStateInformation.put(restoredMetaInfo.getName(),
registeredColumn);
-
- } else {
- // TODO with eager state registration
in place, check here for serializer migration strategies
- }
-
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
- }
- }
-
- /**
- * Restore the KV-state / ColumnFamily data for all key-groups
referenced by the current state handle.
- */
- private void restoreKVStateData() throws IOException,
RocksDBException {
- //for all key-groups in the current state handle...
- try (RocksDBWriteBatchWrapper writeBatchWrapper = new
RocksDBWriteBatchWrapper(rocksDBKeyedStateBackend.db)) {
- for (Tuple2<Integer, Long> keyGroupOffset :
currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
- int keyGroup = keyGroupOffset.f0;
-
- // Check that restored key groups all
belong to the backend
-
Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
- "The key group must belong to
the backend");
-
- long offset = keyGroupOffset.f1;
- //not empty key-group?
- if (0L != offset) {
-
currentStateHandleInStream.seek(offset);
- try (InputStream compressedKgIn
=
keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
{
-
DataInputViewStreamWrapper compressedKgInputView = new
DataInputViewStreamWrapper(compressedKgIn);
- //TODO this could be
aware of keyGroupPrefixBytes and write only one byte if possible
- int kvStateId =
compressedKgInputView.readShort();
- ColumnFamilyHandle
handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
- //insert all k/v pairs
into DB
- boolean
keyGroupHasMoreKeys = true;
- while
(keyGroupHasMoreKeys) {
- byte[] key =
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
- byte[] value =
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
- if
(hasMetaDataFollowsFlag(key)) {
- //clear
the signal bit in the key to make it ready for insertion again
-
clearMetaDataFollowsFlag(key);
-
writeBatchWrapper.put(handle, key, value);
- //TODO
this could be aware of keyGroupPrefixBytes and write only one byte if possible
-
kvStateId = END_OF_KEY_GROUP_MARK
-
& compressedKgInputView.readShort();
- if
(END_OF_KEY_GROUP_MARK == kvStateId) {
-
keyGroupHasMoreKeys = false;
- } else {
-
handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
- }
- } else {
-
writeBatchWrapper.put(handle, key, value);
- }
- }
- }
- }
- }
- }
+ RocksDBRestoreOperation<K> restoreOperation =
getRocksDBRestoreOperation(
+ keyGroupPrefixBytes, cancelStreamRegistry,
kvStateInformation, nativeMetricMonitor, restoreState);
+ restoreOperation.restore(restoreState, db);
+ } catch (Exception ex) {
+ dispose();
+ throw ex;
}
}
- /**
- * Encapsulates the process of restoring a RocksDBKeyedStateBackend
from an incremental snapshot.
- */
- private static class RocksDBIncrementalRestoreOperation<T> {
-
- private final RocksDBKeyedStateBackend<T> stateBackend;
- private final SortedMap<Long, Set<StateHandleID>>
restoredSstFiles;
- private UUID restoredBackendUID;
- private long lastCompletedCheckpointId;
- private boolean isKeySerializerCompatibilityChecked;
-
- private
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
-
- this.stateBackend = stateBackend;
- this.restoredSstFiles = new TreeMap<>();
- }
-
- SortedMap<Long, Set<StateHandleID>> getRestoredSstFiles() {
- return restoredSstFiles;
- }
-
- UUID getRestoredBackendUID() {
- return restoredBackendUID;
- }
-
- long getLastCompletedCheckpointId() {
- return lastCompletedCheckpointId;
- }
-
- /**
- * Root method that branches for different implementations of
{@link KeyedStateHandle}.
- */
- void restore(Collection<KeyedStateHandle> restoreStateHandles)
throws Exception {
-
- if (restoreStateHandles.isEmpty()) {
- return;
- }
-
- final KeyedStateHandle theFirstStateHandle =
restoreStateHandles.iterator().next();
-
- boolean isRescaling = (restoreStateHandles.size() > 1 ||
-
!Objects.equals(theFirstStateHandle.getKeyGroupRange(),
stateBackend.keyGroupRange));
-
- if (!isRescaling) {
- restoreWithoutRescaling(theFirstStateHandle);
- } else {
- restoreWithRescaling(restoreStateHandles);
- }
- }
-
- /**
- * Recovery from a single remote incremental state without
rescaling.
- */
- void restoreWithoutRescaling(KeyedStateHandle rawStateHandle)
throws Exception {
-
- IncrementalLocalKeyedStateHandle localKeyedStateHandle;
- List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
- List<ColumnFamilyDescriptor> columnFamilyDescriptors;
-
- // Recovery from remote incremental state.
- Path temporaryRestoreInstancePath = new Path(
- stateBackend.instanceBasePath.getAbsolutePath(),
- UUID.randomUUID().toString());
-
- try {
- if (rawStateHandle instanceof
IncrementalKeyedStateHandle) {
-
- IncrementalKeyedStateHandle
restoreStateHandle = (IncrementalKeyedStateHandle) rawStateHandle;
-
- // read state data.
- try (RocksDBStateDownloader
rocksDBStateDownloader =
- new
RocksDBStateDownloader(stateBackend.numberOfTransferingThreads)) {
-
rocksDBStateDownloader.transferAllStateDataToDirectory(
- restoreStateHandle,
-
temporaryRestoreInstancePath,
-
stateBackend.cancelStreamRegistry);
- }
-
- stateMetaInfoSnapshots =
readMetaData(restoreStateHandle.getMetaStateHandle());
- columnFamilyDescriptors =
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
-
- // since we transferred all remote
state to a local directory, we can use the same code as for
- // local recovery.
- localKeyedStateHandle = new
IncrementalLocalKeyedStateHandle(
-
restoreStateHandle.getBackendIdentifier(),
-
restoreStateHandle.getCheckpointId(),
- new
DirectoryStateHandle(temporaryRestoreInstancePath),
-
restoreStateHandle.getKeyGroupRange(),
-
restoreStateHandle.getMetaStateHandle(),
-
restoreStateHandle.getSharedState().keySet());
- } else if (rawStateHandle instanceof
IncrementalLocalKeyedStateHandle) {
-
- // Recovery from local incremental
state.
- localKeyedStateHandle =
(IncrementalLocalKeyedStateHandle) rawStateHandle;
- stateMetaInfoSnapshots =
readMetaData(localKeyedStateHandle.getMetaDataState());
- columnFamilyDescriptors =
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
- } else {
- throw new
IllegalStateException("Unexpected state handle type, " +
- "expected " +
IncrementalKeyedStateHandle.class + " or " +
IncrementalLocalKeyedStateHandle.class +
- ", but found " +
rawStateHandle.getClass());
- }
-
- restoreLocalStateIntoFullInstance(
- localKeyedStateHandle,
- columnFamilyDescriptors,
- stateMetaInfoSnapshots);
- } finally {
- FileSystem restoreFileSystem =
temporaryRestoreInstancePath.getFileSystem();
- if
(restoreFileSystem.exists(temporaryRestoreInstancePath)) {
-
restoreFileSystem.delete(temporaryRestoreInstancePath, true);
- }
- }
- }
-
- /**
- * Recovery from multi incremental states with rescaling. For
rescaling, this method creates a temporary
- * RocksDB instance for a key-groups shard. All contents from
the temporary instance are copied into the
- * real restore instance and then the temporary instance is
discarded.
- */
- void restoreWithRescaling(Collection<KeyedStateHandle>
restoreStateHandles) throws Exception {
-
- this.restoredBackendUID = UUID.randomUUID();
-
- initTargetDB(restoreStateHandles,
stateBackend.keyGroupRange);
-
- byte[] startKeyGroupPrefixBytes = new
byte[stateBackend.keyGroupPrefixBytes];
-
RocksDBKeySerializationUtils.serializeKeyGroup(stateBackend.getKeyGroupRange().getStartKeyGroup(),
startKeyGroupPrefixBytes);
-
- byte[] stopKeyGroupPrefixBytes = new
byte[stateBackend.keyGroupPrefixBytes];
-
RocksDBKeySerializationUtils.serializeKeyGroup(stateBackend.getKeyGroupRange().getEndKeyGroup()
+ 1, stopKeyGroupPrefixBytes);
-
- for (KeyedStateHandle rawStateHandle :
restoreStateHandles) {
-
- if (!(rawStateHandle instanceof
IncrementalKeyedStateHandle)) {
- throw new
IllegalStateException("Unexpected state handle type, " +
- "expected " +
IncrementalKeyedStateHandle.class +
- ", but found " +
rawStateHandle.getClass());
- }
-
- Path temporaryRestoreInstancePath = new
Path(stateBackend.instanceBasePath.getAbsolutePath() +
UUID.randomUUID().toString());
- try (RestoredDBInstance tmpRestoreDBInfo =
restoreDBInstanceFromStateHandle(
- (IncrementalKeyedStateHandle)
rawStateHandle,
- temporaryRestoreInstancePath);
- RocksDBWriteBatchWrapper
writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db)) {
-
- List<ColumnFamilyDescriptor>
tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
- List<ColumnFamilyHandle>
tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
-
- // iterating only the requested
descriptors automatically skips the default column family handle
- for (int i = 0; i <
tmpColumnFamilyDescriptors.size(); ++i) {
- ColumnFamilyHandle
tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);
- ColumnFamilyDescriptor
tmpColumnFamilyDescriptor = tmpColumnFamilyDescriptors.get(i);
-
- ColumnFamilyHandle
targetColumnFamilyHandle = getOrRegisterColumnFamilyHandle(
-
tmpColumnFamilyDescriptor, null,
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i));
-
- try (RocksIteratorWrapper
iterator = getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
-
-
iterator.seek(startKeyGroupPrefixBytes);
-
- while
(iterator.isValid()) {
-
- if
(RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(),
stopKeyGroupPrefixBytes)) {
-
writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(),
iterator.value());
- } else {
- //
Since the iterator will visit the record according to the sorted order,
- // we
can just break here.
- break;
- }
-
- iterator.next();
- }
- } // releases native iterator
resources
- }
- } finally {
- FileSystem restoreFileSystem =
temporaryRestoreInstancePath.getFileSystem();
- if
(restoreFileSystem.exists(temporaryRestoreInstancePath)) {
-
restoreFileSystem.delete(temporaryRestoreInstancePath, true);
- }
- }
- }
- }
-
- private class RestoredDBInstance implements AutoCloseable {
Review comment:
You mean add a comment mentioning the movement of this inner class?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services