This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c8860074ffca82bf0314c8065b62901a8a4cabdd Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Mon Dec 10 15:07:47 2018 +0800 [FLINK-11094] [state backends] State backends no longer need separate map for restored StateMetaInfoSnapshots Since now all restored state meta info snapshots are handled so that we always eagerly create the corresponding RegisteredStateMetaInfoBase for it, the information is already part of the registered state infos map. As can be seen in the changes, those maps are no longer queried and can therefore be safely removed. This closes #7264. --- .../runtime/state/DefaultOperatorStateBackend.java | 58 +++-------------- .../runtime/state/heap/HeapKeyedStateBackend.java | 74 +++++----------------- .../streaming/state/RocksDBKeyedStateBackend.java | 15 ----- 3 files changed, 26 insertions(+), 121 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 4702919..952dffb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; @@ -106,22 +105,9 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { private final boolean asynchronousSnapshots; /** - * Map of state names to their corresponding restored state meta info. - * - * <p>TODO this map can be removed when eager-state registration is in place. - * TODO we currently need this cached to check state migration strategies when new serializers are registered. - */ - private final Map<String, StateMetaInfoSnapshot> restoredOperatorStateMetaInfos; - - /** - * Map of state names to their corresponding restored broadcast state meta info. - */ - private final Map<String, StateMetaInfoSnapshot> restoredBroadcastStateMetaInfos; - - /** * Cache of already accessed states. * - * <p>In contrast to {@link #registeredOperatorStates} and {@link #restoredOperatorStateMetaInfos} which may be repopulated + * <p>In contrast to {@link #registeredOperatorStates} which may be repopulated * with restored state, this map is always empty at the beginning. * * <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends. @@ -148,8 +134,6 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { this.asynchronousSnapshots = asynchronousSnapshots; this.accessedStatesByName = new HashMap<>(); this.accessedBroadcastStatesByName = new HashMap<>(); - this.restoredOperatorStateMetaInfos = new HashMap<>(); - this.restoredBroadcastStateMetaInfos = new HashMap<>(); this.snapshotStrategy = new DefaultOperatorStateBackendSnapshotStrategy(); } @@ -226,34 +210,22 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { broadcastState.getStateMetaInfo().getAssignmentMode(), OperatorStateHandle.Mode.BROADCAST); - final StateMetaInfoSnapshot metaInfoSnapshot = restoredBroadcastStateMetaInfos.get(name); + RegisteredBroadcastStateBackendMetaInfo<K, V> restoredBroadcastStateMetaInfo = broadcastState.getStateMetaInfo(); // check whether new serializers are incompatible - TypeSerializerSnapshot<K> keySerializerSnapshot = Preconditions.checkNotNull( - (TypeSerializerSnapshot<K>) metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)); - TypeSerializerSchemaCompatibility<K> keyCompatibility = - keySerializerSnapshot.resolveSchemaCompatibility(broadcastStateKeySerializer); + restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer); if (keyCompatibility.isIncompatible()) { throw new StateMigrationException("The new key serializer for broadcast state must not be incompatible."); } - TypeSerializerSnapshot<V> valueSerializerSnapshot = Preconditions.checkNotNull( - (TypeSerializerSnapshot<V>) metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)); - TypeSerializerSchemaCompatibility<V> valueCompatibility = - valueSerializerSnapshot.resolveSchemaCompatibility(broadcastStateValueSerializer); + restoredBroadcastStateMetaInfo.updateValueSerializer(broadcastStateValueSerializer); if (valueCompatibility.isIncompatible()) { throw new StateMigrationException("The new value serializer for broadcast state must not be incompatible."); } - // new serializer is compatible; use it to replace the old serializer - broadcastState.setStateMetaInfo( - new RegisteredBroadcastStateBackendMetaInfo<>( - name, - OperatorStateHandle.Mode.BROADCAST, - broadcastStateKeySerializer, - broadcastStateValueSerializer)); + broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo); } accessedBroadcastStatesByName.put(name, broadcastState); @@ -345,8 +317,6 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { " not be loaded. This is a temporary restriction that will be fixed in future versions."); } - restoredOperatorStateMetaInfos.put(restoredSnapshot.getName(), restoredSnapshot); - PartitionableListState<?> listState = registeredOperatorStates.get(restoredSnapshot.getName()); if (null == listState) { @@ -381,8 +351,6 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { " not be loaded. This is a temporary restriction that will be fixed in future versions."); } - restoredBroadcastStateMetaInfos.put(restoredSnapshot.getName(), restoredSnapshot); - BackendWritableBroadcastState<? ,?> broadcastState = registeredBroadcastStates.get(restoredSnapshot.getName()); if (broadcastState == null) { @@ -590,25 +558,19 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { partitionableListState.getStateMetaInfo().getAssignmentMode(), mode); - StateMetaInfoSnapshot restoredSnapshot = restoredOperatorStateMetaInfos.get(name); - RegisteredOperatorStateBackendMetaInfo<S> metaInfo = - new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot); + RegisteredOperatorStateBackendMetaInfo<S> restoredPartitionableListStateMetaInfo = + partitionableListState.getStateMetaInfo(); - // check compatibility to determine if state migration is required + // check compatibility to determine if new serializers are incompatible TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate(); - @SuppressWarnings("unchecked") - TypeSerializerSnapshot<S> stateSerializerSnapshot = Preconditions.checkNotNull( - (TypeSerializerSnapshot<S>) restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)); - TypeSerializerSchemaCompatibility<S> stateCompatibility = - stateSerializerSnapshot.resolveSchemaCompatibility(newPartitionStateSerializer); + restoredPartitionableListStateMetaInfo.updatePartitionStateSerializer(newPartitionStateSerializer); if (stateCompatibility.isIncompatible()) { throw new StateMigrationException("The new state serializer for operator state must not be incompatible."); } - partitionableListState.setStateMetaInfo( - new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode)); + partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo); } accessedStatesByName.put(name, partitionableListState); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index ecad76c..3f8761b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -30,7 +30,6 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -79,6 +78,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; @@ -126,14 +126,6 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates; /** - * Map of state names to their corresponding restored state meta info. - * - * <p>TODO this map can be removed when eager-state registration is in place. - * TODO we currently need this cached to check state migration strategies when new serializers are registered. - */ - private final Map<StateUID, StateMetaInfoSnapshot> restoredStateMetaInfo; - - /** * The configuration for local recovery. */ private final LocalRecoveryConfig localRecoveryConfig; @@ -173,7 +165,6 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.snapshotStrategy = new HeapSnapshotStrategy(synchronicityTrait); LOG.info("Initializing heap keyed state backend with stream factory."); - this.restoredStateMetaInfo = new HashMap<>(); this.priorityQueueSetFactory = priorityQueueSetFactory; } @@ -194,23 +185,9 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // TODO we implement the simple way of supporting the current functionality, mimicking keyed state // because this should be reworked in FLINK-9376 and then we should have a common algorithm over // StateMetaInfoSnapshot that avoids this code duplication. - StateMetaInfoSnapshot restoredMetaInfoSnapshot = - restoredStateMetaInfo.get(StateUID.of(stateName, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE)); - - Preconditions.checkState( - restoredMetaInfoSnapshot != null, - "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + - " but its corresponding restored snapshot cannot be found."); - - StateMetaInfoSnapshot.CommonSerializerKeys serializerKey = - StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER; - - @SuppressWarnings("unchecked") - TypeSerializerSnapshot<T> serializerSnapshot = Preconditions.checkNotNull( - (TypeSerializerSnapshot<T>) restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey)); TypeSerializerSchemaCompatibility<T> compatibilityResult = - serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer); + existingState.getMetaInfo().updateElementSerializer(byteOrderedElementSerializer); if (compatibilityResult.isIncompatible()) { throw new FlinkRuntimeException(new StateMigrationException("For heap backends, the new priority queue serializer must not be incompatible.")); @@ -252,57 +229,42 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private <N, V> StateTable<K, N, V> tryRegisterStateTable( TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc, - StateSnapshotTransformer<V> snapshotTransformer) throws StateMigrationException { + @Nullable StateSnapshotTransformer<V> snapshotTransformer) throws StateMigrationException { @SuppressWarnings("unchecked") StateTable<K, N, V> stateTable = (StateTable<K, N, V>) registeredKVStates.get(stateDesc.getName()); TypeSerializer<V> newStateSerializer = stateDesc.getSerializer(); - RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( - stateDesc.getType(), - stateDesc.getName(), - namespaceSerializer, - newStateSerializer, - snapshotTransformer); if (stateTable != null) { - @SuppressWarnings("unchecked") - StateMetaInfoSnapshot restoredMetaInfoSnapshot = - restoredStateMetaInfo.get( - StateUID.of(stateDesc.getName(), StateMetaInfoSnapshot.BackendStateType.KEY_VALUE)); - - Preconditions.checkState( - restoredMetaInfoSnapshot != null, - "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + - " but its corresponding restored snapshot cannot be found."); + RegisteredKeyValueStateBackendMetaInfo<N, V> restoredKvMetaInfo = stateTable.getMetaInfo(); - @SuppressWarnings("unchecked") - TypeSerializerSnapshot<N> namespaceSerializerSnapshot = Preconditions.checkNotNull( - (TypeSerializerSnapshot<N>) restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot( - StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString())); + restoredKvMetaInfo.updateSnapshotTransformer(snapshotTransformer); TypeSerializerSchemaCompatibility<N> namespaceCompatibility = - namespaceSerializerSnapshot.resolveSchemaCompatibility(namespaceSerializer); + restoredKvMetaInfo.updateNamespaceSerializer(namespaceSerializer); if (!namespaceCompatibility.isCompatibleAsIs()) { throw new StateMigrationException("For heap backends, the new namespace serializer must be compatible."); } - @SuppressWarnings("unchecked") - TypeSerializerSnapshot<V> stateSerializerSnapshot = Preconditions.checkNotNull( - (TypeSerializerSnapshot<V>) restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot( - StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString())); - - RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(restoredMetaInfoSnapshot, stateDesc); + restoredKvMetaInfo.checkStateMetaInfo(stateDesc); TypeSerializerSchemaCompatibility<V> stateCompatibility = - stateSerializerSnapshot.resolveSchemaCompatibility(newStateSerializer); + restoredKvMetaInfo.updateStateSerializer(newStateSerializer); if (stateCompatibility.isIncompatible()) { throw new StateMigrationException("For heap backends, the new state serializer must not be incompatible."); } - stateTable.setMetaInfo(newMetaInfo); + stateTable.setMetaInfo(restoredKvMetaInfo); } else { + RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + newStateSerializer, + snapshotTransformer); + stateTable = snapshotStrategy.newStateTable(newMetaInfo); registeredKVStates.put(stateDesc.getName(), stateTable); } @@ -536,10 +498,6 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { Map<Integer, StateMetaInfoSnapshot> kvStatesById) { for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) { - restoredStateMetaInfo.put( - StateUID.of(metaInfoSnapshot.getName(), metaInfoSnapshot.getBackendStateType()), - metaInfoSnapshot); - final StateSnapshotRestore registeredState; switch (metaInfoSnapshot.getBackendStateType()) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 065213b..a37f8aa 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -111,7 +111,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -212,14 +211,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { */ private final LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation; - /** - * Map of state names to their corresponding restored state meta info. - * - * <p>TODO this map can be removed when eager-state registration is in place. - * TODO we currently need this cached to check state migration strategies when new serializers are registered. - */ - private final Map<String, StateMetaInfoSnapshot> restoredKvStateMetaInfos; - /** Number of bytes required to prefix the key groups. */ private final int keyGroupPrefixBytes; @@ -296,7 +287,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.keyGroupPrefixBytes = RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(getNumberOfKeyGroups()); this.kvStateInformation = new LinkedHashMap<>(); - this.restoredKvStateMetaInfos = new HashMap<>(); this.writeOptions = new WriteOptions().setDisableWAL(true); @@ -424,7 +414,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { IOUtils.closeQuietly(dbOptions); IOUtils.closeQuietly(writeOptions); kvStateInformation.clear(); - restoredKvStateMetaInfos.clear(); cleanInstanceBasePath(); } @@ -510,7 +499,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // clear all meta data kvStateInformation.clear(); - restoredKvStateMetaInfos.clear(); try { RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation = null; @@ -753,8 +741,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { nameBytes, rocksDBKeyedStateBackend.columnOptions); - rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); - ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor); // create a meta info for the state on restore; @@ -1166,7 +1152,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { stateBackend.columnOptions); columnFamilyDescriptors.add(columnFamilyDescriptor); - stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot); } return columnFamilyDescriptors; }