[
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367129#comment-16367129
]
ASF GitHub Bot commented on FLINK-8360:
---------------------------------------
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168752498
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
---
@@ -18,81 +18,267 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import java.io.File;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Executor;
/**
- * This class holds the all {@link TaskLocalStateStore} objects for a task
executor (manager).
- *
- * TODO: this still still work in progress and partially still acts as a
placeholder.
+ * This class holds the all {@link TaskLocalStateStoreImpl} objects for a
task executor (manager).
*/
public class TaskExecutorLocalStateStoresManager {
+ /** Logger for this class. */
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
+
/**
* This map holds all local state stores for tasks running on the task
manager / executor that own the instance of
- * this.
+ * this. Maps from allocation id to all the subtask's local state
stores.
*/
- private final Map<JobID, Map<JobVertexSubtaskKey, TaskLocalStateStore>>
taskStateManagers;
+ @GuardedBy("lock")
+ private final Map<AllocationID, Map<JobVertexSubtaskKey,
TaskLocalStateStoreImpl>> taskStateStoresByAllocationID;
+
+ /** The configured mode for local recovery on this task manager. */
+ private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode;
+
+ /** This is the root directory for all local state of this task manager
/ executor. */
+ private final File[] localStateRootDirectories;
+
+ /** Executor that runs the discarding of released state objects. */
+ private final Executor discardExecutor;
+
+ /** Guarding lock for taskStateStoresByAllocationID and closed-flag. */
+ private final Object lock;
+
+ private final Thread shutdownHook;
- public TaskExecutorLocalStateStoresManager() {
- this.taskStateManagers = new HashMap<>();
+ @GuardedBy("lock")
+ private boolean closed;
+
+ public TaskExecutorLocalStateStoresManager(
+ @Nonnull LocalRecoveryConfig.LocalRecoveryMode
localRecoveryMode,
+ @Nonnull File[] localStateRootDirectories,
+ @Nonnull Executor discardExecutor) {
+
+ this.taskStateStoresByAllocationID = new HashMap<>();
+ this.localRecoveryMode = localRecoveryMode;
+ this.localStateRootDirectories = localStateRootDirectories;
+ this.discardExecutor = discardExecutor;
+ this.lock = new Object();
+ this.closed = false;
+
+ for (File localStateRecoveryRootDir :
localStateRootDirectories) {
+ if (!localStateRecoveryRootDir.exists()) {
+
+ if (!localStateRecoveryRootDir.mkdirs()) {
+ throw new IllegalStateException("Could
not create root directory for local recovery: " +
+ localStateRecoveryRootDir);
+ }
+ }
+ }
+
+ // install a shutdown hook
+ this.shutdownHook = new
Thread("TaskExecutorLocalStateStoresManager shutdown hook") {
+ @Override
+ public void run() {
+ shutdown();
+ }
+ };
+ try {
+ Runtime.getRuntime().addShutdownHook(this.shutdownHook);
+ } catch (IllegalStateException e) {
+ // race, JVM is in shutdown already, we can safely
ignore this
+ LOG.debug("Unable to add shutdown hook for
TaskExecutorLocalStateStoresManager, shutdown already in progress", e);
+ } catch (Throwable t) {
+ LOG.warn("Error while adding shutdown hook for
TaskExecutorLocalStateStoresManager", t);
+ }
}
- public TaskLocalStateStore localStateStoreForTask(
- JobID jobId,
- JobVertexID jobVertexID,
- int subtaskIndex) {
+ @Nonnull
+ public TaskLocalStateStore localStateStoreForSubtask(
+ @Nonnull JobID jobId,
+ @Nonnull AllocationID allocationID,
+ @Nonnull JobVertexID jobVertexID,
+ @Nonnegative int subtaskIndex) {
+
+ synchronized (lock) {
+
+ if (closed) {
+ throw new
IllegalStateException("TaskExecutorLocalStateStoresManager is already closed
and cannot " +
+ "register a new TaskLocalStateStore.");
+ }
+
+ final Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>
taskStateManagers =
+
this.taskStateStoresByAllocationID.computeIfAbsent(allocationID, k -> new
HashMap<>());
+
+ final JobVertexSubtaskKey taskKey = new
JobVertexSubtaskKey(jobVertexID, subtaskIndex);
- Preconditions.checkNotNull(jobId);
- final JobVertexSubtaskKey taskKey = new
JobVertexSubtaskKey(jobVertexID, subtaskIndex);
+ // create the allocation base dirs, one inside each
root dir.
+ File[] allocationBaseDirectories =
allocationBaseDirectories(allocationID);
- final Map<JobVertexSubtaskKey, TaskLocalStateStore>
taskStateManagers =
- this.taskStateManagers.computeIfAbsent(jobId, k -> new
HashMap<>());
+ LocalRecoveryDirectoryProviderImpl directoryProvider =
new LocalRecoveryDirectoryProviderImpl(
+ allocationBaseDirectories,
+ jobId,
+ allocationID,
+ jobVertexID,
+ subtaskIndex);
- return taskStateManagers.computeIfAbsent(
- taskKey, k -> new TaskLocalStateStore(jobId,
jobVertexID, subtaskIndex));
+ LocalRecoveryConfig localRecoveryConfig = new
LocalRecoveryConfig(
+ localRecoveryMode,
+ directoryProvider);
+
+ return taskStateManagers.computeIfAbsent(
+ taskKey,
+ k -> new TaskLocalStateStoreImpl(
+ jobId,
+ allocationID,
+ jobVertexID,
+ subtaskIndex,
+ localRecoveryConfig,
+ discardExecutor));
+ }
}
- public void releaseJob(JobID jobID) {
+ public void releaseLocalStateForAllocationId(@Nonnull AllocationID
allocationID) {
--- End diff --
In `TaskExecutorLocalStateStoresManager` I only found this
`releaseLocalStateForAllocationId ()` which releases resources according to
`allocationID`. Is there missing a function for releasing resources according
to `jobID`? I think it should be used when a job is cancelled.
> Implement task-local state recovery
> -----------------------------------
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main
> idea is to have a secondary, local copy of the checkpointed state, while
> there is still a primary copy in DFS that we report to the checkpoint
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available,
> to save network bandwidth. This requires that the assignment from tasks to
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and
> can easily enhance it to all other state types (e.g. operator state) later.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)