[
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367427#comment-16367427
]
ASF GitHub Bot commented on FLINK-8360:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168777749
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiFunction;
+
+/**
+ * This class is a wrapper over multiple alternative {@link
OperatorSubtaskState} that are (partial) substitutes for
+ * each other and imposes a priority ordering over all alternatives for
the different states which define an order in
+ * which the operator should attempt to restore the state from them. One
OperatorSubtaskState is considered as the
+ * "ground truth" about which state should be represented. Alternatives
may be complete or partial substitutes for
+ * the "ground truth" with a higher priority (if they had a lower
alternative, they would not really be alternatives).
+ * Substitution is determined on a per-sub-state basis.
+ */
+public class PrioritizedOperatorSubtaskState {
+
+ /** Singleton instance for an empty, non-restored operator state. */
+ private static final PrioritizedOperatorSubtaskState
EMPTY_NON_RESTORED_INSTANCE =
+ new PrioritizedOperatorSubtaskState(new OperatorSubtaskState(),
Collections.emptyList(), false);
+
+ /** List of prioritized snapshot alternatives for managed operator
state. */
+ private final List<StateObjectCollection<OperatorStateHandle>>
prioritizedManagedOperatorState;
+
+ /** List of prioritized snapshot alternatives for raw operator state. */
+ private final List<StateObjectCollection<OperatorStateHandle>>
prioritizedRawOperatorState;
+
+ /** List of prioritized snapshot alternatives for managed keyed state.
*/
+ private final List<StateObjectCollection<KeyedStateHandle>>
prioritizedManagedKeyedState;
+
+ /** List of prioritized snapshot alternatives for raw keyed state. */
+ private final List<StateObjectCollection<KeyedStateHandle>>
prioritizedRawKeyedState;
+
+ /** Signal flag if this represents state for a restored operator. */
+ private final boolean restored;
+
+ public PrioritizedOperatorSubtaskState(
+ @Nonnull OperatorSubtaskState jobManagerState,
+ @Nonnull List<OperatorSubtaskState> alternativesByPriority) {
+ this(jobManagerState, alternativesByPriority, true);
+ }
+
+ public PrioritizedOperatorSubtaskState(
+ @Nonnull OperatorSubtaskState jobManagerState,
+ @Nonnull List<OperatorSubtaskState> alternativesByPriority,
+ boolean restored) {
+
+ Preconditions.checkNotNull(jobManagerState, "Job manager state
is null.");
+ int size = Preconditions.checkNotNull(alternativesByPriority,
"Alternative states are null.").size();
+
+ this.restored = restored;
+
+ List<StateObjectCollection<OperatorStateHandle>>
managedOperatorAlternatives = new ArrayList<>(size);
+ List<StateObjectCollection<KeyedStateHandle>>
managedKeyedAlternatives = new ArrayList<>(size);
+ List<StateObjectCollection<OperatorStateHandle>>
rawOperatorAlternatives = new ArrayList<>(size);
+ List<StateObjectCollection<KeyedStateHandle>>
rawKeyedAlternatives = new ArrayList<>(size);
+
+ for (OperatorSubtaskState subtaskState :
alternativesByPriority) {
+
+ if (subtaskState != null) {
+
managedKeyedAlternatives.add(subtaskState.getManagedKeyedState());
+
rawKeyedAlternatives.add(subtaskState.getRawKeyedState());
+
managedOperatorAlternatives.add(subtaskState.getManagedOperatorState());
+
rawOperatorAlternatives.add(subtaskState.getRawOperatorState());
+ }
+ }
+
+ // Key-groups should match.
+ BiFunction<KeyedStateHandle, KeyedStateHandle, Boolean>
keyedStateApprover =
+ (ref, alt) ->
ref.getKeyGroupRange().equals(alt.getKeyGroupRange());
+
--- End diff --
In theory, we could be less strict, yes. But in practice it doesn't matter
because rescaling is not supported right now. See my other comment.
> Implement task-local state recovery
> -----------------------------------
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main
> idea is to have a secondary, local copy of the checkpointed state, while
> there is still a primary copy in DFS that we report to the checkpoint
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available,
> to save network bandwidth. This requires that the assignment from tasks to
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and
> can easily enhance it to all other state types (e.g. operator state) later.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)