This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6b9ec27593e934394a759eebe7e767b44bca7ac9 Author: Stefan Richter <[email protected]> AuthorDate: Tue Feb 26 10:39:23 2019 +0100 [hotfix] Minor code cleanups in RocksDBKeyedStateBackendBuilder and RocksDBIncrementalRestoreOperation --- .../state/RocksDBKeyedStateBackendBuilder.java | 10 +++--- .../RocksDBIncrementalRestoreOperation.java | 36 +++++++++------------- 2 files changed, 20 insertions(+), 26 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index c9a03a2..c5cf501 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -81,7 +81,7 @@ import java.util.function.Function; public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBuilder<K> { private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackendBuilder.class); - public static final String DB_INSTANCE_DIR_STRING = "db"; + static final String DB_INSTANCE_DIR_STRING = "db"; /** String that identifies the operator that owns this backend. */ private final String operatorIdentifier; @@ -199,22 +199,22 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken this.injectedDefaultColumnFamilyHandle = injectedDefaultColumnFamilyHandle; } - public RocksDBKeyedStateBackendBuilder<K> setEnableIncrementalCheckpointing(boolean enableIncrementalCheckpointing) { + RocksDBKeyedStateBackendBuilder<K> setEnableIncrementalCheckpointing(boolean enableIncrementalCheckpointing) { this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; return this; } - public RocksDBKeyedStateBackendBuilder<K> setEnableTtlCompactionFilter (boolean enableTtlCompactionFilter) { + RocksDBKeyedStateBackendBuilder<K> setEnableTtlCompactionFilter(boolean enableTtlCompactionFilter) { this.enableTtlCompactionFilter = enableTtlCompactionFilter; return this; } - public RocksDBKeyedStateBackendBuilder<K> setNativeMetricOptions(RocksDBNativeMetricOptions nativeMetricOptions) { + RocksDBKeyedStateBackendBuilder<K> setNativeMetricOptions(RocksDBNativeMetricOptions nativeMetricOptions) { this.nativeMetricOptions = nativeMetricOptions; return this; } - public RocksDBKeyedStateBackendBuilder<K> setNumberOfTransferingThreads(int numberOfTransferingThreads) { + RocksDBKeyedStateBackendBuilder<K> setNumberOfTransferingThreads(int numberOfTransferingThreads) { this.numberOfTransferingThreads = numberOfTransferingThreads; return this; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index 24627b2..87365a2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -85,9 +85,9 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor private static final Logger LOG = LoggerFactory.getLogger(RocksDBIncrementalRestoreOperation.class); private final String operatorIdentifier; - protected final SortedMap<Long, Set<StateHandleID>> restoredSstFiles; - protected long lastCompletedCheckpointId; - protected UUID backendUID; + private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles; + private long lastCompletedCheckpointId; + private UUID backendUID; public RocksDBIncrementalRestoreOperation( String operatorIdentifier, @@ -156,7 +156,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor /** * Recovery from a single remote incremental state without rescaling. */ - void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) throws Exception { + private void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) throws Exception { // 1. Prepare for restore without rescaling Path temporaryRestoreInstancePath = new Path( instanceBasePath.getAbsolutePath(), @@ -219,13 +219,11 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor } } - public RocksDBIncrementalRestorePrepareResult prepareFiles( + private RocksDBIncrementalRestorePrepareResult prepareFiles( KeyedStateHandle rawStateHandle, Path restoreInstancePath) throws BackendBuildingException { - IncrementalLocalKeyedStateHandle localKeyedStateHandle; - List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; + final IncrementalLocalKeyedStateHandle localKeyedStateHandle; try { - KeyedBackendSerializationProxy<K> serializationProxy; if (rawStateHandle instanceof IncrementalKeyedStateHandle) { IncrementalKeyedStateHandle restoreStateHandle = (IncrementalKeyedStateHandle) rawStateHandle; @@ -238,10 +236,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor cancelStreamRegistry); } - serializationProxy = readMetaData(restoreStateHandle.getMetaStateHandle()); - stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots(); - columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true); - // since we transferred all remote state to a local directory, we can use the same code as for // local recovery. localKeyedStateHandle = new IncrementalLocalKeyedStateHandle( @@ -254,14 +248,17 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor } else if (rawStateHandle instanceof IncrementalLocalKeyedStateHandle) { // Recovery from local incremental state. localKeyedStateHandle = (IncrementalLocalKeyedStateHandle) rawStateHandle; - serializationProxy = readMetaData(localKeyedStateHandle.getMetaDataState()); - stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots(); - columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true); } else { throw new IllegalStateException("Unexpected state handle type, " + "expected " + IncrementalKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class + ", but found " + rawStateHandle.getClass()); } + + KeyedBackendSerializationProxy<K> serializationProxy = readMetaData(localKeyedStateHandle.getMetaDataState()); + List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = serializationProxy.getStateMetaInfoSnapshots(); + columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true); + columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1); + return new RocksDBIncrementalRestorePrepareResult(stateMetaInfoSnapshots, localKeyedStateHandle); } catch (Exception e) { // clean up cleanUpPathQuietly(restoreInstancePath); @@ -270,8 +267,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor LOG.error(errMsg, e); throw new BackendBuildingException(errMsg, e); } - columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1); - return new RocksDBIncrementalRestorePrepareResult(stateMetaInfoSnapshots, localKeyedStateHandle); } /** @@ -279,7 +274,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor * RocksDB instance for a key-groups shard. All contents from the temporary instance are copied into the * real restore instance and then the temporary instance is discarded. */ - void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { + private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { // Init DB instance initDBWithRescaling(restoreStateHandles); // Do restore @@ -476,7 +471,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor * This recreates the new working directory of the recovered RocksDB instance and links/copies the contents from * a local state. */ - public void restoreInstanceDirectoryFromPath(Path source, String instanceRocksDBPath) throws IOException { + private void restoreInstanceDirectoryFromPath(Path source, String instanceRocksDBPath) throws IOException { FileSystem fileSystem = source.getFileSystem(); @@ -512,8 +507,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor inputStream = metaStateHandle.openInputStream(); cancelStreamRegistry.registerCloseable(inputStream); DataInputView in = new DataInputViewStreamWrapper(inputStream); - KeyedBackendSerializationProxy<K> serializationProxy = readMetaData(in); - return serializationProxy; + return readMetaData(in); } finally { if (cancelStreamRegistry.unregisterCloseable(inputStream)) { inputStream.close();
