This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6b9ec27593e934394a759eebe7e767b44bca7ac9
Author: Stefan Richter <[email protected]>
AuthorDate: Tue Feb 26 10:39:23 2019 +0100

    [hotfix] Minor code cleanups in RocksDBKeyedStateBackendBuilder and 
RocksDBIncrementalRestoreOperation
---
 .../state/RocksDBKeyedStateBackendBuilder.java     | 10 +++---
 .../RocksDBIncrementalRestoreOperation.java        | 36 +++++++++-------------
 2 files changed, 20 insertions(+), 26 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index c9a03a2..c5cf501 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -81,7 +81,7 @@ import java.util.function.Function;
 public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBackendBuilder<K> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateBackendBuilder.class);
-       public static final String DB_INSTANCE_DIR_STRING = "db";
+       static final String DB_INSTANCE_DIR_STRING = "db";
 
        /** String that identifies the operator that owns this backend. */
        private final String operatorIdentifier;
@@ -199,22 +199,22 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                this.injectedDefaultColumnFamilyHandle = 
injectedDefaultColumnFamilyHandle;
        }
 
-       public RocksDBKeyedStateBackendBuilder<K> 
setEnableIncrementalCheckpointing(boolean enableIncrementalCheckpointing) {
+       RocksDBKeyedStateBackendBuilder<K> 
setEnableIncrementalCheckpointing(boolean enableIncrementalCheckpointing) {
                this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
                return this;
        }
 
-       public RocksDBKeyedStateBackendBuilder<K> setEnableTtlCompactionFilter 
(boolean enableTtlCompactionFilter) {
+       RocksDBKeyedStateBackendBuilder<K> setEnableTtlCompactionFilter(boolean 
enableTtlCompactionFilter) {
                this.enableTtlCompactionFilter = enableTtlCompactionFilter;
                return this;
        }
 
-       public RocksDBKeyedStateBackendBuilder<K> 
setNativeMetricOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
+       RocksDBKeyedStateBackendBuilder<K> 
setNativeMetricOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
                this.nativeMetricOptions = nativeMetricOptions;
                return this;
        }
 
-       public RocksDBKeyedStateBackendBuilder<K> 
setNumberOfTransferingThreads(int numberOfTransferingThreads) {
+       RocksDBKeyedStateBackendBuilder<K> setNumberOfTransferingThreads(int 
numberOfTransferingThreads) {
                this.numberOfTransferingThreads = numberOfTransferingThreads;
                return this;
        }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 24627b2..87365a2 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -85,9 +85,9 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
        private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBIncrementalRestoreOperation.class);
 
        private final String operatorIdentifier;
-       protected final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
-       protected long lastCompletedCheckpointId;
-       protected UUID backendUID;
+       private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
+       private long lastCompletedCheckpointId;
+       private UUID backendUID;
 
        public RocksDBIncrementalRestoreOperation(
                String operatorIdentifier,
@@ -156,7 +156,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
        /**
         * Recovery from a single remote incremental state without rescaling.
         */
-       void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) throws 
Exception {
+       private void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) 
throws Exception {
                // 1. Prepare for restore without rescaling
                Path temporaryRestoreInstancePath = new Path(
                        instanceBasePath.getAbsolutePath(),
@@ -219,13 +219,11 @@ public class RocksDBIncrementalRestoreOperation<K> 
extends AbstractRocksDBRestor
                }
        }
 
-       public RocksDBIncrementalRestorePrepareResult prepareFiles(
+       private RocksDBIncrementalRestorePrepareResult prepareFiles(
                KeyedStateHandle rawStateHandle,
                Path restoreInstancePath) throws BackendBuildingException {
-               IncrementalLocalKeyedStateHandle localKeyedStateHandle;
-               List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+               final IncrementalLocalKeyedStateHandle localKeyedStateHandle;
                try {
-                       KeyedBackendSerializationProxy<K> serializationProxy;
                        if (rawStateHandle instanceof 
IncrementalKeyedStateHandle) {
                                IncrementalKeyedStateHandle restoreStateHandle 
= (IncrementalKeyedStateHandle) rawStateHandle;
 
@@ -238,10 +236,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                                                cancelStreamRegistry);
                                }
 
-                               serializationProxy = 
readMetaData(restoreStateHandle.getMetaStateHandle());
-                               stateMetaInfoSnapshots = 
serializationProxy.getStateMetaInfoSnapshots();
-                               columnFamilyDescriptors = 
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true);
-
                                // since we transferred all remote state to a 
local directory, we can use the same code as for
                                // local recovery.
                                localKeyedStateHandle = new 
IncrementalLocalKeyedStateHandle(
@@ -254,14 +248,17 @@ public class RocksDBIncrementalRestoreOperation<K> 
extends AbstractRocksDBRestor
                        } else if (rawStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
                                // Recovery from local incremental state.
                                localKeyedStateHandle = 
(IncrementalLocalKeyedStateHandle) rawStateHandle;
-                               serializationProxy = 
readMetaData(localKeyedStateHandle.getMetaDataState());
-                               stateMetaInfoSnapshots = 
serializationProxy.getStateMetaInfoSnapshots();
-                               columnFamilyDescriptors = 
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true);
                        } else {
                                throw new IllegalStateException("Unexpected 
state handle type, " +
                                        "expected " + 
IncrementalKeyedStateHandle.class + " or " + 
IncrementalLocalKeyedStateHandle.class +
                                        ", but found " + 
rawStateHandle.getClass());
                        }
+
+                       KeyedBackendSerializationProxy<K> serializationProxy = 
readMetaData(localKeyedStateHandle.getMetaDataState());
+                       List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = 
serializationProxy.getStateMetaInfoSnapshots();
+                       columnFamilyDescriptors = 
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true);
+                       columnFamilyHandles = new 
ArrayList<>(columnFamilyDescriptors.size() + 1);
+                       return new 
RocksDBIncrementalRestorePrepareResult(stateMetaInfoSnapshots, 
localKeyedStateHandle);
                } catch (Exception e) {
                        // clean up
                        cleanUpPathQuietly(restoreInstancePath);
@@ -270,8 +267,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                        LOG.error(errMsg, e);
                        throw new BackendBuildingException(errMsg, e);
                }
-               columnFamilyHandles = new 
ArrayList<>(columnFamilyDescriptors.size() + 1);
-               return new 
RocksDBIncrementalRestorePrepareResult(stateMetaInfoSnapshots, 
localKeyedStateHandle);
        }
 
        /**
@@ -279,7 +274,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
         * RocksDB instance for a key-groups shard. All contents from the 
temporary instance are copied into the
         * real restore instance and then the temporary instance is discarded.
         */
-       void restoreWithRescaling(Collection<KeyedStateHandle> 
restoreStateHandles) throws Exception {
+       private void restoreWithRescaling(Collection<KeyedStateHandle> 
restoreStateHandles) throws Exception {
                // Init DB instance
                initDBWithRescaling(restoreStateHandles);
                // Do restore
@@ -476,7 +471,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
         * This recreates the new working directory of the recovered RocksDB 
instance and links/copies the contents from
         * a local state.
         */
-       public void restoreInstanceDirectoryFromPath(Path source, String 
instanceRocksDBPath) throws IOException {
+       private void restoreInstanceDirectoryFromPath(Path source, String 
instanceRocksDBPath) throws IOException {
 
                FileSystem fileSystem = source.getFileSystem();
 
@@ -512,8 +507,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                        inputStream = metaStateHandle.openInputStream();
                        cancelStreamRegistry.registerCloseable(inputStream);
                        DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
-                       KeyedBackendSerializationProxy<K> serializationProxy = 
readMetaData(in);
-                       return serializationProxy;
+                       return readMetaData(in);
                } finally {
                        if 
(cancelStreamRegistry.unregisterCloseable(inputStream)) {
                                inputStream.close();

Reply via email to