http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java new file mode 100644 index 0000000..5507339 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -0,0 +1,2033 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources; +import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeOffsets; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; +import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; +import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamCompressionDecorator; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; +import org.apache.flink.runtime.state.internal.InternalAggregatingState; +import org.apache.flink.runtime.state.internal.InternalFoldingState; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.internal.InternalReducingState; +import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; +import org.apache.flink.util.StateMigrationException; + +import org.rocksdb.Checkpoint; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * An {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and serializes state to + * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon + * checkpointing. This state backend can store very large state that exceeds memory and spills + * to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe. + * + * <p>This class follows the rules for closing/releasing native RocksDB resources as described in + + <a href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families"> + * this document</a>. + */ +public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class); + + /** The name of the merge operator in RocksDB. Do not change except you know exactly what you do. */ + public static final String MERGE_OPERATOR_NAME = "stringappendtest"; + + /** File suffix of sstable files. */ + private static final String SST_FILE_SUFFIX = ".sst"; + + /** Bytes for the name of the column descriptor for the default column family. */ + public static final byte[] DEFAULT_COLUMN_FAMILY_NAME_BYTES = "default".getBytes(ConfigConstants.DEFAULT_CHARSET); + + /** String that identifies the operator that owns this backend. */ + private final String operatorIdentifier; + + /** The column family options from the options factory. */ + private final ColumnFamilyOptions columnOptions; + + /** The DB options from the options factory. */ + private final DBOptions dbOptions; + + /** Path where this configured instance stores its data directory. */ + private final File instanceBasePath; + + /** Path where this configured instance stores its RocksDB database. */ + private final File instanceRocksDBPath; + + /** + * Protects access to RocksDB in other threads, like the checkpointing thread from parallel call that disposes the + * RocksDb object. + */ + private final ResourceGuard rocksDBResourceGuard; + + /** + * Our RocksDB database, this is used by the actual subclasses of {@link AbstractRocksDBState} + * to store state. The different k/v states that we have don't each have their own RocksDB + * instance. They all write to this instance but to their own column family. + */ + protected RocksDB db; + + /** + * We are not using the default column family for Flink state ops, but we still need to remember this handle so that + * we can close it properly when the backend is closed. This is required by RocksDB's native memory management. + */ + private ColumnFamilyHandle defaultColumnFamily; + + /** + * Information about the k/v states as we create them. This is used to retrieve the + * column family that is used for a state and also for sanity checks when restoring. + */ + private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation; + + /** + * Map of state names to their corresponding restored state meta info. + * + * <p>TODO this map can be removed when eager-state registration is in place. + * TODO we currently need this cached to check state migration strategies when new serializers are registered. + */ + private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos; + + /** Number of bytes required to prefix the key groups. */ + private final int keyGroupPrefixBytes; + + /** True if incremental checkpointing is enabled. */ + private final boolean enableIncrementalCheckpointing; + + /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */ + private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles; + + /** The identifier of the last completed checkpoint. */ + private long lastCompletedCheckpointId = -1L; + + /** Unique ID of this backend. */ + private UUID backendUID; + + public RocksDBKeyedStateBackend( + String operatorIdentifier, + ClassLoader userCodeClassLoader, + File instanceBasePath, + DBOptions dbOptions, + ColumnFamilyOptions columnFamilyOptions, + TaskKvStateRegistry kvStateRegistry, + TypeSerializer<K> keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + ExecutionConfig executionConfig, + boolean enableIncrementalCheckpointing + ) throws IOException { + + super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); + + this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier); + + this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; + this.rocksDBResourceGuard = new ResourceGuard(); + + // ensure that we use the right merge operator, because other code relies on this + this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions) + .setMergeOperatorName(MERGE_OPERATOR_NAME); + + this.dbOptions = Preconditions.checkNotNull(dbOptions); + + this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath); + this.instanceRocksDBPath = new File(instanceBasePath, "db"); + + if (instanceBasePath.exists()) { + // Clear the base directory when the backend is created + // in case something crashed and the backend never reached dispose() + cleanInstanceBasePath(); + } + + if (!instanceBasePath.mkdirs()) { + throw new IOException( + String.format("Could not create RocksDB data directory at %s.", instanceBasePath.getAbsolutePath())); + } + + this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1; + this.kvStateInformation = new HashMap<>(); + this.restoredKvStateMetaInfos = new HashMap<>(); + this.materializedSstFiles = new TreeMap<>(); + this.backendUID = UUID.randomUUID(); + LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); + } + + @Override + public <N> Stream<K> getKeys(String state, N namespace) { + Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(state); + if (columnInfo == null) { + return Stream.empty(); + } + + RocksIterator iterator = db.newIterator(columnInfo.f0); + iterator.seekToFirst(); + + Iterable<K> iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes); + Stream<K> targetStream = StreamSupport.stream(iterable.spliterator(), false); + return targetStream.onClose(iterator::close); + } + + /** + * Should only be called by one thread, and only after all accesses to the DB happened. + */ + @Override + public void dispose() { + super.dispose(); + + // This call will block until all clients that still acquire access to the RocksDB instance have released it, + // so that we cannot release the native resources while clients are still working with it in parallel. + rocksDBResourceGuard.close(); + + // IMPORTANT: null reference to signal potential async checkpoint workers that the db was disposed, as + // working on the disposed object results in SEGFAULTS. + if (db != null) { + + // RocksDB's native memory management requires that *all* CFs (including default) are closed before the + // DB is closed. So we start with the ones created by Flink... + for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData : + kvStateInformation.values()) { + IOUtils.closeQuietly(columnMetaData.f0); + } + + // ... close the default CF ... + IOUtils.closeQuietly(defaultColumnFamily); + + // ... and finally close the DB instance ... + IOUtils.closeQuietly(db); + + // invalidate the reference before releasing the lock so that other accesses will not cause crashes + db = null; + } + + kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); + + IOUtils.closeQuietly(dbOptions); + IOUtils.closeQuietly(columnOptions); + + cleanInstanceBasePath(); + } + + private void cleanInstanceBasePath() { + LOG.info("Deleting existing instance base directory {}.", instanceBasePath); + + try { + FileUtils.deleteDirectory(instanceBasePath); + } catch (IOException ex) { + LOG.warn("Could not delete instance base path for RocksDB: " + instanceBasePath, ex); + } + } + + public int getKeyGroupPrefixBytes() { + return keyGroupPrefixBytes; + } + + /** + * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and + * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always + * be called by the same thread. + * + * @param checkpointId The Id of the checkpoint. + * @param timestamp The timestamp of the checkpoint. + * @param streamFactory The factory that we can use for writing our state to streams. + * @param checkpointOptions Options for how to perform this checkpoint. + * @return Future to the state handle of the snapshot data. + * @throws Exception + */ + @Override + public RunnableFuture<KeyedStateHandle> snapshot( + final long checkpointId, + final long timestamp, + final CheckpointStreamFactory streamFactory, + CheckpointOptions checkpointOptions) throws Exception { + + if (checkpointOptions.getCheckpointType() != CheckpointType.SAVEPOINT && + enableIncrementalCheckpointing) { + return snapshotIncrementally(checkpointId, timestamp, streamFactory); + } else { + return snapshotFully(checkpointId, timestamp, streamFactory); + } + } + + private RunnableFuture<KeyedStateHandle> snapshotIncrementally( + final long checkpointId, + final long checkpointTimestamp, + final CheckpointStreamFactory checkpointStreamFactory) throws Exception { + + if (db == null) { + throw new IOException("RocksDB closed."); + } + + if (kvStateInformation.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", + checkpointTimestamp); + } + return DoneFuture.nullValue(); + } + + final RocksDBIncrementalSnapshotOperation<K> snapshotOperation = + new RocksDBIncrementalSnapshotOperation<>( + this, + checkpointStreamFactory, + checkpointId, + checkpointTimestamp); + + try { + snapshotOperation.takeSnapshot(); + } catch (Exception e) { + snapshotOperation.stop(); + snapshotOperation.releaseResources(true); + throw e; + } + + return new FutureTask<KeyedStateHandle>( + new Callable<KeyedStateHandle>() { + @Override + public KeyedStateHandle call() throws Exception { + return snapshotOperation.materializeSnapshot(); + } + } + ) { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + snapshotOperation.stop(); + return super.cancel(mayInterruptIfRunning); + } + + @Override + protected void done() { + snapshotOperation.releaseResources(isCancelled()); + } + }; + } + + private RunnableFuture<KeyedStateHandle> snapshotFully( + final long checkpointId, + final long timestamp, + final CheckpointStreamFactory streamFactory) throws Exception { + + long startTime = System.currentTimeMillis(); + final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); + + final RocksDBFullSnapshotOperation<K> snapshotOperation; + + if (kvStateInformation.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", timestamp); + } + + return DoneFuture.nullValue(); + } + + snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry); + snapshotOperation.takeDBSnapShot(checkpointId, timestamp); + + // implementation of the async IO operation, based on FutureTask + AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable = + new AbstractAsyncCallableWithResources<KeyedStateHandle>() { + + @Override + protected void acquireResources() throws Exception { + cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry); + snapshotOperation.openCheckpointStream(); + } + + @Override + protected void releaseResources() throws Exception { + closeLocalRegistry(); + releaseSnapshotOperationResources(); + } + + private void releaseSnapshotOperationResources() { + // hold the db lock while operation on the db to guard us against async db disposal + snapshotOperation.releaseSnapshotResources(); + } + + @Override + protected void stopOperation() throws Exception { + closeLocalRegistry(); + } + + private void closeLocalRegistry() { + if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) { + try { + snapshotCloseableRegistry.close(); + } catch (Exception ex) { + LOG.warn("Error closing local registry", ex); + } + } + } + + @Override + public KeyGroupsStateHandle performOperation() throws Exception { + long startTime = System.currentTimeMillis(); + + if (isStopped()) { + throw new IOException("RocksDB closed."); + } + + snapshotOperation.writeDBSnapshot(); + + LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", + streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); + + return snapshotOperation.getSnapshotResultStateHandle(); + } + }; + + LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.", + streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); + + return AsyncStoppableTaskWithCallback.from(ioCallable); + } + + /** + * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend. + */ + static final class RocksDBFullSnapshotOperation<K> { + + static final int FIRST_BIT_IN_BYTE_MASK = 0x80; + static final int END_OF_KEY_GROUP_MARK = 0xFFFF; + + private final RocksDBKeyedStateBackend<K> stateBackend; + private final KeyGroupRangeOffsets keyGroupRangeOffsets; + private final CheckpointStreamFactory checkpointStreamFactory; + private final CloseableRegistry snapshotCloseableRegistry; + private final ResourceGuard.Lease dbLease; + + private long checkpointId; + private long checkpointTimeStamp; + + private Snapshot snapshot; + private ReadOptions readOptions; + private List<Tuple2<RocksIterator, Integer>> kvStateIterators; + + private CheckpointStreamFactory.CheckpointStateOutputStream outStream; + private DataOutputView outputView; + + RocksDBFullSnapshotOperation( + RocksDBKeyedStateBackend<K> stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + CloseableRegistry registry) throws IOException { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange); + this.snapshotCloseableRegistry = registry; + this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); + } + + /** + * 1) Create a snapshot object from RocksDB. + * + * @param checkpointId id of the checkpoint for which we take the snapshot + * @param checkpointTimeStamp timestamp of the checkpoint for which we take the snapshot + */ + public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) { + Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!"); + this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size()); + this.checkpointId = checkpointId; + this.checkpointTimeStamp = checkpointTimeStamp; + this.snapshot = stateBackend.db.getSnapshot(); + } + + /** + * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write. + * + * @throws Exception + */ + public void openCheckpointStream() throws Exception { + Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set."); + outStream = checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); + snapshotCloseableRegistry.registerCloseable(outStream); + outputView = new DataOutputViewStreamWrapper(outStream); + } + + /** + * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1). + * + * @throws IOException + */ + public void writeDBSnapshot() throws IOException, InterruptedException { + + if (null == snapshot) { + throw new IOException("No snapshot available. Might be released due to cancellation."); + } + + Preconditions.checkNotNull(outStream, "No output stream to write snapshot."); + writeKVStateMetaData(); + writeKVStateData(); + } + + /** + * 4) Returns a state handle to the snapshot after the snapshot procedure is completed and null before. + * + * @return state handle to the completed snapshot + */ + public KeyGroupsStateHandle getSnapshotResultStateHandle() throws IOException { + + if (snapshotCloseableRegistry.unregisterCloseable(outStream)) { + + StreamStateHandle stateHandle = outStream.closeAndGetHandle(); + outStream = null; + + if (stateHandle != null) { + return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); + } + } + return null; + } + + /** + * 5) Release the snapshot object for RocksDB and clean up. + */ + public void releaseSnapshotResources() { + + outStream = null; + + if (null != kvStateIterators) { + for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) { + IOUtils.closeQuietly(kvStateIterator.f0); + } + kvStateIterators = null; + } + + if (null != snapshot) { + if (null != stateBackend.db) { + stateBackend.db.releaseSnapshot(snapshot); + } + IOUtils.closeQuietly(snapshot); + snapshot = null; + } + + if (null != readOptions) { + IOUtils.closeQuietly(readOptions); + readOptions = null; + } + + this.dbLease.close(); + } + + private void writeKVStateMetaData() throws IOException { + + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots = + new ArrayList<>(stateBackend.kvStateInformation.size()); + + int kvStateId = 0; + for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> column : + stateBackend.kvStateInformation.entrySet()) { + + metaInfoSnapshots.add(column.getValue().f1.snapshot()); + + //retrieve iterator for this k/v states + readOptions = new ReadOptions(); + readOptions.setSnapshot(snapshot); + + kvStateIterators.add( + new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId)); + + ++kvStateId; + } + + KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>( + stateBackend.getKeySerializer(), + metaInfoSnapshots, + !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator)); + + serializationProxy.write(outputView); + } + + private void writeKVStateData() throws IOException, InterruptedException { + + byte[] previousKey = null; + byte[] previousValue = null; + OutputStream kgOutStream = null; + DataOutputView kgOutView = null; + + try { + // Here we transfer ownership of RocksIterators to the RocksDBMergeIterator + try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator( + kvStateIterators, stateBackend.keyGroupPrefixBytes)) { + + // handover complete, null out to prevent double close + kvStateIterators = null; + + //preamble: setup with first key-group as our lookahead + if (mergeIterator.isValid()) { + //begin first key-group by recording the offset + keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); + //write the k/v-state id as metadata + kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream); + kgOutView = new DataOutputViewStreamWrapper(kgOutStream); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutView.writeShort(mergeIterator.kvStateId()); + previousKey = mergeIterator.key(); + previousValue = mergeIterator.value(); + mergeIterator.next(); + } + + //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. + while (mergeIterator.isValid()) { + + assert (!hasMetaDataFollowsFlag(previousKey)); + + //set signal in first key byte that meta data will follow in the stream after this k/v pair + if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) { + + //be cooperative and check for interruption from time to time in the hot loop + checkInterrupted(); + + setMetaDataFollowsFlagInKey(previousKey); + } + + writeKeyValuePair(previousKey, previousValue, kgOutView); + + //write meta data if we have to + if (mergeIterator.isNewKeyGroup()) { + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutView.writeShort(END_OF_KEY_GROUP_MARK); + // this will just close the outer stream + kgOutStream.close(); + //begin new key-group + keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); + //write the kev-state + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream); + kgOutView = new DataOutputViewStreamWrapper(kgOutStream); + kgOutView.writeShort(mergeIterator.kvStateId()); + } else if (mergeIterator.isNewKeyValueState()) { + //write the k/v-state + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutView.writeShort(mergeIterator.kvStateId()); + } + + //request next k/v pair + previousKey = mergeIterator.key(); + previousValue = mergeIterator.value(); + mergeIterator.next(); + } + } + + //epilogue: write last key-group + if (previousKey != null) { + assert (!hasMetaDataFollowsFlag(previousKey)); + setMetaDataFollowsFlagInKey(previousKey); + writeKeyValuePair(previousKey, previousValue, kgOutView); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutView.writeShort(END_OF_KEY_GROUP_MARK); + // this will just close the outer stream + kgOutStream.close(); + kgOutStream = null; + } + + } finally { + // this will just close the outer stream + IOUtils.closeQuietly(kgOutStream); + } + } + + private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException { + BytePrimitiveArraySerializer.INSTANCE.serialize(key, out); + BytePrimitiveArraySerializer.INSTANCE.serialize(value, out); + } + + static void setMetaDataFollowsFlagInKey(byte[] key) { + key[0] |= FIRST_BIT_IN_BYTE_MASK; + } + + static void clearMetaDataFollowsFlag(byte[] key) { + key[0] &= (~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); + } + + static boolean hasMetaDataFollowsFlag(byte[] key) { + return 0 != (key[0] & RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); + } + + private static void checkInterrupted() throws InterruptedException { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("RocksDB snapshot interrupted."); + } + } + } + + private static final class RocksDBIncrementalSnapshotOperation<K> { + + /** The backend which we snapshot. */ + private final RocksDBKeyedStateBackend<K> stateBackend; + + /** Stream factory that creates the outpus streams to DFS. */ + private final CheckpointStreamFactory checkpointStreamFactory; + + /** Id for the current checkpoint. */ + private final long checkpointId; + + /** Timestamp for the current checkpoint. */ + private final long checkpointTimestamp; + + /** All sst files that were part of the last previously completed checkpoint. */ + private Set<StateHandleID> baseSstFiles; + + /** The state meta data. */ + private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + // Registry for all opened i/o streams + private final CloseableRegistry closeableRegistry = new CloseableRegistry(); + + // new sst files since the last completed checkpoint + private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); + + // This lease protects from concurrent disposal of the native rocksdb instance. + private final ResourceGuard.Lease dbLease; + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend<K> stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) throws IOException { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + FSDataInputStream inputStream = null; + CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + try { + final byte[] buffer = new byte[8 * 1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + closeableRegistry.registerCloseable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(CheckpointedStateScope.SHARED); + closeableRegistry.registerCloseable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + StreamStateHandle result = null; + if (closeableRegistry.unregisterCloseable(outputStream)) { + result = outputStream.closeAndGetHandle(); + outputStream = null; + } + return result; + + } finally { + if (inputStream != null && closeableRegistry.unregisterCloseable(inputStream)) { + inputStream.close(); + } + + if (outputStream != null && closeableRegistry.unregisterCloseable(outputStream)) { + outputStream.close(); + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); + closeableRegistry.registerCloseable(outputStream); + + //no need for compression scheme support because sst-files are already compressed + KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>( + stateBackend.keySerializer, + stateMetaInfoSnapshots, + false); + + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + + serializationProxy.write(out); + + StreamStateHandle result = null; + if (closeableRegistry.unregisterCloseable(outputStream)) { + result = outputStream.closeAndGetHandle(); + outputStream = null; + } + return result; + } finally { + if (outputStream != null) { + if (closeableRegistry.unregisterCloseable(outputStream)) { + outputStream.close(); + } + } + } + } + + void takeSnapshot() throws Exception { + + final long lastCompletedCheckpoint; + + // use the last completed checkpoint as the comparison base. + synchronized (stateBackend.materializedSstFiles) { + lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId; + baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint); + } + + LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " + + "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles); + + // save meta data + for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry + : stateBackend.kvStateInformation.entrySet()) { + stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot()); + } + + // save state data + backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId); + + LOG.trace("Local RocksDB checkpoint goes to backup path {}.", backupPath); + + backupFileSystem = backupPath.getFileSystem(); + if (backupFileSystem.exists(backupPath)) { + throw new IllegalStateException("Unexpected existence of the backup directory."); + } + + // create hard links of living files in the checkpoint path + Checkpoint checkpoint = Checkpoint.create(stateBackend.db); + checkpoint.createCheckpoint(backupPath.getPath()); + } + + KeyedStateHandle materializeSnapshot() throws Exception { + + stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry); + + // write meta data + metaStateHandle = materializeMetaData(); + + // write state data + Preconditions.checkState(backupFileSystem.exists(backupPath)); + + FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath); + if (fileStatuses != null) { + for (FileStatus fileStatus : fileStatuses) { + final Path filePath = fileStatus.getPath(); + final String fileName = filePath.getName(); + final StateHandleID stateHandleID = new StateHandleID(fileName); + + if (fileName.endsWith(SST_FILE_SUFFIX)) { + final boolean existsAlready = + baseSstFiles != null && baseSstFiles.contains(stateHandleID); + + if (existsAlready) { + // we introduce a placeholder state handle, that is replaced with the + // original from the shared state registry (created from a previous checkpoint) + sstFiles.put( + stateHandleID, + new PlaceholderStreamStateHandle()); + } else { + sstFiles.put(stateHandleID, materializeStateData(filePath)); + } + } else { + StreamStateHandle fileHandle = materializeStateData(filePath); + miscFiles.put(stateHandleID, fileHandle); + } + } + } + + synchronized (stateBackend.materializedSstFiles) { + stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet()); + } + + return new IncrementalKeyedStateHandle( + stateBackend.backendUID, + stateBackend.keyGroupRange, + checkpointId, + sstFiles, + miscFiles, + metaStateHandle); + } + + void stop() { + + if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { + try { + closeableRegistry.close(); + } catch (IOException e) { + LOG.warn("Could not properly close io streams.", e); + } + } + } + + void releaseResources(boolean canceled) { + + dbLease.close(); + + if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { + try { + closeableRegistry.close(); + } catch (IOException e) { + LOG.warn("Exception on closing registry.", e); + } + } + + if (backupPath != null) { + try { + if (backupFileSystem.exists(backupPath)) { + + LOG.trace("Deleting local RocksDB backup path {}.", backupPath); + backupFileSystem.delete(backupPath, true); + } + } catch (Exception e) { + LOG.warn("Could not properly delete the checkpoint directory.", e); + } + } + + if (canceled) { + Collection<StateObject> statesToDiscard = + new ArrayList<>(1 + miscFiles.size() + sstFiles.size()); + + statesToDiscard.add(metaStateHandle); + statesToDiscard.addAll(miscFiles.values()); + statesToDiscard.addAll(sstFiles.values()); + + try { + StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard); + } catch (Exception e) { + LOG.warn("Could not properly discard states.", e); + } + } + } + } + + @Override + public void restore(Collection<KeyedStateHandle> restoreState) throws Exception { + LOG.info("Initializing RocksDB keyed state backend from snapshot."); + + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring snapshot from state handles: {}.", restoreState); + } + + // clear all meta data + kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); + + try { + if (restoreState == null || restoreState.isEmpty()) { + createDB(); + } else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) { + RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this); + restoreOperation.restore(restoreState); + } else { + RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this); + restoreOperation.doRestore(restoreState); + } + } catch (Exception ex) { + dispose(); + throw ex; + } + } + + @Override + public void notifyCheckpointComplete(long completedCheckpointId) { + synchronized (materializedSstFiles) { + if (completedCheckpointId < lastCompletedCheckpointId) { + return; + } + + materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId); + + lastCompletedCheckpointId = completedCheckpointId; + } + } + + private void createDB() throws IOException { + List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); + this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles); + this.defaultColumnFamily = columnFamilyHandles.get(0); + } + + private RocksDB openDB( + String path, + List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, + List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException { + + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + new ArrayList<>(1 + stateColumnFamilyDescriptors.size()); + + columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors); + + // we add the required descriptor for the default CF in last position. + columnFamilyDescriptors.add(new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions)); + + RocksDB dbRef; + + try { + dbRef = RocksDB.open( + Preconditions.checkNotNull(dbOptions), + Preconditions.checkNotNull(path), + columnFamilyDescriptors, + stateColumnFamilyHandles); + } catch (RocksDBException e) { + throw new IOException("Error while opening RocksDB instance.", e); + } + + // requested + default CF + Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), + "Not all requested column family handles have been created"); + + return dbRef; + } + + /** + * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot. + */ + static final class RocksDBFullRestoreOperation<K> { + + private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend; + + /** Current key-groups state handle from which we restore key-groups. */ + private KeyGroupsStateHandle currentKeyGroupsStateHandle; + /** Current input stream we obtained from currentKeyGroupsStateHandle. */ + private FSDataInputStream currentStateHandleInStream; + /** Current data input view that wraps currentStateHandleInStream. */ + private DataInputView currentStateHandleInView; + /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */ + private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies; + /** The compression decorator that was used for writing the state, as determined by the meta data. */ + private StreamCompressionDecorator keygroupStreamCompressionDecorator; + + /** + * Creates a restore operation object for the given state backend instance. + * + * @param rocksDBKeyedStateBackend the state backend into which we restore + */ + public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) { + this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend); + } + + /** + * Restores all key-groups data that is referenced by the passed state handles. + * + * @param keyedStateHandles List of all key groups state handles that shall be restored. + */ + public void doRestore(Collection<KeyedStateHandle> keyedStateHandles) + throws IOException, StateMigrationException, RocksDBException { + + rocksDBKeyedStateBackend.createDB(); + + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + if (keyedStateHandle != null) { + + if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + KeyGroupsStateHandle.class + + ", but found: " + keyedStateHandle.getClass()); + } + this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; + restoreKeyGroupsInStateHandle(); + } + } + } + + /** + * Restore one key groups state handle. + */ + private void restoreKeyGroupsInStateHandle() + throws IOException, StateMigrationException, RocksDBException { + try { + currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream(); + rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream); + currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream); + restoreKVStateMetaData(); + restoreKVStateData(); + } finally { + if (currentStateHandleInStream != null + && rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) { + IOUtils.closeQuietly(currentStateHandleInStream); + } + } + } + + /** + * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws RocksDBException + */ + private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { + + KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); + + serializationProxy.read(currentStateHandleInView); + + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (CompatibilityUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + UnloadableDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + rocksDBKeyedStateBackend.keySerializer) + .isRequiresMigration()) { + + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); + } + + this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? + SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE; + + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = + serializationProxy.getStateMetaInfoSnapshots(); + currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); + //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size()); + + for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) { + + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn = + rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName()); + + if (registeredColumn == null) { + byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); + + ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( + nameBytes, + rocksDBKeyedStateBackend.columnOptions); + + RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + restoredMetaInfo.getStateType(), + restoredMetaInfo.getName(), + restoredMetaInfo.getNamespaceSerializer(), + restoredMetaInfo.getStateSerializer()); + + rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); + + ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor); + + registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo); + rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn); + + } else { + // TODO with eager state registration in place, check here for serializer migration strategies + } + currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0); + } + } + + /** + * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle. + * + * @throws IOException + * @throws RocksDBException + */ + private void restoreKVStateData() throws IOException, RocksDBException { + //for all key-groups in the current state handle... + for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { + int keyGroup = keyGroupOffset.f0; + + // Check that restored key groups all belong to the backend + Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), + "The key group must belong to the backend"); + + long offset = keyGroupOffset.f1; + //not empty key-group? + if (0L != offset) { + currentStateHandleInStream.seek(offset); + try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) { + DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + int kvStateId = compressedKgInputView.readShort(); + ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + //insert all k/v pairs into DB + boolean keyGroupHasMoreKeys = true; + while (keyGroupHasMoreKeys) { + byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); + byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); + if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) { + //clear the signal bit in the key to make it ready for insertion again + RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key); + rocksDBKeyedStateBackend.db.put(handle, key, value); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK + & compressedKgInputView.readShort(); + if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) { + keyGroupHasMoreKeys = false; + } else { + handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + } + } else { + rocksDBKeyedStateBackend.db.put(handle, key, value); + } + } + } + } + } + } + } + + private static class RocksDBIncrementalRestoreOperation<T> { + + private final RocksDBKeyedStateBackend<T> stateBackend; + + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) { + this.stateBackend = stateBackend; + } + + private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData( + StreamStateHandle metaStateHandle) throws Exception { + + FSDataInputStream inputStream = null; + + try { + inputStream = metaStateHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerCloseable(inputStream); + + KeyedBackendSerializationProxy<T> serializationProxy = + new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader); + DataInputView in = new DataInputViewStreamWrapper(inputStream); + serializationProxy.read(in); + + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (CompatibilityUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + UnloadableDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + stateBackend.keySerializer) + .isRequiresMigration()) { + + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); + } + + return serializationProxy.getStateMetaInfoSnapshots(); + } finally { + if (inputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) { + inputStream.close(); + } + } + } + + private void readStateData( + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { + + FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); + + FSDataInputStream inputStream = null; + FSDataOutputStream outputStream = null; + + try { + inputStream = remoteFileHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerCloseable(inputStream); + + outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); + stateBackend.cancelStreamRegistry.registerCloseable(outputStream); + + byte[] buffer = new byte[8 * 1024]; + while (true) { + int numBytes = inputStream.read(buffer); + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + } finally { + if (inputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) { + inputStream.close(); + } + + if (outputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) { + outputStream.close(); + } + } + } + + private void restoreInstance( + IncrementalKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { + + // read state data + Path restoreInstancePath = new Path( + stateBackend.instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); + + try { + final Map<StateHandleID, StreamStateHandle> sstFiles = + restoreStateHandle.getSharedState(); + final Map<StateHandleID, StreamStateHandle> miscFiles = + restoreStateHandle.getPrivateState(); + + readAllStateData(sstFiles, restoreInstancePath); + readAllStateData(miscFiles, restoreInstancePath); + + // read meta data + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = + readMetaData(restoreStateHandle.getMetaStateHandle()); + + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + new ArrayList<>(1 + stateMetaInfoSnapshots.size()); + + for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) { + + ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( + stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), + stateBackend.columnOptions); + + columnFamilyDescriptors.add(columnFamilyDescriptor); + stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot); + } + + if (hasExtraKeys) { + + List<ColumnFamilyHandle> columnFamilyHandles = + new ArrayList<>(1 + columnFamilyDescriptors.size()); + + try (RocksDB restoreDb = stateBackend.openDB( + restoreInstancePath.getPath(), + columnFamilyDescriptors, + columnFamilyHandles)) { + + try { + // iterating only the requested descriptors automatically skips the default column family handle + for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { + ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); + ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i); + RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i); + + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry = + stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName()); + + if (null == registeredStateMetaInfoEntry) { + + RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + stateMetaInfoSnapshot.getStateType(), + stateMetaInfoSnapshot.getName(), + stateMetaInfoSnapshot.getNamespaceSerializer(), + stateMetaInfoSnapshot.getStateSerializer()); + + registeredStateMetaInfoEntry = + new Tuple2<>( + stateBackend.db.createColumnFamily(columnFamilyDescriptor), + stateMetaInfo); + + stateBackend.kvStateInformation.put( + stateMetaInfoSnapshot.getName(), + registeredStateMetaInfoEntry); + } + + ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0; + + try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) { + + int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); + byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); + } + + iterator.seek(startKeyGroupPrefixBytes); + + while (iterator.isValid()) { + + int keyGroup = 0; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; + } + + if (stateBackend.keyGroupRange.contains(keyGroup)) { + stateBackend.db.put(targetColumnFamilyHandle, + iterator.key(), iterator.value()); + } + + iterator.next(); + } + } // releases native iterator resources + } + } finally { + //release native tmp db column family resources + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + IOUtils.closeQuietly(columnFamilyHandle); + } + } + } // releases native tmp db resources + } else { + // pick up again the old backend id, so the we can reference existing state + stateBackend.backendUID = restoreStateHandle.getBackendIdentifier(); + + LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.", + stateBackend.operatorIdentifier, stateBackend.backendUID); + + // create hard links in the instance directory + if (!stateBackend.instanceRocksDBPath.mkdirs()) { + throw new IOException("Could not create RocksDB data directory."); + } + + createFileHardLinksInRestorePath(sstFiles, restoreInstancePath); + createFileHardLinksInRestorePath(miscFiles, restoreInstancePath); + + List<ColumnFamilyHandle> columnFamilyHandles = + new ArrayList<>(1 + columnFamilyDescriptors.size()); + + stateBackend.db = stateBackend.openDB( + stateBackend.instanceRocksDBPath.getAbsolutePath(), + columnFamilyDescriptors, columnFamilyHandles); + + // extract and store the default column family which is located at the last index + stateBackend.defaultColumnFamily = columnFamilyHandles.remove(columnFamilyHandles.size() - 1); + + for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { + RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i); + + ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); + RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + stateMetaInfoSnapshot.getStateType(), + stateMetaInfoSnapshot.getName(), + stateMetaInfoSnapshot.getNamespaceSerializer(), + stateMetaInfoSnapshot.getStateSerializer()); + + stateBackend.kvStateInformation.put( + stateMetaInfoSnapshot.getName(), + new Tuple2<>(columnFamilyHandle, stateMetaInfo)); + } + + // use the restore sst files as the base for succeeding checkpoints + synchronized (stateBackend.materializedSstFiles) { + stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet()); + } + + stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId(); + } + } finally { + FileSystem restoreFileSystem = restoreInstancePath.getFileSystem(); + if (restoreFileSystem.exists(restoreInstancePath)) { + restoreFileSystem.delete(restoreInstancePath, true); + } + } + } + + private void readAllStateData( + Map<StateHandleID, StreamStateHandle> stateHandleMap, + Path restoreInstancePath) throws IOException { + + for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + readStateData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle); + } + } + + private void createFileHardLinksInRestorePath( + Map<StateHandleID, StreamStateHandle> stateHandleMap, + Path restoreInstancePath) throws IOException { + + for (StateHandleID stateHandleID : stateHandleMap.keySet()) { + String newSstFileName = stateHandleID.toString(); + File restoreFile = new File(restoreInstancePath.getPath(), newSstFileName); + File targetFile = new File(stateBackend.instanceRocksDBPath, newSstFileName); + Files.createLink(targetFile.toPath(), restoreFile.toPath()); + } + } + + void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { + + boolean hasExtraKeys = (restoreStateHandles.size() > 1 || + !Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(), stateBackend.keyGroupRange)); + + if (hasExtraKeys) { + stateBackend.createDB(); + } + + for (KeyedStateHandle rawStateHandle : restoreStateHandles) { + + if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected " + IncrementalKeyedStateHandle.class + + ", but found " + rawStateHandle.getClass()); + } + + IncrementalKeyedStateHandle keyedStateHandle = (IncrementalKeyedStateHandle) rawStateHandle; + + restoreInstance(keyedStateHandle, hasExtraKeys); + } + } + } + + // ------------------------------------------------------------------------ + // State factories + // ------------------------------------------------------------------------ + + /** + * Creates a column family handle for use with a k/v state. When restoring from a snapshot + * we don't restore the individual k/v states, just the global RocksDB database and the + * list of column families. When a k/v state is first requested we check here whether we + * already have a column family for that and return it or create a new one if it doesn't exist. + * + * <p>This also checks whether the {@link StateDescriptor} for a state matches the one + * that we checkpointed, i.e. is already in the map of column families. + */ + @SuppressWarnings("rawtypes, unchecked") + protected <N, S> ColumnFamilyHandle getColumnFamily( + StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException { + + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo = + kvStateInformation.get(descriptor.getName()); + + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + descriptor.getType(), + descriptor.getName(), + namespaceSerializer, + descriptor.getSerializer()); + + if (stateInfo != null) { + // TODO with eager registration in place, these checks should be moved to restore() + + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo = + (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName()); + + Preconditions.checkState( + Objects.equals(newMetaInfo.getName(), restoredMetaInfo.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfo.getName() + "], " + + "registered with [" + newMetaInfo.getName() + "]."); + + if (!Objects.equals(newMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType() == restoredMetaInfo.getStateType(), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); + } + + // check compatibility results to determine if state migration is required + CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfo.getNamespaceSerializer(), + null, + restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), + newMetaInfo.getNamespaceSerializer()); + + CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfo.getStateSerializer(), + UnloadableDummyTypeSerializer.class, + restoredMetaInfo.getStateSerializerConfigSnapshot(), + newMetaInfo.getStateSerializer()); + + if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) { + // TODO state migration currently isn't possible. + throw new StateMigrationException("State migration isn't supported, yet."); + } else { + stateInfo.f1 = newMetaInfo; + return stateInfo.f0; + } + } + + byte[] nameBytes = descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); + Preconditions.checkState(!Arrays.equals(DEFAULT_COLUMN_FAMILY_NAME_BYTES, nameBytes), + "The chosen state name 'default' collides with the name of the default column family!"); + + ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions); + + final ColumnFamilyHandle columnFamily; + + try { + columnFamily = db.createColumnFamily(columnDescriptor); + } catch (RocksDBException e) { + throw new IOException("Error creating ColumnFamilyHandle.", e); + } + + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tuple = + new Tuple2<>(columnFamily, newMetaInfo); + Map rawAccess = kvStateInformation; + rawAccess.put(descriptor.getName(), tuple); + return columnFamily; + } + + @Override + protected <N, T> InternalValueState<N, T> createValueState( + TypeSerializer<N> namespaceSerializer, + ValueStateDescriptor<T> stateDesc) throws Exception { + + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + + return new RocksDBValueState<>(columnFamily, namespaceSerializer, stateDesc, this); + } + + @Override + protected <N, T> InternalListState<N, T> createListState( + TypeSerializer<N> namespaceSerializer, + ListStateDescriptor<T> stateDesc) throws Exception { + + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + + return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this); + } + + @Override + protected <N, T> InternalReducingState<N, T> createReducingState( + TypeSerializer<N> namespaceSerializer, + ReducingStateDescriptor<T> stateDesc) throws Exception { + + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + + return new RocksDBReducingState<>(columnFamily, namespaceSerializer, stateDesc, this); + } + + @Override + protected <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState( + TypeSerializer<N> namespaceSerializer, + AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception { + + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + return new RocksDBAggregatingState<>(columnFamily, namespaceSerializer, stateDesc, this); + } + + @Override + protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState( + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { + + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + + return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this); + } + + @Override + protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState( + TypeSerializer<N> namespaceSerializer, + MapStateDescriptor<UK, UV> stateDesc) throws Exception { + + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + + return new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this); + } + + /** + * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator. + * Used by #MergeIterator. + */ + static final class MergeIterator implements AutoCloseable { + + /** + * @param iterator The #RocksIterator to wrap . + * @param kvStateId Id of the K/V state to which this iterator belongs. + */ + MergeIterator(RocksIterator iterator, int kvStateId) { + this.iterator = Preconditions.checkNotNull(iterator); + this.currentKey = iterator.key(); + this.kvStateId = kvStateId; + } + + private final RocksIterator iterator; + private byte[] currentKey; + private final int kvStateId; + + public byte[] getCurrentKey() { + return currentKey; + } + + public void setCurrentKey(byte[] currentKey) { + this.currentKey = currentKey; + } + + public RocksIterator getIterator() { + return iterator; + } + + public int getKvStateId() { + return kvStateId; + } + + @Override + public void close() { + IOUtils.closeQuietly(iterator); + } + } + + /** + * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups. + * The resulting iteration sequence is ordered by (key-group, kv-state). + */ + static final class RocksDBMergeIterator implements AutoCloseable { + + private final PriorityQueue<MergeIterator> heap; + private final int keyGroupPrefixByteCount; + private boolean newKeyGroup; + private boolean newKVState; + private boolean valid; + + private MergeIterator currentSubIterator; + + private static final List<Comparator<MergeIterator>> COMPARATORS; + + static { + int maxBytes = 4; + COMPARATORS = new ArrayList<>(maxBytes); + for (int i = 0; i < maxBytes; ++i) { + final int currentBytes = i; + COMPARATORS.add(new Comparator<MergeIterator>() { + @Override + public int compare(MergeIterator o1, MergeIterator o2) { + int arrayCmpRes = compareKeyGroupsForByteArrays( + o1.currentKey, o2.currentKey, currentBytes); + return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes; + } + }); + } + } + + RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) { + Preconditions.checkNotNull(kvStateIterators); + this.keyGroupPrefixByteCount = keyGroupPrefixByteCount; + + Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount); + + if (kvStateIterators.size() > 0) { + PriorityQueue<MergeIterator> iteratorPriorityQueue = + new PriorityQueue<>(kvStateIterators.size(), iteratorComparator); + + for (Tuple2<RocksIterator, Integer> rocksIteratorWithKVStateId : kvStateIterators) { + final RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0; + rocksIterator.seekToFirst(); + if (rocksIterator.isValid()) { + iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1)); + } else { + IOUtils.closeQuietly(rocksIterator); + } + } + + kvStateIterators.clear(); + + this.heap = iteratorPriorityQueue; + this.valid = !heap.isEmpty(); + this.currentSubIterator = heap.poll(); + } else { + // creating a PriorityQueue of size 0 results in an exception. + this.heap = null; + this.valid = false; + } + + this.newKeyGroup = true; + this.newKVState = true; + } + + /** + * Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after + * calls to {@link #next()}. + */ + public void next() { + newKeyGroup = false; + newKVState = false; + + final RocksIterator rocksIterator = currentSubIterator.getIterator(); + rocksIterator.next(); + + byte[] oldKey = currentSubIterator.getCurrentKey(); + if (rocksIterator.isValid()) { + currentSubIterator.currentKey = rocksIterator.key(); + + if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) { + heap.offer(currentSubIterator); + currentSubIterator = heap.poll(); + newKVState = currentSubIterator.getIterator() != rocksIterator; + detectNewKeyGroup(oldKey); + } + } else { + IOUtils.closeQuietly(rocksIterator); + + if (heap.isEmpty()) { + currentSubIterator = null; + valid = false; + } else { + currentSubIterator = heap.poll(); + newKVState = true; + detectNewKeyGroup(oldKey); + } + } + } + + private boolean isDifferentKeyGroup(byte[] a, byte[] b) { + return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount); + } + + private void detectNewKeyGroup(byte[] oldKey) { + if (isDifferentKeyGroup(oldKey, currentSubIterator.currentKey)) { + newKeyGroup = true; + } + } + + /** + * @return key-group for the current key + */ + public int keyGroup() { + int result = 0; + //big endian decode + for (int i = 0; i < keyGroupPrefixByteCount; ++i) { + result <<= 8; + result |= (currentSubIterator.currentKey[i] & 0xFF); + } + return result; + } + + public byte[] key() { + return currentSubIterator.getCurrentKey(); + } + + public byte[] value() { + return currentSubIterator.getIterator().value(); + } + + /** + * @return Id of K/V state to which the current key belongs. + */ + public int kvStateId() { + return currentSubIterator.getKvStateId(); + } + + /** + * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor. + * @return true iff the current key belong to a different k/v-state than it's predecessor. + */ + public boolean isNewKeyValueState() { + return newKVState; + } + + /** + * Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor. + * @return true iff the current key belong to a different key-group than it's predecessor. + */ + public boolean isNewKeyGroup() { + return newKeyGroup; + } + + /** + * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as + * {@link #next()} should only be called if valid returned true. Should be checked after each call to + * {@link #next()} before accessing iterator state. + * @return True iff this iterator is valid. + */ + public boolean isValid() { + return valid; + } + + private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) { + for (int i = 0; i < len; ++i) { + int diff = (a[i] & 0xFF) - (b[i] & 0xFF); + if (diff != 0) { + return diff; + } + } + return 0; + } + + @Override + public void close() { + IOUtils.closeQuietly(currentSubIterator); + currentSubIterator = null; + + IOUtils.closeAllQuietly(heap); + heap.clear(); + } + } + + /** + * Only visible for testing, DO NOT USE. + */ + @VisibleForTesting + public File getInstanceBasePath() { + return instanceBasePath; + } + + @Override + public boolean supportsAsynchronousSnapshots() { + return true; + } + + @VisibleForTesting + @SuppressWarnings("unchecked") + @Override + public int numStateEntries() { + int count = 0; + + for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) { + try (RocksIterator rocksIterator = db.newIterator(column.f0)) { + rocksIterator.seekToFirst(); + + while (rocksIterator.isValid()) { + count++; + rocksIterator.next(); + } + } + } + + return count; + } + + private static class RocksIteratorWrapper<K> implements Iterator<K> { + private final RocksIterator iterator; + private final String state; + private final TypeSerializer<K> keySerializer; + private final int keyGroupPrefixBytes; + + public RocksIteratorWrapper( + RocksIterator iterator, + String state, + TypeSerializer<K> keySerializer, + int keyGroupPrefixBytes) { + this.iterator = Preconditions.checkNotNull(iterator); + this.state = Preconditions.checkNotNull(state); + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + } + + @Override + public boolean hasNext() { + return iterator.isValid(); + } + + @Override + public K next() { + if (!hasNext()) { + throw new NoSuchElementException("Failed to access state [" + state + "]"); + } + try { + byte[] key = iterator.key(); + DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper( + new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes)); + K value = keySerializer.deserialize(dataInput); + iterator.next(); + return value; + } catch (IOException e) { + throw new FlinkRuntimeException("Failed to access state [" + state + "]", e); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java new file mode 100644 index 0000000..f0481ec --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteOptions; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * {@link ListState} implementation that stores state in RocksDB. + * + * <p>{@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <V> The type of the values in the list state. + */ +public class RocksDBListState<K, N, V> + extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, List<V>> + implements InternalListState<N, V> { + + /** Serializer for the values. */ + private final TypeSerializer<V> valueSerializer; + + /** + * We disable writes to the write-ahead-log here. We can't have these in the base class + * because JNI segfaults for some reason if they are. + */ + private final WriteOptions writeOptions; + + /** + * Separator of StringAppendTestOperator in RocksDB. + */ + private static final byte DELIMITER = ','; + + /** + * Creates a new {@code RocksDBListState}. + * + * @param namespaceSerializer The serializer for the namespace. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + */ + public RocksDBListState(ColumnFamilyHandle columnFamily, + TypeSerializer<N> namespaceSerializer, + ListStateDescriptor<V> stateDesc, + RocksDBKeyedStateBackend<K> backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + this.valueSerializer = stateDesc.getElementSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + @Override + public Iterable<V> get() { + try { + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); + byte[] valueBytes = backend.db.get(columnFamily, key); + + if (valueBytes == null) { + return null; + } + + ByteArrayInputStream bais = new ByteArrayInputStream(valueBytes); + DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); + + List<V> result = new ArrayList<>(); + while (in.available() > 0) { + result.add(valueSerializer.deserialize(in)); + if (in.available() > 0) { + in.readByte(); + } + } + return result; + } catch (IOException | RocksDBException e) { + throw new RuntimeException("Error while retrieving data from RocksDB", e); + } + } + + @Override + public void add(V value) throws IOException { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + + try { + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); + keySerializationStream.reset(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); + valueSerializer.serialize(value, out); + backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); + } catch (Exception e) { + throw new RuntimeException("Error while adding data to RocksDB", e); + } + } + + @Override + public void mergeNamespaces(N target, Collection<N> sources) throws Exception { + if (sources == null || sources.isEmpty()) { + return; + } + + // cache key and namespace + final K key = backend.getCurrentKey(); + final int keyGroup = backend.getCurrentKeyGroupIndex(); + + try { + // create the target full-binary-key + writeKeyWithGroupAndNamespace( + keyGroup, key, target, + keySerializationStream, keySerializationDataOutputView); + final byte[] targetKey = keySerializationStream.toByteArray(); + + // merge the sources to the target + for (N source : sources) { + if (source != null) { + writeKeyWithGroupAndNamespace( + keyGroup, key, source, + keySerializationStream, keySerializationDataOutputView); + + byte[] sourceKey = keySerializationStream.toByteArray(); + byte[] valueBytes = backend.db.get(columnFamily, sourceKey); + backend.db.delete(columnFamily, sourceKey); + + if (valueBytes != null) { + backend.db.merge(columnFamily, writeOptions, targetKey, valueBytes); + } + } + } + } + catch (Exception e) { + throw new Exception("Error while merging state in RocksDB", e); + } + } + + @Override + public void update(List<V> values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + + clear(); + + if (!values.isEmpty()) { + try { + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); + + byte[] premerge = getPreMergedValue(values); + if (premerge != null) { + backend.db.put(columnFamily, writeOptions, key, premerge); + } else { + throw new IOException("Failed pre-merge values in update()"); + } + } catch (IOException | RocksDBException e) { + throw new RuntimeException("Error while updating data to RocksDB", e); + } + } + } + + @Override + public void addAll(List<V> values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + + if (!values.isEmpty()) { + try { + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); + + byte[] premerge = getPreMergedValue(values); + if (premerge != null) { + backend.db.merge(columnFamily, writeOptions, key, premerge); + } else { + throw new IOException("Failed pre-merge values in addAll()"); + } + } catch (IOException | RocksDBException e) { + throw new RuntimeException("Error while updating data to RocksDB", e); + } + } + } + + private byte[] getPreMergedValue(List<V> values) throws IOException { + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); + + keySerializationStream.reset(); + boolean first = true; + for (V value : values) { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + if (first) { + first = false; + } else { + keySerializationStream.write(DELIMITER); + } + valueSerializer.serialize(value, out); + } + + return keySerializationStream.toByteArray(); + } +}