This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit aba02eb3fcc4472c3d5f5a0f527960d79c659c31 Author: Stefan Richter <s.rich...@data-artisans.com> AuthorDate: Tue Aug 7 15:57:27 2018 +0200 [FLINK-10042][state] (part 1) Extract snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full classes --- .../flink/runtime/state/SnapshotStrategy.java | 3 +- .../runtime/state/heap/HeapKeyedStateBackend.java | 5 + .../streaming/state/RocksDBKeyedStateBackend.java | 1071 ++------------------ .../state/snapshot/RocksFullSnapshotStrategy.java | 478 +++++++++ .../snapshot/RocksIncrementalSnapshotStrategy.java | 578 +++++++++++ .../state/snapshot/RocksSnapshotUtil.java | 51 + .../state/snapshot/SnapshotStrategyBase.java | 90 ++ .../streaming/state/RocksDBAsyncSnapshotTest.java | 27 +- .../streaming/state/RocksDBStateBackendTest.java | 1 + 9 files changed, 1317 insertions(+), 987 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java index 9139fa7..3ad68af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java @@ -28,8 +28,7 @@ import java.util.concurrent.RunnableFuture; * * @param <S> type of the returned state object that represents the result of the snapshot operation. */ -@FunctionalInterface -public interface SnapshotStrategy<S extends StateObject> { +public interface SnapshotStrategy<S extends StateObject> extends CheckpointListener { /** * Operation that writes a snapshot into a stream that is provided by the given {@link CheckpointStreamFactory} and diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index bc1e0f5..0e2f16c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -882,6 +882,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + // nothing to do. + } } private interface StateFactory { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index c159976..87c7e55 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -35,9 +35,8 @@ import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator; -import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator; -import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper; -import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy; +import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; @@ -47,32 +46,22 @@ import org.apache.flink.core.memory.ByteArrayDataInputView; import org.apache.flink.core.memory.ByteArrayDataOutputView; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.CheckpointType; -import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources; -import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider; -import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.DirectoryStateHandle; -import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; import org.apache.flink.runtime.state.KeyExtractorFunction; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.Keyed; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.LocalRecoveryConfig; -import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider; -import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.PriorityComparable; import org.apache.flink.runtime.state.PriorityComparator; import org.apache.flink.runtime.state.PriorityQueueSetFactory; @@ -80,14 +69,11 @@ import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; -import org.apache.flink.runtime.state.SnapshotDirectory; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.SnapshotStrategy; import org.apache.flink.runtime.state.StateHandleID; -import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory; -import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; @@ -96,25 +82,19 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.ResourceGuard; import org.apache.flink.util.StateMigrationException; -import org.apache.flink.util.function.SupplierWithException; -import org.rocksdb.Checkpoint; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; -import org.rocksdb.Snapshot; import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,7 +105,6 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.ArrayList; @@ -144,12 +123,16 @@ import java.util.Spliterator; import java.util.Spliterators; import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.FutureTask; import java.util.concurrent.RunnableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.END_OF_KEY_GROUP_MARK; +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX; +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.clearMetaDataFollowsFlag; +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag; + /** * An {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and serializes state to * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon @@ -167,9 +150,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** The name of the merge operator in RocksDB. Do not change except you know exactly what you do. */ public static final String MERGE_OPERATOR_NAME = "stringappendtest"; - /** File suffix of sstable files. */ - private static final String SST_FILE_SUFFIX = ".sst"; - private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES = Stream.of( Tuple2.of(ValueStateDescriptor.class, (StateFactory) RocksDBValueState::create), @@ -230,7 +210,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * Information about the k/v states as we create them. This is used to retrieve the * column family that is used for a state and also for sanity checks when restoring. */ - private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation; + private final LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation; /** * Map of state names to their corresponding restored state meta info. @@ -246,20 +226,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** True if incremental checkpointing is enabled. */ private final boolean enableIncrementalCheckpointing; - /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */ - private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles; - - /** The identifier of the last completed checkpoint. */ - private long lastCompletedCheckpointId = -1L; - - /** Unique ID of this backend. */ - private UUID backendUID; - /** The configuration of local recovery. */ private final LocalRecoveryConfig localRecoveryConfig; /** The snapshot strategy, e.g., if we use full or incremental checkpoints, local state, and so on. */ - private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy; + private SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy; /** Factory for priority queue state. */ private final PriorityQueueSetFactory priorityQueueFactory; @@ -314,12 +285,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(getNumberOfKeyGroups()); this.kvStateInformation = new LinkedHashMap<>(); this.restoredKvStateMetaInfos = new HashMap<>(); - this.materializedSstFiles = new TreeMap<>(); - this.backendUID = UUID.randomUUID(); - - this.snapshotStrategy = enableIncrementalCheckpointing ? - new IncrementalSnapshotStrategy() : - new FullSnapshotStrategy(); this.writeOptions = new WriteOptions().setDisableWAL(true); @@ -333,8 +298,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { default: throw new IllegalArgumentException("Unknown priority queue state type: " + priorityQueueStateType); } - - LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } private static void checkAndCreateDirectory(File directory) throws IOException { @@ -508,41 +471,83 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { restoredKvStateMetaInfos.clear(); try { + RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation = null; if (restoreState == null || restoreState.isEmpty()) { createDB(); } else { KeyedStateHandle firstStateHandle = restoreState.iterator().next(); if (firstStateHandle instanceof IncrementalKeyedStateHandle || firstStateHandle instanceof IncrementalLocalKeyedStateHandle) { - RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this); - restoreOperation.restore(restoreState); + incrementalRestoreOperation = new RocksDBIncrementalRestoreOperation<>(this); + incrementalRestoreOperation.restore(restoreState); } else { - RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this); - restoreOperation.doRestore(restoreState); + RocksDBFullRestoreOperation<K> fullRestoreOperation = new RocksDBFullRestoreOperation<>(this); + fullRestoreOperation.doRestore(restoreState); } } + + initializeSnapshotStrategy(incrementalRestoreOperation); } catch (Exception ex) { dispose(); throw ex; } } - @Override - public void notifyCheckpointComplete(long completedCheckpointId) { - - if (!enableIncrementalCheckpointing) { - return; - } - - synchronized (materializedSstFiles) { - - if (completedCheckpointId < lastCompletedCheckpointId) { - return; + @VisibleForTesting + void initializeSnapshotStrategy( + @Nullable RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation) { + + final RocksFullSnapshotStrategy<K> fullSnapshotStrategy = + new RocksFullSnapshotStrategy<>( + db, + rocksDBResourceGuard, + keySerializer, + kvStateInformation, + keyGroupRange, + keyGroupPrefixBytes, + localRecoveryConfig, + cancelStreamRegistry, + keyGroupCompressionDecorator); + + if (enableIncrementalCheckpointing) { + final UUID backendUID; + final SortedMap<Long, Set<StateHandleID>> materializedSstFiles; + final long lastCompletedCheckpointId; + + if (incrementalRestoreOperation == null) { + backendUID = UUID.randomUUID(); + materializedSstFiles = new TreeMap<>(); + lastCompletedCheckpointId = -1L; + } else { + backendUID = Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredBackendUID()); + materializedSstFiles = Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredSstFiles()); + lastCompletedCheckpointId = incrementalRestoreOperation.getLastCompletedCheckpointId(); + Preconditions.checkState(lastCompletedCheckpointId >= 0L); } + // TODO eventually we might want to separate savepoint and snapshot strategy, i.e. having 2 strategies. + this.snapshotStrategy = new RocksIncrementalSnapshotStrategy<>( + db, + rocksDBResourceGuard, + keySerializer, + kvStateInformation, + keyGroupRange, + keyGroupPrefixBytes, + localRecoveryConfig, + cancelStreamRegistry, + instanceBasePath, + backendUID, + materializedSstFiles, + lastCompletedCheckpointId, + fullSnapshotStrategy); + } else { + this.snapshotStrategy = fullSnapshotStrategy; + } + } - materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId); - - lastCompletedCheckpointId = completedCheckpointId; + @Override + public void notifyCheckpointComplete(long completedCheckpointId) throws Exception { + if (snapshotStrategy != null) { + snapshotStrategy.notifyCheckpointComplete(completedCheckpointId); } } @@ -656,10 +661,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws RocksDBException */ private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { @@ -724,9 +725,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle. - * - * @throws IOException - * @throws RocksDBException */ private void restoreKVStateData() throws IOException, RocksDBException { //for all key-groups in the current state handle... @@ -752,14 +750,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { while (keyGroupHasMoreKeys) { byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); - if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) { + if (hasMetaDataFollowsFlag(key)) { //clear the signal bit in the key to make it ready for insertion again - RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key); + clearMetaDataFollowsFlag(key); writeBatchWrapper.put(handle, key, value); //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK + kvStateId = END_OF_KEY_GROUP_MARK & compressedKgInputView.readShort(); - if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) { + if (END_OF_KEY_GROUP_MARK == kvStateId) { keyGroupHasMoreKeys = false; } else { handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); @@ -781,9 +779,26 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private static class RocksDBIncrementalRestoreOperation<T> { private final RocksDBKeyedStateBackend<T> stateBackend; + private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles; + private UUID restoredBackendUID; + private long lastCompletedCheckpointId; private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) { + this.stateBackend = stateBackend; + this.restoredSstFiles = new TreeMap<>(); + } + + SortedMap<Long, Set<StateHandleID>> getRestoredSstFiles() { + return restoredSstFiles; + } + + UUID getRestoredBackendUID() { + return restoredBackendUID; + } + + long getLastCompletedCheckpointId() { + return lastCompletedCheckpointId; } /** @@ -872,6 +887,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { */ void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { + this.restoredBackendUID = UUID.randomUUID(); + initTargetDB(restoreStateHandles, stateBackend.keyGroupRange); byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; @@ -949,6 +966,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { @Nonnull private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; + private + RestoredDBInstance( @Nonnull RocksDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @@ -1113,10 +1132,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { List<ColumnFamilyDescriptor> columnFamilyDescriptors, List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) throws Exception { // pick up again the old backend id, so the we can reference existing state - stateBackend.backendUID = restoreStateHandle.getBackendIdentifier(); + this.restoredBackendUID = restoreStateHandle.getBackendIdentifier(); LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.", - stateBackend.operatorIdentifier, stateBackend.backendUID); + stateBackend.operatorIdentifier, this.restoredBackendUID); // create hard links in the instance directory if (!stateBackend.instanceRocksDBPath.mkdirs()) { @@ -1150,13 +1169,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } // use the restore sst files as the base for succeeding checkpoints - synchronized (stateBackend.materializedSstFiles) { - stateBackend.materializedSstFiles.put( + restoredSstFiles.put( restoreStateHandle.getCheckpointId(), restoreStateHandle.getSharedStateHandleIDs()); - } - stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId(); + lastCompletedCheckpointId = restoreStateHandle.getCheckpointId(); } /** @@ -1447,881 +1464,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return count; } - private class FullSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> { - - @Override - public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot( - long checkpointId, - long timestamp, - CheckpointStreamFactory primaryStreamFactory, - CheckpointOptions checkpointOptions) throws Exception { - - long startTime = System.currentTimeMillis(); - - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", - timestamp); - } - - return DoneFuture.of(SnapshotResult.empty()); - } - - final SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier = - - localRecoveryConfig.isLocalRecoveryEnabled() && - (CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ? - - () -> CheckpointStreamWithResultProvider.createDuplicatingStream( - checkpointId, - CheckpointedStateScope.EXCLUSIVE, - primaryStreamFactory, - localRecoveryConfig.getLocalStateDirectoryProvider()) : - - () -> CheckpointStreamWithResultProvider.createSimpleStream( - CheckpointedStateScope.EXCLUSIVE, - primaryStreamFactory); - - final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); - - final RocksDBFullSnapshotOperation<K> snapshotOperation = - new RocksDBFullSnapshotOperation<>( - RocksDBKeyedStateBackend.this, - supplier, - snapshotCloseableRegistry); - - snapshotOperation.takeDBSnapShot(); - - // implementation of the async IO operation, based on FutureTask - AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = - new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() { - - @Override - protected void acquireResources() throws Exception { - cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry); - snapshotOperation.openCheckpointStream(); - } - - @Override - protected void releaseResources() throws Exception { - closeLocalRegistry(); - releaseSnapshotOperationResources(); - } - - private void releaseSnapshotOperationResources() { - // hold the db lock while operation on the db to guard us against async db disposal - snapshotOperation.releaseSnapshotResources(); - } - - @Override - protected void stopOperation() throws Exception { - closeLocalRegistry(); - } - - private void closeLocalRegistry() { - if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) { - try { - snapshotCloseableRegistry.close(); - } catch (Exception ex) { - LOG.warn("Error closing local registry", ex); - } - } - } - - @Nonnull - @Override - public SnapshotResult<KeyedStateHandle> performOperation() throws Exception { - long startTime = System.currentTimeMillis(); - - if (isStopped()) { - throw new IOException("RocksDB closed."); - } - - snapshotOperation.writeDBSnapshot(); - - LOG.debug("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", - primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); - - return snapshotOperation.getSnapshotResultStateHandle(); - } - }; - - LOG.debug("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.", - primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); - return AsyncStoppableTaskWithCallback.from(ioCallable); - } - } - - private class IncrementalSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> { - - private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate; - - public IncrementalSnapshotStrategy() { - this.savepointDelegate = new FullSnapshotStrategy(); - } - - @Override - public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot( - long checkpointId, - long checkpointTimestamp, - CheckpointStreamFactory checkpointStreamFactory, - CheckpointOptions checkpointOptions) throws Exception { - - // for savepoints, we delegate to the full snapshot strategy because savepoints are always self-contained. - if (CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType()) { - return savepointDelegate.performSnapshot( - checkpointId, - checkpointTimestamp, - checkpointStreamFactory, - checkpointOptions); - } - - if (db == null) { - throw new IOException("RocksDB closed."); - } - - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", checkpointTimestamp); - } - return DoneFuture.of(SnapshotResult.empty()); - } - - SnapshotDirectory snapshotDirectory; - - if (localRecoveryConfig.isLocalRecoveryEnabled()) { - // create a "permanent" snapshot directory for local recovery. - LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider(); - File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId); - - if (directory.exists()) { - FileUtils.deleteDirectory(directory); - } - - if (!directory.mkdirs()) { - throw new IOException("Local state base directory for checkpoint " + checkpointId + - " already exists: " + directory); - } - - // introduces an extra directory because RocksDB wants a non-existing directory for native checkpoints. - File rdbSnapshotDir = new File(directory, "rocks_db"); - Path path = new Path(rdbSnapshotDir.toURI()); - // create a "permanent" snapshot directory because local recovery is active. - snapshotDirectory = SnapshotDirectory.permanent(path); - } else { - // create a "temporary" snapshot directory because local recovery is inactive. - Path path = new Path(instanceBasePath.getAbsolutePath(), "chk-" + checkpointId); - snapshotDirectory = SnapshotDirectory.temporary(path); - } - - final RocksDBIncrementalSnapshotOperation<K> snapshotOperation = - new RocksDBIncrementalSnapshotOperation<>( - RocksDBKeyedStateBackend.this, - checkpointStreamFactory, - snapshotDirectory, - checkpointId); - - try { - snapshotOperation.takeSnapshot(); - } catch (Exception e) { - snapshotOperation.stop(); - snapshotOperation.releaseResources(true); - throw e; - } - - return new FutureTask<SnapshotResult<KeyedStateHandle>>( - snapshotOperation::runSnapshot - ) { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - snapshotOperation.stop(); - return super.cancel(mayInterruptIfRunning); - } - - @Override - protected void done() { - snapshotOperation.releaseResources(isCancelled()); - } - }; - } - } - - /** - * Encapsulates the process to perform a full snapshot of a RocksDBKeyedStateBackend. - */ - @VisibleForTesting - static class RocksDBFullSnapshotOperation<K> - extends AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> { - - static final int FIRST_BIT_IN_BYTE_MASK = 0x80; - static final int END_OF_KEY_GROUP_MARK = 0xFFFF; - - private final RocksDBKeyedStateBackend<K> stateBackend; - private final KeyGroupRangeOffsets keyGroupRangeOffsets; - private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier; - private final CloseableRegistry snapshotCloseableRegistry; - private final ResourceGuard.Lease dbLease; - - private Snapshot snapshot; - private ReadOptions readOptions; - - /** - * The state meta data. - */ - private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; - - /** - * The copied column handle. - */ - private List<Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> copiedMeta; - - private List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators; - - private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider; - private DataOutputView outputView; - - RocksDBFullSnapshotOperation( - RocksDBKeyedStateBackend<K> stateBackend, - SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier, - CloseableRegistry registry) throws IOException { - - this.stateBackend = stateBackend; - this.checkpointStreamSupplier = checkpointStreamSupplier; - this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange); - this.snapshotCloseableRegistry = registry; - this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); - } - - /** - * 1) Create a snapshot object from RocksDB. - * - */ - public void takeDBSnapShot() { - Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!"); - - this.stateMetaInfoSnapshots = new ArrayList<>(stateBackend.kvStateInformation.size()); - - this.copiedMeta = new ArrayList<>(stateBackend.kvStateInformation.size()); - - for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 : - stateBackend.kvStateInformation.values()) { - // snapshot meta info - this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot()); - this.copiedMeta.add(tuple2); - } - this.snapshot = stateBackend.db.getSnapshot(); - } - - /** - * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write. - * - * @throws Exception - */ - public void openCheckpointStream() throws Exception { - Preconditions.checkArgument(checkpointStreamWithResultProvider == null, - "Output stream for snapshot is already set."); - - checkpointStreamWithResultProvider = checkpointStreamSupplier.get(); - snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider); - outputView = new DataOutputViewStreamWrapper( - checkpointStreamWithResultProvider.getCheckpointOutputStream()); - } - - /** - * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1). - * - * @throws IOException - */ - public void writeDBSnapshot() throws IOException, InterruptedException, RocksDBException { - - if (null == snapshot) { - throw new IOException("No snapshot available. Might be released due to cancellation."); - } - - Preconditions.checkNotNull(checkpointStreamWithResultProvider, "No output stream to write snapshot."); - writeKVStateMetaData(); - writeKVStateData(); - } - - /** - * 4) Returns a snapshot result for the completed snapshot. - * - * @return snapshot result for the completed snapshot. - */ - @Nonnull - public SnapshotResult<KeyedStateHandle> getSnapshotResultStateHandle() throws IOException { - - if (snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) { - - SnapshotResult<StreamStateHandle> res = - checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(); - checkpointStreamWithResultProvider = null; - return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(res, keyGroupRangeOffsets); - } - - return SnapshotResult.empty(); - } - - /** - * 5) Release the snapshot object for RocksDB and clean up. - */ - public void releaseSnapshotResources() { - - checkpointStreamWithResultProvider = null; - - if (null != kvStateIterators) { - for (Tuple2<RocksIteratorWrapper, Integer> kvStateIterator : kvStateIterators) { - IOUtils.closeQuietly(kvStateIterator.f0); - } - kvStateIterators = null; - } - - if (null != snapshot) { - if (null != stateBackend.db) { - stateBackend.db.releaseSnapshot(snapshot); - } - IOUtils.closeQuietly(snapshot); - snapshot = null; - } - - if (null != readOptions) { - IOUtils.closeQuietly(readOptions); - readOptions = null; - } - - this.dbLease.close(); - } - - private void writeKVStateMetaData() throws IOException { - - this.kvStateIterators = new ArrayList<>(copiedMeta.size()); - - int kvStateId = 0; - - //retrieve iterator for this k/v states - readOptions = new ReadOptions(); - readOptions.setSnapshot(snapshot); - - for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 : copiedMeta) { - RocksIteratorWrapper rocksIteratorWrapper = - getRocksIterator(stateBackend.db, tuple2.f0, tuple2.f1, readOptions); - kvStateIterators.add(new Tuple2<>(rocksIteratorWrapper, kvStateId)); - ++kvStateId; - } - - KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>( - // TODO: this code assumes that writing a serializer is threadsafe, we should support to - // get a serialized form already at state registration time in the future - stateBackend.getKeySerializer(), - stateMetaInfoSnapshots, - !Objects.equals( - UncompressedStreamCompressionDecorator.INSTANCE, - stateBackend.keyGroupCompressionDecorator)); - - serializationProxy.write(outputView); - } - - private void writeKVStateData() throws IOException, InterruptedException { - byte[] previousKey = null; - byte[] previousValue = null; - DataOutputView kgOutView = null; - OutputStream kgOutStream = null; - CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream = - checkpointStreamWithResultProvider.getCheckpointOutputStream(); - - try { - // Here we transfer ownership of RocksIterators to the RocksStatesPerKeyGroupMergeIterator - try (RocksStatesPerKeyGroupMergeIterator mergeIterator = new RocksStatesPerKeyGroupMergeIterator( - kvStateIterators, stateBackend.keyGroupPrefixBytes)) { - - // handover complete, null out to prevent double close - kvStateIterators = null; - - //preamble: setup with first key-group as our lookahead - if (mergeIterator.isValid()) { - //begin first key-group by recording the offset - keyGroupRangeOffsets.setKeyGroupOffset( - mergeIterator.keyGroup(), - checkpointOutputStream.getPos()); - //write the k/v-state id as metadata - kgOutStream = stateBackend.keyGroupCompressionDecorator. - decorateWithCompression(checkpointOutputStream); - kgOutView = new DataOutputViewStreamWrapper(kgOutStream); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(mergeIterator.kvStateId()); - previousKey = mergeIterator.key(); - previousValue = mergeIterator.value(); - mergeIterator.next(); - } - - //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. - while (mergeIterator.isValid()) { - - assert (!hasMetaDataFollowsFlag(previousKey)); - - //set signal in first key byte that meta data will follow in the stream after this k/v pair - if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) { - - //be cooperative and check for interruption from time to time in the hot loop - checkInterrupted(); - - setMetaDataFollowsFlagInKey(previousKey); - } - - writeKeyValuePair(previousKey, previousValue, kgOutView); - - //write meta data if we have to - if (mergeIterator.isNewKeyGroup()) { - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(END_OF_KEY_GROUP_MARK); - // this will just close the outer stream - kgOutStream.close(); - //begin new key-group - keyGroupRangeOffsets.setKeyGroupOffset( - mergeIterator.keyGroup(), - checkpointOutputStream.getPos()); - //write the kev-state - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutStream = stateBackend.keyGroupCompressionDecorator. - decorateWithCompression(checkpointOutputStream); - kgOutView = new DataOutputViewStreamWrapper(kgOutStream); - kgOutView.writeShort(mergeIterator.kvStateId()); - } else if (mergeIterator.isNewKeyValueState()) { - //write the k/v-state - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(mergeIterator.kvStateId()); - } - - //request next k/v pair - previousKey = mergeIterator.key(); - previousValue = mergeIterator.value(); - mergeIterator.next(); - } - } - - //epilogue: write last key-group - if (previousKey != null) { - assert (!hasMetaDataFollowsFlag(previousKey)); - setMetaDataFollowsFlagInKey(previousKey); - writeKeyValuePair(previousKey, previousValue, kgOutView); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(END_OF_KEY_GROUP_MARK); - // this will just close the outer stream - kgOutStream.close(); - kgOutStream = null; - } - - } finally { - // this will just close the outer stream - IOUtils.closeQuietly(kgOutStream); - } - } - - private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException { - BytePrimitiveArraySerializer.INSTANCE.serialize(key, out); - BytePrimitiveArraySerializer.INSTANCE.serialize(value, out); - } - - static void setMetaDataFollowsFlagInKey(byte[] key) { - key[0] |= FIRST_BIT_IN_BYTE_MASK; - } - - static void clearMetaDataFollowsFlag(byte[] key) { - key[0] &= (~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); - } - - static boolean hasMetaDataFollowsFlag(byte[] key) { - return 0 != (key[0] & RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); - } - - private static void checkInterrupted() throws InterruptedException { - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException("RocksDB snapshot interrupted."); - } - } - - @Override - protected void acquireResources() throws Exception { - stateBackend.cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry); - openCheckpointStream(); - } - - @Override - protected void releaseResources() { - closeLocalRegistry(); - releaseSnapshotOperationResources(); - } - - private void releaseSnapshotOperationResources() { - // hold the db lock while operation on the db to guard us against async db disposal - releaseSnapshotResources(); - } - - @Override - protected void stopOperation() { - closeLocalRegistry(); - } - - private void closeLocalRegistry() { - if (stateBackend.cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) { - try { - snapshotCloseableRegistry.close(); - } catch (Exception ex) { - LOG.warn("Error closing local registry", ex); - } - } - } - - @Nonnull - @Override - public SnapshotResult<KeyedStateHandle> performOperation() throws Exception { - long startTime = System.currentTimeMillis(); - - if (isStopped()) { - throw new IOException("RocksDB closed."); - } - - writeDBSnapshot(); - - LOG.debug("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", - checkpointStreamSupplier, Thread.currentThread(), (System.currentTimeMillis() - startTime)); - - return getSnapshotResultStateHandle(); - } - } - - /** - * Encapsulates the process to perform an incremental snapshot of a RocksDBKeyedStateBackend. - */ - private static final class RocksDBIncrementalSnapshotOperation<K> { - - /** The backend which we snapshot. */ - private final RocksDBKeyedStateBackend<K> stateBackend; - - /** Stream factory that creates the outpus streams to DFS. */ - private final CheckpointStreamFactory checkpointStreamFactory; - - /** Id for the current checkpoint. */ - private final long checkpointId; - - /** All sst files that were part of the last previously completed checkpoint. */ - private Set<StateHandleID> baseSstFiles; - - /** The state meta data. */ - private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>(); - - /** Local directory for the RocksDB native backup. */ - private SnapshotDirectory localBackupDirectory; - - // Registry for all opened i/o streams - private final CloseableRegistry closeableRegistry = new CloseableRegistry(); - - // new sst files since the last completed checkpoint - private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); - - // handles to the misc files in the current snapshot - private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); - - // This lease protects from concurrent disposal of the native rocksdb instance. - private final ResourceGuard.Lease dbLease; - - private SnapshotResult<StreamStateHandle> metaStateHandle = null; - - private RocksDBIncrementalSnapshotOperation( - RocksDBKeyedStateBackend<K> stateBackend, - CheckpointStreamFactory checkpointStreamFactory, - SnapshotDirectory localBackupDirectory, - long checkpointId) throws IOException { - - this.stateBackend = stateBackend; - this.checkpointStreamFactory = checkpointStreamFactory; - this.checkpointId = checkpointId; - this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); - this.localBackupDirectory = localBackupDirectory; - } - - private StreamStateHandle materializeStateData(Path filePath) throws Exception { - FSDataInputStream inputStream = null; - CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; - - try { - final byte[] buffer = new byte[8 * 1024]; - - FileSystem backupFileSystem = localBackupDirectory.getFileSystem(); - inputStream = backupFileSystem.open(filePath); - closeableRegistry.registerCloseable(inputStream); - - outputStream = checkpointStreamFactory - .createCheckpointStateOutputStream(CheckpointedStateScope.SHARED); - closeableRegistry.registerCloseable(outputStream); - - while (true) { - int numBytes = inputStream.read(buffer); - - if (numBytes == -1) { - break; - } - - outputStream.write(buffer, 0, numBytes); - } - - StreamStateHandle result = null; - if (closeableRegistry.unregisterCloseable(outputStream)) { - result = outputStream.closeAndGetHandle(); - outputStream = null; - } - return result; - - } finally { - - if (closeableRegistry.unregisterCloseable(inputStream)) { - inputStream.close(); - } - - if (closeableRegistry.unregisterCloseable(outputStream)) { - outputStream.close(); - } - } - } - - @Nonnull - private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception { - - LocalRecoveryConfig localRecoveryConfig = stateBackend.localRecoveryConfig; - - CheckpointStreamWithResultProvider streamWithResultProvider = - - localRecoveryConfig.isLocalRecoveryEnabled() ? - - CheckpointStreamWithResultProvider.createDuplicatingStream( - checkpointId, - CheckpointedStateScope.EXCLUSIVE, - checkpointStreamFactory, - localRecoveryConfig.getLocalStateDirectoryProvider()) : - - CheckpointStreamWithResultProvider.createSimpleStream( - CheckpointedStateScope.EXCLUSIVE, - checkpointStreamFactory); - - try { - closeableRegistry.registerCloseable(streamWithResultProvider); - - //no need for compression scheme support because sst-files are already compressed - KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>( - stateBackend.keySerializer, - stateMetaInfoSnapshots, - false); - - DataOutputView out = - new DataOutputViewStreamWrapper(streamWithResultProvider.getCheckpointOutputStream()); - - serializationProxy.write(out); - - if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) { - SnapshotResult<StreamStateHandle> result = - streamWithResultProvider.closeAndFinalizeCheckpointStreamResult(); - streamWithResultProvider = null; - return result; - } else { - throw new IOException("Stream already closed and cannot return a handle."); - } - } finally { - if (streamWithResultProvider != null) { - if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) { - IOUtils.closeQuietly(streamWithResultProvider); - } - } - } - } - - void takeSnapshot() throws Exception { - - final long lastCompletedCheckpoint; - - // use the last completed checkpoint as the comparison base. - synchronized (stateBackend.materializedSstFiles) { - lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId; - baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint); - } - - LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " + - "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles); - - // save meta data - for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> stateMetaInfoEntry - : stateBackend.kvStateInformation.entrySet()) { - stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot()); - } - - LOG.trace("Local RocksDB checkpoint goes to backup path {}.", localBackupDirectory); - - if (localBackupDirectory.exists()) { - throw new IllegalStateException("Unexpected existence of the backup directory."); - } - - // create hard links of living files in the snapshot path - try (Checkpoint checkpoint = Checkpoint.create(stateBackend.db)) { - checkpoint.createCheckpoint(localBackupDirectory.getDirectory().getPath()); - } - } - - @Nonnull - SnapshotResult<KeyedStateHandle> runSnapshot() throws Exception { - - stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry); - - // write meta data - metaStateHandle = materializeMetaData(); - - // sanity checks - they should never fail - Preconditions.checkNotNull(metaStateHandle, - "Metadata was not properly created."); - Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(), - "Metadata for job manager was not properly created."); - - // write state data - Preconditions.checkState(localBackupDirectory.exists()); - - FileStatus[] fileStatuses = localBackupDirectory.listStatus(); - if (fileStatuses != null) { - for (FileStatus fileStatus : fileStatuses) { - final Path filePath = fileStatus.getPath(); - final String fileName = filePath.getName(); - final StateHandleID stateHandleID = new StateHandleID(fileName); - - if (fileName.endsWith(SST_FILE_SUFFIX)) { - final boolean existsAlready = - baseSstFiles != null && baseSstFiles.contains(stateHandleID); - - if (existsAlready) { - // we introduce a placeholder state handle, that is replaced with the - // original from the shared state registry (created from a previous checkpoint) - sstFiles.put( - stateHandleID, - new PlaceholderStreamStateHandle()); - } else { - sstFiles.put(stateHandleID, materializeStateData(filePath)); - } - } else { - StreamStateHandle fileHandle = materializeStateData(filePath); - miscFiles.put(stateHandleID, fileHandle); - } - } - } - - synchronized (stateBackend.materializedSstFiles) { - stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet()); - } - - IncrementalKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalKeyedStateHandle( - stateBackend.backendUID, - stateBackend.keyGroupRange, - checkpointId, - sstFiles, - miscFiles, - metaStateHandle.getJobManagerOwnedSnapshot()); - - StreamStateHandle taskLocalSnapshotMetaDataStateHandle = metaStateHandle.getTaskLocalSnapshot(); - DirectoryStateHandle directoryStateHandle = null; - - try { - - directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle(); - } catch (IOException ex) { - - Exception collector = ex; - - try { - taskLocalSnapshotMetaDataStateHandle.discardState(); - } catch (Exception discardEx) { - collector = ExceptionUtils.firstOrSuppressed(discardEx, collector); - } - - LOG.warn("Problem with local state snapshot.", collector); - } - - if (directoryStateHandle != null && taskLocalSnapshotMetaDataStateHandle != null) { - - IncrementalLocalKeyedStateHandle localDirKeyedStateHandle = - new IncrementalLocalKeyedStateHandle( - stateBackend.backendUID, - checkpointId, - directoryStateHandle, - stateBackend.keyGroupRange, - taskLocalSnapshotMetaDataStateHandle, - sstFiles.keySet()); - return SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle); - } else { - return SnapshotResult.of(jmIncrementalKeyedStateHandle); - } - } - - void stop() { - - if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { - try { - closeableRegistry.close(); - } catch (IOException e) { - LOG.warn("Could not properly close io streams.", e); - } - } - } - - void releaseResources(boolean canceled) { - - dbLease.close(); - - if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { - try { - closeableRegistry.close(); - } catch (IOException e) { - LOG.warn("Exception on closing registry.", e); - } - } - - try { - if (localBackupDirectory.exists()) { - LOG.trace("Running cleanup for local RocksDB backup directory {}.", localBackupDirectory); - boolean cleanupOk = localBackupDirectory.cleanup(); - - if (!cleanupOk) { - LOG.debug("Could not properly cleanup local RocksDB backup directory."); - } - } - } catch (IOException e) { - LOG.warn("Could not properly cleanup local RocksDB backup directory.", e); - } - - if (canceled) { - Collection<StateObject> statesToDiscard = - new ArrayList<>(1 + miscFiles.size() + sstFiles.size()); - - statesToDiscard.add(metaStateHandle); - statesToDiscard.addAll(miscFiles.values()); - statesToDiscard.addAll(sstFiles.values()); - - try { - StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard); - } catch (Exception e) { - LOG.warn("Could not properly discard states.", e); - } - - if (localBackupDirectory.isSnapshotCompleted()) { - try { - DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle(); - if (directoryStateHandle != null) { - directoryStateHandle.discardState(); - } - } catch (Exception e) { - LOG.warn("Could not properly discard local state.", e); - } - } - } - } - } - public static RocksIteratorWrapper getRocksIterator(RocksDB db) { return new RocksIteratorWrapper(db.newIterator()); } @@ -2332,23 +1474,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle)); } - @SuppressWarnings("unchecked") - private static RocksIteratorWrapper getRocksIterator( - RocksDB db, - ColumnFamilyHandle columnFamilyHandle, - RegisteredStateMetaInfoBase metaInfo, - ReadOptions readOptions) { - StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null; - if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) { - stateSnapshotTransformer = (StateSnapshotTransformer<byte[]>) - ((RegisteredKeyValueStateBackendMetaInfo<?, ?>) metaInfo).getSnapshotTransformer(); - } - RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions); - return stateSnapshotTransformer == null ? - new RocksIteratorWrapper(rocksIterator) : - new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer); - } - /** * Encapsulates the logic and resources in connection with creating priority queue state structures. */ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java new file mode 100644 index 0000000..0cc9729 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.snapshot; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; +import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator; +import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeOffsets; +import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; +import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StreamCompressionDecorator; +import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; +import org.apache.flink.util.function.SupplierWithException; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.END_OF_KEY_GROUP_MARK; +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag; +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.setMetaDataFollowsFlagInKey; + +/** + * Snapshot strategy to create full snapshots of + * {@link org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}. Iterates and writes all states from a + * RocksDB snapshot of the column families. + * + * @param <K> type of the backend keys. + */ +public class RocksFullSnapshotStrategy<K> extends SnapshotStrategyBase<K> { + + private static final Logger LOG = LoggerFactory.getLogger(RocksFullSnapshotStrategy.class); + + /** This decorator is used to apply compression per key-group for the written snapshot data. */ + @Nonnull + private final StreamCompressionDecorator keyGroupCompressionDecorator; + + public RocksFullSnapshotStrategy( + @Nonnull RocksDB db, + @Nonnull ResourceGuard rocksDBResourceGuard, + @Nonnull TypeSerializer<K> keySerializer, + @Nonnull LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int keyGroupPrefixBytes, + @Nonnull LocalRecoveryConfig localRecoveryConfig, + @Nonnull CloseableRegistry cancelStreamRegistry, + @Nonnull StreamCompressionDecorator keyGroupCompressionDecorator) { + super( + db, + rocksDBResourceGuard, + keySerializer, + kvStateInformation, + keyGroupRange, + keyGroupPrefixBytes, + localRecoveryConfig, + cancelStreamRegistry); + + this.keyGroupCompressionDecorator = keyGroupCompressionDecorator; + } + + @Override + public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot( + long checkpointId, + long timestamp, + CheckpointStreamFactory primaryStreamFactory, + CheckpointOptions checkpointOptions) throws Exception { + + long startTime = System.currentTimeMillis(); + + if (kvStateInformation.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", + timestamp); + } + + return DoneFuture.of(SnapshotResult.empty()); + } + + final SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier = + + localRecoveryConfig.isLocalRecoveryEnabled() && + (CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ? + + () -> CheckpointStreamWithResultProvider.createDuplicatingStream( + checkpointId, + CheckpointedStateScope.EXCLUSIVE, + primaryStreamFactory, + localRecoveryConfig.getLocalStateDirectoryProvider()) : + + () -> CheckpointStreamWithResultProvider.createSimpleStream( + CheckpointedStateScope.EXCLUSIVE, + primaryStreamFactory); + + final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); + + final RocksDBFullSnapshotCallable snapshotOperation = + new RocksDBFullSnapshotCallable(supplier, snapshotCloseableRegistry); + + return new SnapshotTask(snapshotOperation); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + // nothing to do. + } + + /** + * Wrapping task to run a {@link RocksDBFullSnapshotCallable} and delegate cancellation. + */ + private class SnapshotTask extends FutureTask<SnapshotResult<KeyedStateHandle>> { + + /** Reference to the callable for cancellation. */ + @Nonnull + private final AutoCloseable callableClose; + + SnapshotTask(@Nonnull RocksDBFullSnapshotCallable callable) { + super(callable); + this.callableClose = callable; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + IOUtils.closeQuietly(callableClose); + return super.cancel(mayInterruptIfRunning); + } + } + + /** + * Encapsulates the process to perform a full snapshot of a RocksDBKeyedStateBackend. + */ + @VisibleForTesting + private class RocksDBFullSnapshotCallable implements Callable<SnapshotResult<KeyedStateHandle>>, AutoCloseable { + + @Nonnull + private final KeyGroupRangeOffsets keyGroupRangeOffsets; + + @Nonnull + private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier; + + @Nonnull + private final CloseableRegistry snapshotCloseableRegistry; + + @Nonnull + private final ResourceGuard.Lease dbLease; + + @Nonnull + private final Snapshot snapshot; + + @Nonnull + private final ReadOptions readOptions; + + /** + * The state meta data. + */ + @Nonnull + private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; + + /** + * The copied column handle. + */ + @Nonnull + private List<Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> metaDataCopy; + + private final AtomicBoolean ownedForCleanup; + + RocksDBFullSnapshotCallable( + @Nonnull SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier, + @Nonnull CloseableRegistry registry) throws IOException { + + this.ownedForCleanup = new AtomicBoolean(false); + this.checkpointStreamSupplier = checkpointStreamSupplier; + this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange); + this.snapshotCloseableRegistry = registry; + + this.stateMetaInfoSnapshots = new ArrayList<>(kvStateInformation.size()); + this.metaDataCopy = new ArrayList<>(kvStateInformation.size()); + for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 : kvStateInformation.values()) { + // snapshot meta info + this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot()); + this.metaDataCopy.add(tuple2); + } + + this.dbLease = rocksDBResourceGuard.acquireResource(); + + this.readOptions = new ReadOptions(); + this.snapshot = db.getSnapshot(); + this.readOptions.setSnapshot(snapshot); + } + + @Override + public SnapshotResult<KeyedStateHandle> call() throws Exception { + + if (!ownedForCleanup.compareAndSet(false, true)) { + throw new CancellationException("Snapshot task was already cancelled, stopping execution."); + } + + final long startTime = System.currentTimeMillis(); + final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators = new ArrayList<>(metaDataCopy.size()); + + try { + + cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry); + + final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider = checkpointStreamSupplier.get(); + snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider); + + final DataOutputView outputView = + new DataOutputViewStreamWrapper(checkpointStreamWithResultProvider.getCheckpointOutputStream()); + + writeKVStateMetaData(kvStateIterators, outputView); + writeKVStateData(kvStateIterators, checkpointStreamWithResultProvider); + + final SnapshotResult<KeyedStateHandle> snapshotResult = + createStateHandlesFromStreamProvider(checkpointStreamWithResultProvider); + + LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", + checkpointStreamSupplier, Thread.currentThread(), (System.currentTimeMillis() - startTime)); + + return snapshotResult; + + } finally { + + for (Tuple2<RocksIteratorWrapper, Integer> kvStateIterator : kvStateIterators) { + IOUtils.closeQuietly(kvStateIterator.f0); + } + + cleanupSynchronousStepResources(); + } + } + + private void cleanupSynchronousStepResources() { + IOUtils.closeQuietly(readOptions); + + db.releaseSnapshot(snapshot); + IOUtils.closeQuietly(snapshot); + + IOUtils.closeQuietly(dbLease); + + if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) { + try { + snapshotCloseableRegistry.close(); + } catch (Exception ex) { + LOG.warn("Error closing local registry", ex); + } + } + } + + private SnapshotResult<KeyedStateHandle> createStateHandlesFromStreamProvider( + CheckpointStreamWithResultProvider checkpointStreamWithResultProvider) throws IOException { + if (snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) { + return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult( + checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(), + keyGroupRangeOffsets); + } else { + throw new IOException("Snapshot was already closed before completion."); + } + } + + private void writeKVStateMetaData( + final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators, + final DataOutputView outputView) throws IOException { + + int kvStateId = 0; + + for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 : metaDataCopy) { + + RocksIteratorWrapper rocksIteratorWrapper = + getRocksIterator(db, tuple2.f0, tuple2.f1, readOptions); + + kvStateIterators.add(Tuple2.of(rocksIteratorWrapper, kvStateId)); + ++kvStateId; + } + + KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>( + // TODO: this code assumes that writing a serializer is threadsafe, we should support to + // get a serialized form already at state registration time in the future + keySerializer, + stateMetaInfoSnapshots, + !Objects.equals( + UncompressedStreamCompressionDecorator.INSTANCE, + keyGroupCompressionDecorator)); + + serializationProxy.write(outputView); + } + + private void writeKVStateData( + final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators, + final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider) throws IOException, InterruptedException { + + byte[] previousKey = null; + byte[] previousValue = null; + DataOutputView kgOutView = null; + OutputStream kgOutStream = null; + CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream = + checkpointStreamWithResultProvider.getCheckpointOutputStream(); + + try { + // Here we transfer ownership of RocksIterators to the RocksStatesPerKeyGroupMergeIterator + try (RocksStatesPerKeyGroupMergeIterator mergeIterator = new RocksStatesPerKeyGroupMergeIterator( + kvStateIterators, keyGroupPrefixBytes)) { + + //preamble: setup with first key-group as our lookahead + if (mergeIterator.isValid()) { + //begin first key-group by recording the offset + keyGroupRangeOffsets.setKeyGroupOffset( + mergeIterator.keyGroup(), + checkpointOutputStream.getPos()); + //write the k/v-state id as metadata + kgOutStream = keyGroupCompressionDecorator.decorateWithCompression(checkpointOutputStream); + kgOutView = new DataOutputViewStreamWrapper(kgOutStream); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutView.writeShort(mergeIterator.kvStateId()); + previousKey = mergeIterator.key(); + previousValue = mergeIterator.value(); + mergeIterator.next(); + } + + //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. + while (mergeIterator.isValid()) { + + assert (!hasMetaDataFollowsFlag(previousKey)); + + //set signal in first key byte that meta data will follow in the stream after this k/v pair + if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) { + + //be cooperative and check for interruption from time to time in the hot loop + checkInterrupted(); + + setMetaDataFollowsFlagInKey(previousKey); + } + + writeKeyValuePair(previousKey, previousValue, kgOutView); + + //write meta data if we have to + if (mergeIterator.isNewKeyGroup()) { + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutView.writeShort(END_OF_KEY_GROUP_MARK); + // this will just close the outer stream + kgOutStream.close(); + //begin new key-group + keyGroupRangeOffsets.setKeyGroupOffset( + mergeIterator.keyGroup(), + checkpointOutputStream.getPos()); + //write the kev-state + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutStream = keyGroupCompressionDecorator.decorateWithCompression(checkpointOutputStream); + kgOutView = new DataOutputViewStreamWrapper(kgOutStream); + kgOutView.writeShort(mergeIterator.kvStateId()); + } else if (mergeIterator.isNewKeyValueState()) { + //write the k/v-state + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutView.writeShort(mergeIterator.kvStateId()); + } + + //request next k/v pair + previousKey = mergeIterator.key(); + previousValue = mergeIterator.value(); + mergeIterator.next(); + } + } + + //epilogue: write last key-group + if (previousKey != null) { + assert (!hasMetaDataFollowsFlag(previousKey)); + setMetaDataFollowsFlagInKey(previousKey); + writeKeyValuePair(previousKey, previousValue, kgOutView); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutView.writeShort(END_OF_KEY_GROUP_MARK); + // this will just close the outer stream + kgOutStream.close(); + kgOutStream = null; + } + + } finally { + // this will just close the outer stream + IOUtils.closeQuietly(kgOutStream); + } + } + + private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException { + BytePrimitiveArraySerializer.INSTANCE.serialize(key, out); + BytePrimitiveArraySerializer.INSTANCE.serialize(value, out); + } + + private void checkInterrupted() throws InterruptedException { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("RocksDB snapshot interrupted."); + } + } + + @Override + public void close() throws Exception { + + if (ownedForCleanup.compareAndSet(false, true)) { + cleanupSynchronousStepResources(); + } + + if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) { + snapshotCloseableRegistry.close(); + } + } + } + + @SuppressWarnings("unchecked") + private static RocksIteratorWrapper getRocksIterator( + RocksDB db, + ColumnFamilyHandle columnFamilyHandle, + RegisteredStateMetaInfoBase metaInfo, + ReadOptions readOptions) { + StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null; + if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) { + stateSnapshotTransformer = (StateSnapshotTransformer<byte[]>) + ((RegisteredKeyValueStateBackendMetaInfo<?, ?>) metaInfo).getSnapshotTransformer(); + } + RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions); + return stateSnapshotTransformer == null ? + new RocksIteratorWrapper(rocksIterator) : + new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer); + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java new file mode 100644 index 0000000..3487fe6 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.snapshot; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.DirectoryStateHandle; +import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider; +import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; +import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; +import org.apache.flink.runtime.state.SnapshotDirectory; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.SnapshotStrategy; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.Checkpoint; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; + +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX; + +/** + * Snapshot strategy for {@link org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend} that is based + * on RocksDB's native checkpoints and creates incremental snapshots. + * + * @param <K> type of the backend keys. + */ +public class RocksIncrementalSnapshotStrategy<K> extends SnapshotStrategyBase<K> { + + private static final Logger LOG = LoggerFactory.getLogger(RocksIncrementalSnapshotStrategy.class); + + /** Base path of the RocksDB instance. */ + @Nonnull + private final File instanceBasePath; + + /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */ + @Nonnull + private final UUID backendUID; + + /** Stores the materialized sstable files from all snapshots that build the incremental history. */ + @Nonnull + private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles; + + /** The identifier of the last completed checkpoint. */ + private long lastCompletedCheckpointId; + + /** We delegate snapshots that are for savepoints to this. */ + @Nonnull + private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate; + + public RocksIncrementalSnapshotStrategy( + @Nonnull RocksDB db, + @Nonnull ResourceGuard rocksDBResourceGuard, + @Nonnull TypeSerializer<K> keySerializer, + @Nonnull LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int keyGroupPrefixBytes, + @Nonnull LocalRecoveryConfig localRecoveryConfig, + @Nonnull CloseableRegistry cancelStreamRegistry, + @Nonnull File instanceBasePath, + @Nonnull UUID backendUID, + @Nonnull SortedMap<Long, Set<StateHandleID>> materializedSstFiles, + long lastCompletedCheckpointId, + @Nonnull SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate) { + + super( + db, + rocksDBResourceGuard, + keySerializer, + kvStateInformation, + keyGroupRange, + keyGroupPrefixBytes, + localRecoveryConfig, + cancelStreamRegistry); + + this.instanceBasePath = instanceBasePath; + this.backendUID = backendUID; + this.materializedSstFiles = materializedSstFiles; + this.lastCompletedCheckpointId = lastCompletedCheckpointId; + this.savepointDelegate = savepointDelegate; + } + + @Override + public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot( + long checkpointId, + long checkpointTimestamp, + CheckpointStreamFactory checkpointStreamFactory, + CheckpointOptions checkpointOptions) throws Exception { + + // for savepoints, we delegate to the full snapshot strategy because savepoints are always self-contained. + if (CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType()) { + return savepointDelegate.performSnapshot( + checkpointId, + checkpointTimestamp, + checkpointStreamFactory, + checkpointOptions); + } + + if (kvStateInformation.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", checkpointTimestamp); + } + return DoneFuture.of(SnapshotResult.empty()); + } + + SnapshotDirectory snapshotDirectory; + + if (localRecoveryConfig.isLocalRecoveryEnabled()) { + // create a "permanent" snapshot directory for local recovery. + LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider(); + File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId); + + if (directory.exists()) { + FileUtils.deleteDirectory(directory); + } + + if (!directory.mkdirs()) { + throw new IOException("Local state base directory for checkpoint " + checkpointId + + " already exists: " + directory); + } + + // introduces an extra directory because RocksDB wants a non-existing directory for native checkpoints. + File rdbSnapshotDir = new File(directory, "rocks_db"); + Path path = new Path(rdbSnapshotDir.toURI()); + // create a "permanent" snapshot directory because local recovery is active. + snapshotDirectory = SnapshotDirectory.permanent(path); + } else { + // create a "temporary" snapshot directory because local recovery is inactive. + Path path = new Path(instanceBasePath.getAbsolutePath(), "chk-" + checkpointId); + snapshotDirectory = SnapshotDirectory.temporary(path); + } + + final RocksDBIncrementalSnapshotOperation snapshotOperation = + new RocksDBIncrementalSnapshotOperation( + checkpointStreamFactory, + snapshotDirectory, + checkpointId); + + try { + snapshotOperation.takeSnapshot(); + } catch (Exception e) { + snapshotOperation.stop(); + snapshotOperation.releaseResources(true); + throw e; + } + + return new FutureTask<SnapshotResult<KeyedStateHandle>>( + snapshotOperation::runSnapshot + ) { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + snapshotOperation.stop(); + return super.cancel(mayInterruptIfRunning); + } + + @Override + protected void done() { + snapshotOperation.releaseResources(isCancelled()); + } + }; + } + + @Override + public void notifyCheckpointComplete(long completedCheckpointId) { + synchronized (materializedSstFiles) { + + if (completedCheckpointId < lastCompletedCheckpointId) { + return; + } + + materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId); + + lastCompletedCheckpointId = completedCheckpointId; + } + } + + /** + * Encapsulates the process to perform an incremental snapshot of a RocksDBKeyedStateBackend. + */ + private final class RocksDBIncrementalSnapshotOperation { + + /** + * Stream factory that creates the outpus streams to DFS. + */ + private final CheckpointStreamFactory checkpointStreamFactory; + + /** + * Id for the current checkpoint. + */ + private final long checkpointId; + + /** + * All sst files that were part of the last previously completed checkpoint. + */ + private Set<StateHandleID> baseSstFiles; + + /** + * The state meta data. + */ + private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; + + /** + * Local directory for the RocksDB native backup. + */ + private SnapshotDirectory localBackupDirectory; + + // Registry for all opened i/o streams + private final CloseableRegistry closeableRegistry; + + // new sst files since the last completed checkpoint + private final Map<StateHandleID, StreamStateHandle> sstFiles; + + // handles to the misc files in the current snapshot + private final Map<StateHandleID, StreamStateHandle> miscFiles; + + // This lease protects from concurrent disposal of the native rocksdb instance. + private final ResourceGuard.Lease dbLease; + + private SnapshotResult<StreamStateHandle> metaStateHandle; + + private RocksDBIncrementalSnapshotOperation( + CheckpointStreamFactory checkpointStreamFactory, + SnapshotDirectory localBackupDirectory, + long checkpointId) throws IOException { + + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.localBackupDirectory = localBackupDirectory; + this.stateMetaInfoSnapshots = new ArrayList<>(); + this.closeableRegistry = new CloseableRegistry(); + this.sstFiles = new HashMap<>(); + this.miscFiles = new HashMap<>(); + this.metaStateHandle = null; + this.dbLease = rocksDBResourceGuard.acquireResource(); + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + FSDataInputStream inputStream = null; + CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + try { + final byte[] buffer = new byte[8 * 1024]; + + FileSystem backupFileSystem = localBackupDirectory.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + closeableRegistry.registerCloseable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(CheckpointedStateScope.SHARED); + closeableRegistry.registerCloseable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + StreamStateHandle result = null; + if (closeableRegistry.unregisterCloseable(outputStream)) { + result = outputStream.closeAndGetHandle(); + outputStream = null; + } + return result; + + } finally { + + if (closeableRegistry.unregisterCloseable(inputStream)) { + inputStream.close(); + } + + if (closeableRegistry.unregisterCloseable(outputStream)) { + outputStream.close(); + } + } + } + + @Nonnull + private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception { + + CheckpointStreamWithResultProvider streamWithResultProvider = + + localRecoveryConfig.isLocalRecoveryEnabled() ? + + CheckpointStreamWithResultProvider.createDuplicatingStream( + checkpointId, + CheckpointedStateScope.EXCLUSIVE, + checkpointStreamFactory, + localRecoveryConfig.getLocalStateDirectoryProvider()) : + + CheckpointStreamWithResultProvider.createSimpleStream( + CheckpointedStateScope.EXCLUSIVE, + checkpointStreamFactory); + + try { + closeableRegistry.registerCloseable(streamWithResultProvider); + + //no need for compression scheme support because sst-files are already compressed + KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>( + keySerializer, + stateMetaInfoSnapshots, + false); + + DataOutputView out = + new DataOutputViewStreamWrapper(streamWithResultProvider.getCheckpointOutputStream()); + + serializationProxy.write(out); + + if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) { + SnapshotResult<StreamStateHandle> result = + streamWithResultProvider.closeAndFinalizeCheckpointStreamResult(); + streamWithResultProvider = null; + return result; + } else { + throw new IOException("Stream already closed and cannot return a handle."); + } + } finally { + if (streamWithResultProvider != null) { + if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) { + IOUtils.closeQuietly(streamWithResultProvider); + } + } + } + } + + void takeSnapshot() throws Exception { + + final long lastCompletedCheckpoint; + + // use the last completed checkpoint as the comparison base. + synchronized (materializedSstFiles) { + lastCompletedCheckpoint = lastCompletedCheckpointId; + baseSstFiles = materializedSstFiles.get(lastCompletedCheckpoint); + } + + LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " + + "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles); + + // save meta data + for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> stateMetaInfoEntry + : kvStateInformation.entrySet()) { + stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot()); + } + + LOG.trace("Local RocksDB checkpoint goes to backup path {}.", localBackupDirectory); + + if (localBackupDirectory.exists()) { + throw new IllegalStateException("Unexpected existence of the backup directory."); + } + + // create hard links of living files in the snapshot path + try (Checkpoint checkpoint = Checkpoint.create(db)) { + checkpoint.createCheckpoint(localBackupDirectory.getDirectory().getPath()); + } + } + + @Nonnull + SnapshotResult<KeyedStateHandle> runSnapshot() throws Exception { + + cancelStreamRegistry.registerCloseable(closeableRegistry); + + // write meta data + metaStateHandle = materializeMetaData(); + + // sanity checks - they should never fail + Preconditions.checkNotNull(metaStateHandle, + "Metadata was not properly created."); + Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(), + "Metadata for job manager was not properly created."); + + // write state data + Preconditions.checkState(localBackupDirectory.exists()); + + FileStatus[] fileStatuses = localBackupDirectory.listStatus(); + if (fileStatuses != null) { + for (FileStatus fileStatus : fileStatuses) { + final Path filePath = fileStatus.getPath(); + final String fileName = filePath.getName(); + final StateHandleID stateHandleID = new StateHandleID(fileName); + + if (fileName.endsWith(SST_FILE_SUFFIX)) { + final boolean existsAlready = + baseSstFiles != null && baseSstFiles.contains(stateHandleID); + + if (existsAlready) { + // we introduce a placeholder state handle, that is replaced with the + // original from the shared state registry (created from a previous checkpoint) + sstFiles.put( + stateHandleID, + new PlaceholderStreamStateHandle()); + } else { + sstFiles.put(stateHandleID, materializeStateData(filePath)); + } + } else { + StreamStateHandle fileHandle = materializeStateData(filePath); + miscFiles.put(stateHandleID, fileHandle); + } + } + } + + synchronized (materializedSstFiles) { + materializedSstFiles.put(checkpointId, sstFiles.keySet()); + } + + IncrementalKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalKeyedStateHandle( + backendUID, + keyGroupRange, + checkpointId, + sstFiles, + miscFiles, + metaStateHandle.getJobManagerOwnedSnapshot()); + + StreamStateHandle taskLocalSnapshotMetaDataStateHandle = metaStateHandle.getTaskLocalSnapshot(); + DirectoryStateHandle directoryStateHandle = null; + + try { + + directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle(); + } catch (IOException ex) { + + Exception collector = ex; + + try { + taskLocalSnapshotMetaDataStateHandle.discardState(); + } catch (Exception discardEx) { + collector = ExceptionUtils.firstOrSuppressed(discardEx, collector); + } + + LOG.warn("Problem with local state snapshot.", collector); + } + + if (directoryStateHandle != null && taskLocalSnapshotMetaDataStateHandle != null) { + + IncrementalLocalKeyedStateHandle localDirKeyedStateHandle = + new IncrementalLocalKeyedStateHandle( + backendUID, + checkpointId, + directoryStateHandle, + keyGroupRange, + taskLocalSnapshotMetaDataStateHandle, + sstFiles.keySet()); + return SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle); + } else { + return SnapshotResult.of(jmIncrementalKeyedStateHandle); + } + } + + void stop() { + + if (cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { + try { + closeableRegistry.close(); + } catch (IOException e) { + LOG.warn("Could not properly close io streams.", e); + } + } + } + + void releaseResources(boolean canceled) { + + dbLease.close(); + + if (cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { + try { + closeableRegistry.close(); + } catch (IOException e) { + LOG.warn("Exception on closing registry.", e); + } + } + + try { + if (localBackupDirectory.exists()) { + LOG.trace("Running cleanup for local RocksDB backup directory {}.", localBackupDirectory); + boolean cleanupOk = localBackupDirectory.cleanup(); + + if (!cleanupOk) { + LOG.debug("Could not properly cleanup local RocksDB backup directory."); + } + } + } catch (IOException e) { + LOG.warn("Could not properly cleanup local RocksDB backup directory.", e); + } + + if (canceled) { + Collection<StateObject> statesToDiscard = + new ArrayList<>(1 + miscFiles.size() + sstFiles.size()); + + statesToDiscard.add(metaStateHandle); + statesToDiscard.addAll(miscFiles.values()); + statesToDiscard.addAll(sstFiles.values()); + + try { + StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard); + } catch (Exception e) { + LOG.warn("Could not properly discard states.", e); + } + + if (localBackupDirectory.isSnapshotCompleted()) { + try { + DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle(); + if (directoryStateHandle != null) { + directoryStateHandle.discardState(); + } + } catch (Exception e) { + LOG.warn("Could not properly discard local state.", e); + } + } + } + } + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java new file mode 100644 index 0000000..bf2bbdb --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.snapshot; + +/** + * Utility methods and constants around RocksDB creating and restoring snapshots for + * {@link org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}. + */ +public class RocksSnapshotUtil { + + /** + * File suffix of sstable files. + */ + public static final String SST_FILE_SUFFIX = ".sst"; + + public static final int FIRST_BIT_IN_BYTE_MASK = 0x80; + + public static final int END_OF_KEY_GROUP_MARK = 0xFFFF; + + public static void setMetaDataFollowsFlagInKey(byte[] key) { + key[0] |= FIRST_BIT_IN_BYTE_MASK; + } + + public static void clearMetaDataFollowsFlag(byte[] key) { + key[0] &= (~FIRST_BIT_IN_BYTE_MASK); + } + + public static boolean hasMetaDataFollowsFlag(byte[] key) { + return 0 != (key[0] & FIRST_BIT_IN_BYTE_MASK); + } + + private RocksSnapshotUtil() { + throw new AssertionError(); + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/SnapshotStrategyBase.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/SnapshotStrategyBase.java new file mode 100644 index 0000000..efebe8c --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/SnapshotStrategyBase.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.snapshot; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.SnapshotStrategy; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.util.LinkedHashMap; + +/** + * Base class for {@link SnapshotStrategy} implementations on RocksDB. + * + * @param <K> type of the backend keys. + */ +public abstract class SnapshotStrategyBase<K> implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> { + + @Nonnull + protected final RocksDB db; + + @Nonnull + protected final ResourceGuard rocksDBResourceGuard; + + @Nonnull + protected final TypeSerializer<K> keySerializer; + + @Nonnull + protected final LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation; + + @Nonnull + protected final KeyGroupRange keyGroupRange; + + @Nonnegative + protected final int keyGroupPrefixBytes; + + @Nonnull + protected final LocalRecoveryConfig localRecoveryConfig; + + @Nonnull + protected final CloseableRegistry cancelStreamRegistry; + + public SnapshotStrategyBase( + @Nonnull RocksDB db, + @Nonnull ResourceGuard rocksDBResourceGuard, + @Nonnull TypeSerializer<K> keySerializer, + @Nonnull LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int keyGroupPrefixBytes, + @Nonnull LocalRecoveryConfig localRecoveryConfig, + @Nonnull CloseableRegistry cancelStreamRegistry) { + + this.db = db; + this.rocksDBResourceGuard = rocksDBResourceGuard; + this.keySerializer = keySerializer; + this.kvStateInformation = kvStateInformation; + this.keyGroupRange = keyGroupRange; + this.keyGroupPrefixBytes = keyGroupPrefixBytes; + this.localRecoveryConfig = localRecoveryConfig; + this.cancelStreamRegistry = cancelStreamRegistry; + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index e344638..c872553 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -91,6 +91,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.END_OF_KEY_GROUP_MARK; +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.FIRST_BIT_IN_BYTE_MASK; +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.clearMetaDataFollowsFlag; +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag; +import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.setMetaDataFollowsFlagInKey; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.spy; @@ -425,21 +430,19 @@ public class RocksDBAsyncSnapshotTest extends TestLogger { @Test public void testConsistentSnapshotSerializationFlagsAndMasks() { - Assert.assertEquals(0xFFFF, RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK); - Assert.assertEquals(0x80, RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); + Assert.assertEquals(0xFFFF, END_OF_KEY_GROUP_MARK); + Assert.assertEquals(0x80, FIRST_BIT_IN_BYTE_MASK); byte[] expectedKey = new byte[] {42, 42}; byte[] modKey = expectedKey.clone(); - Assert.assertFalse( - RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey)); + Assert.assertFalse(hasMetaDataFollowsFlag(modKey)); - RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.setMetaDataFollowsFlagInKey(modKey); - Assert.assertTrue(RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey)); + setMetaDataFollowsFlagInKey(modKey); + Assert.assertTrue(hasMetaDataFollowsFlag(modKey)); - RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(modKey); - Assert.assertFalse( - RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey)); + clearMetaDataFollowsFlag(modKey); + Assert.assertFalse(hasMetaDataFollowsFlag(modKey)); Assert.assertTrue(Arrays.equals(expectedKey, modKey)); } @@ -504,12 +507,12 @@ public class RocksDBAsyncSnapshotTest extends TestLogger { @Nullable @Override - public StreamStateHandle closeAndGetHandle() throws IOException { + public StreamStateHandle closeAndGetHandle() { throw new UnsupportedOperationException(); } @Override - public long getPos() throws IOException { + public long getPos() { throw new UnsupportedOperationException(); } @@ -529,7 +532,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger { } @Override - public void close() throws IOException { + public void close() { throw new UnsupportedOperationException(); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 0ea0d3f..4916251 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -191,6 +191,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa allCreatedCloseables = new ArrayList<>(); keyedStateBackend.db = spy(keyedStateBackend.db); + keyedStateBackend.initializeSnapshotStrategy(null); doAnswer(new Answer<Object>() {