Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5230#discussion_r163839057
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
---
@@ -228,36 +221,58 @@ private GroupByStateNameResults groupByStateName(
Map<String, List<Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo>>> broadcastNameToState =
nameToStateByMode.getByMode(OperatorStateHandle.Mode.BROADCAST);
- for (int i = 0; i < parallelism; ++i) {
+ for (int i = 0; i < newParallelism; ++i) {
Map<StreamStateHandle, OperatorStateHandle> mergeMap =
mergeMapList.get(i);
for (Map.Entry<String, List<Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo>>> e :
broadcastNameToState.entrySet()) {
- List<Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo>> current = e.getValue();
-
- for (Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo> handleWithMetaInfo : current) {
+ for (Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo> handleWithMetaInfo : e.getValue()) {
OperatorStateHandle operatorStateHandle
= mergeMap.get(handleWithMetaInfo.f0);
if (operatorStateHandle == null) {
- operatorStateHandle = new
OperatorStateHandle(
- new
HashMap<String, OperatorStateHandle.StateMetaInfo>(),
-
handleWithMetaInfo.f0);
-
+ operatorStateHandle = new
OperatorStateHandle(new HashMap<>(), handleWithMetaInfo.f0);
mergeMap.put(handleWithMetaInfo.f0, operatorStateHandle);
}
operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(),
handleWithMetaInfo.f1);
}
}
}
+
+ // Now we also add the state handles marked for broadcast to
all parallel instances
+ Map<String, List<Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo>>> uniformBroadcastNameToState =
+
nameToStateByMode.getByMode(OperatorStateHandle.Mode.UNIFORM_BROADCAST);
+
+ for (int i = 0; i < newParallelism; ++i) {
+ // TODO: 11/29/17 should I take into account nulls?
--- End diff --
What about these two `TODOs`?
---