[FLINK-4940] Add broadcast state to the OperatorStateBackend.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/484fedd4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/484fedd4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/484fedd4 Branch: refs/heads/master Commit: 484fedd4e649ec3adc4f55f090b71f9d7dbaa961 Parents: 44211e3 Author: kkloudas <[email protected]> Authored: Thu Dec 21 13:51:35 2017 +0100 Committer: kkloudas <[email protected]> Committed: Wed Feb 7 14:07:01 2018 +0100 ---------------------------------------------------------------------- .../kafka/FlinkKafkaConsumerBaseTest.java | 12 + .../flink/api/common/state/BroadcastState.java | 90 ++++++ .../api/common/state/OperatorStateStore.java | 32 ++ .../common/state/ReadOnlyBroadcastState.java | 70 ++++ .../checkpoint/OperatorStateRepartitioner.java | 4 +- .../RoundRobinOperatorStateRepartitioner.java | 80 +++-- .../checkpoint/StateAssignmentOperation.java | 5 +- .../state/AbstractKeyedStateBackend.java | 4 +- .../state/BackendWritableBroadcastState.java | 42 +++ .../state/DefaultOperatorStateBackend.java | 322 ++++++++++++++++--- .../flink/runtime/state/HeapBroadcastState.java | 154 +++++++++ .../OperatorBackendSerializationProxy.java | 73 +++-- ...ckendStateMetaInfoSnapshotReaderWriters.java | 142 +++++++- .../runtime/state/OperatorStateHandle.java | 5 +- ...RegisteredBroadcastBackendStateMetaInfo.java | 230 +++++++++++++ .../checkpoint/CheckpointCoordinatorTest.java | 74 ++++- .../savepoint/CheckpointTestUtils.java | 4 +- .../runtime/state/OperatorStateBackendTest.java | 199 +++++++++++- .../runtime/state/OperatorStateHandleTest.java | 5 +- .../runtime/state/SerializationProxiesTest.java | 109 ++++++- 20 files changed, 1496 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index 41b609e..5040966 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -19,9 +19,11 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; @@ -894,6 +896,11 @@ public class FlinkKafkaConsumerBaseTest { } @Override + public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception { throw new UnsupportedOperationException(); } @@ -902,6 +909,11 @@ public class FlinkKafkaConsumerBaseTest { public Set<String> getRegisteredStateNames() { throw new UnsupportedOperationException(); } + + @Override + public Set<String> getRegisteredBroadcastStateNames() { + throw new UnsupportedOperationException(); + } } private static class MockFunctionInitializationContext implements FunctionInitializationContext { http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java new file mode 100644 index 0000000..0cece41 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Iterator; +import java.util.Map; + +/** + * A type of state that can be created to store the state of a {@code BroadcastStream}. This state assumes that + * <b>the same elements are sent to all instances of an operator.</b> + * + * <p><b>CAUTION:</b> the user has to guarantee that all task instances store the same elements in this type of state. + * + * <p> Each operator instance individually maintains and stores elements in the broadcast state. The fact that the + * incoming stream is a broadcast one guarantees that all instances see all the elements. Upon recovery + * or re-scaling, the same state is given to each of the instances. To avoid hotspots, each task reads its previous + * partition, and if there are more tasks (scale up), then the new instances read from the old instances in a round + * robin fashion. This is why each instance has to guarantee that it stores the same elements as the rest. If not, + * upon recovery or rescaling you may have unpredictable redistribution of the partitions, thus unpredictable results. + * + * @param <K> The key type of the elements in the {@link BroadcastState}. + * @param <V> The value type of the elements in the {@link BroadcastState}. + */ +@PublicEvolving +public interface BroadcastState<K, V> extends ReadOnlyBroadcastState<K, V> { + + /** + * Associates a new value with the given key. + * + * @param key The key of the mapping + * @param value The new value of the mapping + * + * @throws Exception Thrown if the system cannot access the state. + */ + void put(K key, V value) throws Exception; + + /** + * Copies all of the mappings from the given map into the state. + * + * @param map The mappings to be stored in this state + * + * @throws Exception Thrown if the system cannot access the state. + */ + void putAll(Map<K, V> map) throws Exception; + + /** + * Deletes the mapping of the given key. + * + * @param key The key of the mapping + * + * @throws Exception Thrown if the system cannot access the state. + */ + void remove(K key) throws Exception; + + /** + * Iterates over all the mappings in the state. + * + * @return An iterator over all the mappings in the state + * + * @throws Exception Thrown if the system cannot access the state. + */ + Iterator<Map.Entry<K, V>> iterator() throws Exception; + + /** + * Returns all the mappings in the state + * + * @return An iterable view of all the key-value pairs in the state. + * + * @throws Exception Thrown if the system cannot access the state. + */ + Iterable<Map.Entry<K, V>> entries() throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java index bf22041..c2037e0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java @@ -30,6 +30,31 @@ import java.util.Set; public interface OperatorStateStore { /** + * Creates (or restores) a {@link BroadcastState broadcast state}. This type of state can only be created to store + * the state of a {@code BroadcastStream}. Each state is registered under a unique name. + * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore). + * The returned broadcast state has {@code key-value} format. + * + * <p><b>CAUTION: the user has to guarantee that all task instances store the same elements in this type of state.</b> + * + * <p>Each operator instance individually maintains and stores elements in the broadcast state. The fact that the + * incoming stream is a broadcast one guarantees that all instances see all the elements. Upon recovery + * or re-scaling, the same state is given to each of the instances. To avoid hotspots, each task reads its previous + * partition, and if there are more tasks (scale up), then the new instances read from the old instances in a round + * robin fashion. This is why each instance has to guarantee that it stores the same elements as the rest. If not, + * upon recovery or rescaling you may have unpredictable redistribution of the partitions, thus unpredictable results. + * + * @param stateDescriptor The descriptor for this state, providing a name, a serializer for the keys and one for the + * values. + * @param <K> The type of the keys in the broadcast state. + * @param <V> The type of the values in the broadcast state. + * + * @return The {@link BroadcastState Broadcast State}. + * @throws Exception + */ + <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception; + + /** * Creates (or restores) a list state. Each state is registered under a unique name. * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore). * @@ -83,6 +108,13 @@ public interface OperatorStateStore { */ Set<String> getRegisteredStateNames(); + /** + * Returns a set with the names of all currently registered broadcast states. + * + * @return set of names for all registered broadcast states. + */ + Set<String> getRegisteredBroadcastStateNames(); + // ------------------------------------------------------------------------------------------- // Deprecated methods // ------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-core/src/main/java/org/apache/flink/api/common/state/ReadOnlyBroadcastState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReadOnlyBroadcastState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReadOnlyBroadcastState.java new file mode 100644 index 0000000..4d3f2e7 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReadOnlyBroadcastState.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +/** + * A read-only view of the {@link BroadcastState}. + * + * <p>Although read-only, the user code should not modify the value + * returned by the {@link #get(Object)} or the entries of the immutable + * iterator returned by the {@link #immutableEntries()}, as this can lead to + * inconsistent states. The reason for this is that we do not create extra + * copies of the elements for performance reasons. + * + * @param <K> The key type of the elements in the {@link ReadOnlyBroadcastState}. + * @param <V> The value type of the elements in the {@link ReadOnlyBroadcastState}. + */ +@PublicEvolving +public interface ReadOnlyBroadcastState<K, V> extends State { + + /** + * Returns the current value associated with the given key. + * + * <p>The user code must not modify the value returned, as + * this can lead to inconsistent states. + * + * @param key The key of the mapping + * @return The value of the mapping with the given key + * + * @throws Exception Thrown if the system cannot access the state. + */ + V get(K key) throws Exception; + + /** + * Returns whether there exists the given mapping. + * + * @param key The key of the mapping + * @return True if there exists a mapping whose key equals to the given key + * + * @throws Exception Thrown if the system cannot access the state. + */ + boolean contains(K key) throws Exception; + + /** + * Returns an immutable {@link Iterable} over the entries in the state. + * + * <p>The user code must not modify the entries of the returned immutable + * iterator, as this can lead to inconsistent states. + */ + Iterable<Map.Entry<K, V>> immutableEntries() throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java index 98810f1..090f48a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java @@ -31,12 +31,12 @@ public interface OperatorStateRepartitioner { /** * @param previousParallelSubtaskStates List of state handles to the parallel subtask states of an operator, as they * have been checkpointed. - * @param parallelism The parallelism that we consider for the state redistribution. Determines the size of the + * @param newParallelism The parallelism that we consider for the state redistribution. Determines the size of the * returned list. * @return List with one entry per parallel subtask. Each subtask receives now one collection of states that build * of the new total state for this subtask. */ List<Collection<OperatorStateHandle>> repartitionState( List<OperatorStateHandle> previousParallelSubtaskStates, - int parallelism); + int newParallelism); } http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java index 4513ef8..e09b677 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java @@ -42,10 +42,10 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart @Override public List<Collection<OperatorStateHandle>> repartitionState( List<OperatorStateHandle> previousParallelSubtaskStates, - int parallelism) { + int newParallelism) { Preconditions.checkNotNull(previousParallelSubtaskStates); - Preconditions.checkArgument(parallelism > 0); + Preconditions.checkArgument(newParallelism > 0); // Reorganize: group by (State Name -> StreamStateHandle + Offsets) GroupByStateNameResults nameToStateByMode = groupByStateName(previousParallelSubtaskStates); @@ -55,11 +55,11 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart } // Assemble result from all merge maps - List<Collection<OperatorStateHandle>> result = new ArrayList<>(parallelism); + List<Collection<OperatorStateHandle>> result = new ArrayList<>(newParallelism); // Do the actual repartitioning for all named states List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList = - repartition(nameToStateByMode, parallelism); + repartition(nameToStateByMode, newParallelism); for (int i = 0; i < mergeMapList.size(); ++i) { result.add(i, new ArrayList<>(mergeMapList.get(i).values())); @@ -72,8 +72,7 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart * Group by the different named states. */ @SuppressWarnings("unchecked, rawtype") - private GroupByStateNameResults groupByStateName( - List<OperatorStateHandle> previousParallelSubtaskStates) { + private GroupByStateNameResults groupByStateName(List<OperatorStateHandle> previousParallelSubtaskStates) { //Reorganize: group by (State Name -> StreamStateHandle + StateMetaInfo) EnumMap<OperatorStateHandle.Mode, @@ -81,10 +80,7 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart new EnumMap<>(OperatorStateHandle.Mode.class); for (OperatorStateHandle.Mode mode : OperatorStateHandle.Mode.values()) { - Map<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>> map = new HashMap<>(); - nameToStateByMode.put( - mode, - new HashMap<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>>()); + nameToStateByMode.put(mode, new HashMap<>()); } for (OperatorStateHandle psh : previousParallelSubtaskStates) { @@ -120,14 +116,14 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart */ private List<Map<StreamStateHandle, OperatorStateHandle>> repartition( GroupByStateNameResults nameToStateByMode, - int parallelism) { + int newParallelism) { // We will use this to merge w.r.t. StreamStateHandles for each parallel subtask inside the maps - List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList = new ArrayList<>(parallelism); + List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList = new ArrayList<>(newParallelism); // Initialize - for (int i = 0; i < parallelism; ++i) { - mergeMapList.add(new HashMap<StreamStateHandle, OperatorStateHandle>()); + for (int i = 0; i < newParallelism; ++i) { + mergeMapList.add(new HashMap<>()); } // Start with the state handles we distribute round robin by splitting by offsets @@ -150,15 +146,15 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart // Repartition the state across the parallel operator instances int lstIdx = 0; int offsetIdx = 0; - int baseFraction = totalPartitions / parallelism; - int remainder = totalPartitions % parallelism; + int baseFraction = totalPartitions / newParallelism; + int remainder = totalPartitions % newParallelism; int newStartParallelOp = startParallelOp; - for (int i = 0; i < parallelism; ++i) { + for (int i = 0; i < newParallelism; ++i) { // Preparation: calculate the actual index considering wrap around - int parallelOpIdx = (i + startParallelOp) % parallelism; + int parallelOpIdx = (i + startParallelOp) % newParallelism; // Now calculate the number of partitions we will assign to the parallel instance in this round ... int numberOfPartitionsToAssign = baseFraction; @@ -209,10 +205,7 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart Map<StreamStateHandle, OperatorStateHandle> mergeMap = mergeMapList.get(parallelOpIdx); OperatorStateHandle operatorStateHandle = mergeMap.get(handleWithOffsets.f0); if (operatorStateHandle == null) { - operatorStateHandle = new OperatorStateHandle( - new HashMap<String, OperatorStateHandle.StateMetaInfo>(), - handleWithOffsets.f0); - + operatorStateHandle = new OperatorStateHandle(new HashMap<>(), handleWithOffsets.f0); mergeMap.put(handleWithOffsets.f0, operatorStateHandle); } operatorStateHandle.getStateNameToPartitionOffsets().put( @@ -226,30 +219,51 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart // Now we also add the state handles marked for broadcast to all parallel instances Map<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>> broadcastNameToState = - nameToStateByMode.getByMode(OperatorStateHandle.Mode.BROADCAST); + nameToStateByMode.getByMode(OperatorStateHandle.Mode.UNION); - for (int i = 0; i < parallelism; ++i) { + for (int i = 0; i < newParallelism; ++i) { Map<StreamStateHandle, OperatorStateHandle> mergeMap = mergeMapList.get(i); for (Map.Entry<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>> e : broadcastNameToState.entrySet()) { - List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>> current = e.getValue(); - - for (Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo> handleWithMetaInfo : current) { + for (Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo> handleWithMetaInfo : e.getValue()) { OperatorStateHandle operatorStateHandle = mergeMap.get(handleWithMetaInfo.f0); if (operatorStateHandle == null) { - operatorStateHandle = new OperatorStateHandle( - new HashMap<String, OperatorStateHandle.StateMetaInfo>(), - handleWithMetaInfo.f0); - + operatorStateHandle = new OperatorStateHandle(new HashMap<>(), handleWithMetaInfo.f0); mergeMap.put(handleWithMetaInfo.f0, operatorStateHandle); } operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(), handleWithMetaInfo.f1); } } } + + // Now we also add the state handles marked for uniform broadcast to all parallel instances + Map<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>> uniformBroadcastNameToState = + nameToStateByMode.getByMode(OperatorStateHandle.Mode.BROADCAST); + + for (int i = 0; i < newParallelism; ++i) { + + final Map<StreamStateHandle, OperatorStateHandle> mergeMap = mergeMapList.get(i); + + // for each name, pick the i-th entry + for (Map.Entry<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>> e : + uniformBroadcastNameToState.entrySet()) { + + int oldParallelism = e.getValue().size(); + + Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo> handleWithMetaInfo = + e.getValue().get(i % oldParallelism); + + OperatorStateHandle operatorStateHandle = mergeMap.get(handleWithMetaInfo.f0); + if (operatorStateHandle == null) { + operatorStateHandle = new OperatorStateHandle(new HashMap<>(), handleWithMetaInfo.f0); + mergeMap.put(handleWithMetaInfo.f0, operatorStateHandle); + } + operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(), handleWithMetaInfo.f1); + } + } return mergeMapList; } @@ -257,7 +271,7 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart private final EnumMap<OperatorStateHandle.Mode, Map<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>>> byMode; - public GroupByStateNameResults( + GroupByStateNameResults( EnumMap<OperatorStateHandle.Mode, Map<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>>> byMode) { this.byMode = Preconditions.checkNotNull(byMode); @@ -268,4 +282,4 @@ public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepart return byMode.get(mode); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index e108bad..43a4b01 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -618,11 +618,10 @@ public class StateAssignmentOperation { Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsets = operatorStateHandle.getStateNameToPartitionOffsets(); - for (OperatorStateHandle.StateMetaInfo metaInfo : partitionOffsets.values()) { // if we find any broadcast state, we cannot take the shortcut and need to go through repartitioning - if (OperatorStateHandle.Mode.BROADCAST.equals(metaInfo.getDistributionMode())) { + if (OperatorStateHandle.Mode.UNION.equals(metaInfo.getDistributionMode())) { return opStateRepartitioner.repartitionState( chainOpParallelStates, newParallelism); @@ -639,7 +638,7 @@ public class StateAssignmentOperation { /** * Determine the subset of {@link KeyGroupsStateHandle KeyGroupsStateHandles} with correct * key group index for the given subtask {@link KeyGroupRange}. - * <p> + * * <p>This is publicly visible to be used in tests. */ public static List<KeyedStateHandle> getKeyedStateHandles( http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 0ffde7a..cc53c0c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -98,9 +98,7 @@ public abstract class AbstractKeyedStateBackend<K> private final ExecutionConfig executionConfig; - /** - * Decorates the input and output streams to write key-groups compressed. - */ + /** Decorates the input and output streams to write key-groups compressed. */ protected final StreamCompressionDecorator keyGroupCompressionDecorator; public AbstractKeyedStateBackend( http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java new file mode 100644 index 0000000..8daf07c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.core.fs.FSDataOutputStream; + +import java.io.IOException; + +/** + * An interface with methods related to the interplay between the {@link BroadcastState Broadcast State} and + * the {@link OperatorStateBackend}. + * + * @param <K> The key type of the elements in the {@link BroadcastState Broadcast State}. + * @param <V> The value type of the elements in the {@link BroadcastState Broadcast State}. + */ +public interface BackendWritableBroadcastState<K, V> extends BroadcastState<K, V> { + + BackendWritableBroadcastState<K, V> deepCopy(); + + long write(FSDataOutputStream out) throws IOException; + + void setStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo); + + RegisteredBroadcastBackendStateMetaInfo<K, V> getStateMetaInfo(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index e6d6dc6..f486643 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -22,6 +22,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -69,7 +71,12 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { /** * Map for all registered operator states. Maps state name -> state */ - private final Map<String, PartitionableListState<?>> registeredStates; + private final Map<String, PartitionableListState<?>> registeredOperatorStates; + + /** + * Map for all registered operator broadcast states. Maps state name -> state + */ + private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates; /** * CloseableRegistry to participate in the tasks lifecycle. @@ -102,12 +109,17 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { * <p>TODO this map can be removed when eager-state registration is in place. * TODO we currently need this cached to check state migration strategies when new serializers are registered. */ - private final Map<String, RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredStateMetaInfos; + private final Map<String, RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredOperatorStateMetaInfos; + + /** + * Map of state names to their corresponding restored broadcast state meta info. + */ + private final Map<String, RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> restoredBroadcastStateMetaInfos; /** * Cache of already accessed states. * - * <p>In contrast to {@link #registeredStates} and {@link #restoredStateMetaInfos} which may be repopulated + * <p>In contrast to {@link #registeredOperatorStates} and {@link #restoredOperatorStateMetaInfos} which may be repopulated * with restored state, this map is always empty at the beginning. * * <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends. @@ -116,6 +128,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { */ private final HashMap<String, PartitionableListState<?>> accessedStatesByName; + private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName; + public DefaultOperatorStateBackend( ClassLoader userClassLoader, ExecutionConfig executionConfig, @@ -125,10 +139,13 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { this.userClassloader = Preconditions.checkNotNull(userClassLoader); this.executionConfig = executionConfig; this.javaSerializer = new JavaSerializer<>(); - this.registeredStates = new HashMap<>(); + this.registeredOperatorStates = new HashMap<>(); + this.registeredBroadcastStates = new HashMap<>(); this.asynchronousSnapshots = asynchronousSnapshots; this.accessedStatesByName = new HashMap<>(); - this.restoredStateMetaInfos = new HashMap<>(); + this.accessedBroadcastStatesByName = new HashMap<>(); + this.restoredOperatorStateMetaInfos = new HashMap<>(); + this.restoredBroadcastStateMetaInfos = new HashMap<>(); } public ExecutionConfig getExecutionConfig() { @@ -137,7 +154,12 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { @Override public Set<String> getRegisteredStateNames() { - return registeredStates.keySet(); + return registeredOperatorStates.keySet(); + } + + @Override + public Set<String> getRegisteredBroadcastStateNames() { + return registeredBroadcastStates.keySet(); } @Override @@ -148,7 +170,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { @Override public void dispose() { IOUtils.closeQuietly(closeStreamOnCancelRegistry); - registeredStates.clear(); + registeredOperatorStates.clear(); + registeredBroadcastStates.clear(); } // ------------------------------------------------------------------------------------------- @@ -156,13 +179,94 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { // ------------------------------------------------------------------------------------------- @Override + public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException { + + Preconditions.checkNotNull(stateDescriptor); + String name = Preconditions.checkNotNull(stateDescriptor.getName()); + + @SuppressWarnings("unchecked") + BackendWritableBroadcastState<K, V> previous = (BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(name); + if (previous != null) { + checkStateNameAndMode( + previous.getStateMetaInfo().getName(), + name, + previous.getStateMetaInfo().getAssignmentMode(), + OperatorStateHandle.Mode.BROADCAST); + return previous; + } + + stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig()); + TypeSerializer<K> broadcastStateKeySerializer = Preconditions.checkNotNull(stateDescriptor.getKeySerializer()); + TypeSerializer<V> broadcastStateValueSerializer = Preconditions.checkNotNull(stateDescriptor.getValueSerializer()); + + BackendWritableBroadcastState<K, V> broadcastState = (BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name); + + if (broadcastState == null) { + broadcastState = new HeapBroadcastState<>( + new RegisteredBroadcastBackendStateMetaInfo<>( + name, + OperatorStateHandle.Mode.BROADCAST, + broadcastStateKeySerializer, + broadcastStateValueSerializer)); + registeredBroadcastStates.put(name, broadcastState); + } else { + // has restored state; check compatibility of new state access + + checkStateNameAndMode( + broadcastState.getStateMetaInfo().getName(), + name, + broadcastState.getStateMetaInfo().getAssignmentMode(), + OperatorStateHandle.Mode.BROADCAST); + + @SuppressWarnings("unchecked") + RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> restoredMetaInfo = + (RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V>) restoredBroadcastStateMetaInfos.get(name); + + // check compatibility to determine if state migration is required + CompatibilityResult<K> keyCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfo.getKeySerializer(), + UnloadableDummyTypeSerializer.class, + restoredMetaInfo.getKeySerializerConfigSnapshot(), + broadcastStateKeySerializer); + + CompatibilityResult<V> valueCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfo.getValueSerializer(), + UnloadableDummyTypeSerializer.class, + restoredMetaInfo.getValueSerializerConfigSnapshot(), + broadcastStateValueSerializer); + + if (!keyCompatibility.isRequiresMigration() && !valueCompatibility.isRequiresMigration()) { + // new serializer is compatible; use it to replace the old serializer + broadcastState.setStateMetaInfo( + new RegisteredBroadcastBackendStateMetaInfo<>( + name, + OperatorStateHandle.Mode.BROADCAST, + broadcastStateKeySerializer, + broadcastStateValueSerializer)); + } else { + // TODO state migration currently isn't possible. + + // NOTE: for heap backends, it is actually fine to proceed here without failing the restore, + // since the state has already been deserialized to objects and we can just continue with + // the new serializer; we're deliberately failing here for now to have equal functionality with + // the RocksDB backend to avoid confusion for users. + + throw new StateMigrationException("State migration isn't supported, yet."); + } + } + + accessedBroadcastStatesByName.put(name, broadcastState); + return broadcastState; + } + + @Override public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception { return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE); } @Override public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception { - return getListState(stateDescriptor, OperatorStateHandle.Mode.BROADCAST); + return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION); } // ------------------------------------------------------------------------------------------- @@ -203,23 +307,39 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { final long syncStartTime = System.currentTimeMillis(); - if (registeredStates.isEmpty()) { + if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) { return DoneFuture.nullValue(); } - final Map<String, PartitionableListState<?>> registeredStatesDeepCopies = - new HashMap<>(registeredStates.size()); + final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies = + new HashMap<>(registeredOperatorStates.size()); + final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies = + new HashMap<>(registeredBroadcastStates.size()); - // eagerly create deep copies of the list states in the sync phase, so that we can use them in the async writing ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(userClassloader); try { - for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) { - PartitionableListState<?> listState = entry.getValue(); - if (null != listState) { - listState = listState.deepCopy(); + // eagerly create deep copies of the list and the broadcast states (if any) + // in the synchronous phase, so that we can use them in the async writing. + + if (!registeredOperatorStates.isEmpty()) { + for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) { + PartitionableListState<?> listState = entry.getValue(); + if (null != listState) { + listState = listState.deepCopy(); + } + registeredOperatorStatesDeepCopies.put(entry.getKey(), listState); + } + } + + if (!registeredBroadcastStates.isEmpty()) { + for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) { + BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue(); + if (null != broadcastState) { + broadcastState = broadcastState.deepCopy(); + } + registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState); } - registeredStatesDeepCopies.put(entry.getKey(), listState); } } finally { Thread.currentThread().setContextClassLoader(snapshotClassLoader); @@ -263,25 +383,38 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out; - final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = - new HashMap<>(registeredStatesDeepCopies.size()); + // get the registered operator state infos ... + List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> operatorMetaInfoSnapshots = + new ArrayList<>(registeredOperatorStatesDeepCopies.size()); + + for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStatesDeepCopies.entrySet()) { + operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot()); + } - List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> metaInfoSnapshots = - new ArrayList<>(registeredStatesDeepCopies.size()); + // ... get the registered broadcast operator state infos ... + List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> broadcastMetaInfoSnapshots = + new ArrayList<>(registeredBroadcastStatesDeepCopies.size()); - for (Map.Entry<String, PartitionableListState<?>> entry : registeredStatesDeepCopies.entrySet()) { - metaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot()); + for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStatesDeepCopies.entrySet()) { + broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot()); } + // ... write them all in the checkpoint stream ... DataOutputView dov = new DataOutputViewStreamWrapper(localOut); OperatorBackendSerializationProxy backendSerializationProxy = - new OperatorBackendSerializationProxy(metaInfoSnapshots); + new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots); backendSerializationProxy.write(dov); + // ... and then go for the states ... + + // we put BOTH normal and broadcast state metadata here + final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = + new HashMap<>(registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size()); + for (Map.Entry<String, PartitionableListState<?>> entry : - registeredStatesDeepCopies.entrySet()) { + registeredOperatorStatesDeepCopies.entrySet()) { PartitionableListState<?> value = entry.getValue(); long[] partitionOffsets = value.write(localOut); @@ -291,6 +424,19 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); } + // ... and the broadcast states themselves ... + for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : + registeredBroadcastStatesDeepCopies.entrySet()) { + + BackendWritableBroadcastState<?, ?> value = entry.getValue(); + long[] partitionOffsets = {value.write(localOut)}; + OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode(); + writtenStatesMetaData.put( + entry.getKey(), + new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); + } + + // ... and, finally, create the state handle. OperatorStateHandle retValue = null; if (closeStreamOnCancelRegistry.unregisterCloseable(out)) { @@ -348,11 +494,11 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { backendSerializationProxy.read(new DataInputViewStreamWrapper(in)); - List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredMetaInfoSnapshots = - backendSerializationProxy.getStateMetaInfoSnapshots(); + List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredOperatorMetaInfoSnapshots = + backendSerializationProxy.getOperatorStateMetaInfoSnapshots(); // Recreate all PartitionableListStates from the meta info - for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : restoredMetaInfoSnapshots) { + for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : restoredOperatorMetaInfoSnapshots) { if (restoredMetaInfo.getPartitionStateSerializer() == null || restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) { @@ -368,9 +514,9 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { " not be loaded. This is a temporary restriction that will be fixed in future versions."); } - restoredStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); + restoredOperatorStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); - PartitionableListState<?> listState = registeredStates.get(restoredMetaInfo.getName()); + PartitionableListState<?> listState = registeredOperatorStates.get(restoredMetaInfo.getName()); if (null == listState) { listState = new PartitionableListState<>( @@ -379,22 +525,66 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { restoredMetaInfo.getPartitionStateSerializer(), restoredMetaInfo.getAssignmentMode())); - registeredStates.put(listState.getStateMetaInfo().getName(), listState); + registeredOperatorStates.put(listState.getStateMetaInfo().getName(), listState); + } else { + // TODO with eager state registration in place, check here for serializer migration strategies + } + } + + // ... and then get back the broadcast state. + List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> restoredBroadcastMetaInfoSnapshots = + backendSerializationProxy.getBroadcastStateMetaInfoSnapshots(); + + for (RegisteredBroadcastBackendStateMetaInfo.Snapshot<? ,?> restoredMetaInfo : restoredBroadcastMetaInfoSnapshots) { + + if (restoredMetaInfo.getKeySerializer() == null || restoredMetaInfo.getValueSerializer() == null || + restoredMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer || + restoredMetaInfo.getValueSerializer() instanceof UnloadableDummyTypeSerializer) { + + // must fail now if the previous serializer cannot be restored because there is no serializer + // capable of reading previous state + // TODO when eager state registration is in place, we can try to get a convert deserializer + // TODO from the newly registered serializer instead of simply failing here + + throw new IOException("Unable to restore broadcast state [" + restoredMetaInfo.getName() + "]." + + " The previous key and value serializers of the state must be present; the serializers could" + + " have been removed from the classpath, or their implementations have changed and could" + + " not be loaded. This is a temporary restriction that will be fixed in future versions."); + } + + restoredBroadcastStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); + + BackendWritableBroadcastState<? ,?> broadcastState = registeredBroadcastStates.get(restoredMetaInfo.getName()); + + if (broadcastState == null) { + broadcastState = new HeapBroadcastState<>( + new RegisteredBroadcastBackendStateMetaInfo<>( + restoredMetaInfo.getName(), + restoredMetaInfo.getAssignmentMode(), + restoredMetaInfo.getKeySerializer(), + restoredMetaInfo.getValueSerializer())); + + registeredBroadcastStates.put(broadcastState.getStateMetaInfo().getName(), broadcastState); } else { // TODO with eager state registration in place, check here for serializer migration strategies } } - // Restore all the state in PartitionableListStates + // Restore all the states for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets : stateHandle.getStateNameToPartitionOffsets().entrySet()) { - PartitionableListState<?> stateListForName = registeredStates.get(nameToOffsets.getKey()); - - Preconditions.checkState(null != stateListForName, "Found state without " + - "corresponding meta info: " + nameToOffsets.getKey()); + final String stateName = nameToOffsets.getKey(); - deserializeStateValues(stateListForName, in, nameToOffsets.getValue()); + PartitionableListState<?> listStateForName = registeredOperatorStates.get(stateName); + if (listStateForName == null) { + BackendWritableBroadcastState<?, ?> broadcastStateForName = registeredBroadcastStates.get(stateName); + Preconditions.checkState(broadcastStateForName != null, "Found state without " + + "corresponding meta info: " + stateName); + deserializeBroadcastStateValues(broadcastStateForName, in, nameToOffsets.getValue()); + } else { + deserializeOperatorStateValues(listStateForName, in, nameToOffsets.getValue()); + } } } finally { @@ -428,7 +618,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { */ private final ArrayListSerializer<S> internalListCopySerializer; - public PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) { + PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) { this(stateMetaInfo, new ArrayList<S>()); } @@ -513,7 +703,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { private <S> ListState<S> getListState( ListStateDescriptor<S> stateDescriptor, - OperatorStateHandle.Mode mode) throws IOException, StateMigrationException { + OperatorStateHandle.Mode mode) throws StateMigrationException { Preconditions.checkNotNull(stateDescriptor); String name = Preconditions.checkNotNull(stateDescriptor.getName()); @@ -521,7 +711,11 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { @SuppressWarnings("unchecked") PartitionableListState<S> previous = (PartitionableListState<S>) accessedStatesByName.get(name); if (previous != null) { - checkStateNameAndMode(previous.getStateMetaInfo(), name, mode); + checkStateNameAndMode( + previous.getStateMetaInfo().getName(), + name, + previous.getStateMetaInfo().getAssignmentMode(), + mode); return previous; } @@ -533,7 +727,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer()); @SuppressWarnings("unchecked") - PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredStates.get(name); + PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredOperatorStates.get(name); if (null == partitionableListState) { // no restored state for the state name; simply create new state holder @@ -544,15 +738,19 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { partitionStateSerializer, mode)); - registeredStates.put(name, partitionableListState); + registeredOperatorStates.put(name, partitionableListState); } else { // has restored state; check compatibility of new state access - checkStateNameAndMode(partitionableListState.getStateMetaInfo(), name, mode); + checkStateNameAndMode( + partitionableListState.getStateMetaInfo().getName(), + name, + partitionableListState.getStateMetaInfo().getAssignmentMode(), + mode); @SuppressWarnings("unchecked") RegisteredOperatorBackendStateMetaInfo.Snapshot<S> restoredMetaInfo = - (RegisteredOperatorBackendStateMetaInfo.Snapshot<S>) restoredStateMetaInfos.get(name); + (RegisteredOperatorBackendStateMetaInfo.Snapshot<S>) restoredOperatorStateMetaInfos.get(name); // check compatibility to determine if state migration is required CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( @@ -581,7 +779,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { return partitionableListState; } - private static <S> void deserializeStateValues( + private static <S> void deserializeOperatorStateValues( PartitionableListState<S> stateListForName, FSDataInputStream in, OperatorStateHandle.StateMetaInfo metaInfo) throws IOException { @@ -599,21 +797,45 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } } + private static <K, V> void deserializeBroadcastStateValues( + final BackendWritableBroadcastState<K, V> broadcastStateForName, + final FSDataInputStream in, + final OperatorStateHandle.StateMetaInfo metaInfo) throws Exception { + + if (metaInfo != null) { + long[] offsets = metaInfo.getOffsets(); + if (offsets != null) { + + TypeSerializer<K> keySerializer = broadcastStateForName.getStateMetaInfo().getKeySerializer(); + TypeSerializer<V> valueSerializer = broadcastStateForName.getStateMetaInfo().getValueSerializer(); + + in.seek(offsets[0]); + + DataInputView div = new DataInputViewStreamWrapper(in); + int size = div.readInt(); + for (int i = 0; i < size; i++) { + broadcastStateForName.put(keySerializer.deserialize(div), valueSerializer.deserialize(div)); + } + } + } + } + private static void checkStateNameAndMode( - RegisteredOperatorBackendStateMetaInfo previousMetaInfo, + String actualName, String expectedName, + OperatorStateHandle.Mode actualMode, OperatorStateHandle.Mode expectedMode) { Preconditions.checkState( - previousMetaInfo.getName().equals(expectedName), + actualName.equals(expectedName), "Incompatible state names. " + - "Was [" + previousMetaInfo.getName() + "], " + + "Was [" + actualName + "], " + "registered with [" + expectedName + "]."); Preconditions.checkState( - previousMetaInfo.getAssignmentMode().equals(expectedMode), + actualMode.equals(expectedMode), "Incompatible state assignment modes. " + - "Was [" + previousMetaInfo.getAssignmentMode() + "], " + + "Was [" + actualMode + "], " + "registered with [" + expectedMode + "]."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java new file mode 100644 index 0000000..42e68f3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * A {@link BroadcastState Broadcast State} backed a heap-based {@link Map}. + * + * @param <K> The key type of the elements in the {@link BroadcastState Broadcast State}. + * @param <V> The value type of the elements in the {@link BroadcastState Broadcast State}. + */ +public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K, V> { + + /** + * Meta information of the state, including state name, assignment mode, and serializer. + */ + private RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo; + + /** + * The internal map the holds the elements of the state. + */ + private final Map<K, V> backingMap; + + /** + * A serializer that allows to perform deep copies of internal map state. + */ + private final MapSerializer<K, V> internalMapCopySerializer; + + HeapBroadcastState(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo) { + this(stateMetaInfo, new HashMap<>()); + } + + private HeapBroadcastState(final RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo, final Map<K, V> internalMap) { + + this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo); + this.backingMap = Preconditions.checkNotNull(internalMap); + this.internalMapCopySerializer = new MapSerializer<>(stateMetaInfo.getKeySerializer(), stateMetaInfo.getValueSerializer()); + } + + private HeapBroadcastState(HeapBroadcastState<K, V> toCopy) { + this(toCopy.stateMetaInfo, toCopy.internalMapCopySerializer.copy(toCopy.backingMap)); + } + + @Override + public void setStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo) { + this.stateMetaInfo = stateMetaInfo; + } + + @Override + public RegisteredBroadcastBackendStateMetaInfo<K, V> getStateMetaInfo() { + return stateMetaInfo; + } + + @Override + public HeapBroadcastState<K, V> deepCopy() { + return new HeapBroadcastState<>(this); + } + + @Override + public void clear() { + backingMap.clear(); + } + + @Override + public String toString() { + return "HeapBroadcastState{" + + "stateMetaInfo=" + stateMetaInfo + + ", backingMap=" + backingMap + + ", internalMapCopySerializer=" + internalMapCopySerializer + + '}'; + } + + @Override + public long write(FSDataOutputStream out) throws IOException { + long partitionOffset = out.getPos(); + + DataOutputView dov = new DataOutputViewStreamWrapper(out); + dov.writeInt(backingMap.size()); + for (Map.Entry<K, V> entry: backingMap.entrySet()) { + getStateMetaInfo().getKeySerializer().serialize(entry.getKey(), dov); + getStateMetaInfo().getValueSerializer().serialize(entry.getValue(), dov); + } + + return partitionOffset; + } + + @Override + public V get(K key) { + return backingMap.get(key); + } + + @Override + public void put(K key, V value) { + backingMap.put(key, value); + } + + @Override + public void putAll(Map<K, V> map) { + backingMap.putAll(map); + } + + @Override + public void remove(K key) { + backingMap.remove(key); + } + + @Override + public boolean contains(K key) { + return backingMap.containsKey(key); + } + + @Override + public Iterator<Map.Entry<K, V>> iterator() { + return backingMap.entrySet().iterator(); + } + + @Override + public Iterable<Map.Entry<K, V>> entries() { + return backingMap.entrySet(); + } + + @Override + public Iterable<Map.Entry<K, V>> immutableEntries() { + return Collections.unmodifiableSet(backingMap.entrySet()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java index 074d84e..e73f83a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java @@ -33,9 +33,10 @@ import java.util.List; */ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritable { - public static final int VERSION = 2; + public static final int VERSION = 3; - private List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> stateMetaInfoSnapshots; + private List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> operatorStateMetaInfoSnapshots; + private List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> broadcastStateMetaInfoSnapshots; private ClassLoader userCodeClassLoader; public OperatorBackendSerializationProxy(ClassLoader userCodeClassLoader) { @@ -43,10 +44,15 @@ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritab } public OperatorBackendSerializationProxy( - List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> stateMetaInfoSnapshots) { - - this.stateMetaInfoSnapshots = Preconditions.checkNotNull(stateMetaInfoSnapshots); - Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= Short.MAX_VALUE); + List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> operatorStateMetaInfoSnapshots, + List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> broadcastStateMetaInfoSnapshots) { + + this.operatorStateMetaInfoSnapshots = Preconditions.checkNotNull(operatorStateMetaInfoSnapshots); + this.broadcastStateMetaInfoSnapshots = Preconditions.checkNotNull(broadcastStateMetaInfoSnapshots); + Preconditions.checkArgument( + operatorStateMetaInfoSnapshots.size() <= Short.MAX_VALUE && + broadcastStateMetaInfoSnapshots.size() <= Short.MAX_VALUE + ); } @Override @@ -56,19 +62,26 @@ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritab @Override public int[] getCompatibleVersions() { - // we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x) - return new int[] {VERSION, 1}; + // we are compatible with version 3 (Flink 1.5.x), 2 (Flink 1.4.x, Flink 1.3.x) and version 1 (Flink 1.2.x) + return new int[] {VERSION, 2, 1}; } @Override public void write(DataOutputView out) throws IOException { super.write(out); - out.writeShort(stateMetaInfoSnapshots.size()); - for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> kvState : stateMetaInfoSnapshots) { + out.writeShort(operatorStateMetaInfoSnapshots.size()); + for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> state : operatorStateMetaInfoSnapshots) { + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getOperatorStateWriterForVersion(VERSION, state) + .writeOperatorStateMetaInfo(out); + } + + out.writeShort(broadcastStateMetaInfoSnapshots.size()); + for (RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?> state : broadcastStateMetaInfoSnapshots) { OperatorBackendStateMetaInfoSnapshotReaderWriters - .getWriterForVersion(VERSION, kvState) - .writeStateMetaInfo(out); + .getBroadcastStateWriterForVersion(VERSION, state) + .writeBroadcastStateMetaInfo(out); } } @@ -76,17 +89,35 @@ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritab public void read(DataInputView in) throws IOException { super.read(in); - int numKvStates = in.readShort(); - stateMetaInfoSnapshots = new ArrayList<>(numKvStates); - for (int i = 0; i < numKvStates; i++) { - stateMetaInfoSnapshots.add( - OperatorBackendStateMetaInfoSnapshotReaderWriters - .getReaderForVersion(getReadVersion(), userCodeClassLoader) - .readStateMetaInfo(in)); + int numOperatorStates = in.readShort(); + operatorStateMetaInfoSnapshots = new ArrayList<>(numOperatorStates); + for (int i = 0; i < numOperatorStates; i++) { + operatorStateMetaInfoSnapshots.add( + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader) + .readOperatorStateMetaInfo(in)); } + + if (getReadVersion() >= 3) { + // broadcast states did not exist prior to version 3 + int numBroadcastStates = in.readShort(); + broadcastStateMetaInfoSnapshots = new ArrayList<>(numBroadcastStates); + for (int i = 0; i < numBroadcastStates; i++) { + broadcastStateMetaInfoSnapshots.add( + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getBroadcastStateReaderForVersion(getReadVersion(), userCodeClassLoader) + .readBroadcastStateMetaInfo(in)); + } + } else { + broadcastStateMetaInfoSnapshots = new ArrayList<>(); + } + } + + public List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> getOperatorStateMetaInfoSnapshots() { + return operatorStateMetaInfoSnapshots; } - public List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> getStateMetaInfoSnapshots() { - return stateMetaInfoSnapshots; + public List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> getBroadcastStateMetaInfoSnapshots() { + return broadcastStateMetaInfoSnapshots; } } http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java index 03fe612..fafd542 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java @@ -31,7 +31,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; +import java.util.List; /** * Readers and writers for different versions of the {@link RegisteredOperatorBackendStateMetaInfo.Snapshot}. @@ -45,9 +47,10 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { // Writers // - v1: Flink 1.2.x // - v2: Flink 1.3.x + // - v3: Flink 1.5.x // ------------------------------------------------------------------------------- - public static <S> OperatorBackendStateMetaInfoWriter getWriterForVersion( + public static <S> OperatorBackendStateMetaInfoWriter getOperatorStateWriterForVersion( int version, RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) { switch (version) { @@ -55,6 +58,7 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { return new OperatorBackendStateMetaInfoWriterV1<>(stateMetaInfo); // current version + case 2: case OperatorBackendSerializationProxy.VERSION: return new OperatorBackendStateMetaInfoWriterV2<>(stateMetaInfo); @@ -65,8 +69,28 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { } } + public static <K, V> BroadcastStateMetaInfoWriter getBroadcastStateWriterForVersion( + final int version, + final RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo) { + + switch (version) { + // current version + case OperatorBackendSerializationProxy.VERSION: + return new BroadcastStateMetaInfoWriterV3<>(broadcastStateMetaInfo); + + default: + // guard for future + throw new IllegalStateException( + "Unrecognized broadcast state meta info writer version: " + version); + } + } + public interface OperatorBackendStateMetaInfoWriter { - void writeStateMetaInfo(DataOutputView out) throws IOException; + void writeOperatorStateMetaInfo(DataOutputView out) throws IOException; + } + + public interface BroadcastStateMetaInfoWriter { + void writeBroadcastStateMetaInfo(final DataOutputView out) throws IOException; } public static abstract class AbstractOperatorBackendStateMetaInfoWriter<S> @@ -79,6 +103,16 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { } } + public abstract static class AbstractBroadcastStateMetaInfoWriter<K, V> + implements BroadcastStateMetaInfoWriter { + + protected final RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo; + + public AbstractBroadcastStateMetaInfoWriter(final RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo) { + this.broadcastStateMetaInfo = Preconditions.checkNotNull(broadcastStateMetaInfo); + } + } + public static class OperatorBackendStateMetaInfoWriterV1<S> extends AbstractOperatorBackendStateMetaInfoWriter<S> { public OperatorBackendStateMetaInfoWriterV1(RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) { @@ -86,7 +120,7 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { } @Override - public void writeStateMetaInfo(DataOutputView out) throws IOException { + public void writeOperatorStateMetaInfo(DataOutputView out) throws IOException { out.writeUTF(stateMetaInfo.getName()); out.writeByte(stateMetaInfo.getAssignmentMode().ordinal()); TypeSerializerSerializationUtil.writeSerializer(out, stateMetaInfo.getPartitionStateSerializer()); @@ -100,7 +134,7 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { } @Override - public void writeStateMetaInfo(DataOutputView out) throws IOException { + public void writeOperatorStateMetaInfo(DataOutputView out) throws IOException { out.writeUTF(stateMetaInfo.getName()); out.writeByte(stateMetaInfo.getAssignmentMode().ordinal()); @@ -113,20 +147,51 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { } } + public static class BroadcastStateMetaInfoWriterV3<K, V> extends AbstractBroadcastStateMetaInfoWriter<K, V> { + + public BroadcastStateMetaInfoWriterV3( + final RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo) { + super(broadcastStateMetaInfo); + } + + @Override + public void writeBroadcastStateMetaInfo(final DataOutputView out) throws IOException { + out.writeUTF(broadcastStateMetaInfo.getName()); + out.writeByte(broadcastStateMetaInfo.getAssignmentMode().ordinal()); + + // write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( + out, + Arrays.asList( + Tuple2.of( + broadcastStateMetaInfo.getKeySerializer(), + broadcastStateMetaInfo.getKeySerializerConfigSnapshot() + ), + Tuple2.of( + broadcastStateMetaInfo.getValueSerializer(), + broadcastStateMetaInfo.getValueSerializerConfigSnapshot() + ) + ) + ); + } + } + // ------------------------------------------------------------------------------- // Readers // - v1: Flink 1.2.x // - v2: Flink 1.3.x + // - v3: Flink 1.5.x // ------------------------------------------------------------------------------- - public static <S> OperatorBackendStateMetaInfoReader<S> getReaderForVersion( + public static <S> OperatorBackendStateMetaInfoReader<S> getOperatorStateReaderForVersion( int version, ClassLoader userCodeClassLoader) { switch (version) { case 1: return new OperatorBackendStateMetaInfoReaderV1<>(userCodeClassLoader); - // current version + // version 2 and version 3 (current) + case 2: case OperatorBackendSerializationProxy.VERSION: return new OperatorBackendStateMetaInfoReaderV2<>(userCodeClassLoader); @@ -137,8 +202,27 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { } } + public static <K, V> BroadcastStateMetaInfoReader<K, V> getBroadcastStateReaderForVersion( + int version, ClassLoader userCodeClassLoader) { + + switch (version) { + // current version + case OperatorBackendSerializationProxy.VERSION: + return new BroadcastStateMetaInfoReaderV3<>(userCodeClassLoader); + + default: + // guard for future + throw new IllegalStateException( + "Unrecognized broadcast state meta info reader version: " + version); + } + } + public interface OperatorBackendStateMetaInfoReader<S> { - RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readStateMetaInfo(DataInputView in) throws IOException; + RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readOperatorStateMetaInfo(DataInputView in) throws IOException; + } + + public interface BroadcastStateMetaInfoReader<K, V> { + RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> readBroadcastStateMetaInfo(final DataInputView in) throws IOException; } public static abstract class AbstractOperatorBackendStateMetaInfoReader<S> @@ -151,6 +235,16 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { } } + public abstract static class AbstractBroadcastStateMetaInfoReader<K, V> + implements BroadcastStateMetaInfoReader<K, V> { + + protected final ClassLoader userCodeClassLoader; + + public AbstractBroadcastStateMetaInfoReader(final ClassLoader userCodeClassLoader) { + this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader); + } + } + public static class OperatorBackendStateMetaInfoReaderV1<S> extends AbstractOperatorBackendStateMetaInfoReader<S> { public OperatorBackendStateMetaInfoReaderV1(ClassLoader userCodeClassLoader) { @@ -159,7 +253,7 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { @SuppressWarnings("unchecked") @Override - public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readStateMetaInfo(DataInputView in) throws IOException { + public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readOperatorStateMetaInfo(DataInputView in) throws IOException { RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo = new RegisteredOperatorBackendStateMetaInfo.Snapshot<>(); @@ -196,7 +290,7 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { } @Override - public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readStateMetaInfo(DataInputView in) throws IOException { + public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readOperatorStateMetaInfo(DataInputView in) throws IOException { RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo = new RegisteredOperatorBackendStateMetaInfo.Snapshot<>(); @@ -212,4 +306,34 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { return stateMetaInfo; } } + + public static class BroadcastStateMetaInfoReaderV3<K, V> extends AbstractBroadcastStateMetaInfoReader<K, V> { + + public BroadcastStateMetaInfoReaderV3(final ClassLoader userCodeClassLoader) { + super(userCodeClassLoader); + } + + @Override + public RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> readBroadcastStateMetaInfo(final DataInputView in) throws IOException { + RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> stateMetaInfo = + new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>(); + + stateMetaInfo.setName(in.readUTF()); + stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]); + + List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializers = + TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader); + + Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> keySerializerAndConfig = serializers.get(0); + Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> valueSerializerAndConfig = serializers.get(1); + + stateMetaInfo.setKeySerializer((TypeSerializer<K>) keySerializerAndConfig.f0); + stateMetaInfo.setKeySerializerConfigSnapshot(keySerializerAndConfig.f1); + + stateMetaInfo.setValueSerializer((TypeSerializer<V>) valueSerializerAndConfig.f0); + stateMetaInfo.setValueSerializerConfigSnapshot(valueSerializerAndConfig.f1); + + return stateMetaInfo; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java index a357dc4..f9427ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java @@ -36,8 +36,9 @@ public class OperatorStateHandle implements StreamStateHandle { * The modes that determine how an {@link OperatorStateHandle} is assigned to tasks during restore. */ public enum Mode { - SPLIT_DISTRIBUTE, // The operator state partitions in the state handle are split and distributed to one task each. - BROADCAST // The operator state partitions are broadcast to all task. + SPLIT_DISTRIBUTE, // The operator state partitions in the state handle are split and distributed to one task each. + UNION, // The operator state partitions are UNION-ed upon restoring and sent to all tasks. + BROADCAST // The operator states are identical, as the state is produced from a broadcast stream. } private static final long serialVersionUID = 35876522969227335L; http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java new file mode 100644 index 0000000..d462b34 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.util.Preconditions; + +import java.util.Objects; + +public class RegisteredBroadcastBackendStateMetaInfo<K, V> { + + /** The name of the state, as registered by the user. */ + private final String name; + + /** The mode how elements in this state are assigned to tasks during restore. */ + private final OperatorStateHandle.Mode assignmentMode; + + /** The type serializer for the keys in the map state. */ + private final TypeSerializer<K> keySerializer; + + /** The type serializer for the values in the map state. */ + private final TypeSerializer<V> valueSerializer; + + public RegisteredBroadcastBackendStateMetaInfo( + final String name, + final OperatorStateHandle.Mode assignmentMode, + final TypeSerializer<K> keySerializer, + final TypeSerializer<V> valueSerializer) { + + Preconditions.checkArgument(assignmentMode != null && assignmentMode == OperatorStateHandle.Mode.BROADCAST); + + this.name = Preconditions.checkNotNull(name); + this.assignmentMode = assignmentMode; + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.valueSerializer = Preconditions.checkNotNull(valueSerializer); + } + + public String getName() { + return name; + } + + public TypeSerializer<K> getKeySerializer() { + return keySerializer; + } + + public TypeSerializer<V> getValueSerializer() { + return valueSerializer; + } + + public OperatorStateHandle.Mode getAssignmentMode() { + return assignmentMode; + } + + public RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> snapshot() { + return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>( + name, + assignmentMode, + keySerializer.duplicate(), + valueSerializer.duplicate(), + keySerializer.snapshotConfiguration(), + valueSerializer.snapshotConfiguration()); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) { + return false; + } + + final RegisteredBroadcastBackendStateMetaInfo other = + (RegisteredBroadcastBackendStateMetaInfo) obj; + + return Objects.equals(name, other.getName()) + && Objects.equals(assignmentMode, other.getAssignmentMode()) + && Objects.equals(keySerializer, other.getKeySerializer()) + && Objects.equals(valueSerializer, other.getValueSerializer()); + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + assignmentMode.hashCode(); + result = 31 * result + keySerializer.hashCode(); + result = 31 * result + valueSerializer.hashCode(); + return result; + } + + @Override + public String toString() { + return "RegisteredBroadcastBackendStateMetaInfo{" + + "name='" + name + '\'' + + ", keySerializer=" + keySerializer + + ", valueSerializer=" + valueSerializer + + ", assignmentMode=" + assignmentMode + + '}'; + } + + /** + * A consistent snapshot of a {@link RegisteredOperatorBackendStateMetaInfo}. + */ + public static class Snapshot<K, V> { + + private String name; + private OperatorStateHandle.Mode assignmentMode; + private TypeSerializer<K> keySerializer; + private TypeSerializer<V> valueSerializer; + private TypeSerializerConfigSnapshot keySerializerConfigSnapshot; + private TypeSerializerConfigSnapshot valueSerializerConfigSnapshot; + + /** Empty constructor used when restoring the state meta info snapshot. */ + Snapshot() {} + + private Snapshot( + final String name, + final OperatorStateHandle.Mode assignmentMode, + final TypeSerializer<K> keySerializer, + final TypeSerializer<V> valueSerializer, + final TypeSerializerConfigSnapshot keySerializerConfigSnapshot, + final TypeSerializerConfigSnapshot valueSerializerConfigSnapshot) { + + this.name = Preconditions.checkNotNull(name); + this.assignmentMode = Preconditions.checkNotNull(assignmentMode); + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.valueSerializer = Preconditions.checkNotNull(valueSerializer); + this.keySerializerConfigSnapshot = Preconditions.checkNotNull(keySerializerConfigSnapshot); + this.valueSerializerConfigSnapshot = Preconditions.checkNotNull(valueSerializerConfigSnapshot); + } + + public String getName() { + return name; + } + + void setName(String name) { + this.name = name; + } + + public OperatorStateHandle.Mode getAssignmentMode() { + return assignmentMode; + } + + void setAssignmentMode(OperatorStateHandle.Mode mode) { + this.assignmentMode = mode; + } + + public TypeSerializer<K> getKeySerializer() { + return keySerializer; + } + + void setKeySerializer(TypeSerializer<K> serializer) { + this.keySerializer = serializer; + } + + public TypeSerializer<V> getValueSerializer() { + return valueSerializer; + } + + void setValueSerializer(TypeSerializer<V> serializer) { + this.valueSerializer = serializer; + } + + public TypeSerializerConfigSnapshot getKeySerializerConfigSnapshot() { + return keySerializerConfigSnapshot; + } + + void setKeySerializerConfigSnapshot(TypeSerializerConfigSnapshot configSnapshot) { + this.keySerializerConfigSnapshot = configSnapshot; + } + + public TypeSerializerConfigSnapshot getValueSerializerConfigSnapshot() { + return valueSerializerConfigSnapshot; + } + + void setValueSerializerConfigSnapshot(TypeSerializerConfigSnapshot configSnapshot) { + this.valueSerializerConfigSnapshot = configSnapshot; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo.Snapshot)) { + return false; + } + + RegisteredBroadcastBackendStateMetaInfo.Snapshot snapshot = + (RegisteredBroadcastBackendStateMetaInfo.Snapshot) obj; + + return name.equals(snapshot.getName()) + && assignmentMode.ordinal() == snapshot.getAssignmentMode().ordinal() + && Objects.equals(keySerializer, snapshot.getKeySerializer()) + && Objects.equals(valueSerializer, snapshot.getValueSerializer()) + && keySerializerConfigSnapshot.equals(snapshot.getKeySerializerConfigSnapshot()) + && valueSerializerConfigSnapshot.equals(snapshot.getValueSerializerConfigSnapshot()); + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + assignmentMode.hashCode(); + result = 31 * result + ((keySerializer != null) ? keySerializer.hashCode() : 0); + result = 31 * result + ((valueSerializer != null) ? valueSerializer.hashCode() : 0); + result = 31 * result + keySerializerConfigSnapshot.hashCode(); + result = 31 * result + valueSerializerConfigSnapshot.hashCode(); + return result; + } + } +}
