[
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365744#comment-16365744
]
ASF GitHub Bot commented on FLINK-8360:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168501441
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
---
@@ -19,92 +19,224 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import java.io.File;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
/**
* This class will service as a task-manager-level local storage for local
checkpointed state. The purpose is to provide
* access to a state that is stored locally for a faster recovery compared
to the state that is stored remotely in a
* stable store DFS. For now, this storage is only complementary to the
stable storage and local state is typically
* lost in case of machine failures. In such cases (and others), client
code of this class must fall back to using the
* slower but highly available store.
- *
- * TODO this is currently a placeholder / mock that still must be
implemented!
*/
public class TaskLocalStateStore {
- /** */
+ /** Logger for this class. */
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskLocalStateStore.class);
+
+ /** Maximum number of retained snapshots. */
+ private static final int MAX_RETAINED_SNAPSHOTS = 5;
+
+ /** Dummy value to use instead of null to satisfy {@link
ConcurrentHashMap}. */
+ private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot();
--- End diff --
Can this be static?
> Implement task-local state recovery
> -----------------------------------
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main
> idea is to have a secondary, local copy of the checkpointed state, while
> there is still a primary copy in DFS that we report to the checkpoint
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available,
> to save network bandwidth. This requires that the assignment from tasks to
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and
> can easily enhance it to all other state types (e.g. operator state) later.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)