[
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582781#comment-15582781
]
ASF GitHub Bot commented on FLINK-4844:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2648#discussion_r83669355
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
---
@@ -804,199 +792,15 @@ public boolean restoreLatestCheckpointedState(
LOG.info("Restoring from latest valid checkpoint: {}.",
latest);
- for (Map.Entry<JobVertexID, TaskState>
taskGroupStateEntry: latest.getTaskStates().entrySet()) {
- TaskState taskState =
taskGroupStateEntry.getValue();
- ExecutionJobVertex executionJobVertex =
tasks.get(taskGroupStateEntry.getKey());
-
- if (executionJobVertex != null) {
- // check that the number of key groups
have not changed
- if (taskState.getMaxParallelism() !=
executionJobVertex.getMaxParallelism()) {
- throw new
IllegalStateException("The maximum parallelism (" +
-
taskState.getMaxParallelism() + ") with which the latest " +
- "checkpoint of the
execution job vertex " + executionJobVertex +
- " has been taken and
the current maximum parallelism (" +
-
executionJobVertex.getMaxParallelism() + ") changed. This " +
- "is currently not
supported.");
- }
-
-
- int oldParallelism =
taskState.getParallelism();
- int newParallelism =
executionJobVertex.getParallelism();
- boolean parallelismChanged =
oldParallelism != newParallelism;
- boolean hasNonPartitionedState =
taskState.hasNonPartitionedState();
-
- if (hasNonPartitionedState &&
parallelismChanged) {
- throw new
IllegalStateException("Cannot restore the latest checkpoint because " +
- "the operator " +
executionJobVertex.getJobVertexId() + " has non-partitioned " +
- "state and its
parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
- " has parallelism " +
newParallelism + " whereas the corresponding" +
- "state object has a
parallelism of " + oldParallelism);
- }
-
- List<KeyGroupRange> keyGroupPartitions
= createKeyGroupPartitions(
-
executionJobVertex.getMaxParallelism(),
- newParallelism);
-
- // operator chain index -> list of the
stored partitionables states from all parallel instances
- @SuppressWarnings("unchecked")
- List<OperatorStateHandle>[]
chainParallelStates =
- new
List[taskState.getChainLength()];
-
- for (int i = 0; i < oldParallelism;
++i) {
-
-
ChainedStateHandle<OperatorStateHandle> partitionableState =
-
taskState.getPartitionableState(i);
-
- if (partitionableState != null)
{
- for (int j = 0; j <
partitionableState.getLength(); ++j) {
-
OperatorStateHandle opParalleState = partitionableState.get(j);
- if
(opParalleState != null) {
-
List<OperatorStateHandle> opParallelStates =
-
chainParallelStates[j];
- if
(opParallelStates == null) {
-
opParallelStates = new ArrayList<>();
-
chainParallelStates[j] = opParallelStates;
- }
-
opParallelStates.add(opParalleState);
- }
- }
- }
- }
-
- // operator chain index -> lists with
collected states (one collection for each parallel subtasks)
- @SuppressWarnings("unchecked")
- List<Collection<OperatorStateHandle>>[]
redistributedParallelStates =
- new
List[taskState.getChainLength()];
-
- //TODO here we can employ different
redistribution strategies for state, e.g. union state. For now we only offer
round robin as the default.
- OperatorStateRepartitioner
repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
-
- for (int i = 0; i <
chainParallelStates.length; ++i) {
- List<OperatorStateHandle>
chainOpParallelStates = chainParallelStates[i];
- if (chainOpParallelStates !=
null) {
- //We only redistribute
if the parallelism of the operator changed from previous executions
- if (parallelismChanged)
{
-
redistributedParallelStates[i] = repartitioner.repartitionState(
-
chainOpParallelStates,
-
newParallelism);
- } else {
-
List<Collection<OperatorStateHandle>> repacking = new
ArrayList<>(newParallelism);
- for
(OperatorStateHandle operatorStateHandle : chainOpParallelStates) {
-
repacking.add(Collections.singletonList(operatorStateHandle));
- }
-
redistributedParallelStates[i] = repacking;
- }
- }
- }
-
- int counter = 0;
-
- for (int i = 0; i < newParallelism;
++i) {
-
- // non-partitioned state
-
ChainedStateHandle<StreamStateHandle> state = null;
-
- if (hasNonPartitionedState) {
- SubtaskState
subtaskState = taskState.getState(i);
-
- if (subtaskState !=
null) {
- // count the
number of executions for which we set a state
- ++counter;
- state =
subtaskState.getChainedStateHandle();
- }
- }
-
- // partitionable state
- @SuppressWarnings("unchecked")
-
Collection<OperatorStateHandle>[] ia = new
Collection[taskState.getChainLength()];
-
List<Collection<OperatorStateHandle>> subTaskPartitionableState =
Arrays.asList(ia);
-
- for (int j = 0; j <
redistributedParallelStates.length; ++j) {
-
List<Collection<OperatorStateHandle>> redistributedParallelState =
-
redistributedParallelStates[j];
-
- if
(redistributedParallelState != null) {
-
subTaskPartitionableState.set(j, redistributedParallelState.get(i));
- }
- }
+ StateAssignmentOperation stateAssignmentOperation =
--- End diff --
Is this a typical pattern? Why not just something like
```
StateAssignmentUtil.assignStates(tasks, latest, allOrNothingState)
```
Also, I'm not sure `allOrNothingState` is useful anymore. It's not invoked
with `true` except in tests.
> Partitionable Raw Keyed/Operator State
> --------------------------------------
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
> Issue Type: New Feature
> Reporter: Stefan Richter
> Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using
> backends. However, the serialization code for many operators is build around
> reading/writing their state to a stream for checkpointing. We want to provide
> partitionable states also through streams, so that migrating existing
> operators becomes more easy.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)