[
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15584893#comment-15584893
]
ASF GitHub Bot commented on FLINK-4844:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2648#discussion_r83807199
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
---
@@ -804,199 +792,15 @@ public boolean restoreLatestCheckpointedState(
LOG.info("Restoring from latest valid checkpoint: {}.",
latest);
- for (Map.Entry<JobVertexID, TaskState>
taskGroupStateEntry: latest.getTaskStates().entrySet()) {
- TaskState taskState =
taskGroupStateEntry.getValue();
- ExecutionJobVertex executionJobVertex =
tasks.get(taskGroupStateEntry.getKey());
-
- if (executionJobVertex != null) {
- // check that the number of key groups
have not changed
- if (taskState.getMaxParallelism() !=
executionJobVertex.getMaxParallelism()) {
- throw new
IllegalStateException("The maximum parallelism (" +
-
taskState.getMaxParallelism() + ") with which the latest " +
- "checkpoint of the
execution job vertex " + executionJobVertex +
- " has been taken and
the current maximum parallelism (" +
-
executionJobVertex.getMaxParallelism() + ") changed. This " +
- "is currently not
supported.");
- }
-
-
- int oldParallelism =
taskState.getParallelism();
- int newParallelism =
executionJobVertex.getParallelism();
- boolean parallelismChanged =
oldParallelism != newParallelism;
- boolean hasNonPartitionedState =
taskState.hasNonPartitionedState();
-
- if (hasNonPartitionedState &&
parallelismChanged) {
- throw new
IllegalStateException("Cannot restore the latest checkpoint because " +
- "the operator " +
executionJobVertex.getJobVertexId() + " has non-partitioned " +
- "state and its
parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
- " has parallelism " +
newParallelism + " whereas the corresponding" +
- "state object has a
parallelism of " + oldParallelism);
- }
-
- List<KeyGroupRange> keyGroupPartitions
= createKeyGroupPartitions(
-
executionJobVertex.getMaxParallelism(),
- newParallelism);
-
- // operator chain index -> list of the
stored partitionables states from all parallel instances
- @SuppressWarnings("unchecked")
- List<OperatorStateHandle>[]
chainParallelStates =
- new
List[taskState.getChainLength()];
-
- for (int i = 0; i < oldParallelism;
++i) {
-
-
ChainedStateHandle<OperatorStateHandle> partitionableState =
-
taskState.getPartitionableState(i);
-
- if (partitionableState != null)
{
- for (int j = 0; j <
partitionableState.getLength(); ++j) {
-
OperatorStateHandle opParalleState = partitionableState.get(j);
- if
(opParalleState != null) {
-
List<OperatorStateHandle> opParallelStates =
-
chainParallelStates[j];
- if
(opParallelStates == null) {
-
opParallelStates = new ArrayList<>();
-
chainParallelStates[j] = opParallelStates;
- }
-
opParallelStates.add(opParalleState);
- }
- }
- }
- }
-
- // operator chain index -> lists with
collected states (one collection for each parallel subtasks)
- @SuppressWarnings("unchecked")
- List<Collection<OperatorStateHandle>>[]
redistributedParallelStates =
- new
List[taskState.getChainLength()];
-
- //TODO here we can employ different
redistribution strategies for state, e.g. union state. For now we only offer
round robin as the default.
- OperatorStateRepartitioner
repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
-
- for (int i = 0; i <
chainParallelStates.length; ++i) {
- List<OperatorStateHandle>
chainOpParallelStates = chainParallelStates[i];
- if (chainOpParallelStates !=
null) {
- //We only redistribute
if the parallelism of the operator changed from previous executions
- if (parallelismChanged)
{
-
redistributedParallelStates[i] = repartitioner.repartitionState(
-
chainOpParallelStates,
-
newParallelism);
- } else {
-
List<Collection<OperatorStateHandle>> repacking = new
ArrayList<>(newParallelism);
- for
(OperatorStateHandle operatorStateHandle : chainOpParallelStates) {
-
repacking.add(Collections.singletonList(operatorStateHandle));
- }
-
redistributedParallelStates[i] = repacking;
- }
- }
- }
-
- int counter = 0;
-
- for (int i = 0; i < newParallelism;
++i) {
-
- // non-partitioned state
-
ChainedStateHandle<StreamStateHandle> state = null;
-
- if (hasNonPartitionedState) {
- SubtaskState
subtaskState = taskState.getState(i);
-
- if (subtaskState !=
null) {
- // count the
number of executions for which we set a state
- ++counter;
- state =
subtaskState.getChainedStateHandle();
- }
- }
-
- // partitionable state
- @SuppressWarnings("unchecked")
-
Collection<OperatorStateHandle>[] ia = new
Collection[taskState.getChainLength()];
-
List<Collection<OperatorStateHandle>> subTaskPartitionableState =
Arrays.asList(ia);
-
- for (int j = 0; j <
redistributedParallelStates.length; ++j) {
-
List<Collection<OperatorStateHandle>> redistributedParallelState =
-
redistributedParallelStates[j];
-
- if
(redistributedParallelState != null) {
-
subTaskPartitionableState.set(j, redistributedParallelState.get(i));
- }
- }
+ StateAssignmentOperation stateAssignmentOperation =
--- End diff --
I was not sure whether this class will have state or not. I might still
want to break assignStates() down into smaller methods, which might lead me to
introducing some state to make. Last but not least, having things more fine
grained and in a class allows better unit testing and mocking, while mocking
static methods is impossible. This is one of the places where we might want
something like this in the future.
Not sure about the allOrNothingState, but that could be a separate change.
> Partitionable Raw Keyed/Operator State
> --------------------------------------
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
> Issue Type: New Feature
> Reporter: Stefan Richter
> Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using
> backends. However, the serialization code for many operators is build around
> reading/writing their state to a stream for checkpointing. We want to provide
> partitionable states also through streams, so that migrating existing
> operators becomes more easy.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)