Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168514624
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
---
@@ -20,35 +20,47 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import java.io.File;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
/**
* This class holds the all {@link TaskLocalStateStore} objects for a task
executor (manager).
- *
- * TODO: this still still work in progress and partially still acts as a
placeholder.
*/
public class TaskExecutorLocalStateStoresManager {
+ /** Logger for this class. */
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
+
/**
* This map holds all local state stores for tasks running on the task
manager / executor that own the instance of
- * this.
+ * this. Maps from allocation id to all the subtask's local state
stores.
*/
- private final Map<JobID, Map<JobVertexSubtaskKey, TaskLocalStateStore>>
taskStateStoresByJobID;
+ private final Map<AllocationID, Map<JobVertexSubtaskKey,
TaskLocalStateStore>> taskStateStoresByAllocationID;
/** This is the root directory for all local state of this task manager
/ executor. */
private final File[] localStateRootDirectories;
- public TaskExecutorLocalStateStoresManager(File[]
localStateRootDirectories) {
- this.taskStateStoresByJobID = new HashMap<>();
- this.localStateRootDirectories =
Preconditions.checkNotNull(localStateRootDirectories);
+ /** Executor that runs the discarding of released state objects. */
+ private final Executor discardExecutor;
+
+ public TaskExecutorLocalStateStoresManager(
+ @Nonnull File[] localStateRootDirectories,
+ @Nonnull Executor discardExecutor) {
--- End diff --
At least it is highlighting potential problems in the IDE and you can
instruct the compiler to introduce runtime checks if you wish so (if you
compile with Intellij this is the default).
---