This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c8860074ffca82bf0314c8065b62901a8a4cabdd
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Mon Dec 10 15:07:47 2018 +0800

    [FLINK-11094] [state backends] State backends no longer need separate map 
for restored StateMetaInfoSnapshots
    
    Since now all restored state meta info snapshots are handled so that we
    always eagerly create the corresponding RegisteredStateMetaInfoBase for
    it, the information is already part of the registered state infos map.
    
    As can be seen in the changes, those maps are no longer queried and can
    therefore be safely removed.
    
    This closes #7264.
---
 .../runtime/state/DefaultOperatorStateBackend.java | 58 +++--------------
 .../runtime/state/heap/HeapKeyedStateBackend.java  | 74 +++++-----------------
 .../streaming/state/RocksDBKeyedStateBackend.java  | 15 -----
 3 files changed, 26 insertions(+), 121 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 4702919..952dffb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -106,22 +105,9 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        private final boolean asynchronousSnapshots;
 
        /**
-        * Map of state names to their corresponding restored state meta info.
-        *
-        * <p>TODO this map can be removed when eager-state registration is in 
place.
-        * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
-        */
-       private final Map<String, StateMetaInfoSnapshot> 
restoredOperatorStateMetaInfos;
-
-       /**
-        * Map of state names to their corresponding restored broadcast state 
meta info.
-        */
-       private final Map<String, StateMetaInfoSnapshot> 
restoredBroadcastStateMetaInfos;
-
-       /**
         * Cache of already accessed states.
         *
-        * <p>In contrast to {@link #registeredOperatorStates} and {@link 
#restoredOperatorStateMetaInfos} which may be repopulated
+        * <p>In contrast to {@link #registeredOperatorStates} which may be 
repopulated
         * with restored state, this map is always empty at the beginning.
         *
         * <p>TODO this map should be moved to a base class once we have proper 
hierarchy for the operator state backends.
@@ -148,8 +134,6 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                this.asynchronousSnapshots = asynchronousSnapshots;
                this.accessedStatesByName = new HashMap<>();
                this.accessedBroadcastStatesByName = new HashMap<>();
-               this.restoredOperatorStateMetaInfos = new HashMap<>();
-               this.restoredBroadcastStateMetaInfos = new HashMap<>();
                this.snapshotStrategy = new 
DefaultOperatorStateBackendSnapshotStrategy();
        }
 
@@ -226,34 +210,22 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                        
broadcastState.getStateMetaInfo().getAssignmentMode(),
                                        OperatorStateHandle.Mode.BROADCAST);
 
-                       final StateMetaInfoSnapshot metaInfoSnapshot = 
restoredBroadcastStateMetaInfos.get(name);
+                       RegisteredBroadcastStateBackendMetaInfo<K, V> 
restoredBroadcastStateMetaInfo = broadcastState.getStateMetaInfo();
 
                        // check whether new serializers are incompatible
-                       TypeSerializerSnapshot<K> keySerializerSnapshot = 
Preconditions.checkNotNull(
-                               (TypeSerializerSnapshot<K>) 
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER));
-
                        TypeSerializerSchemaCompatibility<K> keyCompatibility =
-                               
keySerializerSnapshot.resolveSchemaCompatibility(broadcastStateKeySerializer);
+                               
restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);
                        if (keyCompatibility.isIncompatible()) {
                                throw new StateMigrationException("The new key 
serializer for broadcast state must not be incompatible.");
                        }
 
-                       TypeSerializerSnapshot<V> valueSerializerSnapshot = 
Preconditions.checkNotNull(
-                               (TypeSerializerSnapshot<V>) 
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
-
                        TypeSerializerSchemaCompatibility<V> valueCompatibility 
=
-                               
valueSerializerSnapshot.resolveSchemaCompatibility(broadcastStateValueSerializer);
+                               
restoredBroadcastStateMetaInfo.updateValueSerializer(broadcastStateValueSerializer);
                        if (valueCompatibility.isIncompatible()) {
                                throw new StateMigrationException("The new 
value serializer for broadcast state must not be incompatible.");
                        }
 
-                       // new serializer is compatible; use it to replace the 
old serializer
-                       broadcastState.setStateMetaInfo(
-                                       new 
RegisteredBroadcastStateBackendMetaInfo<>(
-                                                       name,
-                                                       
OperatorStateHandle.Mode.BROADCAST,
-                                                       
broadcastStateKeySerializer,
-                                                       
broadcastStateValueSerializer));
+                       
broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo);
                }
 
                accessedBroadcastStatesByName.put(name, broadcastState);
@@ -345,8 +317,6 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                                        " not be loaded. This 
is a temporary restriction that will be fixed in future versions.");
                                        }
 
-                                       
restoredOperatorStateMetaInfos.put(restoredSnapshot.getName(), 
restoredSnapshot);
-
                                        PartitionableListState<?> listState = 
registeredOperatorStates.get(restoredSnapshot.getName());
 
                                        if (null == listState) {
@@ -381,8 +351,6 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                                                " not be 
loaded. This is a temporary restriction that will be fixed in future 
versions.");
                                        }
 
-                                       
restoredBroadcastStateMetaInfos.put(restoredSnapshot.getName(), 
restoredSnapshot);
-
                                        BackendWritableBroadcastState<? ,?> 
broadcastState = registeredBroadcastStates.get(restoredSnapshot.getName());
 
                                        if (broadcastState == null) {
@@ -590,25 +558,19 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                        
partitionableListState.getStateMetaInfo().getAssignmentMode(),
                                        mode);
 
-                       StateMetaInfoSnapshot restoredSnapshot = 
restoredOperatorStateMetaInfos.get(name);
-                       RegisteredOperatorStateBackendMetaInfo<S> metaInfo =
-                               new 
RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
+                       RegisteredOperatorStateBackendMetaInfo<S> 
restoredPartitionableListStateMetaInfo =
+                               partitionableListState.getStateMetaInfo();
 
-                       // check compatibility to determine if state migration 
is required
+                       // check compatibility to determine if new serializers 
are incompatible
                        TypeSerializer<S> newPartitionStateSerializer = 
partitionStateSerializer.duplicate();
 
-                       @SuppressWarnings("unchecked")
-                       TypeSerializerSnapshot<S> stateSerializerSnapshot = 
Preconditions.checkNotNull(
-                               (TypeSerializerSnapshot<S>) 
restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
-
                        TypeSerializerSchemaCompatibility<S> stateCompatibility 
=
-                               
stateSerializerSnapshot.resolveSchemaCompatibility(newPartitionStateSerializer);
+                               
restoredPartitionableListStateMetaInfo.updatePartitionStateSerializer(newPartitionStateSerializer);
                        if (stateCompatibility.isIncompatible()) {
                                throw new StateMigrationException("The new 
state serializer for operator state must not be incompatible.");
                        }
 
-                       partitionableListState.setStateMetaInfo(
-                               new 
RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, 
mode));
+                       
partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);
                }
 
                accessedStatesByName.put(name, partitionableListState);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index ecad76c..3f8761b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -79,6 +78,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -126,14 +126,6 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> 
registeredPQStates;
 
        /**
-        * Map of state names to their corresponding restored state meta info.
-        *
-        * <p>TODO this map can be removed when eager-state registration is in 
place.
-        * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
-        */
-       private final Map<StateUID, StateMetaInfoSnapshot> 
restoredStateMetaInfo;
-
-       /**
         * The configuration for local recovery.
         */
        private final LocalRecoveryConfig localRecoveryConfig;
@@ -173,7 +165,6 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                this.snapshotStrategy = new 
HeapSnapshotStrategy(synchronicityTrait);
                LOG.info("Initializing heap keyed state backend with stream 
factory.");
-               this.restoredStateMetaInfo = new HashMap<>();
                this.priorityQueueSetFactory = priorityQueueSetFactory;
        }
 
@@ -194,23 +185,9 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        // TODO we implement the simple way of supporting the 
current functionality, mimicking keyed state
                        // because this should be reworked in FLINK-9376 and 
then we should have a common algorithm over
                        // StateMetaInfoSnapshot that avoids this code 
duplication.
-                       StateMetaInfoSnapshot restoredMetaInfoSnapshot =
-                               
restoredStateMetaInfo.get(StateUID.of(stateName, 
StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE));
-
-                       Preconditions.checkState(
-                               restoredMetaInfoSnapshot != null,
-                               "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
-                                       " but its corresponding restored 
snapshot cannot be found.");
-
-                       StateMetaInfoSnapshot.CommonSerializerKeys 
serializerKey =
-                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER;
-
-                       @SuppressWarnings("unchecked")
-                       TypeSerializerSnapshot<T> serializerSnapshot = 
Preconditions.checkNotNull(
-                               (TypeSerializerSnapshot<T>) 
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey));
 
                        TypeSerializerSchemaCompatibility<T> 
compatibilityResult =
-                               
serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer);
+                               
existingState.getMetaInfo().updateElementSerializer(byteOrderedElementSerializer);
 
                        if (compatibilityResult.isIncompatible()) {
                                throw new FlinkRuntimeException(new 
StateMigrationException("For heap backends, the new priority queue serializer 
must not be incompatible."));
@@ -252,57 +229,42 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private <N, V> StateTable<K, N, V> tryRegisterStateTable(
                        TypeSerializer<N> namespaceSerializer,
                        StateDescriptor<?, V> stateDesc,
-                       StateSnapshotTransformer<V> snapshotTransformer) throws 
StateMigrationException {
+                       @Nullable StateSnapshotTransformer<V> 
snapshotTransformer) throws StateMigrationException {
 
                @SuppressWarnings("unchecked")
                StateTable<K, N, V> stateTable = (StateTable<K, N, V>) 
registeredKVStates.get(stateDesc.getName());
 
                TypeSerializer<V> newStateSerializer = 
stateDesc.getSerializer();
-               RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
-                       stateDesc.getType(),
-                       stateDesc.getName(),
-                       namespaceSerializer,
-                       newStateSerializer,
-                       snapshotTransformer);
 
                if (stateTable != null) {
-                       @SuppressWarnings("unchecked")
-                       StateMetaInfoSnapshot restoredMetaInfoSnapshot =
-                               restoredStateMetaInfo.get(
-                                       StateUID.of(stateDesc.getName(), 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE));
-
-                       Preconditions.checkState(
-                               restoredMetaInfoSnapshot != null,
-                               "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
-                                       " but its corresponding restored 
snapshot cannot be found.");
+                       RegisteredKeyValueStateBackendMetaInfo<N, V> 
restoredKvMetaInfo = stateTable.getMetaInfo();
 
-                       @SuppressWarnings("unchecked")
-                       TypeSerializerSnapshot<N> namespaceSerializerSnapshot = 
Preconditions.checkNotNull(
-                               (TypeSerializerSnapshot<N>) 
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-                                       
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()));
+                       
restoredKvMetaInfo.updateSnapshotTransformer(snapshotTransformer);
 
                        TypeSerializerSchemaCompatibility<N> 
namespaceCompatibility =
-                               
namespaceSerializerSnapshot.resolveSchemaCompatibility(namespaceSerializer);
+                               
restoredKvMetaInfo.updateNamespaceSerializer(namespaceSerializer);
                        if (!namespaceCompatibility.isCompatibleAsIs()) {
                                throw new StateMigrationException("For heap 
backends, the new namespace serializer must be compatible.");
                        }
 
-                       @SuppressWarnings("unchecked")
-                       TypeSerializerSnapshot<V> stateSerializerSnapshot = 
Preconditions.checkNotNull(
-                               (TypeSerializerSnapshot<V>) 
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-                                       
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()));
-
-                       
RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(restoredMetaInfoSnapshot,
 stateDesc);
+                       restoredKvMetaInfo.checkStateMetaInfo(stateDesc);
 
                        TypeSerializerSchemaCompatibility<V> stateCompatibility 
=
-                               
stateSerializerSnapshot.resolveSchemaCompatibility(newStateSerializer);
+                               
restoredKvMetaInfo.updateStateSerializer(newStateSerializer);
 
                        if (stateCompatibility.isIncompatible()) {
                                throw new StateMigrationException("For heap 
backends, the new state serializer must not be incompatible.");
                        }
 
-                       stateTable.setMetaInfo(newMetaInfo);
+                       stateTable.setMetaInfo(restoredKvMetaInfo);
                } else {
+                       RegisteredKeyValueStateBackendMetaInfo<N, V> 
newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
+                               stateDesc.getType(),
+                               stateDesc.getName(),
+                               namespaceSerializer,
+                               newStateSerializer,
+                               snapshotTransformer);
+
                        stateTable = 
snapshotStrategy.newStateTable(newMetaInfo);
                        registeredKVStates.put(stateDesc.getName(), stateTable);
                }
@@ -536,10 +498,6 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                Map<Integer, StateMetaInfoSnapshot> kvStatesById) {
 
                for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) 
{
-                       restoredStateMetaInfo.put(
-                               StateUID.of(metaInfoSnapshot.getName(), 
metaInfoSnapshot.getBackendStateType()),
-                               metaInfoSnapshot);
-
                        final StateSnapshotRestore registeredState;
 
                        switch (metaInfoSnapshot.getBackendStateType()) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 065213b..a37f8aa 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -111,7 +111,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -212,14 +211,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         */
        private final LinkedHashMap<String, Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase>> kvStateInformation;
 
-       /**
-        * Map of state names to their corresponding restored state meta info.
-        *
-        * <p>TODO this map can be removed when eager-state registration is in 
place.
-        * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
-        */
-       private final Map<String, StateMetaInfoSnapshot> 
restoredKvStateMetaInfos;
-
        /** Number of bytes required to prefix the key groups. */
        private final int keyGroupPrefixBytes;
 
@@ -296,7 +287,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                this.keyGroupPrefixBytes =
                        
RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(getNumberOfKeyGroups());
                this.kvStateInformation = new LinkedHashMap<>();
-               this.restoredKvStateMetaInfos = new HashMap<>();
 
                this.writeOptions = new WriteOptions().setDisableWAL(true);
 
@@ -424,7 +414,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        IOUtils.closeQuietly(dbOptions);
                        IOUtils.closeQuietly(writeOptions);
                        kvStateInformation.clear();
-                       restoredKvStateMetaInfos.clear();
 
                        cleanInstanceBasePath();
                }
@@ -510,7 +499,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                // clear all meta data
                kvStateInformation.clear();
-               restoredKvStateMetaInfos.clear();
 
                try {
                        RocksDBIncrementalRestoreOperation<K> 
incrementalRestoreOperation = null;
@@ -753,8 +741,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                nameBytes,
                                                
rocksDBKeyedStateBackend.columnOptions);
 
-                                       
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
 restoredMetaInfo);
-
                                        ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
 
                                        // create a meta info for the state on 
restore;
@@ -1166,7 +1152,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        stateBackend.columnOptions);
 
                                
columnFamilyDescriptors.add(columnFamilyDescriptor);
-                               
stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), 
stateMetaInfoSnapshot);
                        }
                        return columnFamilyDescriptors;
                }

Reply via email to