Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113928105 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -62,252 +69,390 @@ public StateAssignmentOperation( } public boolean assignStates() throws Exception { - - // this tracks if we find missing node hash ids and already use secondary mappings - boolean expandedToLegacyIds = false; - + Map<OperatorID, OperatorState> localStates = new HashMap<>(taskStates); Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks; - for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) { - - TaskState taskState = taskGroupStateEntry.getValue(); - - //----------------------------------------find vertex for state--------------------------------------------- - - ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - - // on the first time we can not find the execution job vertex for an id, we also consider alternative ids, - // for example as generated from older flink versions, to provide backwards compatibility. - if (executionJobVertex == null && !expandedToLegacyIds) { - localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks); - executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - expandedToLegacyIds = true; - logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search."); - } + Set<OperatorID> allOperatorIDs = new HashSet<>(); + for (ExecutionJobVertex executionJobVertex : tasks.values()) { + allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs())); --- End diff -- I we change to immutable list instead of array, this code also saves one converting to list
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---