[
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367437#comment-16367437
]
ASF GitHub Bot commented on FLINK-8360:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168779365
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiFunction;
+
+/**
+ * This class is a wrapper over multiple alternative {@link
OperatorSubtaskState} that are (partial) substitutes for
+ * each other and imposes a priority ordering over all alternatives for
the different states which define an order in
+ * which the operator should attempt to restore the state from them. One
OperatorSubtaskState is considered as the
+ * "ground truth" about which state should be represented. Alternatives
may be complete or partial substitutes for
+ * the "ground truth" with a higher priority (if they had a lower
alternative, they would not really be alternatives).
+ * Substitution is determined on a per-sub-state basis.
+ */
+public class PrioritizedOperatorSubtaskState {
+
+ /** Singleton instance for an empty, non-restored operator state. */
+ private static final PrioritizedOperatorSubtaskState
EMPTY_NON_RESTORED_INSTANCE =
+ new PrioritizedOperatorSubtaskState(new OperatorSubtaskState(),
Collections.emptyList(), false);
+
+ /** List of prioritized snapshot alternatives for managed operator
state. */
+ private final List<StateObjectCollection<OperatorStateHandle>>
prioritizedManagedOperatorState;
+
+ /** List of prioritized snapshot alternatives for raw operator state. */
+ private final List<StateObjectCollection<OperatorStateHandle>>
prioritizedRawOperatorState;
+
+ /** List of prioritized snapshot alternatives for managed keyed state.
*/
+ private final List<StateObjectCollection<KeyedStateHandle>>
prioritizedManagedKeyedState;
+
+ /** List of prioritized snapshot alternatives for raw keyed state. */
+ private final List<StateObjectCollection<KeyedStateHandle>>
prioritizedRawKeyedState;
+
+ /** Signal flag if this represents state for a restored operator. */
+ private final boolean restored;
+
+ public PrioritizedOperatorSubtaskState(
+ @Nonnull OperatorSubtaskState jobManagerState,
+ @Nonnull List<OperatorSubtaskState> alternativesByPriority) {
+ this(jobManagerState, alternativesByPriority, true);
+ }
+
+ public PrioritizedOperatorSubtaskState(
+ @Nonnull OperatorSubtaskState jobManagerState,
+ @Nonnull List<OperatorSubtaskState> alternativesByPriority,
+ boolean restored) {
+
+ Preconditions.checkNotNull(jobManagerState, "Job manager state
is null.");
+ int size = Preconditions.checkNotNull(alternativesByPriority,
"Alternative states are null.").size();
+
+ this.restored = restored;
+
+ List<StateObjectCollection<OperatorStateHandle>>
managedOperatorAlternatives = new ArrayList<>(size);
+ List<StateObjectCollection<KeyedStateHandle>>
managedKeyedAlternatives = new ArrayList<>(size);
+ List<StateObjectCollection<OperatorStateHandle>>
rawOperatorAlternatives = new ArrayList<>(size);
+ List<StateObjectCollection<KeyedStateHandle>>
rawKeyedAlternatives = new ArrayList<>(size);
+
+ for (OperatorSubtaskState subtaskState :
alternativesByPriority) {
+
+ if (subtaskState != null) {
+
managedKeyedAlternatives.add(subtaskState.getManagedKeyedState());
+
rawKeyedAlternatives.add(subtaskState.getRawKeyedState());
+
managedOperatorAlternatives.add(subtaskState.getManagedOperatorState());
+
rawOperatorAlternatives.add(subtaskState.getRawOperatorState());
+ }
+ }
+
+ // Key-groups should match.
+ BiFunction<KeyedStateHandle, KeyedStateHandle, Boolean>
keyedStateApprover =
+ (ref, alt) ->
ref.getKeyGroupRange().equals(alt.getKeyGroupRange());
+
+ // State meta data should match.
+ BiFunction<OperatorStateHandle, OperatorStateHandle, Boolean>
operatorStateApprover =
+ (ref, alt) ->
ref.getStateNameToPartitionOffsets().equals(alt.getStateNameToPartitionOffsets());
+
+ this.prioritizedManagedKeyedState =
resolvePrioritizedAlternatives(
+ jobManagerState.getManagedKeyedState(),
+ managedKeyedAlternatives,
+ keyedStateApprover);
+
+ this.prioritizedRawKeyedState = resolvePrioritizedAlternatives(
+ jobManagerState.getRawKeyedState(),
+ rawKeyedAlternatives,
+ keyedStateApprover);
+
+ this.prioritizedManagedOperatorState =
resolvePrioritizedAlternatives(
+ jobManagerState.getManagedOperatorState(),
+ managedOperatorAlternatives,
+ operatorStateApprover);
+
+ this.prioritizedRawOperatorState =
resolvePrioritizedAlternatives(
+ jobManagerState.getRawOperatorState(),
+ rawOperatorAlternatives,
+ operatorStateApprover);
+ }
+
+ //
-----------------------------------------------------------------------------------------------------------------
+
+ /**
+ * Returns an iterator over all alternative snapshots to restore the
managed operator state, in the order in which
+ * we should attempt to restore.
+ */
+ @Nonnull
+ public Iterator<StateObjectCollection<OperatorStateHandle>>
getPrioritizedManagedOperatorState() {
+ return prioritizedManagedOperatorState.iterator();
+ }
+
+ /**
+ * Returns an iterator over all alternative snapshots to restore the
raw operator state, in the order in which we
+ * should attempt to restore.
+ */
+ @Nonnull
+ public Iterator<StateObjectCollection<OperatorStateHandle>>
getPrioritizedRawOperatorState() {
+ return prioritizedRawOperatorState.iterator();
+ }
+
+ /**
+ * Returns an iterator over all alternative snapshots to restore the
managed keyed state, in the order in which we
+ * should attempt to restore.
+ */
+ @Nonnull
+ public Iterator<StateObjectCollection<KeyedStateHandle>>
getPrioritizedManagedKeyedState() {
+ return prioritizedManagedKeyedState.iterator();
+ }
+
+ /**
+ * Returns an iterator over all alternative snapshots to restore the
raw keyed state, in the order in which we
+ * should attempt to restore.
+ */
+ @Nonnull
+ public Iterator<StateObjectCollection<KeyedStateHandle>>
getPrioritizedRawKeyedState() {
+ return prioritizedRawKeyedState.iterator();
+ }
+
+ //
-----------------------------------------------------------------------------------------------------------------
+
+ /**
+ * Returns the managed operator state from the job manager, which
represents the ground truth about what this state
+ * should represent. This is the alternative with lowest priority.
+ */
+ @Nonnull
+ public StateObjectCollection<OperatorStateHandle>
getJobManagerManagedOperatorState() {
+ return lastElement(prioritizedManagedOperatorState);
+ }
+
+ /**
+ * Returns the raw operator state from the job manager, which
represents the ground truth about what this state
+ * should represent. This is the alternative with lowest priority.
+ */
+ @Nonnull
+ public StateObjectCollection<OperatorStateHandle>
getJobManagerRawOperatorState() {
+ return lastElement(prioritizedRawOperatorState);
+ }
+
+ /**
+ * Returns the managed keyed state from the job manager, which
represents the ground truth about what this state
+ * should represent. This is the alternative with lowest priority.
+ */
+ @Nonnull
+ public StateObjectCollection<KeyedStateHandle>
getJobManagerManagedKeyedState() {
+ return lastElement(prioritizedManagedKeyedState);
+ }
+
+ /**
+ * Returns the raw keyed state from the job manager, which represents
the ground truth about what this state
+ * should represent. This is the alternative with lowest priority.
+ */
+ @Nonnull
+ public StateObjectCollection<KeyedStateHandle>
getJobManagerRawKeyedState() {
+ return lastElement(prioritizedRawKeyedState);
+ }
+
+ //
-----------------------------------------------------------------------------------------------------------------
+
+ /**
+ * Returns true if this was created for a restored operator, false
otherwise. Restored operators are operators that
+ * participated in a previous checkpoint, even if they did not emit any
state snapshots.
+ */
+ public boolean isRestored() {
+ return restored;
+ }
+
+
+ /**
+ * This helper method resolves the dependencies between the ground
truth of the operator state obtained from the
+ * job manager and potential alternatives for recovery, e.g. from a
task-local source.
+ */
+ protected <T extends StateObject> List<StateObjectCollection<T>>
resolvePrioritizedAlternatives(
+ StateObjectCollection<T> jobManagerState,
+ List<StateObjectCollection<T>> alternativesByPriority,
+ BiFunction<T, T, Boolean> approveFun) {
+
+ // Nothing to resolve if there are no alternatives, or the
ground truth has already no state, or if we can
+ // assume that a rescaling happened because we find more than
one handle in the JM state (this is more a sanity
+ // check).
+ if (alternativesByPriority == null
+ || alternativesByPriority.isEmpty()
+ || !jobManagerState.hasState()
+ || jobManagerState.size() != 1) {
+
--- End diff --
That might be, but there is still not a fundamental problem with the
approach if we change the matching/creation of the effective state handles in
the future, once it is required. For the time being, keeping it simple keeps
maintenance simpler.
> Implement task-local state recovery
> -----------------------------------
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main
> idea is to have a secondary, local copy of the checkpointed state, while
> there is still a primary copy in DFS that we report to the checkpoint
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available,
> to save network bandwidth. This requires that the assignment from tasks to
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and
> can easily enhance it to all other state types (e.g. operator state) later.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)