Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2648#discussion_r83669355
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
---
@@ -804,199 +792,15 @@ public boolean restoreLatestCheckpointedState(
LOG.info("Restoring from latest valid checkpoint: {}.",
latest);
- for (Map.Entry<JobVertexID, TaskState>
taskGroupStateEntry: latest.getTaskStates().entrySet()) {
- TaskState taskState =
taskGroupStateEntry.getValue();
- ExecutionJobVertex executionJobVertex =
tasks.get(taskGroupStateEntry.getKey());
-
- if (executionJobVertex != null) {
- // check that the number of key groups
have not changed
- if (taskState.getMaxParallelism() !=
executionJobVertex.getMaxParallelism()) {
- throw new
IllegalStateException("The maximum parallelism (" +
-
taskState.getMaxParallelism() + ") with which the latest " +
- "checkpoint of the
execution job vertex " + executionJobVertex +
- " has been taken and
the current maximum parallelism (" +
-
executionJobVertex.getMaxParallelism() + ") changed. This " +
- "is currently not
supported.");
- }
-
-
- int oldParallelism =
taskState.getParallelism();
- int newParallelism =
executionJobVertex.getParallelism();
- boolean parallelismChanged =
oldParallelism != newParallelism;
- boolean hasNonPartitionedState =
taskState.hasNonPartitionedState();
-
- if (hasNonPartitionedState &&
parallelismChanged) {
- throw new
IllegalStateException("Cannot restore the latest checkpoint because " +
- "the operator " +
executionJobVertex.getJobVertexId() + " has non-partitioned " +
- "state and its
parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
- " has parallelism " +
newParallelism + " whereas the corresponding" +
- "state object has a
parallelism of " + oldParallelism);
- }
-
- List<KeyGroupRange> keyGroupPartitions
= createKeyGroupPartitions(
-
executionJobVertex.getMaxParallelism(),
- newParallelism);
-
- // operator chain index -> list of the
stored partitionables states from all parallel instances
- @SuppressWarnings("unchecked")
- List<OperatorStateHandle>[]
chainParallelStates =
- new
List[taskState.getChainLength()];
-
- for (int i = 0; i < oldParallelism;
++i) {
-
-
ChainedStateHandle<OperatorStateHandle> partitionableState =
-
taskState.getPartitionableState(i);
-
- if (partitionableState != null)
{
- for (int j = 0; j <
partitionableState.getLength(); ++j) {
-
OperatorStateHandle opParalleState = partitionableState.get(j);
- if
(opParalleState != null) {
-
List<OperatorStateHandle> opParallelStates =
-
chainParallelStates[j];
- if
(opParallelStates == null) {
-
opParallelStates = new ArrayList<>();
-
chainParallelStates[j] = opParallelStates;
- }
-
opParallelStates.add(opParalleState);
- }
- }
- }
- }
-
- // operator chain index -> lists with
collected states (one collection for each parallel subtasks)
- @SuppressWarnings("unchecked")
- List<Collection<OperatorStateHandle>>[]
redistributedParallelStates =
- new
List[taskState.getChainLength()];
-
- //TODO here we can employ different
redistribution strategies for state, e.g. union state. For now we only offer
round robin as the default.
- OperatorStateRepartitioner
repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
-
- for (int i = 0; i <
chainParallelStates.length; ++i) {
- List<OperatorStateHandle>
chainOpParallelStates = chainParallelStates[i];
- if (chainOpParallelStates !=
null) {
- //We only redistribute
if the parallelism of the operator changed from previous executions
- if (parallelismChanged)
{
-
redistributedParallelStates[i] = repartitioner.repartitionState(
-
chainOpParallelStates,
-
newParallelism);
- } else {
-
List<Collection<OperatorStateHandle>> repacking = new
ArrayList<>(newParallelism);
- for
(OperatorStateHandle operatorStateHandle : chainOpParallelStates) {
-
repacking.add(Collections.singletonList(operatorStateHandle));
- }
-
redistributedParallelStates[i] = repacking;
- }
- }
- }
-
- int counter = 0;
-
- for (int i = 0; i < newParallelism;
++i) {
-
- // non-partitioned state
-
ChainedStateHandle<StreamStateHandle> state = null;
-
- if (hasNonPartitionedState) {
- SubtaskState
subtaskState = taskState.getState(i);
-
- if (subtaskState !=
null) {
- // count the
number of executions for which we set a state
- ++counter;
- state =
subtaskState.getChainedStateHandle();
- }
- }
-
- // partitionable state
- @SuppressWarnings("unchecked")
-
Collection<OperatorStateHandle>[] ia = new
Collection[taskState.getChainLength()];
-
List<Collection<OperatorStateHandle>> subTaskPartitionableState =
Arrays.asList(ia);
-
- for (int j = 0; j <
redistributedParallelStates.length; ++j) {
-
List<Collection<OperatorStateHandle>> redistributedParallelState =
-
redistributedParallelStates[j];
-
- if
(redistributedParallelState != null) {
-
subTaskPartitionableState.set(j, redistributedParallelState.get(i));
- }
- }
+ StateAssignmentOperation stateAssignmentOperation =
--- End diff --
Is this a typical pattern? Why not just something like
```
StateAssignmentUtil.assignStates(tasks, latest, allOrNothingState)
```
Also, I'm not sure `allOrNothingState` is useful anymore. It's not invoked
with `true` except in tests.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---