[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348543#comment-16348543 ]
ASF GitHub Bot commented on FLINK-8360: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r165349012 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -363,1691 +372,1780 @@ public int getKeyGroupPrefixBytes() { final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { - if (checkpointOptions.getCheckpointType() != CheckpointOptions.CheckpointType.SAVEPOINT && - enableIncrementalCheckpointing) { - return snapshotIncrementally(checkpointId, timestamp, streamFactory); - } else { - return snapshotFully(checkpointId, timestamp, streamFactory); - } + return snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions); } - private RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotIncrementally( - final long checkpointId, - final long checkpointTimestamp, - final CheckpointStreamFactory checkpointStreamFactory) throws Exception { - - if (db == null) { - throw new IOException("RocksDB closed."); - } + @Override + public void restore(StateObjectCollection<KeyedStateHandle> restoreState) throws Exception { + LOG.info("Initializing RocksDB keyed state backend from snapshot."); - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + - checkpointTimestamp + " . Returning null."); - } - return DoneFuture.nullValue(); + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring snapshot from state handles: {}.", restoreState); } - final RocksDBIncrementalSnapshotOperation<K> snapshotOperation = - new RocksDBIncrementalSnapshotOperation<>( - this, - checkpointStreamFactory, - checkpointId, - checkpointTimestamp); - - snapshotOperation.takeSnapshot(); - - return new FutureTask<SnapshotResult<KeyedStateHandle>>( - () -> snapshotOperation.materializeSnapshot() - ) { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - snapshotOperation.stop(); - return super.cancel(mayInterruptIfRunning); - } + // clear all meta data + kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); - @Override - protected void done() { - snapshotOperation.releaseResources(isCancelled()); + try { + if (restoreState == null || restoreState.isEmpty()) { + createDB(); + } else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) { + RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this); + restoreOperation.restore(restoreState); + } else { + RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this); + restoreOperation.doRestore(restoreState); } - }; + } catch (Exception ex) { + dispose(); + throw ex; + } } - private RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotFully( - final long checkpointId, - final long timestamp, - final CheckpointStreamFactory streamFactory) throws Exception { - - long startTime = System.currentTimeMillis(); - final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); - - final RocksDBFullSnapshotOperation<K> snapshotOperation; - - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + - " . Returning null."); - } + @Override + public void notifyCheckpointComplete(long completedCheckpointId) { - return DoneFuture.nullValue(); + if (!enableIncrementalCheckpointing) { + return; } - snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry); - snapshotOperation.takeDBSnapShot(checkpointId, timestamp); - - // implementation of the async IO operation, based on FutureTask - AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = - new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() { + synchronized (materializedSstFiles) { - @Override - protected void acquireResources() throws Exception { - cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry); - snapshotOperation.openCheckpointStream(); - } + if (completedCheckpointId < lastCompletedCheckpointId) { + return; + } - @Override - protected void releaseResources() throws Exception { - closeLocalRegistry(); - releaseSnapshotOperationResources(); - } + materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId); - private void releaseSnapshotOperationResources() { - // hold the db lock while operation on the db to guard us against async db disposal - snapshotOperation.releaseSnapshotResources(); - } + lastCompletedCheckpointId = completedCheckpointId; + } + } - @Override - protected void stopOperation() throws Exception { - closeLocalRegistry(); - } + private void createDB() throws IOException { + List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); + this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles); + this.defaultColumnFamily = columnFamilyHandles.get(0); + } - private void closeLocalRegistry() { - if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) { - try { - snapshotCloseableRegistry.close(); - } catch (Exception ex) { - LOG.warn("Error closing local registry", ex); - } - } - } + private RocksDB openDB( + String path, + List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, + List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException { - @Override - public SnapshotResult<KeyedStateHandle> performOperation() throws Exception { - long startTime = System.currentTimeMillis(); + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + new ArrayList<>(1 + stateColumnFamilyDescriptors.size()); - if (isStopped()) { - throw new IOException("RocksDB closed."); - } + columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors); - snapshotOperation.writeDBSnapshot(); + // we add the required descriptor for the default CF in last position. + columnFamilyDescriptors.add(new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions)); - LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", - streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); + RocksDB dbRef; - KeyGroupsStateHandle stateHandle = snapshotOperation.getSnapshotResultStateHandle(); - return new SnapshotResult<>(stateHandle, null); - } - }; + try { + dbRef = RocksDB.open( + Preconditions.checkNotNull(dbOptions), + Preconditions.checkNotNull(path), + columnFamilyDescriptors, + stateColumnFamilyHandles); + } catch (RocksDBException e) { + throw new IOException("Error while opening RocksDB instance.", e); + } - LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in thread " + - Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms."); + // requested + default CF + Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), + "Not all requested column family handles have been created"); - return AsyncStoppableTaskWithCallback.from(ioCallable); + return dbRef; } /** - * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend. + * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a full snapshot. */ - static final class RocksDBFullSnapshotOperation<K> { + private static final class RocksDBFullRestoreOperation<K> { - static final int FIRST_BIT_IN_BYTE_MASK = 0x80; - static final int END_OF_KEY_GROUP_MARK = 0xFFFF; - - private final RocksDBKeyedStateBackend<K> stateBackend; - private final KeyGroupRangeOffsets keyGroupRangeOffsets; - private final CheckpointStreamFactory checkpointStreamFactory; - private final CloseableRegistry snapshotCloseableRegistry; - private final ResourceGuard.Lease dbLease; - - private long checkpointId; - private long checkpointTimeStamp; - - private Snapshot snapshot; - private ReadOptions readOptions; - private List<Tuple2<RocksIterator, Integer>> kvStateIterators; - - private CheckpointStreamFactory.CheckpointStateOutputStream outStream; - private DataOutputView outputView; - - RocksDBFullSnapshotOperation( - RocksDBKeyedStateBackend<K> stateBackend, - CheckpointStreamFactory checkpointStreamFactory, - CloseableRegistry registry) throws IOException { + private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend; - this.stateBackend = stateBackend; - this.checkpointStreamFactory = checkpointStreamFactory; - this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange); - this.snapshotCloseableRegistry = registry; - this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); - } + /** Current key-groups state handle from which we restore key-groups. */ + private KeyGroupsStateHandle currentKeyGroupsStateHandle; + /** Current input stream we obtained from currentKeyGroupsStateHandle. */ + private FSDataInputStream currentStateHandleInStream; + /** Current data input view that wraps currentStateHandleInStream. */ + private DataInputView currentStateHandleInView; + /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */ + private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies; + /** The compression decorator that was used for writing the state, as determined by the meta data. */ + private StreamCompressionDecorator keygroupStreamCompressionDecorator; /** - * 1) Create a snapshot object from RocksDB. + * Creates a restore operation object for the given state backend instance. * - * @param checkpointId id of the checkpoint for which we take the snapshot - * @param checkpointTimeStamp timestamp of the checkpoint for which we take the snapshot + * @param rocksDBKeyedStateBackend the state backend into which we restore */ - public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) { - Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!"); - this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size()); - this.checkpointId = checkpointId; - this.checkpointTimeStamp = checkpointTimeStamp; - this.snapshot = stateBackend.db.getSnapshot(); + public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) { + this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend); } /** - * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write. + * Restores all key-groups data that is referenced by the passed state handles. * - * @throws Exception + * @param keyedStateHandles List of all key groups state handles that shall be restored. */ - public void openCheckpointStream() throws Exception { - Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set."); - outStream = checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp); - snapshotCloseableRegistry.registerCloseable(outStream); - outputView = new DataOutputViewStreamWrapper(outStream); - } + public void doRestore(Collection<KeyedStateHandle> keyedStateHandles) + throws IOException, StateMigrationException, RocksDBException { - /** - * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1). - * - * @throws IOException - */ - public void writeDBSnapshot() throws IOException, InterruptedException { + rocksDBKeyedStateBackend.createDB(); - if (null == snapshot) { - throw new IOException("No snapshot available. Might be released due to cancellation."); - } + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + if (keyedStateHandle != null) { - Preconditions.checkNotNull(outStream, "No output stream to write snapshot."); - writeKVStateMetaData(); - writeKVStateData(); + if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + KeyGroupsStateHandle.class + + ", but found: " + keyedStateHandle.getClass()); + } + this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; + restoreKeyGroupsInStateHandle(); + } + } } /** - * 4) Returns a state handle to the snapshot after the snapshot procedure is completed and null before. - * - * @return state handle to the completed snapshot + * Restore one key groups state handle. */ - public KeyGroupsStateHandle getSnapshotResultStateHandle() throws IOException { - - if (snapshotCloseableRegistry.unregisterCloseable(outStream)) { - - StreamStateHandle stateHandle = outStream.closeAndGetHandle(); - outStream = null; - - if (stateHandle != null) { - return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); + private void restoreKeyGroupsInStateHandle() + throws IOException, StateMigrationException, RocksDBException { + try { + currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream(); + rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream); + currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream); + restoreKVStateMetaData(); + restoreKVStateData(); + } finally { + if (rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) { + IOUtils.closeQuietly(currentStateHandleInStream); } } - return null; } /** - * 5) Release the snapshot object for RocksDB and clean up. + * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws RocksDBException */ - public void releaseSnapshotResources() { + private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { - outStream = null; + KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); - if (null != kvStateIterators) { - for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) { - IOUtils.closeQuietly(kvStateIterator.f0); - } - kvStateIterators = null; - } + serializationProxy.read(currentStateHandleInView); - if (null != snapshot) { - if (null != stateBackend.db) { - stateBackend.db.releaseSnapshot(snapshot); - } - IOUtils.closeQuietly(snapshot); - snapshot = null; - } + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (CompatibilityUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + UnloadableDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + rocksDBKeyedStateBackend.keySerializer) + .isRequiresMigration()) { - if (null != readOptions) { - IOUtils.closeQuietly(readOptions); - readOptions = null; + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); } - this.dbLease.close(); - } - - private void writeKVStateMetaData() throws IOException { + this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? + SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE; - List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots = - new ArrayList<>(stateBackend.kvStateInformation.size()); + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = + serializationProxy.getStateMetaInfoSnapshots(); + currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); + //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size()); - int kvStateId = 0; - for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> column : - stateBackend.kvStateInformation.entrySet()) { + for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) { - metaInfoSnapshots.add(column.getValue().f1.snapshot()); + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn = + rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName()); - //retrieve iterator for this k/v states - readOptions = new ReadOptions(); - readOptions.setSnapshot(snapshot); + if (registeredColumn == null) { + byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); - kvStateIterators.add( - new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId)); + ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( + nameBytes, + rocksDBKeyedStateBackend.columnOptions); - ++kvStateId; - } + RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + restoredMetaInfo.getStateType(), + restoredMetaInfo.getName(), + restoredMetaInfo.getNamespaceSerializer(), + restoredMetaInfo.getStateSerializer()); - KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>( - stateBackend.getKeySerializer(), - metaInfoSnapshots, - !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator)); + rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); - serializationProxy.write(outputView); - } + ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor); - private void writeKVStateData() throws IOException, InterruptedException { + registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo); + rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn); - byte[] previousKey = null; - byte[] previousValue = null; - OutputStream kgOutStream = null; - DataOutputView kgOutView = null; + } else { + // TODO with eager state registration in place, check here for serializer migration strategies + } + currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0); + } + } - try { - // Here we transfer ownership of RocksIterators to the RocksDBMergeIterator - try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator( - kvStateIterators, stateBackend.keyGroupPrefixBytes)) { + /** + * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle. + * + * @throws IOException + * @throws RocksDBException + */ + private void restoreKVStateData() throws IOException, RocksDBException { + //for all key-groups in the current state handle... + for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { + int keyGroup = keyGroupOffset.f0; - // handover complete, null out to prevent double close - kvStateIterators = null; + // Check that restored key groups all belong to the backend + Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), + "The key group must belong to the backend"); - //preamble: setup with first key-group as our lookahead - if (mergeIterator.isValid()) { - //begin first key-group by recording the offset - keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); - //write the k/v-state id as metadata - kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream); - kgOutView = new DataOutputViewStreamWrapper(kgOutStream); + long offset = keyGroupOffset.f1; + //not empty key-group? + if (0L != offset) { + currentStateHandleInStream.seek(offset); + try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) { + DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn); //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(mergeIterator.kvStateId()); - previousKey = mergeIterator.key(); - previousValue = mergeIterator.value(); - mergeIterator.next(); + int kvStateId = compressedKgInputView.readShort(); + ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + //insert all k/v pairs into DB + boolean keyGroupHasMoreKeys = true; + while (keyGroupHasMoreKeys) { + byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); + byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); + if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) { + //clear the signal bit in the key to make it ready for insertion again + RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key); + rocksDBKeyedStateBackend.db.put(handle, key, value); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK + & compressedKgInputView.readShort(); + if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) { + keyGroupHasMoreKeys = false; + } else { + handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + } + } else { + rocksDBKeyedStateBackend.db.put(handle, key, value); + } + } } + } + } + } + } - //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. - while (mergeIterator.isValid()) { + /** + * Encapsulates the process of restoring a RocksDBKeyedStateBackend from an incremental snapshot. + */ + private static class RocksDBIncrementalRestoreOperation<T> { --- End diff -- Would it actually make sense to give these classes their own files? The `RocksDBKeyedStateBackend` file is 2100 lines long which makes it quite hard to handle. > Implement task-local state recovery > ----------------------------------- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing > Reporter: Stefan Richter > Assignee: Stefan Richter > Priority: Major > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)