[
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339255#comment-16339255
]
ASF GitHub Bot commented on FLINK-8345:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5230#discussion_r163839057
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
---
@@ -228,36 +221,58 @@ private GroupByStateNameResults groupByStateName(
Map<String, List<Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo>>> broadcastNameToState =
nameToStateByMode.getByMode(OperatorStateHandle.Mode.BROADCAST);
- for (int i = 0; i < parallelism; ++i) {
+ for (int i = 0; i < newParallelism; ++i) {
Map<StreamStateHandle, OperatorStateHandle> mergeMap =
mergeMapList.get(i);
for (Map.Entry<String, List<Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo>>> e :
broadcastNameToState.entrySet()) {
- List<Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo>> current = e.getValue();
-
- for (Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo> handleWithMetaInfo : current) {
+ for (Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo> handleWithMetaInfo : e.getValue()) {
OperatorStateHandle operatorStateHandle
= mergeMap.get(handleWithMetaInfo.f0);
if (operatorStateHandle == null) {
- operatorStateHandle = new
OperatorStateHandle(
- new
HashMap<String, OperatorStateHandle.StateMetaInfo>(),
-
handleWithMetaInfo.f0);
-
+ operatorStateHandle = new
OperatorStateHandle(new HashMap<>(), handleWithMetaInfo.f0);
mergeMap.put(handleWithMetaInfo.f0, operatorStateHandle);
}
operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(),
handleWithMetaInfo.f1);
}
}
}
+
+ // Now we also add the state handles marked for broadcast to
all parallel instances
+ Map<String, List<Tuple2<StreamStateHandle,
OperatorStateHandle.StateMetaInfo>>> uniformBroadcastNameToState =
+
nameToStateByMode.getByMode(OperatorStateHandle.Mode.UNIFORM_BROADCAST);
+
+ for (int i = 0; i < newParallelism; ++i) {
+ // TODO: 11/29/17 should I take into account nulls?
--- End diff --
What about these two `TODOs`?
> Iterate over keyed state on broadcast side of connect with broadcast.
> ---------------------------------------------------------------------
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
> Issue Type: New Feature
> Components: Streaming
> Affects Versions: 1.5.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Priority: Major
> Fix For: 1.5.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)