[FLINK-4940] Add broadcast state to the OperatorStateBackend.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/484fedd4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/484fedd4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/484fedd4

Branch: refs/heads/master
Commit: 484fedd4e649ec3adc4f55f090b71f9d7dbaa961
Parents: 44211e3
Author: kkloudas <[email protected]>
Authored: Thu Dec 21 13:51:35 2017 +0100
Committer: kkloudas <[email protected]>
Committed: Wed Feb 7 14:07:01 2018 +0100

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  12 +
 .../flink/api/common/state/BroadcastState.java  |  90 ++++++
 .../api/common/state/OperatorStateStore.java    |  32 ++
 .../common/state/ReadOnlyBroadcastState.java    |  70 ++++
 .../checkpoint/OperatorStateRepartitioner.java  |   4 +-
 .../RoundRobinOperatorStateRepartitioner.java   |  80 +++--
 .../checkpoint/StateAssignmentOperation.java    |   5 +-
 .../state/AbstractKeyedStateBackend.java        |   4 +-
 .../state/BackendWritableBroadcastState.java    |  42 +++
 .../state/DefaultOperatorStateBackend.java      | 322 ++++++++++++++++---
 .../flink/runtime/state/HeapBroadcastState.java | 154 +++++++++
 .../OperatorBackendSerializationProxy.java      |  73 +++--
 ...ckendStateMetaInfoSnapshotReaderWriters.java | 142 +++++++-
 .../runtime/state/OperatorStateHandle.java      |   5 +-
 ...RegisteredBroadcastBackendStateMetaInfo.java | 230 +++++++++++++
 .../checkpoint/CheckpointCoordinatorTest.java   |  74 ++++-
 .../savepoint/CheckpointTestUtils.java          |   4 +-
 .../runtime/state/OperatorStateBackendTest.java | 199 +++++++++++-
 .../runtime/state/OperatorStateHandleTest.java  |   5 +-
 .../runtime/state/SerializationProxiesTest.java | 109 ++++++-
 20 files changed, 1496 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 41b609e..5040966 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -19,9 +19,11 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -894,6 +896,11 @@ public class FlinkKafkaConsumerBaseTest {
                }
 
                @Override
+               public <K, V> BroadcastState<K, V> 
getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
                public <S> ListState<S> getListState(ListStateDescriptor<S> 
stateDescriptor) throws Exception {
                        throw new UnsupportedOperationException();
                }
@@ -902,6 +909,11 @@ public class FlinkKafkaConsumerBaseTest {
                public Set<String> getRegisteredStateNames() {
                        throw new UnsupportedOperationException();
                }
+
+               @Override
+               public Set<String> getRegisteredBroadcastStateNames() {
+                       throw new UnsupportedOperationException();
+               }
        }
 
        private static class MockFunctionInitializationContext implements 
FunctionInitializationContext {

http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java
new file mode 100644
index 0000000..0cece41
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A type of state that can be created to store the state of a {@code 
BroadcastStream}. This state assumes that
+ * <b>the same elements are sent to all instances of an operator.</b>
+ *
+ * <p><b>CAUTION:</b> the user has to guarantee that all task instances store 
the same elements in this type of state.
+ *
+ * <p> Each operator instance individually maintains and stores elements in 
the broadcast state. The fact that the
+ * incoming stream is a broadcast one guarantees that all instances see all 
the elements. Upon recovery
+ * or re-scaling, the same state is given to each of the instances. To avoid 
hotspots, each task reads its previous
+ * partition, and if there are more tasks (scale up), then the new instances 
read from the old instances in a round
+ * robin fashion. This is why each instance has to guarantee that it stores 
the same elements as the rest. If not,
+ * upon recovery or rescaling you may have unpredictable redistribution of the 
partitions, thus unpredictable results.
+ *
+ * @param <K> The key type of the elements in the {@link BroadcastState}.
+ * @param <V> The value type of the elements in the {@link BroadcastState}.
+ */
+@PublicEvolving
+public interface BroadcastState<K, V> extends ReadOnlyBroadcastState<K, V> {
+
+       /**
+        * Associates a new value with the given key.
+        *
+        * @param key The key of the mapping
+        * @param value The new value of the mapping
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       void put(K key, V value) throws Exception;
+
+       /**
+        * Copies all of the mappings from the given map into the state.
+        *
+        * @param map The mappings to be stored in this state
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       void putAll(Map<K, V> map) throws Exception;
+
+       /**
+        * Deletes the mapping of the given key.
+        *
+        * @param key The key of the mapping
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       void remove(K key) throws Exception;
+
+       /**
+        * Iterates over all the mappings in the state.
+        *
+        * @return An iterator over all the mappings in the state
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       Iterator<Map.Entry<K, V>> iterator() throws Exception;
+
+       /**
+        * Returns all the mappings in the state
+        *
+        * @return An iterable view of all the key-value pairs in the state.
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       Iterable<Map.Entry<K, V>> entries() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
index bf22041..c2037e0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
@@ -30,6 +30,31 @@ import java.util.Set;
 public interface OperatorStateStore {
 
        /**
+        * Creates (or restores) a {@link BroadcastState broadcast state}. This 
type of state can only be created to store
+        * the state of a {@code BroadcastStream}. Each state is registered 
under a unique name.
+        * The provided serializer is used to de/serialize the state in case of 
checkpointing (snapshot/restore).
+        * The returned broadcast state has {@code key-value} format.
+        *
+        * <p><b>CAUTION: the user has to guarantee that all task instances 
store the same elements in this type of state.</b>
+        *
+        * <p>Each operator instance individually maintains and stores elements 
in the broadcast state. The fact that the
+        * incoming stream is a broadcast one guarantees that all instances see 
all the elements. Upon recovery
+        * or re-scaling, the same state is given to each of the instances. To 
avoid hotspots, each task reads its previous
+        * partition, and if there are more tasks (scale up), then the new 
instances read from the old instances in a round
+        * robin fashion. This is why each instance has to guarantee that it 
stores the same elements as the rest. If not,
+        * upon recovery or rescaling you may have unpredictable redistribution 
of the partitions, thus unpredictable results.
+        *
+        * @param stateDescriptor The descriptor for this state, providing a 
name, a serializer for the keys and one for the
+        *                        values.
+        * @param <K> The type of the keys in the broadcast state.
+        * @param <V> The type of the values in the broadcast state.
+        *
+        * @return The {@link BroadcastState Broadcast State}.
+        * @throws Exception
+        */
+       <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> 
stateDescriptor) throws Exception;
+
+       /**
         * Creates (or restores) a list state. Each state is registered under a 
unique name.
         * The provided serializer is used to de/serialize the state in case of 
checkpointing (snapshot/restore).
         *
@@ -83,6 +108,13 @@ public interface OperatorStateStore {
         */
        Set<String> getRegisteredStateNames();
 
+       /**
+        * Returns a set with the names of all currently registered broadcast 
states.
+        *
+        * @return set of names for all registered broadcast states.
+        */
+       Set<String> getRegisteredBroadcastStateNames();
+
        // 
-------------------------------------------------------------------------------------------
        //  Deprecated methods
        // 
-------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-core/src/main/java/org/apache/flink/api/common/state/ReadOnlyBroadcastState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ReadOnlyBroadcastState.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ReadOnlyBroadcastState.java
new file mode 100644
index 0000000..4d3f2e7
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ReadOnlyBroadcastState.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/**
+ * A read-only view of the {@link BroadcastState}.
+ *
+ * <p>Although read-only, the user code should not modify the value
+ * returned by the {@link #get(Object)} or the entries of the immutable
+ * iterator returned by the {@link #immutableEntries()}, as this can lead to
+ * inconsistent states. The reason for this is that we do not create extra
+ * copies of the elements for performance reasons.
+ *
+ * @param <K> The key type of the elements in the {@link 
ReadOnlyBroadcastState}.
+ * @param <V> The value type of the elements in the {@link 
ReadOnlyBroadcastState}.
+ */
+@PublicEvolving
+public interface ReadOnlyBroadcastState<K, V> extends State {
+
+       /**
+        * Returns the current value associated with the given key.
+        *
+        * <p>The user code must not modify the value returned, as
+        * this can lead to inconsistent states.
+        *
+        * @param key The key of the mapping
+        * @return The value of the mapping with the given key
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       V get(K key) throws Exception;
+
+       /**
+        * Returns whether there exists the given mapping.
+        *
+        * @param key The key of the mapping
+        * @return True if there exists a mapping whose key equals to the given 
key
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       boolean contains(K key) throws Exception;
+
+       /**
+        * Returns an immutable {@link Iterable} over the entries in the state.
+        *
+        * <p>The user code must not modify the entries of the returned 
immutable
+        * iterator, as this can lead to inconsistent states.
+        */
+       Iterable<Map.Entry<K, V>> immutableEntries() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
index 98810f1..090f48a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
@@ -31,12 +31,12 @@ public interface OperatorStateRepartitioner {
        /**
         * @param previousParallelSubtaskStates List of state handles to the 
parallel subtask states of an operator, as they
         *                                      have been checkpointed.
-        * @param parallelism                   The parallelism that we 
consider for the state redistribution. Determines the size of the
+        * @param newParallelism                The parallelism that we 
consider for the state redistribution. Determines the size of the
         *                                      returned list.
         * @return List with one entry per parallel subtask. Each subtask 
receives now one collection of states that build
         * of the new total state for this subtask.
         */
        List<Collection<OperatorStateHandle>> repartitionState(
                        List<OperatorStateHandle> previousParallelSubtaskStates,
-                       int parallelism);
+                       int newParallelism);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
index 4513ef8..e09b677 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
@@ -42,10 +42,10 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
        @Override
        public List<Collection<OperatorStateHandle>> repartitionState(
                        List<OperatorStateHandle> previousParallelSubtaskStates,
-                       int parallelism) {
+                       int newParallelism) {
 
                Preconditions.checkNotNull(previousParallelSubtaskStates);
-               Preconditions.checkArgument(parallelism > 0);
+               Preconditions.checkArgument(newParallelism > 0);
 
                // Reorganize: group by (State Name -> StreamStateHandle + 
Offsets)
                GroupByStateNameResults nameToStateByMode = 
groupByStateName(previousParallelSubtaskStates);
@@ -55,11 +55,11 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
                }
 
                // Assemble result from all merge maps
-               List<Collection<OperatorStateHandle>> result = new 
ArrayList<>(parallelism);
+               List<Collection<OperatorStateHandle>> result = new 
ArrayList<>(newParallelism);
 
                // Do the actual repartitioning for all named states
                List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList =
-                               repartition(nameToStateByMode, parallelism);
+                               repartition(nameToStateByMode, newParallelism);
 
                for (int i = 0; i < mergeMapList.size(); ++i) {
                        result.add(i, new 
ArrayList<>(mergeMapList.get(i).values()));
@@ -72,8 +72,7 @@ public class RoundRobinOperatorStateRepartitioner implements 
OperatorStateRepart
         * Group by the different named states.
         */
        @SuppressWarnings("unchecked, rawtype")
-       private GroupByStateNameResults groupByStateName(
-                       List<OperatorStateHandle> 
previousParallelSubtaskStates) {
+       private GroupByStateNameResults 
groupByStateName(List<OperatorStateHandle> previousParallelSubtaskStates) {
 
                //Reorganize: group by (State Name -> StreamStateHandle + 
StateMetaInfo)
                EnumMap<OperatorStateHandle.Mode,
@@ -81,10 +80,7 @@ public class RoundRobinOperatorStateRepartitioner implements 
OperatorStateRepart
                                new EnumMap<>(OperatorStateHandle.Mode.class);
 
                for (OperatorStateHandle.Mode mode : 
OperatorStateHandle.Mode.values()) {
-                       Map<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>> map = new HashMap<>();
-                       nameToStateByMode.put(
-                                       mode,
-                                       new HashMap<String, 
List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>>());
+                       nameToStateByMode.put(mode, new HashMap<>());
                }
 
                for (OperatorStateHandle psh : previousParallelSubtaskStates) {
@@ -120,14 +116,14 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
         */
        private List<Map<StreamStateHandle, OperatorStateHandle>> repartition(
                        GroupByStateNameResults nameToStateByMode,
-                       int parallelism) {
+                       int newParallelism) {
 
                // We will use this to merge w.r.t. StreamStateHandles for each 
parallel subtask inside the maps
-               List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList 
= new ArrayList<>(parallelism);
+               List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList 
= new ArrayList<>(newParallelism);
 
                // Initialize
-               for (int i = 0; i < parallelism; ++i) {
-                       mergeMapList.add(new HashMap<StreamStateHandle, 
OperatorStateHandle>());
+               for (int i = 0; i < newParallelism; ++i) {
+                       mergeMapList.add(new HashMap<>());
                }
 
                // Start with the state handles we distribute round robin by 
splitting by offsets
@@ -150,15 +146,15 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
                        // Repartition the state across the parallel operator 
instances
                        int lstIdx = 0;
                        int offsetIdx = 0;
-                       int baseFraction = totalPartitions / parallelism;
-                       int remainder = totalPartitions % parallelism;
+                       int baseFraction = totalPartitions / newParallelism;
+                       int remainder = totalPartitions % newParallelism;
 
                        int newStartParallelOp = startParallelOp;
 
-                       for (int i = 0; i < parallelism; ++i) {
+                       for (int i = 0; i < newParallelism; ++i) {
 
                                // Preparation: calculate the actual index 
considering wrap around
-                               int parallelOpIdx = (i + startParallelOp) % 
parallelism;
+                               int parallelOpIdx = (i + startParallelOp) % 
newParallelism;
 
                                // Now calculate the number of partitions we 
will assign to the parallel instance in this round ...
                                int numberOfPartitionsToAssign = baseFraction;
@@ -209,10 +205,7 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
                                        Map<StreamStateHandle, 
OperatorStateHandle> mergeMap = mergeMapList.get(parallelOpIdx);
                                        OperatorStateHandle operatorStateHandle 
= mergeMap.get(handleWithOffsets.f0);
                                        if (operatorStateHandle == null) {
-                                               operatorStateHandle = new 
OperatorStateHandle(
-                                                               new 
HashMap<String, OperatorStateHandle.StateMetaInfo>(),
-                                                               
handleWithOffsets.f0);
-
+                                               operatorStateHandle = new 
OperatorStateHandle(new HashMap<>(), handleWithOffsets.f0);
                                                
mergeMap.put(handleWithOffsets.f0, operatorStateHandle);
                                        }
                                        
operatorStateHandle.getStateNameToPartitionOffsets().put(
@@ -226,30 +219,51 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
 
                // Now we also add the state handles marked for broadcast to 
all parallel instances
                Map<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>> broadcastNameToState =
-                               
nameToStateByMode.getByMode(OperatorStateHandle.Mode.BROADCAST);
+                               
nameToStateByMode.getByMode(OperatorStateHandle.Mode.UNION);
 
-               for (int i = 0; i < parallelism; ++i) {
+               for (int i = 0; i < newParallelism; ++i) {
 
                        Map<StreamStateHandle, OperatorStateHandle> mergeMap = 
mergeMapList.get(i);
 
                        for (Map.Entry<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>> e :
                                        broadcastNameToState.entrySet()) {
 
-                               List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>> current = e.getValue();
-
-                               for (Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo> handleWithMetaInfo : current) {
+                               for (Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo> handleWithMetaInfo : e.getValue()) {
                                        OperatorStateHandle operatorStateHandle 
= mergeMap.get(handleWithMetaInfo.f0);
                                        if (operatorStateHandle == null) {
-                                               operatorStateHandle = new 
OperatorStateHandle(
-                                                               new 
HashMap<String, OperatorStateHandle.StateMetaInfo>(),
-                                                               
handleWithMetaInfo.f0);
-
+                                               operatorStateHandle = new 
OperatorStateHandle(new HashMap<>(), handleWithMetaInfo.f0);
                                                
mergeMap.put(handleWithMetaInfo.f0, operatorStateHandle);
                                        }
                                        
operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(), 
handleWithMetaInfo.f1);
                                }
                        }
                }
+
+               // Now we also add the state handles marked for uniform 
broadcast to all parallel instances
+               Map<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>> uniformBroadcastNameToState =
+                               
nameToStateByMode.getByMode(OperatorStateHandle.Mode.BROADCAST);
+
+               for (int i = 0; i < newParallelism; ++i) {
+
+                       final Map<StreamStateHandle, OperatorStateHandle> 
mergeMap = mergeMapList.get(i);
+
+                       // for each name, pick the i-th entry
+                       for (Map.Entry<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>> e :
+                                       uniformBroadcastNameToState.entrySet()) 
{
+
+                               int oldParallelism = e.getValue().size();
+                               
+                               Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo> handleWithMetaInfo =
+                                                       e.getValue().get(i % 
oldParallelism);
+
+                               OperatorStateHandle operatorStateHandle = 
mergeMap.get(handleWithMetaInfo.f0);
+                               if (operatorStateHandle == null) {
+                                       operatorStateHandle = new 
OperatorStateHandle(new HashMap<>(), handleWithMetaInfo.f0);
+                                       mergeMap.put(handleWithMetaInfo.f0, 
operatorStateHandle);
+                               }
+                               
operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(), 
handleWithMetaInfo.f1);
+                       }
+               }
                return mergeMapList;
        }
 
@@ -257,7 +271,7 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
                private final EnumMap<OperatorStateHandle.Mode,
                                Map<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>>> byMode;
 
-               public GroupByStateNameResults(
+               GroupByStateNameResults(
                                EnumMap<OperatorStateHandle.Mode,
                                                Map<String, 
List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>>> byMode) {
                        this.byMode = Preconditions.checkNotNull(byMode);
@@ -268,4 +282,4 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
                        return byMode.get(mode);
                }
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index e108bad..43a4b01 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -618,11 +618,10 @@ public class StateAssignmentOperation {
                                        Map<String, 
OperatorStateHandle.StateMetaInfo> partitionOffsets =
                                                
operatorStateHandle.getStateNameToPartitionOffsets();
 
-
                                        for (OperatorStateHandle.StateMetaInfo 
metaInfo : partitionOffsets.values()) {
 
                                                // if we find any broadcast 
state, we cannot take the shortcut and need to go through repartitioning
-                                               if 
(OperatorStateHandle.Mode.BROADCAST.equals(metaInfo.getDistributionMode())) {
+                                               if 
(OperatorStateHandle.Mode.UNION.equals(metaInfo.getDistributionMode())) {
                                                        return 
opStateRepartitioner.repartitionState(
                                                                
chainOpParallelStates,
                                                                newParallelism);
@@ -639,7 +638,7 @@ public class StateAssignmentOperation {
        /**
         * Determine the subset of {@link KeyGroupsStateHandle 
KeyGroupsStateHandles} with correct
         * key group index for the given subtask {@link KeyGroupRange}.
-        * <p>
+        *
         * <p>This is publicly visible to be used in tests.
         */
        public static List<KeyedStateHandle> getKeyedStateHandles(

http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 0ffde7a..cc53c0c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -98,9 +98,7 @@ public abstract class AbstractKeyedStateBackend<K>
 
        private final ExecutionConfig executionConfig;
 
-       /**
-        * Decorates the input and output streams to write key-groups 
compressed.
-        */
+       /** Decorates the input and output streams to write key-groups 
compressed. */
        protected final StreamCompressionDecorator keyGroupCompressionDecorator;
 
        public AbstractKeyedStateBackend(

http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
new file mode 100644
index 0000000..8daf07c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+
+/**
+ * An interface with methods related to the interplay between the {@link 
BroadcastState Broadcast State} and
+ * the {@link OperatorStateBackend}.
+ *
+ * @param <K> The key type of the elements in the {@link BroadcastState 
Broadcast State}.
+ * @param <V> The value type of the elements in the {@link BroadcastState 
Broadcast State}.
+ */
+public interface BackendWritableBroadcastState<K, V> extends BroadcastState<K, 
V> {
+
+       BackendWritableBroadcastState<K, V> deepCopy();
+
+       long write(FSDataOutputStream out) throws IOException;
+
+       void setStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> 
stateMetaInfo);
+
+       RegisteredBroadcastBackendStateMetaInfo<K, V> getStateMetaInfo();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index e6d6dc6..f486643 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -69,7 +71,12 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        /**
         * Map for all registered operator states. Maps state name -> state
         */
-       private final Map<String, PartitionableListState<?>> registeredStates;
+       private final Map<String, PartitionableListState<?>> 
registeredOperatorStates;
+
+       /**
+        * Map for all registered operator broadcast states. Maps state name -> 
state
+        */
+       private final Map<String, BackendWritableBroadcastState<?, ?>> 
registeredBroadcastStates;
 
        /**
         * CloseableRegistry to participate in the tasks lifecycle.
@@ -102,12 +109,17 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
         * <p>TODO this map can be removed when eager-state registration is in 
place.
         * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
         */
-       private final Map<String, 
RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredStateMetaInfos;
+       private final Map<String, 
RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
restoredOperatorStateMetaInfos;
+
+       /**
+        * Map of state names to their corresponding restored broadcast state 
meta info.
+        */
+       private final Map<String, 
RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> 
restoredBroadcastStateMetaInfos;
 
        /**
         * Cache of already accessed states.
         *
-        * <p>In contrast to {@link #registeredStates} and {@link 
#restoredStateMetaInfos} which may be repopulated
+        * <p>In contrast to {@link #registeredOperatorStates} and {@link 
#restoredOperatorStateMetaInfos} which may be repopulated
         * with restored state, this map is always empty at the beginning.
         *
         * <p>TODO this map should be moved to a base class once we have proper 
hierarchy for the operator state backends.
@@ -116,6 +128,8 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
         */
        private final HashMap<String, PartitionableListState<?>> 
accessedStatesByName;
 
+       private final Map<String, BackendWritableBroadcastState<?, ?>> 
accessedBroadcastStatesByName;
+
        public DefaultOperatorStateBackend(
                ClassLoader userClassLoader,
                ExecutionConfig executionConfig,
@@ -125,10 +139,13 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                this.userClassloader = 
Preconditions.checkNotNull(userClassLoader);
                this.executionConfig = executionConfig;
                this.javaSerializer = new JavaSerializer<>();
-               this.registeredStates = new HashMap<>();
+               this.registeredOperatorStates = new HashMap<>();
+               this.registeredBroadcastStates = new HashMap<>();
                this.asynchronousSnapshots = asynchronousSnapshots;
                this.accessedStatesByName = new HashMap<>();
-               this.restoredStateMetaInfos = new HashMap<>();
+               this.accessedBroadcastStatesByName = new HashMap<>();
+               this.restoredOperatorStateMetaInfos = new HashMap<>();
+               this.restoredBroadcastStateMetaInfos = new HashMap<>();
        }
 
        public ExecutionConfig getExecutionConfig() {
@@ -137,7 +154,12 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
        @Override
        public Set<String> getRegisteredStateNames() {
-               return registeredStates.keySet();
+               return registeredOperatorStates.keySet();
+       }
+
+       @Override
+       public Set<String> getRegisteredBroadcastStateNames() {
+               return registeredBroadcastStates.keySet();
        }
 
        @Override
@@ -148,7 +170,8 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        @Override
        public void dispose() {
                IOUtils.closeQuietly(closeStreamOnCancelRegistry);
-               registeredStates.clear();
+               registeredOperatorStates.clear();
+               registeredBroadcastStates.clear();
        }
 
        // 
-------------------------------------------------------------------------------------------
@@ -156,13 +179,94 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        // 
-------------------------------------------------------------------------------------------
 
        @Override
+       public <K, V> BroadcastState<K, V> getBroadcastState(final 
MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
+
+               Preconditions.checkNotNull(stateDescriptor);
+               String name = 
Preconditions.checkNotNull(stateDescriptor.getName());
+
+               @SuppressWarnings("unchecked")
+               BackendWritableBroadcastState<K, V> previous = 
(BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(name);
+               if (previous != null) {
+                       checkStateNameAndMode(
+                                       previous.getStateMetaInfo().getName(),
+                                       name,
+                                       
previous.getStateMetaInfo().getAssignmentMode(),
+                                       OperatorStateHandle.Mode.BROADCAST);
+                       return previous;
+               }
+
+               
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
+               TypeSerializer<K> broadcastStateKeySerializer = 
Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
+               TypeSerializer<V> broadcastStateValueSerializer = 
Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
+
+               BackendWritableBroadcastState<K, V> broadcastState = 
(BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);
+
+               if (broadcastState == null) {
+                       broadcastState = new HeapBroadcastState<>(
+                                       new 
RegisteredBroadcastBackendStateMetaInfo<>(
+                                                       name,
+                                                       
OperatorStateHandle.Mode.BROADCAST,
+                                                       
broadcastStateKeySerializer,
+                                                       
broadcastStateValueSerializer));
+                       registeredBroadcastStates.put(name, broadcastState);
+               } else {
+                       // has restored state; check compatibility of new state 
access
+
+                       checkStateNameAndMode(
+                                       
broadcastState.getStateMetaInfo().getName(),
+                                       name,
+                                       
broadcastState.getStateMetaInfo().getAssignmentMode(),
+                                       OperatorStateHandle.Mode.BROADCAST);
+
+                       @SuppressWarnings("unchecked")
+                       RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> 
restoredMetaInfo =
+                                       
(RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V>) 
restoredBroadcastStateMetaInfos.get(name);
+
+                       // check compatibility to determine if state migration 
is required
+                       CompatibilityResult<K> keyCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       restoredMetaInfo.getKeySerializer(),
+                                       UnloadableDummyTypeSerializer.class,
+                                       
restoredMetaInfo.getKeySerializerConfigSnapshot(),
+                                       broadcastStateKeySerializer);
+
+                       CompatibilityResult<V> valueCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       restoredMetaInfo.getValueSerializer(),
+                                       UnloadableDummyTypeSerializer.class,
+                                       
restoredMetaInfo.getValueSerializerConfigSnapshot(),
+                                       broadcastStateValueSerializer);
+
+                       if (!keyCompatibility.isRequiresMigration() && 
!valueCompatibility.isRequiresMigration()) {
+                               // new serializer is compatible; use it to 
replace the old serializer
+                               broadcastState.setStateMetaInfo(
+                                               new 
RegisteredBroadcastBackendStateMetaInfo<>(
+                                                               name,
+                                                               
OperatorStateHandle.Mode.BROADCAST,
+                                                               
broadcastStateKeySerializer,
+                                                               
broadcastStateValueSerializer));
+                       } else {
+                               // TODO state migration currently isn't 
possible.
+
+                               // NOTE: for heap backends, it is actually fine 
to proceed here without failing the restore,
+                               // since the state has already been 
deserialized to objects and we can just continue with
+                               // the new serializer; we're deliberately 
failing here for now to have equal functionality with
+                               // the RocksDB backend to avoid confusion for 
users.
+
+                               throw new StateMigrationException("State 
migration isn't supported, yet.");
+                       }
+               }
+
+               accessedBroadcastStatesByName.put(name, broadcastState);
+               return broadcastState;
+       }
+
+       @Override
        public <S> ListState<S> getListState(ListStateDescriptor<S> 
stateDescriptor) throws Exception {
                return getListState(stateDescriptor, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
        }
 
        @Override
        public <S> ListState<S> getUnionListState(ListStateDescriptor<S> 
stateDescriptor) throws Exception {
-               return getListState(stateDescriptor, 
OperatorStateHandle.Mode.BROADCAST);
+               return getListState(stateDescriptor, 
OperatorStateHandle.Mode.UNION);
        }
 
        // 
-------------------------------------------------------------------------------------------
@@ -203,23 +307,39 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
                final long syncStartTime = System.currentTimeMillis();
 
-               if (registeredStates.isEmpty()) {
+               if (registeredOperatorStates.isEmpty() && 
registeredBroadcastStates.isEmpty()) {
                        return DoneFuture.nullValue();
                }
 
-               final Map<String, PartitionableListState<?>> 
registeredStatesDeepCopies =
-                               new HashMap<>(registeredStates.size());
+               final Map<String, PartitionableListState<?>> 
registeredOperatorStatesDeepCopies =
+                               new HashMap<>(registeredOperatorStates.size());
+               final Map<String, BackendWritableBroadcastState<?, ?>> 
registeredBroadcastStatesDeepCopies =
+                               new HashMap<>(registeredBroadcastStates.size());
 
-               // eagerly create deep copies of the list states in the sync 
phase, so that we can use them in the async writing
                ClassLoader snapshotClassLoader = 
Thread.currentThread().getContextClassLoader();
                Thread.currentThread().setContextClassLoader(userClassloader);
                try {
-                       for (Map.Entry<String, PartitionableListState<?>> entry 
: this.registeredStates.entrySet()) {
-                               PartitionableListState<?> listState = 
entry.getValue();
-                               if (null != listState) {
-                                       listState = listState.deepCopy();
+                       // eagerly create deep copies of the list and the 
broadcast states (if any)
+                       // in the synchronous phase, so that we can use them in 
the async writing.
+
+                       if (!registeredOperatorStates.isEmpty()) {
+                               for (Map.Entry<String, 
PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
+                                       PartitionableListState<?> listState = 
entry.getValue();
+                                       if (null != listState) {
+                                               listState = 
listState.deepCopy();
+                                       }
+                                       
registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
+                               }
+                       }
+
+                       if (!registeredBroadcastStates.isEmpty()) {
+                               for (Map.Entry<String, 
BackendWritableBroadcastState<?, ?>> entry : 
registeredBroadcastStates.entrySet()) {
+                                       BackendWritableBroadcastState<?, ?> 
broadcastState = entry.getValue();
+                                       if (null != broadcastState) {
+                                               broadcastState = 
broadcastState.deepCopy();
+                                       }
+                                       
registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
                                }
-                               registeredStatesDeepCopies.put(entry.getKey(), 
listState);
                        }
                } finally {
                        
Thread.currentThread().setContextClassLoader(snapshotClassLoader);
@@ -263,25 +383,38 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
                                        
CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out;
 
-                                       final Map<String, 
OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
-                                               new 
HashMap<>(registeredStatesDeepCopies.size());
+                                       // get the registered operator state 
infos ...
+                                       
List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
operatorMetaInfoSnapshots =
+                                               new 
ArrayList<>(registeredOperatorStatesDeepCopies.size());
+
+                                       for (Map.Entry<String, 
PartitionableListState<?>> entry : 
registeredOperatorStatesDeepCopies.entrySet()) {
+                                               
operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
+                                       }
 
-                                       
List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> metaInfoSnapshots =
-                                               new 
ArrayList<>(registeredStatesDeepCopies.size());
+                                       // ... get the registered broadcast 
operator state infos ...
+                                       
List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> 
broadcastMetaInfoSnapshots =
+                                                       new 
ArrayList<>(registeredBroadcastStatesDeepCopies.size());
 
-                                       for (Map.Entry<String, 
PartitionableListState<?>> entry : registeredStatesDeepCopies.entrySet()) {
-                                               
metaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
+                                       for (Map.Entry<String, 
BackendWritableBroadcastState<?, ?>> entry : 
registeredBroadcastStatesDeepCopies.entrySet()) {
+                                               
broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                                        }
 
+                                       // ... write them all in the checkpoint 
stream ...
                                        DataOutputView dov = new 
DataOutputViewStreamWrapper(localOut);
 
                                        OperatorBackendSerializationProxy 
backendSerializationProxy =
-                                               new 
OperatorBackendSerializationProxy(metaInfoSnapshots);
+                                               new 
OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, 
broadcastMetaInfoSnapshots);
 
                                        backendSerializationProxy.write(dov);
 
+                                       // ... and then go for the states ...
+
+                                       // we put BOTH normal and broadcast 
state metadata here
+                                       final Map<String, 
OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
+                                                       new 
HashMap<>(registeredOperatorStatesDeepCopies.size() + 
registeredBroadcastStatesDeepCopies.size());
+
                                        for (Map.Entry<String, 
PartitionableListState<?>> entry :
-                                               
registeredStatesDeepCopies.entrySet()) {
+                                                       
registeredOperatorStatesDeepCopies.entrySet()) {
 
                                                PartitionableListState<?> value 
= entry.getValue();
                                                long[] partitionOffsets = 
value.write(localOut);
@@ -291,6 +424,19 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                                        new 
OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                                        }
 
+                                       // ... and the broadcast states 
themselves ...
+                                       for (Map.Entry<String, 
BackendWritableBroadcastState<?, ?>> entry :
+                                                       
registeredBroadcastStatesDeepCopies.entrySet()) {
+
+                                               
BackendWritableBroadcastState<?, ?> value = entry.getValue();
+                                               long[] partitionOffsets = 
{value.write(localOut)};
+                                               OperatorStateHandle.Mode mode = 
value.getStateMetaInfo().getAssignmentMode();
+                                               writtenStatesMetaData.put(
+                                                               entry.getKey(),
+                                                               new 
OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
+                                       }
+
+                                       // ... and, finally, create the state 
handle.
                                        OperatorStateHandle retValue = null;
 
                                        if 
(closeStreamOnCancelRegistry.unregisterCloseable(out)) {
@@ -348,11 +494,11 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
                                backendSerializationProxy.read(new 
DataInputViewStreamWrapper(in));
 
-                               
List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
restoredMetaInfoSnapshots =
-                                               
backendSerializationProxy.getStateMetaInfoSnapshots();
+                               
List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
restoredOperatorMetaInfoSnapshots =
+                                               
backendSerializationProxy.getOperatorStateMetaInfoSnapshots();
 
                                // Recreate all PartitionableListStates from 
the meta info
-                               for 
(RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : 
restoredMetaInfoSnapshots) {
+                               for 
(RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : 
restoredOperatorMetaInfoSnapshots) {
 
                                        if 
(restoredMetaInfo.getPartitionStateSerializer() == null ||
                                                        
restoredMetaInfo.getPartitionStateSerializer() instanceof 
UnloadableDummyTypeSerializer) {
@@ -368,9 +514,9 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                                        " not be loaded. This 
is a temporary restriction that will be fixed in future versions.");
                                        }
 
-                                       
restoredStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
+                                       
restoredOperatorStateMetaInfos.put(restoredMetaInfo.getName(), 
restoredMetaInfo);
 
-                                       PartitionableListState<?> listState = 
registeredStates.get(restoredMetaInfo.getName());
+                                       PartitionableListState<?> listState = 
registeredOperatorStates.get(restoredMetaInfo.getName());
 
                                        if (null == listState) {
                                                listState = new 
PartitionableListState<>(
@@ -379,22 +525,66 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                                                                
restoredMetaInfo.getPartitionStateSerializer(),
                                                                                
restoredMetaInfo.getAssignmentMode()));
 
-                                               
registeredStates.put(listState.getStateMetaInfo().getName(), listState);
+                                               
registeredOperatorStates.put(listState.getStateMetaInfo().getName(), listState);
+                                       } else {
+                                               // TODO with eager state 
registration in place, check here for serializer migration strategies
+                                       }
+                               }
+
+                               // ... and then get back the broadcast state.
+                               
List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> 
restoredBroadcastMetaInfoSnapshots =
+                                               
backendSerializationProxy.getBroadcastStateMetaInfoSnapshots();
+
+                               for 
(RegisteredBroadcastBackendStateMetaInfo.Snapshot<? ,?> restoredMetaInfo : 
restoredBroadcastMetaInfoSnapshots) {
+
+                                       if (restoredMetaInfo.getKeySerializer() 
== null || restoredMetaInfo.getValueSerializer() == null ||
+                                                       
restoredMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer ||
+                                                       
restoredMetaInfo.getValueSerializer() instanceof UnloadableDummyTypeSerializer) 
{
+
+                                               // must fail now if the 
previous serializer cannot be restored because there is no serializer
+                                               // capable of reading previous 
state
+                                               // TODO when eager state 
registration is in place, we can try to get a convert deserializer
+                                               // TODO from the newly 
registered serializer instead of simply failing here
+
+                                               throw new IOException("Unable 
to restore broadcast state [" + restoredMetaInfo.getName() + "]." +
+                                                               " The previous 
key and value serializers of the state must be present; the serializers could" +
+                                                               " have been 
removed from the classpath, or their implementations have changed and could" +
+                                                               " not be 
loaded. This is a temporary restriction that will be fixed in future 
versions.");
+                                       }
+
+                                       
restoredBroadcastStateMetaInfos.put(restoredMetaInfo.getName(), 
restoredMetaInfo);
+
+                                       BackendWritableBroadcastState<? ,?> 
broadcastState = registeredBroadcastStates.get(restoredMetaInfo.getName());
+
+                                       if (broadcastState == null) {
+                                               broadcastState = new 
HeapBroadcastState<>(
+                                                               new 
RegisteredBroadcastBackendStateMetaInfo<>(
+                                                                               
restoredMetaInfo.getName(),
+                                                                               
restoredMetaInfo.getAssignmentMode(),
+                                                                               
restoredMetaInfo.getKeySerializer(),
+                                                                               
restoredMetaInfo.getValueSerializer()));
+
+                                               
registeredBroadcastStates.put(broadcastState.getStateMetaInfo().getName(), 
broadcastState);
                                        } else {
                                                // TODO with eager state 
registration in place, check here for serializer migration strategies
                                        }
                                }
 
-                               // Restore all the state in 
PartitionableListStates
+                               // Restore all the states
                                for (Map.Entry<String, 
OperatorStateHandle.StateMetaInfo> nameToOffsets :
                                                
stateHandle.getStateNameToPartitionOffsets().entrySet()) {
 
-                                       PartitionableListState<?> 
stateListForName = registeredStates.get(nameToOffsets.getKey());
-
-                                       Preconditions.checkState(null != 
stateListForName, "Found state without " +
-                                                       "corresponding meta 
info: " + nameToOffsets.getKey());
+                                       final String stateName = 
nameToOffsets.getKey();
 
-                                       
deserializeStateValues(stateListForName, in, nameToOffsets.getValue());
+                                       PartitionableListState<?> 
listStateForName = registeredOperatorStates.get(stateName);
+                                       if (listStateForName == null) {
+                                               
BackendWritableBroadcastState<?, ?> broadcastStateForName = 
registeredBroadcastStates.get(stateName);
+                                               
Preconditions.checkState(broadcastStateForName != null, "Found state without " +
+                                                               "corresponding 
meta info: " + stateName);
+                                               
deserializeBroadcastStateValues(broadcastStateForName, in, 
nameToOffsets.getValue());
+                                       } else {
+                                               
deserializeOperatorStateValues(listStateForName, in, nameToOffsets.getValue());
+                                       }
                                }
 
                        } finally {
@@ -428,7 +618,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                 */
                private final ArrayListSerializer<S> internalListCopySerializer;
 
-               public 
PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) 
{
+               
PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) 
{
                        this(stateMetaInfo, new ArrayList<S>());
                }
 
@@ -513,7 +703,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
        private <S> ListState<S> getListState(
                        ListStateDescriptor<S> stateDescriptor,
-                       OperatorStateHandle.Mode mode) throws IOException, 
StateMigrationException {
+                       OperatorStateHandle.Mode mode) throws 
StateMigrationException {
 
                Preconditions.checkNotNull(stateDescriptor);
                String name = 
Preconditions.checkNotNull(stateDescriptor.getName());
@@ -521,7 +711,11 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                @SuppressWarnings("unchecked")
                PartitionableListState<S> previous = 
(PartitionableListState<S>) accessedStatesByName.get(name);
                if (previous != null) {
-                       checkStateNameAndMode(previous.getStateMetaInfo(), 
name, mode);
+                       checkStateNameAndMode(
+                                       previous.getStateMetaInfo().getName(),
+                                       name,
+                                       
previous.getStateMetaInfo().getAssignmentMode(),
+                                       mode);
                        return previous;
                }
 
@@ -533,7 +727,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                TypeSerializer<S> partitionStateSerializer = 
Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
 
                @SuppressWarnings("unchecked")
-               PartitionableListState<S> partitionableListState = 
(PartitionableListState<S>) registeredStates.get(name);
+               PartitionableListState<S> partitionableListState = 
(PartitionableListState<S>) registeredOperatorStates.get(name);
 
                if (null == partitionableListState) {
                        // no restored state for the state name; simply create 
new state holder
@@ -544,15 +738,19 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                        partitionStateSerializer,
                                        mode));
 
-                       registeredStates.put(name, partitionableListState);
+                       registeredOperatorStates.put(name, 
partitionableListState);
                } else {
                        // has restored state; check compatibility of new state 
access
 
-                       
checkStateNameAndMode(partitionableListState.getStateMetaInfo(), name, mode);
+                       checkStateNameAndMode(
+                                       
partitionableListState.getStateMetaInfo().getName(),
+                                       name,
+                                       
partitionableListState.getStateMetaInfo().getAssignmentMode(),
+                                       mode);
 
                        @SuppressWarnings("unchecked")
                        RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
restoredMetaInfo =
-                               
(RegisteredOperatorBackendStateMetaInfo.Snapshot<S>) 
restoredStateMetaInfos.get(name);
+                               
(RegisteredOperatorBackendStateMetaInfo.Snapshot<S>) 
restoredOperatorStateMetaInfos.get(name);
 
                        // check compatibility to determine if state migration 
is required
                        CompatibilityResult<S> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
@@ -581,7 +779,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                return partitionableListState;
        }
 
-       private static <S> void deserializeStateValues(
+       private static <S> void deserializeOperatorStateValues(
                PartitionableListState<S> stateListForName,
                FSDataInputStream in,
                OperatorStateHandle.StateMetaInfo metaInfo) throws IOException {
@@ -599,21 +797,45 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                }
        }
 
+       private static <K, V> void deserializeBroadcastStateValues(
+                       final BackendWritableBroadcastState<K, V> 
broadcastStateForName,
+                       final FSDataInputStream in,
+                       final OperatorStateHandle.StateMetaInfo metaInfo) 
throws Exception {
+
+               if (metaInfo != null) {
+                       long[] offsets = metaInfo.getOffsets();
+                       if (offsets != null) {
+
+                               TypeSerializer<K> keySerializer = 
broadcastStateForName.getStateMetaInfo().getKeySerializer();
+                               TypeSerializer<V> valueSerializer = 
broadcastStateForName.getStateMetaInfo().getValueSerializer();
+
+                               in.seek(offsets[0]);
+
+                               DataInputView div = new 
DataInputViewStreamWrapper(in);
+                               int size = div.readInt();
+                               for (int i = 0; i < size; i++) {
+                                       
broadcastStateForName.put(keySerializer.deserialize(div), 
valueSerializer.deserialize(div));
+                               }
+                       }
+               }
+       }
+
        private static void checkStateNameAndMode(
-                       RegisteredOperatorBackendStateMetaInfo previousMetaInfo,
+                       String actualName,
                        String expectedName,
+                       OperatorStateHandle.Mode actualMode,
                        OperatorStateHandle.Mode expectedMode) {
 
                Preconditions.checkState(
-                       previousMetaInfo.getName().equals(expectedName),
+                       actualName.equals(expectedName),
                        "Incompatible state names. " +
-                               "Was [" + previousMetaInfo.getName() + "], " +
+                               "Was [" + actualName + "], " +
                                "registered with [" + expectedName + "].");
 
                Preconditions.checkState(
-                       
previousMetaInfo.getAssignmentMode().equals(expectedMode),
+                               actualMode.equals(expectedMode),
                        "Incompatible state assignment modes. " +
-                               "Was [" + previousMetaInfo.getAssignmentMode() 
+ "], " +
+                               "Was [" + actualMode + "], " +
                                "registered with [" + expectedMode + "].");
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
new file mode 100644
index 0000000..42e68f3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A {@link BroadcastState Broadcast State} backed a heap-based {@link Map}.
+ *
+ * @param <K> The key type of the elements in the {@link BroadcastState 
Broadcast State}.
+ * @param <V> The value type of the elements in the {@link BroadcastState 
Broadcast State}.
+ */
+public class HeapBroadcastState<K, V> implements 
BackendWritableBroadcastState<K, V> {
+
+       /**
+        * Meta information of the state, including state name, assignment 
mode, and serializer.
+        */
+       private RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo;
+
+       /**
+        * The internal map the holds the elements of the state.
+        */
+       private final Map<K, V> backingMap;
+
+       /**
+        * A serializer that allows to perform deep copies of internal map 
state.
+        */
+       private final MapSerializer<K, V> internalMapCopySerializer;
+
+       HeapBroadcastState(RegisteredBroadcastBackendStateMetaInfo<K, V> 
stateMetaInfo) {
+               this(stateMetaInfo, new HashMap<>());
+       }
+
+       private HeapBroadcastState(final 
RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo, final Map<K, V> 
internalMap) {
+
+               this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
+               this.backingMap = Preconditions.checkNotNull(internalMap);
+               this.internalMapCopySerializer = new 
MapSerializer<>(stateMetaInfo.getKeySerializer(), 
stateMetaInfo.getValueSerializer());
+       }
+
+       private HeapBroadcastState(HeapBroadcastState<K, V> toCopy) {
+               this(toCopy.stateMetaInfo, 
toCopy.internalMapCopySerializer.copy(toCopy.backingMap));
+       }
+
+       @Override
+       public void setStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, 
V> stateMetaInfo) {
+               this.stateMetaInfo = stateMetaInfo;
+       }
+
+       @Override
+       public RegisteredBroadcastBackendStateMetaInfo<K, V> getStateMetaInfo() 
{
+               return stateMetaInfo;
+       }
+
+       @Override
+       public HeapBroadcastState<K, V> deepCopy() {
+               return new HeapBroadcastState<>(this);
+       }
+
+       @Override
+       public void clear() {
+               backingMap.clear();
+       }
+
+       @Override
+       public String toString() {
+               return "HeapBroadcastState{" +
+                               "stateMetaInfo=" + stateMetaInfo +
+                               ", backingMap=" + backingMap +
+                               ", internalMapCopySerializer=" + 
internalMapCopySerializer +
+                               '}';
+       }
+
+       @Override
+       public long write(FSDataOutputStream out) throws IOException {
+               long partitionOffset = out.getPos();
+
+               DataOutputView dov = new DataOutputViewStreamWrapper(out);
+               dov.writeInt(backingMap.size());
+               for (Map.Entry<K, V> entry: backingMap.entrySet()) {
+                       
getStateMetaInfo().getKeySerializer().serialize(entry.getKey(), dov);
+                       
getStateMetaInfo().getValueSerializer().serialize(entry.getValue(), dov);
+               }
+
+               return partitionOffset;
+       }
+
+       @Override
+       public V get(K key) {
+               return backingMap.get(key);
+       }
+
+       @Override
+       public void put(K key, V value) {
+               backingMap.put(key, value);
+       }
+
+       @Override
+       public void putAll(Map<K, V> map) {
+               backingMap.putAll(map);
+       }
+
+       @Override
+       public void remove(K key) {
+               backingMap.remove(key);
+       }
+
+       @Override
+       public boolean contains(K key) {
+               return backingMap.containsKey(key);
+       }
+
+       @Override
+       public Iterator<Map.Entry<K, V>> iterator() {
+               return backingMap.entrySet().iterator();
+       }
+
+       @Override
+       public Iterable<Map.Entry<K, V>> entries() {
+               return backingMap.entrySet();
+       }
+
+       @Override
+       public Iterable<Map.Entry<K, V>> immutableEntries() {
+               return Collections.unmodifiableSet(backingMap.entrySet());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
index 074d84e..e73f83a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
@@ -33,9 +33,10 @@ import java.util.List;
  */
 public class OperatorBackendSerializationProxy extends 
VersionedIOReadableWritable {
 
-       public static final int VERSION = 2;
+       public static final int VERSION = 3;
 
-       private List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
stateMetaInfoSnapshots;
+       private List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
operatorStateMetaInfoSnapshots;
+       private List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> 
broadcastStateMetaInfoSnapshots;
        private ClassLoader userCodeClassLoader;
 
        public OperatorBackendSerializationProxy(ClassLoader 
userCodeClassLoader) {
@@ -43,10 +44,15 @@ public class OperatorBackendSerializationProxy extends 
VersionedIOReadableWritab
        }
 
        public OperatorBackendSerializationProxy(
-                       
List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
stateMetaInfoSnapshots) {
-
-               this.stateMetaInfoSnapshots = 
Preconditions.checkNotNull(stateMetaInfoSnapshots);
-               Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= 
Short.MAX_VALUE);
+                       
List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
operatorStateMetaInfoSnapshots,
+                       
List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> 
broadcastStateMetaInfoSnapshots) {
+
+               this.operatorStateMetaInfoSnapshots = 
Preconditions.checkNotNull(operatorStateMetaInfoSnapshots);
+               this.broadcastStateMetaInfoSnapshots = 
Preconditions.checkNotNull(broadcastStateMetaInfoSnapshots);
+               Preconditions.checkArgument(
+                               operatorStateMetaInfoSnapshots.size() <= 
Short.MAX_VALUE &&
+                                               
broadcastStateMetaInfoSnapshots.size() <= Short.MAX_VALUE
+               );
        }
 
        @Override
@@ -56,19 +62,26 @@ public class OperatorBackendSerializationProxy extends 
VersionedIOReadableWritab
 
        @Override
        public int[] getCompatibleVersions() {
-               // we are compatible with version 2 (Flink 1.3.x) and version 1 
(Flink 1.2.x)
-               return new int[] {VERSION, 1};
+               // we are compatible with version 3 (Flink 1.5.x), 2 (Flink 
1.4.x, Flink 1.3.x) and version 1 (Flink 1.2.x)
+               return new int[] {VERSION, 2, 1};
        }
 
        @Override
        public void write(DataOutputView out) throws IOException {
                super.write(out);
 
-               out.writeShort(stateMetaInfoSnapshots.size());
-               for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> kvState 
: stateMetaInfoSnapshots) {
+               out.writeShort(operatorStateMetaInfoSnapshots.size());
+               for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> state : 
operatorStateMetaInfoSnapshots) {
+                       OperatorBackendStateMetaInfoSnapshotReaderWriters
+                                       
.getOperatorStateWriterForVersion(VERSION, state)
+                                       .writeOperatorStateMetaInfo(out);
+               }
+
+               out.writeShort(broadcastStateMetaInfoSnapshots.size());
+               for (RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?> 
state : broadcastStateMetaInfoSnapshots) {
                        OperatorBackendStateMetaInfoSnapshotReaderWriters
-                               .getWriterForVersion(VERSION, kvState)
-                               .writeStateMetaInfo(out);
+                                       
.getBroadcastStateWriterForVersion(VERSION, state)
+                                       .writeBroadcastStateMetaInfo(out);
                }
        }
 
@@ -76,17 +89,35 @@ public class OperatorBackendSerializationProxy extends 
VersionedIOReadableWritab
        public void read(DataInputView in) throws IOException {
                super.read(in);
 
-               int numKvStates = in.readShort();
-               stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
-               for (int i = 0; i < numKvStates; i++) {
-                       stateMetaInfoSnapshots.add(
-                               
OperatorBackendStateMetaInfoSnapshotReaderWriters
-                                       .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
-                                       .readStateMetaInfo(in));
+               int numOperatorStates = in.readShort();
+               operatorStateMetaInfoSnapshots = new 
ArrayList<>(numOperatorStates);
+               for (int i = 0; i < numOperatorStates; i++) {
+                       operatorStateMetaInfoSnapshots.add(
+                                       
OperatorBackendStateMetaInfoSnapshotReaderWriters
+                                                       
.getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader)
+                                                       
.readOperatorStateMetaInfo(in));
                }
+
+               if (getReadVersion() >= 3) {
+                       // broadcast states did not exist prior to version 3
+                       int numBroadcastStates = in.readShort();
+                       broadcastStateMetaInfoSnapshots = new 
ArrayList<>(numBroadcastStates);
+                       for (int i = 0; i < numBroadcastStates; i++) {
+                               broadcastStateMetaInfoSnapshots.add(
+                                               
OperatorBackendStateMetaInfoSnapshotReaderWriters
+                                                               
.getBroadcastStateReaderForVersion(getReadVersion(), userCodeClassLoader)
+                                                               
.readBroadcastStateMetaInfo(in));
+                       }
+               } else {
+                       broadcastStateMetaInfoSnapshots = new ArrayList<>();
+               }
+       }
+
+       public List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
getOperatorStateMetaInfoSnapshots() {
+               return operatorStateMetaInfoSnapshots;
        }
 
-       public List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
getStateMetaInfoSnapshots() {
-               return stateMetaInfoSnapshots;
+       public List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> 
getBroadcastStateMetaInfoSnapshots() {
+               return broadcastStateMetaInfoSnapshots;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
index 03fe612..fafd542 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
@@ -31,7 +31,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 /**
  * Readers and writers for different versions of the {@link 
RegisteredOperatorBackendStateMetaInfo.Snapshot}.
@@ -45,9 +47,10 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
        //  Writers
        //   - v1: Flink 1.2.x
        //   - v2: Flink 1.3.x
+       //   - v3: Flink 1.5.x
        // 
-------------------------------------------------------------------------------
 
-       public static <S> OperatorBackendStateMetaInfoWriter 
getWriterForVersion(
+       public static <S> OperatorBackendStateMetaInfoWriter 
getOperatorStateWriterForVersion(
                        int version, 
RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) {
 
                switch (version) {
@@ -55,6 +58,7 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                                return new 
OperatorBackendStateMetaInfoWriterV1<>(stateMetaInfo);
 
                        // current version
+                       case 2:
                        case OperatorBackendSerializationProxy.VERSION:
                                return new 
OperatorBackendStateMetaInfoWriterV2<>(stateMetaInfo);
 
@@ -65,8 +69,28 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                }
        }
 
+       public static <K, V> BroadcastStateMetaInfoWriter 
getBroadcastStateWriterForVersion(
+                       final int version,
+                       final 
RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo) {
+
+               switch (version) {
+                       // current version
+                       case OperatorBackendSerializationProxy.VERSION:
+                               return new 
BroadcastStateMetaInfoWriterV3<>(broadcastStateMetaInfo);
+
+                       default:
+                               // guard for future
+                               throw new IllegalStateException(
+                                               "Unrecognized broadcast state 
meta info writer version: " + version);
+               }
+       }
+
        public interface OperatorBackendStateMetaInfoWriter {
-               void writeStateMetaInfo(DataOutputView out) throws IOException;
+               void writeOperatorStateMetaInfo(DataOutputView out) throws 
IOException;
+       }
+
+       public interface BroadcastStateMetaInfoWriter {
+               void writeBroadcastStateMetaInfo(final DataOutputView out) 
throws IOException;
        }
 
        public static abstract class 
AbstractOperatorBackendStateMetaInfoWriter<S>
@@ -79,6 +103,16 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                }
        }
 
+       public abstract static class AbstractBroadcastStateMetaInfoWriter<K, V>
+                       implements BroadcastStateMetaInfoWriter {
+
+               protected final 
RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo;
+
+               public AbstractBroadcastStateMetaInfoWriter(final 
RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo) {
+                       this.broadcastStateMetaInfo = 
Preconditions.checkNotNull(broadcastStateMetaInfo);
+               }
+       }
+
        public static class OperatorBackendStateMetaInfoWriterV1<S> extends 
AbstractOperatorBackendStateMetaInfoWriter<S> {
 
                public 
OperatorBackendStateMetaInfoWriterV1(RegisteredOperatorBackendStateMetaInfo.Snapshot<S>
 stateMetaInfo) {
@@ -86,7 +120,7 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                }
 
                @Override
-               public void writeStateMetaInfo(DataOutputView out) throws 
IOException {
+               public void writeOperatorStateMetaInfo(DataOutputView out) 
throws IOException {
                        out.writeUTF(stateMetaInfo.getName());
                        
out.writeByte(stateMetaInfo.getAssignmentMode().ordinal());
                        TypeSerializerSerializationUtil.writeSerializer(out, 
stateMetaInfo.getPartitionStateSerializer());
@@ -100,7 +134,7 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                }
 
                @Override
-               public void writeStateMetaInfo(DataOutputView out) throws 
IOException {
+               public void writeOperatorStateMetaInfo(DataOutputView out) 
throws IOException {
                        out.writeUTF(stateMetaInfo.getName());
                        
out.writeByte(stateMetaInfo.getAssignmentMode().ordinal());
 
@@ -113,20 +147,51 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                }
        }
 
+       public static class BroadcastStateMetaInfoWriterV3<K, V> extends 
AbstractBroadcastStateMetaInfoWriter<K, V> {
+
+               public BroadcastStateMetaInfoWriterV3(
+                               final 
RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo) {
+                       super(broadcastStateMetaInfo);
+               }
+
+               @Override
+               public void writeBroadcastStateMetaInfo(final DataOutputView 
out) throws IOException {
+                       out.writeUTF(broadcastStateMetaInfo.getName());
+                       
out.writeByte(broadcastStateMetaInfo.getAssignmentMode().ordinal());
+
+                       // write in a way that allows us to be fault-tolerant 
and skip blocks in the case of java serialization failures
+                       
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
+                                       out,
+                                       Arrays.asList(
+                                                       Tuple2.of(
+                                                                       
broadcastStateMetaInfo.getKeySerializer(),
+                                                                       
broadcastStateMetaInfo.getKeySerializerConfigSnapshot()
+                                                       ),
+                                                       Tuple2.of(
+                                                                       
broadcastStateMetaInfo.getValueSerializer(),
+                                                                       
broadcastStateMetaInfo.getValueSerializerConfigSnapshot()
+                                                       )
+                                       )
+                       );
+               }
+       }
+
        // 
-------------------------------------------------------------------------------
        //  Readers
        //   - v1: Flink 1.2.x
        //   - v2: Flink 1.3.x
+       //   - v3: Flink 1.5.x
        // 
-------------------------------------------------------------------------------
 
-       public static <S> OperatorBackendStateMetaInfoReader<S> 
getReaderForVersion(
+       public static <S> OperatorBackendStateMetaInfoReader<S> 
getOperatorStateReaderForVersion(
                        int version, ClassLoader userCodeClassLoader) {
 
                switch (version) {
                        case 1:
                                return new 
OperatorBackendStateMetaInfoReaderV1<>(userCodeClassLoader);
 
-                       // current version
+                       // version 2 and version 3 (current)
+                       case 2:
                        case OperatorBackendSerializationProxy.VERSION:
                                return new 
OperatorBackendStateMetaInfoReaderV2<>(userCodeClassLoader);
 
@@ -137,8 +202,27 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                }
        }
 
+       public static <K, V> BroadcastStateMetaInfoReader<K, V> 
getBroadcastStateReaderForVersion(
+                       int version, ClassLoader userCodeClassLoader) {
+
+               switch (version) {
+                       // current version
+                       case OperatorBackendSerializationProxy.VERSION:
+                               return new 
BroadcastStateMetaInfoReaderV3<>(userCodeClassLoader);
+
+                       default:
+                               // guard for future
+                               throw new IllegalStateException(
+                                               "Unrecognized broadcast state 
meta info reader version: " + version);
+               }
+       }
+
        public interface OperatorBackendStateMetaInfoReader<S> {
-               RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
readStateMetaInfo(DataInputView in) throws IOException;
+               RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
readOperatorStateMetaInfo(DataInputView in) throws IOException;
+       }
+
+       public interface BroadcastStateMetaInfoReader<K, V> {
+               RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> 
readBroadcastStateMetaInfo(final DataInputView in) throws IOException;
        }
 
        public static abstract class 
AbstractOperatorBackendStateMetaInfoReader<S>
@@ -151,6 +235,16 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                }
        }
 
+       public abstract static class AbstractBroadcastStateMetaInfoReader<K, V>
+                       implements BroadcastStateMetaInfoReader<K, V> {
+
+               protected final ClassLoader userCodeClassLoader;
+
+               public AbstractBroadcastStateMetaInfoReader(final ClassLoader 
userCodeClassLoader) {
+                       this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
+               }
+       }
+
        public static class OperatorBackendStateMetaInfoReaderV1<S> extends 
AbstractOperatorBackendStateMetaInfoReader<S> {
 
                public OperatorBackendStateMetaInfoReaderV1(ClassLoader 
userCodeClassLoader) {
@@ -159,7 +253,7 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
 
                @SuppressWarnings("unchecked")
                @Override
-               public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
readStateMetaInfo(DataInputView in) throws IOException {
+               public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
readOperatorStateMetaInfo(DataInputView in) throws IOException {
                        RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
stateMetaInfo =
                                new 
RegisteredOperatorBackendStateMetaInfo.Snapshot<>();
 
@@ -196,7 +290,7 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                }
 
                @Override
-               public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
readStateMetaInfo(DataInputView in) throws IOException {
+               public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
readOperatorStateMetaInfo(DataInputView in) throws IOException {
                        RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
stateMetaInfo =
                                new 
RegisteredOperatorBackendStateMetaInfo.Snapshot<>();
 
@@ -212,4 +306,34 @@ public class 
OperatorBackendStateMetaInfoSnapshotReaderWriters {
                        return stateMetaInfo;
                }
        }
+
+       public static class BroadcastStateMetaInfoReaderV3<K, V> extends 
AbstractBroadcastStateMetaInfoReader<K, V> {
+
+               public BroadcastStateMetaInfoReaderV3(final ClassLoader 
userCodeClassLoader) {
+                       super(userCodeClassLoader);
+               }
+
+               @Override
+               public RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> 
readBroadcastStateMetaInfo(final DataInputView in) throws IOException {
+                       RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> 
stateMetaInfo =
+                                       new 
RegisteredBroadcastBackendStateMetaInfo.Snapshot<>();
+
+                       stateMetaInfo.setName(in.readUTF());
+                       
stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
+
+                       List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> serializers =
+                                       
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, 
userCodeClassLoader);
+
+                       Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> 
keySerializerAndConfig = serializers.get(0);
+                       Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> 
valueSerializerAndConfig = serializers.get(1);
+
+                       stateMetaInfo.setKeySerializer((TypeSerializer<K>) 
keySerializerAndConfig.f0);
+                       
stateMetaInfo.setKeySerializerConfigSnapshot(keySerializerAndConfig.f1);
+                       
+                       stateMetaInfo.setValueSerializer((TypeSerializer<V>) 
valueSerializerAndConfig.f0);
+                       
stateMetaInfo.setValueSerializerConfigSnapshot(valueSerializerAndConfig.f1);
+
+                       return stateMetaInfo;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
index a357dc4..f9427ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
@@ -36,8 +36,9 @@ public class OperatorStateHandle implements StreamStateHandle 
{
         * The modes that determine how an {@link OperatorStateHandle} is 
assigned to tasks during restore.
         */
        public enum Mode {
-               SPLIT_DISTRIBUTE, // The operator state partitions in the state 
handle are split and distributed to one task each.
-               BROADCAST // The operator state partitions are broadcast to all 
task.
+               SPLIT_DISTRIBUTE,       // The operator state partitions in the 
state handle are split and distributed to one task each.
+               UNION,                          // The operator state 
partitions are UNION-ed upon restoring and sent to all tasks.
+               BROADCAST                       // The operator states are 
identical, as the state is produced from a broadcast stream.
        }
 
        private static final long serialVersionUID = 35876522969227335L;

http://git-wip-us.apache.org/repos/asf/flink/blob/484fedd4/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
new file mode 100644
index 0000000..d462b34
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+public class RegisteredBroadcastBackendStateMetaInfo<K, V> {
+
+       /** The name of the state, as registered by the user. */
+       private final String name;
+
+       /** The mode how elements in this state are assigned to tasks during 
restore. */
+       private final OperatorStateHandle.Mode assignmentMode;
+
+       /** The type serializer for the keys in the map state. */
+       private final TypeSerializer<K> keySerializer;
+
+       /** The type serializer for the values in the map state. */
+       private final TypeSerializer<V> valueSerializer;
+
+       public RegisteredBroadcastBackendStateMetaInfo(
+                       final String name,
+                       final OperatorStateHandle.Mode assignmentMode,
+                       final TypeSerializer<K> keySerializer,
+                       final TypeSerializer<V> valueSerializer) {
+
+               Preconditions.checkArgument(assignmentMode != null && 
assignmentMode == OperatorStateHandle.Mode.BROADCAST);
+
+               this.name = Preconditions.checkNotNull(name);
+               this.assignmentMode = assignmentMode;
+               this.keySerializer = Preconditions.checkNotNull(keySerializer);
+               this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer);
+       }
+
+       public String getName() {
+               return name;
+       }
+
+       public TypeSerializer<K> getKeySerializer() {
+               return keySerializer;
+       }
+
+       public TypeSerializer<V> getValueSerializer() {
+               return valueSerializer;
+       }
+
+       public OperatorStateHandle.Mode getAssignmentMode() {
+               return assignmentMode;
+       }
+
+       public RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> 
snapshot() {
+               return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>(
+                               name,
+                               assignmentMode,
+                               keySerializer.duplicate(),
+                               valueSerializer.duplicate(),
+                               keySerializer.snapshotConfiguration(),
+                               valueSerializer.snapshotConfiguration());
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               }
+
+               if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) {
+                       return false;
+               }
+
+               final RegisteredBroadcastBackendStateMetaInfo other =
+                               (RegisteredBroadcastBackendStateMetaInfo) obj;
+
+               return Objects.equals(name, other.getName())
+                               && Objects.equals(assignmentMode, 
other.getAssignmentMode())
+                               && Objects.equals(keySerializer, 
other.getKeySerializer())
+                               && Objects.equals(valueSerializer, 
other.getValueSerializer());
+       }
+
+       @Override
+       public int hashCode() {
+               int result = name.hashCode();
+               result = 31 * result + assignmentMode.hashCode();
+               result = 31 * result + keySerializer.hashCode();
+               result = 31 * result + valueSerializer.hashCode();
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "RegisteredBroadcastBackendStateMetaInfo{" +
+                               "name='" + name + '\'' +
+                               ", keySerializer=" + keySerializer +
+                               ", valueSerializer=" + valueSerializer +
+                               ", assignmentMode=" + assignmentMode +
+                               '}';
+       }
+
+       /**
+        * A consistent snapshot of a {@link 
RegisteredOperatorBackendStateMetaInfo}.
+        */
+       public static class Snapshot<K, V> {
+
+               private String name;
+               private OperatorStateHandle.Mode assignmentMode;
+               private TypeSerializer<K> keySerializer;
+               private TypeSerializer<V> valueSerializer;
+               private TypeSerializerConfigSnapshot 
keySerializerConfigSnapshot;
+               private TypeSerializerConfigSnapshot 
valueSerializerConfigSnapshot;
+
+               /** Empty constructor used when restoring the state meta info 
snapshot. */
+               Snapshot() {}
+
+               private Snapshot(
+                               final String name,
+                               final OperatorStateHandle.Mode assignmentMode,
+                               final TypeSerializer<K> keySerializer,
+                               final TypeSerializer<V> valueSerializer,
+                               final TypeSerializerConfigSnapshot 
keySerializerConfigSnapshot,
+                               final TypeSerializerConfigSnapshot 
valueSerializerConfigSnapshot) {
+
+                       this.name = Preconditions.checkNotNull(name);
+                       this.assignmentMode = 
Preconditions.checkNotNull(assignmentMode);
+                       this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
+                       this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer);
+                       this.keySerializerConfigSnapshot = 
Preconditions.checkNotNull(keySerializerConfigSnapshot);
+                       this.valueSerializerConfigSnapshot = 
Preconditions.checkNotNull(valueSerializerConfigSnapshot);
+               }
+
+               public String getName() {
+                       return name;
+               }
+
+               void setName(String name) {
+                       this.name = name;
+               }
+
+               public OperatorStateHandle.Mode getAssignmentMode() {
+                       return assignmentMode;
+               }
+
+               void setAssignmentMode(OperatorStateHandle.Mode mode) {
+                       this.assignmentMode = mode;
+               }
+
+               public TypeSerializer<K> getKeySerializer() {
+                       return keySerializer;
+               }
+
+               void setKeySerializer(TypeSerializer<K> serializer) {
+                       this.keySerializer = serializer;
+               }
+
+               public TypeSerializer<V> getValueSerializer() {
+                       return valueSerializer;
+               }
+
+               void setValueSerializer(TypeSerializer<V> serializer) {
+                       this.valueSerializer = serializer;
+               }
+
+               public TypeSerializerConfigSnapshot 
getKeySerializerConfigSnapshot() {
+                       return keySerializerConfigSnapshot;
+               }
+
+               void 
setKeySerializerConfigSnapshot(TypeSerializerConfigSnapshot configSnapshot) {
+                       this.keySerializerConfigSnapshot = configSnapshot;
+               }
+
+               public TypeSerializerConfigSnapshot 
getValueSerializerConfigSnapshot() {
+                       return valueSerializerConfigSnapshot;
+               }
+
+               void 
setValueSerializerConfigSnapshot(TypeSerializerConfigSnapshot configSnapshot) {
+                       this.valueSerializerConfigSnapshot = configSnapshot;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj == this) {
+                               return true;
+                       }
+
+                       if (!(obj instanceof 
RegisteredBroadcastBackendStateMetaInfo.Snapshot)) {
+                               return false;
+                       }
+
+                       RegisteredBroadcastBackendStateMetaInfo.Snapshot 
snapshot =
+                                       
(RegisteredBroadcastBackendStateMetaInfo.Snapshot) obj;
+
+                       return name.equals(snapshot.getName())
+                                       && assignmentMode.ordinal() == 
snapshot.getAssignmentMode().ordinal()
+                                       && Objects.equals(keySerializer, 
snapshot.getKeySerializer())
+                                       && Objects.equals(valueSerializer, 
snapshot.getValueSerializer())
+                                       && 
keySerializerConfigSnapshot.equals(snapshot.getKeySerializerConfigSnapshot())
+                                       && 
valueSerializerConfigSnapshot.equals(snapshot.getValueSerializerConfigSnapshot());
+               }
+
+               @Override
+               public int hashCode() {
+                       int result = name.hashCode();
+                       result = 31 * result + assignmentMode.hashCode();
+                       result = 31 * result + ((keySerializer != null) ? 
keySerializer.hashCode() : 0);
+                       result = 31 * result + ((valueSerializer != null) ? 
valueSerializer.hashCode() : 0);
+                       result = 31 * result + 
keySerializerConfigSnapshot.hashCode();
+                       result = 31 * result + 
valueSerializerConfigSnapshot.hashCode();
+                       return result;
+               }
+       }
+}

Reply via email to