Repository: flink Updated Branches: refs/heads/master 3f4de57b1 -> 6642768ad
[FLINK-7460] [state backends] Close all ColumnFamilyHandles when restoring from rescaled incremental checkpoints Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca87bec4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca87bec4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca87bec4 Branch: refs/heads/master Commit: ca87bec4f79c32c9f6cf7a4aa96866f6fac957e0 Parents: 3f4de57 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Mon Aug 14 14:01:03 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu Aug 24 17:17:39 2017 +0200 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 453 ++++++++++--------- .../state/RocksDBStateBackendTest.java | 313 +++++++------ .../runtime/state/StateBackendTestBase.java | 244 +++++----- 3 files changed, 546 insertions(+), 464 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ca87bec4/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 756cfdd..b7f386d 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -105,7 +105,9 @@ import java.io.ObjectInputStream; import java.io.OutputStream; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -138,6 +140,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** The name of the merge operator in RocksDB. Do not change except you know exactly what you do. */ public static final String MERGE_OPERATOR_NAME = "stringappendtest"; + /** Bytes for the name of the column decriptor for the default column family. */ + public static final byte[] DEFAULT_COLUMN_FAMILY_NAME_BYTES = "default".getBytes(ConfigConstants.DEFAULT_CHARSET); + private final String operatorIdentifier; /** The column family options from the options factory. */ @@ -196,7 +201,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles; /** The identifier of the last completed checkpoint. */ - private long lastCompletedCheckpointId = -1; + private long lastCompletedCheckpointId = -1L; /** Unique ID of this backend. */ private UUID backendUID; @@ -204,17 +209,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private static final String SST_FILE_SUFFIX = ".sst"; public RocksDBKeyedStateBackend( - String operatorIdentifier, - ClassLoader userCodeClassLoader, - File instanceBasePath, - DBOptions dbOptions, - ColumnFamilyOptions columnFamilyOptions, - TaskKvStateRegistry kvStateRegistry, - TypeSerializer<K> keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - ExecutionConfig executionConfig, - boolean enableIncrementalCheckpointing + String operatorIdentifier, + ClassLoader userCodeClassLoader, + File instanceBasePath, + DBOptions dbOptions, + ColumnFamilyOptions columnFamilyOptions, + TaskKvStateRegistry kvStateRegistry, + TypeSerializer<K> keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + ExecutionConfig executionConfig, + boolean enableIncrementalCheckpointing ) throws IOException { super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); @@ -253,10 +258,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); this.backendUID = UUID.randomUUID(); - - LOG.debug("Setting initial backend ID in RocksDBKeyedStateBackend for operator {} to {}.", - this.operatorIdentifier, - this.backendUID); + LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } /** @@ -277,7 +279,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // DB is closed. So we start with the ones created by Flink... for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData : kvStateInformation.values()) { - IOUtils.closeQuietly(columnMetaData.f0); } @@ -328,10 +329,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { */ @Override public RunnableFuture<KeyedStateHandle> snapshot( - final long checkpointId, - final long timestamp, - final CheckpointStreamFactory streamFactory, - CheckpointOptions checkpointOptions) throws Exception { + final long checkpointId, + final long timestamp, + final CheckpointStreamFactory streamFactory, + CheckpointOptions checkpointOptions) throws Exception { if (checkpointOptions.getCheckpointType() != CheckpointOptions.CheckpointType.SAVEPOINT && enableIncrementalCheckpointing) { @@ -342,9 +343,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } private RunnableFuture<KeyedStateHandle> snapshotIncrementally( - final long checkpointId, - final long checkpointTimestamp, - final CheckpointStreamFactory checkpointStreamFactory) throws Exception { + final long checkpointId, + final long checkpointTimestamp, + final CheckpointStreamFactory checkpointStreamFactory) throws Exception { final RocksDBIncrementalSnapshotOperation<K> snapshotOperation = new RocksDBIncrementalSnapshotOperation<>( @@ -361,7 +362,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (!hasRegisteredState()) { if (LOG.isDebugEnabled()) { LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + - checkpointTimestamp + " . Returning null."); + checkpointTimestamp + " . Returning null."); } return DoneFuture.nullValue(); } @@ -391,9 +392,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } private RunnableFuture<KeyedStateHandle> snapshotFully( - final long checkpointId, - final long timestamp, - final CheckpointStreamFactory streamFactory) throws Exception { + final long checkpointId, + final long timestamp, + final CheckpointStreamFactory streamFactory) throws Exception { long startTime = System.currentTimeMillis(); @@ -406,7 +407,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (!hasRegisteredState()) { if (LOG.isDebugEnabled()) { LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + - " . Returning null."); + " . Returning null."); } return DoneFuture.nullValue(); } @@ -419,52 +420,52 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // implementation of the async IO operation, based on FutureTask AbstractAsyncIOCallable<KeyedStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable = - new AbstractAsyncIOCallable<KeyedStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() { + new AbstractAsyncIOCallable<KeyedStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() { - @Override - public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception { - snapshotOperation.openCheckpointStream(); - return snapshotOperation.getOutStream(); - } + @Override + public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception { + snapshotOperation.openCheckpointStream(); + return snapshotOperation.getOutStream(); + } - @Override - public KeyGroupsStateHandle performOperation() throws Exception { - long startTime = System.currentTimeMillis(); - synchronized (asyncSnapshotLock) { - try { - // hold the db lock while operation on the db to guard us against async db disposal - if (db == null) { - throw new IOException("RocksDB closed."); - } + @Override + public KeyGroupsStateHandle performOperation() throws Exception { + long startTime = System.currentTimeMillis(); + synchronized (asyncSnapshotLock) { + try { + // hold the db lock while operation on the db to guard us against async db disposal + if (db == null) { + throw new IOException("RocksDB closed."); + } - snapshotOperation.writeDBSnapshot(); + snapshotOperation.writeDBSnapshot(); - } finally { - snapshotOperation.closeCheckpointStream(); - } + } finally { + snapshotOperation.closeCheckpointStream(); } + } - LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", - streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); + LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", + streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); - return snapshotOperation.getSnapshotResultStateHandle(); - } + return snapshotOperation.getSnapshotResultStateHandle(); + } - private void releaseSnapshotOperationResources(boolean canceled) { - // hold the db lock while operation on the db to guard us against async db disposal - synchronized (asyncSnapshotLock) { - snapshotOperation.releaseSnapshotResources(canceled); - } + private void releaseSnapshotOperationResources(boolean canceled) { + // hold the db lock while operation on the db to guard us against async db disposal + synchronized (asyncSnapshotLock) { + snapshotOperation.releaseSnapshotResources(canceled); } + } - @Override - public void done(boolean canceled) { - releaseSnapshotOperationResources(canceled); - } - }; + @Override + public void done(boolean canceled) { + releaseSnapshotOperationResources(canceled); + } + }; LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in thread " + - Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms."); + Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms."); return AsyncStoppableTaskWithCallback.from(ioCallable); } @@ -493,8 +494,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private KeyGroupsStateHandle snapshotResultStateHandle; RocksDBFullSnapshotOperation( - RocksDBKeyedStateBackend<K> stateBackend, - CheckpointStreamFactory checkpointStreamFactory) { + RocksDBKeyedStateBackend<K> stateBackend, + CheckpointStreamFactory checkpointStreamFactory) { this.stateBackend = stateBackend; this.checkpointStreamFactory = checkpointStreamFactory; @@ -523,7 +524,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { public void openCheckpointStream() throws Exception { Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set."); outStream = checkpointStreamFactory. - createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp); + createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp); stateBackend.cancelStreamRegistry.registerClosable(outStream); outputView = new DataOutputViewStreamWrapper(outStream); } @@ -615,11 +616,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private void writeKVStateMetaData() throws IOException { List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots = - new ArrayList<>(stateBackend.kvStateInformation.size()); + new ArrayList<>(stateBackend.kvStateInformation.size()); int kvStateId = 0; for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> column : - stateBackend.kvStateInformation.entrySet()) { + stateBackend.kvStateInformation.entrySet()) { metaInfoSnapshots.add(column.getValue().f1.snapshot()); @@ -628,7 +629,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { readOptions.setSnapshot(snapshot); kvStateIterators.add( - new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId)); + new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId)); ++kvStateId; } @@ -797,10 +798,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private StreamStateHandle metaStateHandle = null; private RocksDBIncrementalSnapshotOperation( - RocksDBKeyedStateBackend<K> stateBackend, - CheckpointStreamFactory checkpointStreamFactory, - long checkpointId, - long checkpointTimestamp) { + RocksDBKeyedStateBackend<K> stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { this.stateBackend = stateBackend; this.checkpointStreamFactory = checkpointStreamFactory; @@ -886,20 +887,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { void takeSnapshot() throws Exception { assert (Thread.holdsLock(stateBackend.asyncSnapshotLock)); - final long lastCompletedCheckpoint; - // use the last completed checkpoint as the comparison base. synchronized (stateBackend.materializedSstFiles) { - lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId; - baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint); + baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId); } - LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " + - "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles); - // save meta data for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry - : stateBackend.kvStateInformation.entrySet()) { + : stateBackend.kvStateInformation.entrySet()) { stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot()); } @@ -1054,47 +1049,39 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } private void createDB() throws IOException { - db = openDB(instanceRocksDBPath.getAbsolutePath(), - new ArrayList<ColumnFamilyDescriptor>(), - null); + List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); + this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles); + this.defaultColumnFamily = columnFamilyHandles.get(0); } private RocksDB openDB( - String path, - List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, - List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException { + String path, + List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, + List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException { - List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(stateColumnFamilyDescriptors); + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + new ArrayList<>(1 + stateColumnFamilyDescriptors.size()); - // we add the required descriptor for the default CF in last position. - columnFamilyDescriptors.add( - new ColumnFamilyDescriptor( - "default".getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions)); + columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors); - List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size()); + // we add the required descriptor for the default CF in last position. + columnFamilyDescriptors.add(new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions)); RocksDB db; try { db = RocksDB.open( - Preconditions.checkNotNull(dbOptions), - Preconditions.checkNotNull(path), - columnFamilyDescriptors, - columnFamilyHandles); + Preconditions.checkNotNull(dbOptions), + Preconditions.checkNotNull(path), + columnFamilyDescriptors, + stateColumnFamilyHandles); } catch (RocksDBException e) { throw new IOException("Error while opening RocksDB instance.", e); } - final int defaultColumnFamilyIndex = columnFamilyHandles.size() - 1; - - // extract the default column family. - defaultColumnFamily = columnFamilyHandles.get(defaultColumnFamilyIndex); - - if (stateColumnFamilyHandles != null) { - // return all CFs except the default CF which is kept separately because it is not used in Flink operations. - stateColumnFamilyHandles.addAll( - columnFamilyHandles.subList(0, defaultColumnFamilyIndex)); - } + // requested + default CF + Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), + "Not all requested column family handles have been created"); return db; } @@ -1135,7 +1122,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * @throws RocksDBException */ public void doRestore(Collection<KeyedStateHandle> keyedStateHandles) - throws IOException, StateMigrationException, ClassNotFoundException, RocksDBException { + throws IOException, StateMigrationException, ClassNotFoundException, RocksDBException { rocksDBKeyedStateBackend.createDB(); @@ -1144,8 +1131,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { throw new IllegalStateException("Unexpected state handle type, " + - "expected: " + KeyGroupsStateHandle.class + - ", but found: " + keyedStateHandle.getClass()); + "expected: " + KeyGroupsStateHandle.class + + ", but found: " + keyedStateHandle.getClass()); } this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; restoreKeyGroupsInStateHandle(); @@ -1161,7 +1148,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * @throws ClassNotFoundException */ private void restoreKeyGroupsInStateHandle() - throws IOException, StateMigrationException, RocksDBException, ClassNotFoundException { + throws IOException, StateMigrationException, RocksDBException, ClassNotFoundException { try { currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream(); rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream); @@ -1186,17 +1173,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); + new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); serializationProxy.read(currentStateHandleInView); // check for key serializer compatibility; this also reconfigures the // key serializer to be compatible, if it is required and is possible if (CompatibilityUtil.resolveCompatibilityResult( - serializationProxy.getKeySerializer(), - UnloadableDummyTypeSerializer.class, - serializationProxy.getKeySerializerConfigSnapshot(), - rocksDBKeyedStateBackend.keySerializer) + serializationProxy.getKeySerializer(), + UnloadableDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + rocksDBKeyedStateBackend.keySerializer) .isRequiresMigration()) { // TODO replace with state migration; note that key hash codes need to remain the same after migration @@ -1208,7 +1195,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE; List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = - serializationProxy.getStateMetaInfoSnapshots(); + serializationProxy.getStateMetaInfoSnapshots(); currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size()); @@ -1218,22 +1205,24 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName()); if (registeredColumn == null) { + byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); + ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( - restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), + nameBytes, rocksDBKeyedStateBackend.columnOptions); RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = - new RegisteredKeyedBackendStateMetaInfo<>( - restoredMetaInfo.getStateType(), - restoredMetaInfo.getName(), - restoredMetaInfo.getNamespaceSerializer(), - restoredMetaInfo.getStateSerializer()); + new RegisteredKeyedBackendStateMetaInfo<>( + restoredMetaInfo.getStateType(), + restoredMetaInfo.getName(), + restoredMetaInfo.getNamespaceSerializer(), + restoredMetaInfo.getStateSerializer()); rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor); - registeredColumn = new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(columnFamily, stateMetaInfo); + registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo); rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn); } else { @@ -1303,7 +1292,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData( - StreamStateHandle metaStateHandle) throws Exception { + StreamStateHandle metaStateHandle) throws Exception { FSDataInputStream inputStream = null; @@ -1319,10 +1308,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // check for key serializer compatibility; this also reconfigures the // key serializer to be compatible, if it is required and is possible if (CompatibilityUtil.resolveCompatibilityResult( - serializationProxy.getKeySerializer(), - UnloadableDummyTypeSerializer.class, - serializationProxy.getKeySerializerConfigSnapshot(), - stateBackend.keySerializer) + serializationProxy.getKeySerializer(), + UnloadableDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + stateBackend.keySerializer) .isRequiresMigration()) { // TODO replace with state migration; note that key hash codes need to remain the same after migration @@ -1340,8 +1329,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } private void readStateData( - Path restoreFilePath, - StreamStateHandle remoteFileHandle) throws IOException { + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); @@ -1378,8 +1367,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } private void restoreInstance( - IncrementalKeyedStateHandle restoreStateHandle, - boolean hasExtraKeys) throws Exception { + IncrementalKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { // read state data Path restoreInstancePath = new Path( @@ -1399,7 +1388,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = readMetaData(restoreStateHandle.getMetaStateHandle()); - List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + new ArrayList<>(1 + stateMetaInfoSnapshots.size()); for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) { @@ -1413,69 +1403,78 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (hasExtraKeys) { - List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(); + List<ColumnFamilyHandle> columnFamilyHandles = + new ArrayList<>(1 + columnFamilyDescriptors.size()); try (RocksDB restoreDb = stateBackend.openDB( - restoreInstancePath.getPath(), - columnFamilyDescriptors, - columnFamilyHandles)) { - - for (int i = 0; i < columnFamilyHandles.size(); ++i) { - ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); - ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i); - RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i); - - Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry = - stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName()); - - if (null == registeredStateMetaInfoEntry) { - - RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = - new RegisteredKeyedBackendStateMetaInfo<>( - stateMetaInfoSnapshot.getStateType(), + restoreInstancePath.getPath(), + columnFamilyDescriptors, + columnFamilyHandles)) { + + try { + // iterating only the requested descriptors automatically skips the default column family handle + for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { + ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); + ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i); + RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i); + + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry = + stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName()); + + if (null == registeredStateMetaInfoEntry) { + + RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + stateMetaInfoSnapshot.getStateType(), + stateMetaInfoSnapshot.getName(), + stateMetaInfoSnapshot.getNamespaceSerializer(), + stateMetaInfoSnapshot.getStateSerializer()); + + registeredStateMetaInfoEntry = + new Tuple2<>( + stateBackend.db.createColumnFamily(columnFamilyDescriptor), + stateMetaInfo); + + stateBackend.kvStateInformation.put( stateMetaInfoSnapshot.getName(), - stateMetaInfoSnapshot.getNamespaceSerializer(), - stateMetaInfoSnapshot.getStateSerializer()); - - registeredStateMetaInfoEntry = - new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>( - stateBackend.db.createColumnFamily(columnFamilyDescriptor), - stateMetaInfo); + registeredStateMetaInfoEntry); + } - stateBackend.kvStateInformation.put( - stateMetaInfoSnapshot.getName(), - registeredStateMetaInfoEntry); - } + ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0; - ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0; + try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) { - try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) { + int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); + byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); + } - int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); - byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; - for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { - startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); - } + iterator.seek(startKeyGroupPrefixBytes); - iterator.seek(startKeyGroupPrefixBytes); + while (iterator.isValid()) { - while (iterator.isValid()) { + int keyGroup = 0; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; + } - int keyGroup = 0; - for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { - keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; - } + if (stateBackend.keyGroupRange.contains(keyGroup)) { + stateBackend.db.put(targetColumnFamilyHandle, + iterator.key(), iterator.value()); + } - if (stateBackend.keyGroupRange.contains(keyGroup)) { - stateBackend.db.put(targetColumnFamilyHandle, - iterator.key(), iterator.value()); + iterator.next(); } - - iterator.next(); - } + } // releases native iterator resources + } + } finally { + //release native tmp db column family resources + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + IOUtils.closeQuietly(columnFamilyHandle); } } - } + } // releases native tmp db resources } else { // pick up again the old backend id, so the we can reference existing state stateBackend.backendUID = restoreStateHandle.getBackendIdentifier(); @@ -1491,11 +1490,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { createFileHardLinksInRestorePath(sstFiles, restoreInstancePath); createFileHardLinksInRestorePath(miscFiles, restoreInstancePath); - List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(); + List<ColumnFamilyHandle> columnFamilyHandles = + new ArrayList<>(1 + columnFamilyDescriptors.size()); + stateBackend.db = stateBackend.openDB( stateBackend.instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles); + // extract and store the default column family which is located at the last index + stateBackend.defaultColumnFamily = columnFamilyHandles.remove(columnFamilyHandles.size() - 1); + for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i); @@ -1509,8 +1513,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { stateBackend.kvStateInformation.put( stateMetaInfoSnapshot.getName(), - new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>( - columnFamilyHandle, stateMetaInfo)); + new Tuple2<>(columnFamilyHandle, stateMetaInfo)); } // use the restore sst files as the base for succeeding checkpoints @@ -1590,10 +1593,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { */ @SuppressWarnings("rawtypes, unchecked") protected <N, S> ColumnFamilyHandle getColumnFamily( - StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException { + StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException { Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo = - kvStateInformation.get(descriptor.getName()); + kvStateInformation.get(descriptor.getName()); RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( descriptor.getType(), @@ -1625,16 +1628,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // check compatibility results to determine if state migration is required CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( - restoredMetaInfo.getNamespaceSerializer(), - MigrationNamespaceSerializerProxy.class, - restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), - newMetaInfo.getNamespaceSerializer()); + restoredMetaInfo.getNamespaceSerializer(), + MigrationNamespaceSerializerProxy.class, + restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), + newMetaInfo.getNamespaceSerializer()); CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( - restoredMetaInfo.getStateSerializer(), - UnloadableDummyTypeSerializer.class, - restoredMetaInfo.getStateSerializerConfigSnapshot(), - newMetaInfo.getStateSerializer()); + restoredMetaInfo.getStateSerializer(), + UnloadableDummyTypeSerializer.class, + restoredMetaInfo.getStateSerializerConfigSnapshot(), + newMetaInfo.getStateSerializer()); if (!namespaceCompatibility.isRequiresMigration() && !stateCompatibility.isRequiresMigration()) { stateInfo.f1 = newMetaInfo; @@ -1645,25 +1648,31 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } - ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor( - descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions); + byte[] nameBytes = descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); + Preconditions.checkState(!Arrays.equals(DEFAULT_COLUMN_FAMILY_NAME_BYTES, nameBytes), + "The chosen state name 'default' collides with the name of the default column family!"); + + ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions); + + final ColumnFamilyHandle columnFamily; try { - ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor); - Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tuple = - new Tuple2<>(columnFamily, newMetaInfo); - Map rawAccess = kvStateInformation; - rawAccess.put(descriptor.getName(), tuple); - return columnFamily; + columnFamily = db.createColumnFamily(columnDescriptor); } catch (RocksDBException e) { throw new IOException("Error creating ColumnFamilyHandle.", e); } + + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tuple = + new Tuple2<>(columnFamily, newMetaInfo); + Map rawAccess = kvStateInformation; + rawAccess.put(descriptor.getName(), tuple); + return columnFamily; } @Override protected <N, T> InternalValueState<N, T> createValueState( - TypeSerializer<N> namespaceSerializer, - ValueStateDescriptor<T> stateDesc) throws Exception { + TypeSerializer<N> namespaceSerializer, + ValueStateDescriptor<T> stateDesc) throws Exception { ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); @@ -1672,8 +1681,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { @Override protected <N, T> InternalListState<N, T> createListState( - TypeSerializer<N> namespaceSerializer, - ListStateDescriptor<T> stateDesc) throws Exception { + TypeSerializer<N> namespaceSerializer, + ListStateDescriptor<T> stateDesc) throws Exception { ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); @@ -1682,8 +1691,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { @Override protected <N, T> InternalReducingState<N, T> createReducingState( - TypeSerializer<N> namespaceSerializer, - ReducingStateDescriptor<T> stateDesc) throws Exception { + TypeSerializer<N> namespaceSerializer, + ReducingStateDescriptor<T> stateDesc) throws Exception { ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); @@ -1692,8 +1701,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { @Override protected <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState( - TypeSerializer<N> namespaceSerializer, - AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception { + TypeSerializer<N> namespaceSerializer, + AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception { ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); return new RocksDBAggregatingState<>(columnFamily, namespaceSerializer, stateDesc, this); @@ -1701,8 +1710,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { @Override protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState( - TypeSerializer<N> namespaceSerializer, - FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); @@ -1711,7 +1720,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { @Override protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer, - MapStateDescriptor<UK, UV> stateDesc) throws Exception { + MapStateDescriptor<UK, UV> stateDesc) throws Exception { ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); return new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this); @@ -1784,7 +1793,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { @Override public int compare(MergeIterator o1, MergeIterator o2) { int arrayCmpRes = compareKeyGroupsForByteArrays( - o1.currentKey, o2.currentKey, currentBytes); + o1.currentKey, o2.currentKey, currentBytes); return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes; } }); @@ -1799,7 +1808,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (kvStateIterators.size() > 0) { PriorityQueue<MergeIterator> iteratorPriorityQueue = - new PriorityQueue<>(kvStateIterators.size(), iteratorComparator); + new PriorityQueue<>(kvStateIterators.size(), iteratorComparator); for (Tuple2<RocksIterator, Integer> rocksIteratorWithKVStateId : kvStateIterators) { final RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0; @@ -1968,8 +1977,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { KeyedStateHandle keyedStateHandle = restoreState.iterator().next(); if (!(keyedStateHandle instanceof MigrationKeyGroupStateHandle)) { throw new IllegalStateException("Unexpected state handle type, " + - "expected: " + MigrationKeyGroupStateHandle.class + - ", but found: " + keyedStateHandle.getClass()); + "expected: " + MigrationKeyGroupStateHandle.class + + ", but found: " + keyedStateHandle.getClass()); } MigrationKeyGroupStateHandle keyGroupStateHandle = (MigrationKeyGroupStateHandle) keyedStateHandle; @@ -1989,8 +1998,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { byte mappingByte = inputView.readByte(); ObjectInputStream ooIn = - new InstantiationUtil.ClassLoaderObjectInputStream( - new DataInputViewStream(inputView), userCodeClassLoader); + new InstantiationUtil.ClassLoaderObjectInputStream( + new DataInputViewStream(inputView), userCodeClassLoader); StateDescriptor<?, ?> stateDescriptor = (StateDescriptor<?, ?>) ooIn.readObject(); @@ -2015,7 +2024,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { while (true) { byte mappingByte = inputView.readByte(); ColumnFamilyHandle handle = getColumnFamily( - columnFamilyMapping.get(mappingByte), MigrationNamespaceSerializerProxy.INSTANCE); + columnFamilyMapping.get(mappingByte), MigrationNamespaceSerializerProxy.INSTANCE); byte[] keyAndNamespace = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView); http://git-wip-us.apache.org/repos/asf/flink/blob/ca87bec4/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 991e0d4..08d661c 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -42,7 +42,9 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.io.filefilter.IOFileFilter; +import org.junit.After; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -122,6 +124,22 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa return backend; } + // small safety net for instance cleanups, so that no native objects are left + @After + public void cleanupRocksDB() { + if (keyedStateBackend != null) { + IOUtils.closeQuietly(keyedStateBackend); + keyedStateBackend.dispose(); + } + + if (allCreatedCloseables != null) { + for (RocksObject rocksCloseable : allCreatedCloseables) { + verify(rocksCloseable, times(1)).close(); + } + allCreatedCloseables = null; + } + } + public void setupRocksKeyedStateBackend() throws Exception { blocker = new OneShotLatch(); @@ -238,149 +256,186 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa @Test public void testCorrectMergeOperatorSet() throws IOException { - ColumnFamilyOptions columnFamilyOptions = mock(ColumnFamilyOptions.class); - - try (RocksDBKeyedStateBackend<Integer> test = new RocksDBKeyedStateBackend<>( - "test", - Thread.currentThread().getContextClassLoader(), - tempFolder.newFolder(), - mock(DBOptions.class), - columnFamilyOptions, - mock(TaskKvStateRegistry.class), - IntSerializer.INSTANCE, - 1, - new KeyGroupRange(0, 0), - new ExecutionConfig(), - enableIncrementalCheckpointing)) { + + final ColumnFamilyOptions columnFamilyOptions = spy(new ColumnFamilyOptions()); + RocksDBKeyedStateBackend<Integer> test = null; + try { + test = new RocksDBKeyedStateBackend<>( + "test", + Thread.currentThread().getContextClassLoader(), + tempFolder.newFolder(), + mock(DBOptions.class), + columnFamilyOptions, + mock(TaskKvStateRegistry.class), + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new ExecutionConfig(), + enableIncrementalCheckpointing); verify(columnFamilyOptions, Mockito.times(1)) .setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME); + } finally { + if (test != null) { + IOUtils.closeQuietly(test); + test.dispose(); + } + columnFamilyOptions.close(); } } @Test public void testReleasingSnapshotAfterBackendClosed() throws Exception { setupRocksKeyedStateBackend(); - RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, - CheckpointOptions.forFullCheckpoint()); - RocksDB spyDB = keyedStateBackend.db; + try { + RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, + CheckpointOptions.forFullCheckpoint()); - if (!enableIncrementalCheckpointing) { - verify(spyDB, times(1)).getSnapshot(); - verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class)); - } + RocksDB spyDB = keyedStateBackend.db; - this.keyedStateBackend.dispose(); - verify(spyDB, times(1)).close(); - assertEquals(null, keyedStateBackend.db); + if (!enableIncrementalCheckpointing) { + verify(spyDB, times(1)).getSnapshot(); + verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class)); + } - //Ensure every RocksObjects not closed yet - for (RocksObject rocksCloseable : allCreatedCloseables) { - verify(rocksCloseable, times(0)).close(); - } + this.keyedStateBackend.dispose(); + verify(spyDB, times(1)).close(); + assertEquals(null, keyedStateBackend.db); - snapshot.cancel(true); + //Ensure every RocksObjects not closed yet + for (RocksObject rocksCloseable : allCreatedCloseables) { + verify(rocksCloseable, times(0)).close(); + } - //Ensure every RocksObjects was closed exactly once - for (RocksObject rocksCloseable : allCreatedCloseables) { - verify(rocksCloseable, times(1)).close(); - } + snapshot.cancel(true); + //Ensure every RocksObjects was closed exactly once + for (RocksObject rocksCloseable : allCreatedCloseables) { + verify(rocksCloseable, times(1)).close(); + } + } finally { + keyedStateBackend.dispose(); + keyedStateBackend = null; + } } @Test public void testDismissingSnapshot() throws Exception { setupRocksKeyedStateBackend(); - RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); - snapshot.cancel(true); - verifyRocksObjectsReleased(); + try { + RunnableFuture<KeyedStateHandle> snapshot = + keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); + snapshot.cancel(true); + verifyRocksObjectsReleased(); + } finally { + this.keyedStateBackend.dispose(); + this.keyedStateBackend = null; + } } @Test public void testDismissingSnapshotNotRunnable() throws Exception { setupRocksKeyedStateBackend(); - RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); - snapshot.cancel(true); - Thread asyncSnapshotThread = new Thread(snapshot); - asyncSnapshotThread.start(); try { - snapshot.get(); - fail(); - } catch (Exception ignored) { + RunnableFuture<KeyedStateHandle> snapshot = + keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); + snapshot.cancel(true); + Thread asyncSnapshotThread = new Thread(snapshot); + asyncSnapshotThread.start(); + try { + snapshot.get(); + fail(); + } catch (Exception ignored) { + } + asyncSnapshotThread.join(); + verifyRocksObjectsReleased(); + } finally { + this.keyedStateBackend.dispose(); + this.keyedStateBackend = null; } - asyncSnapshotThread.join(); - verifyRocksObjectsReleased(); } @Test public void testCompletingSnapshot() throws Exception { setupRocksKeyedStateBackend(); - RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); - Thread asyncSnapshotThread = new Thread(snapshot); - asyncSnapshotThread.start(); - waiter.await(); // wait for snapshot to run - waiter.reset(); - runStateUpdates(); - blocker.trigger(); // allow checkpointing to start writing - waiter.await(); // wait for snapshot stream writing to run - KeyedStateHandle keyedStateHandle = snapshot.get(); - assertNotNull(keyedStateHandle); - assertTrue(keyedStateHandle.getStateSize() > 0); - assertEquals(2, keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); - assertTrue(testStreamFactory.getLastCreatedStream().isClosed()); - asyncSnapshotThread.join(); - verifyRocksObjectsReleased(); + try { + RunnableFuture<KeyedStateHandle> snapshot = + keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); + Thread asyncSnapshotThread = new Thread(snapshot); + asyncSnapshotThread.start(); + waiter.await(); // wait for snapshot to run + waiter.reset(); + runStateUpdates(); + blocker.trigger(); // allow checkpointing to start writing + waiter.await(); // wait for snapshot stream writing to run + KeyedStateHandle keyedStateHandle = snapshot.get(); + assertNotNull(keyedStateHandle); + assertTrue(keyedStateHandle.getStateSize() > 0); + assertEquals(2, keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); + assertTrue(testStreamFactory.getLastCreatedStream().isClosed()); + asyncSnapshotThread.join(); + verifyRocksObjectsReleased(); + } finally { + this.keyedStateBackend.dispose(); + this.keyedStateBackend = null; + } } @Test public void testCancelRunningSnapshot() throws Exception { setupRocksKeyedStateBackend(); - RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); - Thread asyncSnapshotThread = new Thread(snapshot); - asyncSnapshotThread.start(); - waiter.await(); // wait for snapshot to run - waiter.reset(); - runStateUpdates(); - snapshot.cancel(true); - blocker.trigger(); // allow checkpointing to start writing - assertTrue(testStreamFactory.getLastCreatedStream().isClosed()); - waiter.await(); // wait for snapshot stream writing to run try { - snapshot.get(); - fail(); - } catch (Exception ignored) { - } + RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); + Thread asyncSnapshotThread = new Thread(snapshot); + asyncSnapshotThread.start(); + waiter.await(); // wait for snapshot to run + waiter.reset(); + runStateUpdates(); + snapshot.cancel(true); + blocker.trigger(); // allow checkpointing to start writing + assertTrue(testStreamFactory.getLastCreatedStream().isClosed()); + waiter.await(); // wait for snapshot stream writing to run + try { + snapshot.get(); + fail(); + } catch (Exception ignored) { + } - asyncSnapshotThread.join(); - verifyRocksObjectsReleased(); + asyncSnapshotThread.join(); + verifyRocksObjectsReleased(); + } finally { + this.keyedStateBackend.dispose(); + this.keyedStateBackend = null; + } } @Test public void testDisposeDeletesAllDirectories() throws Exception { AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); - ValueStateDescriptor<String> kvId = + Collection<File> allFilesInDbDir = + FileUtils.listFilesAndDirs(new File(dbPath), new AcceptAllFilter(), new AcceptAllFilter()); + try { + ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null); - kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); - ValueState<String> state = + ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); - backend.setCurrentKey(1); - state.update("Hello"); - - Collection<File> allFilesInDbDir = - FileUtils.listFilesAndDirs(new File(dbPath), new AcceptAllFilter(), new AcceptAllFilter()); - - // more than just the root directory - assertTrue(allFilesInDbDir.size() > 1); - - backend.dispose(); + backend.setCurrentKey(1); + state.update("Hello"); + // more than just the root directory + assertTrue(allFilesInDbDir.size() > 1); + } finally { + IOUtils.closeQuietly(backend); + backend.dispose(); + } allFilesInDbDir = - FileUtils.listFilesAndDirs(new File(dbPath), new AcceptAllFilter(), new AcceptAllFilter()); + FileUtils.listFilesAndDirs(new File(dbPath), new AcceptAllFilter(), new AcceptAllFilter()); // just the root directory left assertEquals(1, allFilesInDbDir.size()); @@ -390,62 +445,64 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa public void testSharedIncrementalStateDeRegistration() throws Exception { if (enableIncrementalCheckpointing) { AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); - ValueStateDescriptor<String> kvId = - new ValueStateDescriptor<>("id", String.class, null); + try { + ValueStateDescriptor<String> kvId = + new ValueStateDescriptor<>("id", String.class, null); - kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); - ValueState<String> state = - backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + ValueState<String> state = + backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); - Queue<IncrementalKeyedStateHandle> previousStateHandles = new LinkedList<>(); - SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry()); - for (int checkpointId = 0; checkpointId < 3; ++checkpointId) { + Queue<IncrementalKeyedStateHandle> previousStateHandles = new LinkedList<>(); + SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry()); + for (int checkpointId = 0; checkpointId < 3; ++checkpointId) { - reset(sharedStateRegistry); + reset(sharedStateRegistry); - backend.setCurrentKey(checkpointId); - state.update("Hello-" + checkpointId); + backend.setCurrentKey(checkpointId); + state.update("Hello-" + checkpointId); - RunnableFuture<KeyedStateHandle> snapshot = backend.snapshot( - checkpointId, - checkpointId, - createStreamFactory(), - CheckpointOptions.forFullCheckpoint()); + RunnableFuture<KeyedStateHandle> snapshot = backend.snapshot( + checkpointId, + checkpointId, + createStreamFactory(), + CheckpointOptions.forFullCheckpoint()); - snapshot.run(); + snapshot.run(); - IncrementalKeyedStateHandle stateHandle = (IncrementalKeyedStateHandle) snapshot.get(); - Map<StateHandleID, StreamStateHandle> sharedState = - new HashMap<>(stateHandle.getSharedState()); + IncrementalKeyedStateHandle stateHandle = (IncrementalKeyedStateHandle) snapshot.get(); + Map<StateHandleID, StreamStateHandle> sharedState = + new HashMap<>(stateHandle.getSharedState()); - stateHandle.registerSharedStates(sharedStateRegistry); + stateHandle.registerSharedStates(sharedStateRegistry); - for (Map.Entry<StateHandleID, StreamStateHandle> e : sharedState.entrySet()) { - verify(sharedStateRegistry).registerReference( - stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()), - e.getValue()); - } + for (Map.Entry<StateHandleID, StreamStateHandle> e : sharedState.entrySet()) { + verify(sharedStateRegistry).registerReference( + stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()), + e.getValue()); + } - previousStateHandles.add(stateHandle); - backend.notifyCheckpointComplete(checkpointId); + previousStateHandles.add(stateHandle); + backend.notifyCheckpointComplete(checkpointId); - //----------------------------------------------------------------- + //----------------------------------------------------------------- - if (previousStateHandles.size() > 1) { - checkRemove(previousStateHandles.remove(), sharedStateRegistry); + if (previousStateHandles.size() > 1) { + checkRemove(previousStateHandles.remove(), sharedStateRegistry); + } } - } - while (!previousStateHandles.isEmpty()) { + while (!previousStateHandles.isEmpty()) { - reset(sharedStateRegistry); + reset(sharedStateRegistry); - checkRemove(previousStateHandles.remove(), sharedStateRegistry); + checkRemove(previousStateHandles.remove(), sharedStateRegistry); + } + } finally { + IOUtils.closeQuietly(backend); + backend.dispose(); } - - backend.close(); - backend.dispose(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ca87bec4/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 6debff7..f6f73f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -18,10 +18,6 @@ package org.apache.flink.runtime.state; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; @@ -64,14 +60,17 @@ import org.apache.flink.runtime.state.heap.StateTable; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; +import org.apache.flink.shaded.guava18.com.google.common.base.Joiner; import org.apache.flink.types.IntValue; import org.apache.flink.util.FutureUtil; import org.apache.flink.util.IOUtils; import org.apache.flink.util.StateMigrationException; import org.apache.flink.util.TestLogger; -import org.apache.flink.shaded.guava18.com.google.common.base.Joiner; - +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.commons.io.output.ByteArrayOutputStream; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -177,17 +176,15 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten Environment env) throws Exception { AbstractKeyedStateBackend<K> backend = getStateBackend().createKeyedStateBackend( - env, - new JobID(), - "test_op", - keySerializer, - numberOfKeyGroups, - keyGroupRange, - env.getTaskKvStateRegistry()); + env, + new JobID(), + "test_op", + keySerializer, + numberOfKeyGroups, + keyGroupRange, + env.getTaskKvStateRegistry()); - if (null != state) { - backend.restore(state); - } + backend.restore(state); return backend; } @@ -244,6 +241,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions); + backend.dispose(); } @Test @@ -303,6 +301,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions); + backend.dispose(); } @Test @@ -356,6 +355,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions); + backend.dispose(); } @Test @@ -411,6 +411,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions); + backend.dispose(); } @@ -488,81 +489,91 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten CheckpointStreamFactory streamFactory = createStreamFactory(); SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); Environment env = new DummyEnvironment("test", 1, 0); - AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + AbstractKeyedStateBackend<Integer> backend = null; - TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + try { + backend = createKeyedBackend(IntSerializer.INSTANCE, env); - // make sure that we are in fact using the KryoSerializer - assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); - ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); - ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); - // ============== create snapshot - no Kryo registration or specific / default serializers ============== + ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); - // make some more modifications - backend.setCurrentKey(1); - state.update(new TestPojo("u1", 1)); + // ============== create snapshot - no Kryo registration or specific / default serializers ============== - backend.setCurrentKey(2); - state.update(new TestPojo("u2", 2)); + // make some more modifications + backend.setCurrentKey(1); + state.update(new TestPojo("u1", 1)); - KeyedStateHandle snapshot = runSnapshot(backend.snapshot( + backend.setCurrentKey(2); + state.update(new TestPojo("u2", 2)); + + KeyedStateHandle snapshot = runSnapshot(backend.snapshot( 682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); - snapshot.registerSharedStates(sharedStateRegistry); - backend.dispose(); + snapshot.registerSharedStates(sharedStateRegistry); + backend.dispose(); - // ========== restore snapshot - should use default serializer (ONLY SERIALIZATION) ========== + // ========== restore snapshot - should use default serializer (ONLY SERIALIZATION) ========== - // cast because our test serializer is not typed to TestPojo - env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) CustomKryoTestSerializer.class); + // cast because our test serializer is not typed to TestPojo + env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) CustomKryoTestSerializer.class); - backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); + backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); - // re-initialize to ensure that we create the KryoSerializer from scratch, otherwise - // initializeSerializerUnlessSet would not pick up our new config - kvId = new ValueStateDescriptor<>("id", pojoType); - state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + // re-initialize to ensure that we create the KryoSerializer from scratch, otherwise + // initializeSerializerUnlessSet would not pick up our new config + kvId = new ValueStateDescriptor<>("id", pojoType); + state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); - backend.setCurrentKey(1); + backend.setCurrentKey(1); - // update to test state backends that eagerly serialize, such as RocksDB - state.update(new TestPojo("u1", 11)); + // update to test state backends that eagerly serialize, such as RocksDB + state.update(new TestPojo("u1", 11)); - KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot( + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot( 682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); - snapshot2.registerSharedStates(sharedStateRegistry); + snapshot2.registerSharedStates(sharedStateRegistry); + snapshot.discardState(); - snapshot.discardState(); + backend.dispose(); - backend.dispose(); + // ========= restore snapshot - should use default serializer (FAIL ON DESERIALIZATION) ========= - // ========= restore snapshot - should use default serializer (FAIL ON DESERIALIZATION) ========= + // cast because our test serializer is not typed to TestPojo + env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) CustomKryoTestSerializer.class); - // cast because our test serializer is not typed to TestPojo - env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) CustomKryoTestSerializer.class); + // on the second restore, since the custom serializer will be used for + // deserialization, we expect the deliberate failure to be thrown + expectedException.expect(ExpectedKryoTestException.class); - // on the second restore, since the custom serializer will be used for - // deserialization, we expect the deliberate failure to be thrown - expectedException.expect(ExpectedKryoTestException.class); + // state backends that eagerly deserializes (such as the memory state backend) will fail here + backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env); - // state backends that eagerly deserializes (such as the memory state backend) will fail here - backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env); + state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); - state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + backend.setCurrentKey(1); + // state backends that lazily deserializes (such as RocksDB) will fail here + state.value(); - backend.setCurrentKey(1); - // state backends that lazily deserializes (such as RocksDB) will fail here - state.value(); + snapshot2.discardState(); + backend.dispose(); + } finally { + // ensure to release native resources even when we exit through exception + IOUtils.closeQuietly(backend); + backend.dispose(); + } } /** @@ -581,78 +592,89 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); Environment env = new DummyEnvironment("test", 1, 0); - AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + AbstractKeyedStateBackend<Integer> backend = null; - TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + try { + backend = createKeyedBackend(IntSerializer.INSTANCE, env); - // make sure that we are in fact using the KryoSerializer - assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); - ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); - ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); - // ============== create snapshot - no Kryo registration or specific / default serializers ============== + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); - // make some more modifications - backend.setCurrentKey(1); - state.update(new TestPojo("u1", 1)); + // ============== create snapshot - no Kryo registration or specific / default serializers ============== - backend.setCurrentKey(2); - state.update(new TestPojo("u2", 2)); + // make some more modifications + backend.setCurrentKey(1); + state.update(new TestPojo("u1", 1)); - KeyedStateHandle snapshot = runSnapshot(backend.snapshot( + backend.setCurrentKey(2); + state.update(new TestPojo("u2", 2)); + + KeyedStateHandle snapshot = runSnapshot(backend.snapshot( 682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); - snapshot.registerSharedStates(sharedStateRegistry); - backend.dispose(); + snapshot.registerSharedStates(sharedStateRegistry); + backend.dispose(); - // ========== restore snapshot - should use specific serializer (ONLY SERIALIZATION) ========== + // ========== restore snapshot - should use specific serializer (ONLY SERIALIZATION) ========== - env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class); + env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class); - backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); + backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); - // re-initialize to ensure that we create the KryoSerializer from scratch, otherwise - // initializeSerializerUnlessSet would not pick up our new config - kvId = new ValueStateDescriptor<>("id", pojoType); - state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + // re-initialize to ensure that we create the KryoSerializer from scratch, otherwise + // initializeSerializerUnlessSet would not pick up our new config + kvId = new ValueStateDescriptor<>("id", pojoType); + state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); - backend.setCurrentKey(1); + backend.setCurrentKey(1); - // update to test state backends that eagerly serialize, such as RocksDB - state.update(new TestPojo("u1", 11)); + // update to test state backends that eagerly serialize, such as RocksDB + state.update(new TestPojo("u1", 11)); - KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot( + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot( 682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); - snapshot2.registerSharedStates(sharedStateRegistry); + snapshot2.registerSharedStates(sharedStateRegistry); - snapshot.discardState(); + snapshot.discardState(); - backend.dispose(); + backend.dispose(); - // ========= restore snapshot - should use specific serializer (FAIL ON DESERIALIZATION) ========= + // ========= restore snapshot - should use specific serializer (FAIL ON DESERIALIZATION) ========= - env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class); + env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class); - // on the second restore, since the custom serializer will be used for - // deserialization, we expect the deliberate failure to be thrown - expectedException.expect(ExpectedKryoTestException.class); + // on the second restore, since the custom serializer will be used for + // deserialization, we expect the deliberate failure to be thrown + expectedException.expect(ExpectedKryoTestException.class); - // state backends that eagerly deserializes (such as the memory state backend) will fail here - backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env); + // state backends that eagerly deserializes (such as the memory state backend) will fail here + backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env); - state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); - backend.setCurrentKey(1); - // state backends that lazily deserializes (such as RocksDB) will fail here - state.value(); + backend.setCurrentKey(1); + // state backends that lazily deserializes (such as RocksDB) will fail here + state.value(); + + backend.dispose(); + } finally { + // ensure that native resources are also released in case of exception + if (backend != null) { + backend.dispose(); + } + } } @Test @@ -1726,7 +1748,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten final int MAX_PARALLELISM = 10; CheckpointStreamFactory streamFactory = createStreamFactory(); - AbstractKeyedStateBackend<Integer> backend = createKeyedBackend( + final AbstractKeyedStateBackend<Integer> backend = createKeyedBackend( IntSerializer.INSTANCE, MAX_PARALLELISM, new KeyGroupRange(0, MAX_PARALLELISM - 1), @@ -1770,7 +1792,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten backend.dispose(); // backend for the first half of the key group range - AbstractKeyedStateBackend<Integer> firstHalfBackend = restoreKeyedBackend( + final AbstractKeyedStateBackend<Integer> firstHalfBackend = restoreKeyedBackend( IntSerializer.INSTANCE, MAX_PARALLELISM, new KeyGroupRange(0, 4), @@ -1778,7 +1800,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten new DummyEnvironment("test", 1, 0)); // backend for the second half of the key group range - AbstractKeyedStateBackend<Integer> secondHalfBackend = restoreKeyedBackend( + final AbstractKeyedStateBackend<Integer> secondHalfBackend = restoreKeyedBackend( IntSerializer.INSTANCE, MAX_PARALLELISM, new KeyGroupRange(5, 9), @@ -2017,7 +2039,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten @Test public void testCopyDefaultValue() throws Exception { - AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + final AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1)); @@ -2044,7 +2066,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten */ @Test public void testRequireNonNullNamespace() throws Exception { - AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + final AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1)); @@ -2076,7 +2098,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten @SuppressWarnings("unchecked") protected void testConcurrentMapIfQueryable() throws Exception { final int numberOfKeyGroups = 1; - AbstractKeyedStateBackend<Integer> backend = createKeyedBackend( + final AbstractKeyedStateBackend<Integer> backend = createKeyedBackend( IntSerializer.INSTANCE, numberOfKeyGroups, new KeyGroupRange(0, 0), @@ -2384,9 +2406,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten streamFactory.setBlockerLatch(blocker); streamFactory.setAfterNumberInvocations(10); - AbstractKeyedStateBackend<Integer> backend = null; + final AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + try { - backend = createKeyedBackend(IntSerializer.INSTANCE); if (!backend.supportsAsynchronousSnapshots()) { return; @@ -2413,14 +2435,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten waiter.await(); // close the backend to see if the close is propagated to the stream - backend.close(); + IOUtils.closeQuietly(backend); //unblock the stream so that it can run into the IOException blocker.trigger(); - //dispose the backend - backend.dispose(); - runner.join(); try { @@ -2431,10 +2450,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } } finally { - if (null != backend) { - IOUtils.closeQuietly(backend); - backend.dispose(); - } + backend.dispose(); } }