This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 9d87fe1  [hotfix][statebackend] Reduce and simplify code for column 
creation in RocksDB backend
9d87fe1 is described below

commit 9d87fe16c4b34e58d604f425f8e4b37de3c13ad0
Author: azagrebin <azagre...@users.noreply.github.com>
AuthorDate: Wed Mar 6 16:45:33 2019 +0100

    [hotfix][statebackend] Reduce and simplify code for column creation in 
RocksDB backend
    
    This closes #7830.
    
    (cherry picked from commit 953a5ffcbdae4115f7d525f310723cf8770779df)
---
 .../streaming/state/RocksDBKeyedStateBackend.java  | 16 +----
 .../state/RocksDBKeyedStateBackendBuilder.java     | 13 ++--
 .../streaming/state/RocksDBOperationUtils.java     | 82 ++++++++++++----------
 .../state/RocksDBPriorityQueueSetFactory.java      |  5 +-
 .../restore/AbstractRocksDBRestoreOperation.java   | 19 ++---
 .../state/restore/RocksDBFullRestoreOperation.java |  9 +--
 .../RocksDBIncrementalRestoreOperation.java        | 36 +++-------
 .../state/restore/RocksDBNoneRestoreOperation.java |  7 +-
 .../state/ttl/RocksDbTtlCompactFiltersManager.java | 31 ++------
 9 files changed, 78 insertions(+), 140 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index c6d3863..4a6bd3c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -126,9 +126,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        RocksDBKeyedStateBackend<K> backend) throws Exception;
        }
 
-       /** String that identifies the operator that owns this backend. */
-       private final String operatorIdentifier;
-
        /** Factory function to create column family options from state name. */
        private final Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory;
 
@@ -158,9 +155,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /** Number of bytes required to prefix the key groups. */
        private final int keyGroupPrefixBytes;
 
-       /** Thread number used to transfer state files while 
restoring/snapshotting. */
-       private final int numberOfTransferingThreads;
-
        /**
         * We are not using the default column family for Flink state ops, but 
we still need to remember this handle so that
         * we can close it properly when the backend is closed. Note that the 
one returned by {@link RocksDB#open(String)}
@@ -201,7 +195,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private final RocksDbTtlCompactFiltersManager ttlCompactFiltersManager;
 
        public RocksDBKeyedStateBackend(
-               String operatorIdentifier,
                ClassLoader userCodeClassLoader,
                File instanceBasePath,
                DBOptions dbOptions,
@@ -211,7 +204,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                int numberOfKeyGroups,
                KeyGroupRange keyGroupRange,
                ExecutionConfig executionConfig,
-               int numberOfTransferingThreads,
                TtlTimeProvider ttlTimeProvider,
                RocksDB db,
                LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
@@ -233,10 +225,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                this.ttlCompactFiltersManager = ttlCompactFiltersManager;
 
-               this.operatorIdentifier = 
Preconditions.checkNotNull(operatorIdentifier);
-
-               this.numberOfTransferingThreads = numberOfTransferingThreads;
-
                // ensure that we use the right merge operator, because other 
code relies on this
                this.columnFamilyOptionsFactory = 
Preconditions.checkNotNull(columnFamilyOptionsFactory);
 
@@ -500,8 +488,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                stateSerializer,
                                StateSnapshotTransformFactory.noTransform());
 
-                       newRocksStateInfo = 
RocksDBOperationUtils.createStateInfo(newMetaInfo, ttlCompactFiltersManager,
-                               ttlTimeProvider, db, 
columnFamilyOptionsFactory);
+                       newRocksStateInfo = 
RocksDBOperationUtils.createStateInfo(
+                               newMetaInfo, db, columnFamilyOptionsFactory, 
ttlCompactFiltersManager);
                        
RocksDBOperationUtils.registerKvStateInformation(this.kvStateInformation, 
this.nativeMetricMonitor,
                                stateDesc.getName(), newRocksStateInfo);
                }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 3f245d0..aa6845e 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -242,7 +242,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                RocksDB db = null;
                AbstractRocksDBRestoreOperation restoreOperation = null;
                RocksDbTtlCompactFiltersManager ttlCompactFiltersManager =
-                       new 
RocksDbTtlCompactFiltersManager(enableTtlCompactionFilter);
+                       new 
RocksDbTtlCompactFiltersManager(enableTtlCompactionFilter, ttlTimeProvider);
 
                ResourceGuard rocksDBResourceGuard = new ResourceGuard();
                SnapshotStrategy<K> snapshotStrategy;
@@ -324,7 +324,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        }
                }
                return new RocksDBKeyedStateBackend<>(
-                       this.operatorIdentifier,
                        this.userCodeClassLoader,
                        this.instanceBasePath,
                        this.dbOptions,
@@ -334,7 +333,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                        this.numberOfKeyGroups,
                        this.keyGroupRange,
                        this.executionConfig,
-                       this.numberOfTransferingThreads,
                        this.ttlTimeProvider,
                        db,
                        kvStateInformation,
@@ -374,8 +372,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                nativeMetricOptions,
                                metricGroup,
                                restoreStateHandles,
-                               ttlCompactFiltersManager,
-                               ttlTimeProvider);
+                               ttlCompactFiltersManager);
                }
                KeyedStateHandle firstStateHandle = 
restoreStateHandles.iterator().next();
                if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
@@ -395,8 +392,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                nativeMetricOptions,
                                metricGroup,
                                restoreStateHandles,
-                               ttlCompactFiltersManager,
-                               ttlTimeProvider);
+                               ttlCompactFiltersManager);
                } else {
                        return new RocksDBFullRestoreOperation<>(
                                keyGroupRange,
@@ -413,8 +409,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                nativeMetricOptions,
                                metricGroup,
                                restoreStateHandles,
-                               ttlCompactFiltersManager,
-                               ttlTimeProvider);
+                               ttlCompactFiltersManager);
                }
        }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index b183426..1e3fb8f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.configuration.ConfigConstants;
 import 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
-import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
@@ -32,6 +31,8 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -79,38 +80,11 @@ public class RocksDBOperationUtils {
                return dbRef;
        }
 
-       public static ColumnFamilyDescriptor 
createColumnFamilyDescriptor(String stateName, ColumnFamilyOptions 
columnOptions) {
-               byte[] nameBytes = 
stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
-               
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, 
nameBytes),
-                       "The chosen state name 'default' collides with the name 
of the default column family!");
-
-               return new ColumnFamilyDescriptor(nameBytes, columnOptions);
-       }
-
-       public static ColumnFamilyHandle 
createColumnFamily(ColumnFamilyDescriptor columnDescriptor, RocksDB db) {
-               try {
-                       return db.createColumnFamily(columnDescriptor);
-               } catch (RocksDBException e) {
-                       IOUtils.closeQuietly(columnDescriptor.getOptions());
-                       throw new FlinkRuntimeException("Error creating 
ColumnFamilyHandle.", e);
-               }
-       }
-
-       public static ColumnFamilyHandle createColumnFamily(
-               String stateName,
-               Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory,
-               RocksDB db) {
-               ColumnFamilyOptions options = 
createColumnFamilyOptions(columnFamilyOptionsFactory, stateName);
-               return 
createColumnFamily(createColumnFamilyDescriptor(stateName, options), db);
-       }
-
        public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
                return new RocksIteratorWrapper(db.newIterator());
        }
 
-       public static RocksIteratorWrapper getRocksIterator(
-               RocksDB db,
-               ColumnFamilyHandle columnFamilyHandle) {
+       public static RocksIteratorWrapper getRocksIterator(RocksDB db, 
ColumnFamilyHandle columnFamilyHandle) {
                return new 
RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
        }
 
@@ -119,8 +93,8 @@ public class RocksDBOperationUtils {
                RocksDBNativeMetricMonitor nativeMetricMonitor,
                String columnFamilyName,
                RocksDBKeyedStateBackend.RocksDbKvStateInfo registeredColumn) {
-               kvStateInformation.put(columnFamilyName, registeredColumn);
 
+               kvStateInformation.put(columnFamilyName, registeredColumn);
                if (nativeMetricMonitor != null) {
                        
nativeMetricMonitor.registerColumnFamily(columnFamilyName, 
registeredColumn.columnFamilyHandle);
                }
@@ -128,26 +102,58 @@ public class RocksDBOperationUtils {
 
        /**
         * Creates a state info from a new meta info to use with a k/v state.
+        *
+        * <p>Creates the column family for the state.
+        * Sets TTL compaction filter if {@code ttlCompactFiltersManager} is 
not {@code null}.
         */
        public static RocksDBKeyedStateBackend.RocksDbKvStateInfo 
createStateInfo(
                RegisteredStateMetaInfoBase metaInfoBase,
-               RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
-               TtlTimeProvider ttlTimeProvider,
                RocksDB db,
-               Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory) {
-               ColumnFamilyOptions options = 
createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName());
-               
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(ttlTimeProvider, 
metaInfoBase, options);
-               ColumnFamilyDescriptor columnFamilyDescriptor = 
createColumnFamilyDescriptor(metaInfoBase.getName(), options);
+               Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory,
+               @Nullable RocksDbTtlCompactFiltersManager 
ttlCompactFiltersManager) {
+
+               ColumnFamilyDescriptor columnFamilyDescriptor = 
createColumnFamilyDescriptor(
+                       metaInfoBase, columnFamilyOptionsFactory, 
ttlCompactFiltersManager);
                return new 
RocksDBKeyedStateBackend.RocksDbKvStateInfo(createColumnFamily(columnFamilyDescriptor,
 db), metaInfoBase);
        }
 
-       public static ColumnFamilyOptions createColumnFamilyOptions(
+       /**
+        * Creates a column descriptor for sate column family.
+        *
+        * <p>Sets TTL compaction filter if {@code ttlCompactFiltersManager} is 
not {@code null}.
+        */
+       public static ColumnFamilyDescriptor createColumnFamilyDescriptor(
+               RegisteredStateMetaInfoBase metaInfoBase,
                Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory,
-               String stateName) {
+               @Nullable RocksDbTtlCompactFiltersManager 
ttlCompactFiltersManager) {
+
+               ColumnFamilyOptions options = 
createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName());
+               if (ttlCompactFiltersManager != null) {
+                       
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, 
options);
+               }
+               byte[] nameBytes = 
metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
+               
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, 
nameBytes),
+                       "The chosen state name 'default' collides with the name 
of the default column family!");
+
+               return new ColumnFamilyDescriptor(nameBytes, options);
+       }
+
+       public static ColumnFamilyOptions createColumnFamilyOptions(
+               Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory, String stateName) {
+
                // ensure that we use the right merge operator, because other 
code relies on this
                return 
columnFamilyOptionsFactory.apply(stateName).setMergeOperatorName(MERGE_OPERATOR_NAME);
        }
 
+       private static ColumnFamilyHandle 
createColumnFamily(ColumnFamilyDescriptor columnDescriptor, RocksDB db) {
+               try {
+                       return db.createColumnFamily(columnDescriptor);
+               } catch (RocksDBException e) {
+                       IOUtils.closeQuietly(columnDescriptor.getOptions());
+                       throw new FlinkRuntimeException("Error creating 
ColumnFamilyHandle.", e);
+               }
+       }
+
        public static void addColumnFamilyOptionsToCloseLater(
                List<ColumnFamilyOptions> columnFamilyOptions, 
ColumnFamilyHandle columnFamilyHandle) {
                try {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
index 1eb0f47..1cbeebe 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
@@ -145,12 +145,9 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
                if (stateInfo == null) {
                        // Currently this class is for timer service and TTL 
feature is not applicable here,
                        // so no need to register compact filter when creating 
column family
-                       final ColumnFamilyHandle columnFamilyHandle =
-                               
RocksDBOperationUtils.createColumnFamily(stateName, columnFamilyOptionsFactory, 
this.db);
                        RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo 
=
                                new 
RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, 
byteOrderedElementSerializer);
-
-                       stateInfo = new 
RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfo);
+                       stateInfo = 
RocksDBOperationUtils.createStateInfo(metaInfo, db, columnFamilyOptionsFactory, 
null);
                        
RocksDBOperationUtils.registerKvStateInformation(kvStateInformation, 
nativeMetricMonitor, stateName, stateInfo);
                } else {
                        // TODO we implement the simple way of supporting the 
current functionality, mimicking keyed state
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
index 9a29b62..064483c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StateMigrationException;
 
@@ -42,7 +41,6 @@ import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
 
 import javax.annotation.Nonnull;
 
@@ -87,7 +85,6 @@ public abstract class AbstractRocksDBRestoreOperation<K> 
implements RocksDBResto
        // - Full restore
        //   - data ingestion after db open: 
#getOrRegisterStateColumnFamilyHandle before creating column family
        protected final RocksDbTtlCompactFiltersManager 
ttlCompactFiltersManager;
-       protected final TtlTimeProvider ttlTimeProvider;
 
        protected RocksDB db;
        protected ColumnFamilyHandle defaultColumnFamilyHandle;
@@ -109,8 +106,7 @@ public abstract class AbstractRocksDBRestoreOperation<K> 
implements RocksDBResto
                RocksDBNativeMetricOptions nativeMetricOptions,
                MetricGroup metricGroup,
                @Nonnull Collection<KeyedStateHandle> stateHandles,
-               @Nonnull RocksDbTtlCompactFiltersManager 
ttlCompactFiltersManager,
-               TtlTimeProvider ttlTimeProvider) {
+               @Nonnull RocksDbTtlCompactFiltersManager 
ttlCompactFiltersManager) {
                this.keyGroupRange = keyGroupRange;
                this.keyGroupPrefixBytes = keyGroupPrefixBytes;
                this.numberOfTransferringThreads = numberOfTransferringThreads;
@@ -127,12 +123,11 @@ public abstract class AbstractRocksDBRestoreOperation<K> 
implements RocksDBResto
                this.metricGroup = metricGroup;
                this.restoreStateHandles = stateHandles;
                this.ttlCompactFiltersManager = ttlCompactFiltersManager;
-               this.ttlTimeProvider = ttlTimeProvider;
                this.columnFamilyHandles = new ArrayList<>(1);
                this.columnFamilyDescriptors = Collections.emptyList();
        }
 
-       public void openDB() throws IOException {
+       void openDB() throws IOException {
                db = RocksDBOperationUtils.openDB(
                        dbPath,
                        columnFamilyDescriptors,
@@ -150,9 +145,9 @@ public abstract class AbstractRocksDBRestoreOperation<K> 
implements RocksDBResto
                return this.db;
        }
 
-       protected RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle(
+       RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle(
                ColumnFamilyHandle columnFamilyHandle,
-               StateMetaInfoSnapshot stateMetaInfoSnapshot) throws 
RocksDBException {
+               StateMetaInfoSnapshot stateMetaInfoSnapshot) {
 
                RocksDbKvStateInfo registeredStateMetaInfoEntry =
                        kvStateInformation.get(stateMetaInfoSnapshot.getName());
@@ -163,8 +158,8 @@ public abstract class AbstractRocksDBRestoreOperation<K> 
implements RocksDBResto
                        RegisteredStateMetaInfoBase stateMetaInfo =
                                
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
                        if (columnFamilyHandle == null) {
-                               registeredStateMetaInfoEntry = 
RocksDBOperationUtils.createStateInfo(stateMetaInfo,
-                                       ttlCompactFiltersManager, 
ttlTimeProvider, db, columnFamilyOptionsFactory);
+                               registeredStateMetaInfoEntry = 
RocksDBOperationUtils.createStateInfo(
+                                       stateMetaInfo, db, 
columnFamilyOptionsFactory, ttlCompactFiltersManager);
                        } else {
                                registeredStateMetaInfoEntry = new 
RocksDbKvStateInfo(columnFamilyHandle, stateMetaInfo);
                        }
@@ -181,7 +176,7 @@ public abstract class AbstractRocksDBRestoreOperation<K> 
implements RocksDBResto
                return registeredStateMetaInfoEntry;
        }
 
-       protected KeyedBackendSerializationProxy<K> readMetaData(DataInputView 
dataInputView)
+       KeyedBackendSerializationProxy<K> readMetaData(DataInputView 
dataInputView)
                throws IOException, StateMigrationException {
                // isSerializerPresenceRequired flag is set to false, since for 
the RocksDB state backend,
                // deserialization of state happens lazily during runtime; we 
depend on the fact
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
index e9c8fe9..289ebdc 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
@@ -103,8 +102,7 @@ public class RocksDBFullRestoreOperation<K> extends 
AbstractRocksDBRestoreOperat
                RocksDBNativeMetricOptions nativeMetricOptions,
                MetricGroup metricGroup,
                @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
-               @Nonnull RocksDbTtlCompactFiltersManager 
ttlCompactFiltersManager,
-               TtlTimeProvider ttlTimeProvider) {
+               @Nonnull RocksDbTtlCompactFiltersManager 
ttlCompactFiltersManager) {
                super(
                        keyGroupRange,
                        keyGroupPrefixBytes,
@@ -120,8 +118,7 @@ public class RocksDBFullRestoreOperation<K> extends 
AbstractRocksDBRestoreOperat
                        nativeMetricOptions,
                        metricGroup,
                        restoreStateHandles,
-                       ttlCompactFiltersManager,
-                       ttlTimeProvider);
+                       ttlCompactFiltersManager);
        }
 
        /**
@@ -169,7 +166,7 @@ public class RocksDBFullRestoreOperation<K> extends 
AbstractRocksDBRestoreOperat
        /**
         * Restore the KV-state / ColumnFamily meta data for all key-groups 
referenced by the current state handle.
         */
-       private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
+       private void restoreKVStateMetaData() throws IOException, 
StateMigrationException {
                KeyedBackendSerializationProxy<K> serializationProxy = 
readMetaData(currentStateHandleInView);
 
                this.keygroupStreamCompressionDecorator = 
serializationProxy.isUsingKeyGroupCompression() ?
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 3371f4f..9088006 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.contrib.streaming.state.restore;
 
-import org.apache.flink.configuration.ConfigConstants;
 import 
org.apache.flink.contrib.streaming.state.RocksDBIncrementalCheckpointUtils;
 import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
 import 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
@@ -44,11 +43,11 @@ import 
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.IOUtils;
 
 import org.rocksdb.ColumnFamilyDescriptor;
@@ -106,8 +105,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                RocksDBNativeMetricOptions nativeMetricOptions,
                MetricGroup metricGroup,
                @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
-               @Nonnull RocksDbTtlCompactFiltersManager 
ttlCompactFiltersManager,
-               TtlTimeProvider ttlTimeProvider) {
+               @Nonnull RocksDbTtlCompactFiltersManager 
ttlCompactFiltersManager) {
                super(keyGroupRange,
                        keyGroupPrefixBytes,
                        numberOfTransferringThreads,
@@ -122,8 +120,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends 
AbstractRocksDBRestor
                        nativeMetricOptions,
                        metricGroup,
                        restoreStateHandles,
-                       ttlCompactFiltersManager,
-                       ttlTimeProvider);
+                       ttlCompactFiltersManager);
                this.operatorIdentifier = operatorIdentifier;
                this.restoredSstFiles = new TreeMap<>();
                this.lastCompletedCheckpointId = -1L;
@@ -252,19 +249,10 @@ public class RocksDBIncrementalRestoreOperation<K> 
extends AbstractRocksDBRestor
                }
        }
 
-       private void registerColumnFamilyHandles(List<StateMetaInfoSnapshot> 
metaInfoSnapshots)
-               throws BackendBuildingException {
+       private void registerColumnFamilyHandles(List<StateMetaInfoSnapshot> 
metaInfoSnapshots) {
                // Register CF handlers
                for (int i = 0; i < metaInfoSnapshots.size(); ++i) {
-                       try {
-                               getOrRegisterStateColumnFamilyHandle(
-                                       columnFamilyHandles.get(i),
-                                       metaInfoSnapshots.get(i));
-                       } catch (RocksDBException e) {
-                               String errMsg = "Failed to register CF handle.";
-                               LOG.error(errMsg, e);
-                               throw new BackendBuildingException(errMsg, e);
-                       }
+                       
getOrRegisterStateColumnFamilyHandle(columnFamilyHandles.get(i), 
metaInfoSnapshots.get(i));
                }
        }
 
@@ -452,16 +440,10 @@ public class RocksDBIncrementalRestoreOperation<K> 
extends AbstractRocksDBRestor
                        new ArrayList<>(stateMetaInfoSnapshots.size());
 
                for (StateMetaInfoSnapshot stateMetaInfoSnapshot : 
stateMetaInfoSnapshots) {
-                       ColumnFamilyOptions options = 
RocksDBOperationUtils.createColumnFamilyOptions(
-                               columnFamilyOptionsFactory, 
stateMetaInfoSnapshot.getName());
-                       if (registerTtlCompactFilter) {
-                               
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(ttlTimeProvider,
-                                       stateMetaInfoSnapshot, options);
-                       }
-                       ColumnFamilyDescriptor columnFamilyDescriptor = new 
ColumnFamilyDescriptor(
-                               
stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
-                               options);
-
+                       RegisteredStateMetaInfoBase metaInfoBase =
+                               
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
+                       ColumnFamilyDescriptor columnFamilyDescriptor = 
RocksDBOperationUtils.createColumnFamilyDescriptor(
+                               metaInfoBase, columnFamilyOptionsFactory, 
registerTtlCompactFilter ? ttlCompactFiltersManager : null);
                        columnFamilyDescriptors.add(columnFamilyDescriptor);
                }
                return columnFamilyDescriptors;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java
index 873132a..bfc25d6 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java
@@ -26,7 +26,6 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.StateSerializerProvider;
-import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
@@ -57,8 +56,7 @@ public class RocksDBNoneRestoreOperation<K> extends 
AbstractRocksDBRestoreOperat
                RocksDBNativeMetricOptions nativeMetricOptions,
                MetricGroup metricGroup,
                @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
-               @Nonnull RocksDbTtlCompactFiltersManager 
ttlCompactFiltersManager,
-               TtlTimeProvider ttlTimeProvider
+               @Nonnull RocksDbTtlCompactFiltersManager 
ttlCompactFiltersManager
        ) {
                super(keyGroupRange,
                        keyGroupPrefixBytes,
@@ -74,8 +72,7 @@ public class RocksDBNoneRestoreOperation<K> extends 
AbstractRocksDBRestoreOperat
                        nativeMetricOptions,
                        metricGroup,
                        restoreStateHandles,
-                       ttlCompactFiltersManager,
-                       ttlTimeProvider);
+                       ttlCompactFiltersManager);
        }
 
        @Override
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
index ccdb092..affea5b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
@@ -23,12 +23,10 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.runtime.state.ttl.TtlStateFactory;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.state.ttl.TtlUtils;
@@ -57,47 +55,30 @@ public class RocksDbTtlCompactFiltersManager {
        /** Enables RocksDb compaction filter for State with TTL. */
        private final boolean enableTtlCompactionFilter;
 
+       private final TtlTimeProvider ttlTimeProvider;
+
        /** Registered compaction filter factories. */
        private final LinkedHashMap<String, FlinkCompactionFilterFactory> 
compactionFilterFactories;
 
-       public RocksDbTtlCompactFiltersManager(boolean 
enableTtlCompactionFilter) {
+       public RocksDbTtlCompactFiltersManager(boolean 
enableTtlCompactionFilter, TtlTimeProvider ttlTimeProvider) {
                this.enableTtlCompactionFilter = enableTtlCompactionFilter;
+               this.ttlTimeProvider = ttlTimeProvider;
                this.compactionFilterFactories = new LinkedHashMap<>();
        }
 
        public void setAndRegisterCompactFilterIfStateTtl(
-               TtlTimeProvider ttlTimeProvider,
-               @Nonnull StateMetaInfoSnapshot stateMetaInfoSnapshot,
-               @Nonnull ColumnFamilyOptions options) {
-
-               boolean keyValueState = 
stateMetaInfoSnapshot.getBackendStateType() == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE;
-               if (enableTtlCompactionFilter && keyValueState) {
-                       @SuppressWarnings("unchecked")
-                       TypeSerializerSnapshot<?> stateSerializerSnapshot = 
Preconditions.checkNotNull(
-                               (TypeSerializerSnapshot<?>) 
stateMetaInfoSnapshot.getTypeSerializerSnapshot(
-                                       
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
-                       TypeSerializer<?> serializer = 
stateSerializerSnapshot.restoreSerializer();
-                       if 
(TtlStateFactory.TtlSerializer.isTtlStateSerializer(serializer)) {
-                               
createAndSetCompactFilterFactory(stateMetaInfoSnapshot.getName(), 
ttlTimeProvider, options);
-                       }
-               }
-       }
-
-       public void setAndRegisterCompactFilterIfStateTtl(
-               TtlTimeProvider ttlTimeProvider,
                @Nonnull RegisteredStateMetaInfoBase metaInfoBase,
                @Nonnull ColumnFamilyOptions options) {
 
                if (enableTtlCompactionFilter && metaInfoBase instanceof 
RegisteredKeyValueStateBackendMetaInfo) {
                        RegisteredKeyValueStateBackendMetaInfo kvMetaInfoBase = 
(RegisteredKeyValueStateBackendMetaInfo) metaInfoBase;
                        if 
(TtlStateFactory.TtlSerializer.isTtlStateSerializer(kvMetaInfoBase.getStateSerializer()))
 {
-                               
createAndSetCompactFilterFactory(metaInfoBase.getName(), ttlTimeProvider, 
options);
+                               
createAndSetCompactFilterFactory(metaInfoBase.getName(), options);
                        }
                }
        }
 
-       private void createAndSetCompactFilterFactory(
-               String stateName, TtlTimeProvider ttlTimeProvider, @Nonnull 
ColumnFamilyOptions options) {
+       private void createAndSetCompactFilterFactory(String stateName, 
@Nonnull ColumnFamilyOptions options) {
 
                FlinkCompactionFilterFactory compactionFilterFactory =
                        new FlinkCompactionFilterFactory(new 
TimeProviderWrapper(ttlTimeProvider), createRocksDbNativeLogger());

Reply via email to