Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168502352
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
---
@@ -19,92 +19,224 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import java.io.File;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
/**
* This class will service as a task-manager-level local storage for local
checkpointed state. The purpose is to provide
* access to a state that is stored locally for a faster recovery compared
to the state that is stored remotely in a
* stable store DFS. For now, this storage is only complementary to the
stable storage and local state is typically
* lost in case of machine failures. In such cases (and others), client
code of this class must fall back to using the
* slower but highly available store.
- *
- * TODO this is currently a placeholder / mock that still must be
implemented!
*/
public class TaskLocalStateStore {
- /** */
+ /** Logger for this class. */
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskLocalStateStore.class);
+
+ /** Maximum number of retained snapshots. */
+ private static final int MAX_RETAINED_SNAPSHOTS = 5;
+
+ /** Dummy value to use instead of null to satisfy {@link
ConcurrentHashMap}. */
+ private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot();
+
+ /** JobID from the owning subtask. */
private final JobID jobID;
- /** */
+ /** JobVertexID of the owning subtask. */
private final JobVertexID jobVertexID;
- /** */
+ /** Subtask index of the owning subtask. */
private final int subtaskIndex;
- /** */
- private final Map<Long, TaskStateSnapshot>
storedTaskStateByCheckpointID;
-
/** The root directories for all local state of this {@link
TaskLocalStateStore}. */
private final File[] localStateRootDirectories;
+ /** Executor that runs the discarding of released state objects. */
+ private final Executor discardExecutor;
+
+ /** Lock for synchronisation on the storage map and the discarded
status. */
+ private final Object lock;
+
+ /** Status flag if this store was already discarded. */
+ @GuardedBy("lock")
+ private boolean discarded;
+
+ /** Maps checkpoint ids to local TaskStateSnapshots. */
+ @GuardedBy("lock")
+ private final SortedMap<Long, TaskStateSnapshot>
storedTaskStateByCheckpointID;
+
public TaskLocalStateStore(
- JobID jobID,
- JobVertexID jobVertexID,
- int subtaskIndex,
- File[] localStateRootDirectories) {
+ @Nonnull JobID jobID,
+ @Nonnull JobVertexID jobVertexID,
+ @Nonnegative int subtaskIndex,
+ @Nonnull File[] localStateRootDirectories,
+ @Nonnull Executor discardExecutor) {
this.jobID = jobID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
- this.storedTaskStateByCheckpointID = new HashMap<>();
this.localStateRootDirectories = localStateRootDirectories;
+ this.discardExecutor = discardExecutor;
+ this.lock = new Object();
+ this.storedTaskStateByCheckpointID = new TreeMap<>();
+ this.discarded = false;
}
+ @Nonnull
protected String createSubtaskPath() {
return jobID + File.separator + jobVertexID + File.separator +
subtaskIndex;
}
+ /**
+ * Stores the local state for the given checkpoint id.
+ *
+ * @param checkpointId id for the checkpoint that created the local
state that will be stored.
+ * @param localState the local state to store.
+ */
public void storeLocalState(
- @Nonnull CheckpointMetaData checkpointMetaData,
+ @Nonnegative long checkpointId,
@Nullable TaskStateSnapshot localState) {
- TaskStateSnapshot previous =
-
storedTaskStateByCheckpointID.put(checkpointMetaData.getCheckpointId(),
localState);
+ if (localState == null) {
+ localState = NULL_DUMMY;
+ }
- if (previous != null) {
- throw new IllegalStateException("Found previously
registered local state for checkpoint with id " +
- checkpointMetaData.getCheckpointId() + "! This
indicated a problem.");
+ LOG.info("Storing local state for checkpoint {}.",
checkpointId);
+ LOG.debug("Local state for checkpoint {} is {}.", checkpointId,
localState);
+
+ synchronized (lock) {
+ if (discarded) {
+ // we ignore late stores and simply discard the
state.
+ discardStateObject(localState, checkpointId);
+ } else {
+ TaskStateSnapshot previous =
+
storedTaskStateByCheckpointID.put(checkpointId, localState);
+
+ if (previous != null) {
+ // this should never happen.
+ discardStateObject(previous,
checkpointId);
+ throw new IllegalStateException("Found
previously registered local state for checkpoint with id " +
+ checkpointId + "! This
indicated a problem.");
+ }
+
+ // prune history.
+ while (storedTaskStateByCheckpointID.size() >
MAX_RETAINED_SNAPSHOTS) {
+ Long removeCheckpointID =
storedTaskStateByCheckpointID.firstKey();
+ TaskStateSnapshot snapshot =
+
storedTaskStateByCheckpointID.remove(storedTaskStateByCheckpointID.firstKey());
--- End diff --
I think we can directly pass in `removeCheckpointID`.
---