Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r168747007 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +/** + * Main implementation of a {@link TaskLocalStateStore}. + */ +public class TaskLocalStateStoreImpl implements TaskLocalStateStore { + + /** Logger for this class. */ + private static final Logger LOG = LoggerFactory.getLogger(TaskLocalStateStoreImpl.class); + + /** Maximum number of retained snapshots. */ + @VisibleForTesting + static final int MAX_RETAINED_SNAPSHOTS = 5; + + /** Dummy value to use instead of null to satisfy {@link ConcurrentHashMap}. */ + private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot(); + + /** JobID from the owning subtask. */ + @Nonnull + private final JobID jobID; + + /** AllocationID of the owning slot. */ + @Nonnull + private final AllocationID allocationID; + + /** JobVertexID of the owning subtask. */ + @Nonnull + private final JobVertexID jobVertexID; + + /** Subtask index of the owning subtask. */ + @Nonnegative + private final int subtaskIndex; + + /** The configured mode for local recovery. */ + @Nonnull + private final LocalRecoveryConfig localRecoveryConfig; + + /** Executor that runs the discarding of released state objects. */ + @Nonnull + private final Executor discardExecutor; + + /** Lock for synchronisation on the storage map and the discarded status. */ + @Nonnull + private final Object lock; + + /** Status flag if this store was already discarded. */ + @GuardedBy("lock") + private boolean discarded; + + /** Maps checkpoint ids to local TaskStateSnapshots. */ + @Nonnull + @GuardedBy("lock") + private final SortedMap<Long, TaskStateSnapshot> storedTaskStateByCheckpointID; + + public TaskLocalStateStoreImpl( + @Nonnull JobID jobID, + @Nonnull AllocationID allocationID, + @Nonnull JobVertexID jobVertexID, + @Nonnegative int subtaskIndex, + @Nonnull LocalRecoveryConfig localRecoveryConfig, + @Nonnull Executor discardExecutor) { + + this.jobID = jobID; + this.allocationID = allocationID; + this.jobVertexID = jobVertexID; + this.subtaskIndex = subtaskIndex; + this.discardExecutor = discardExecutor; + this.lock = new Object(); + this.storedTaskStateByCheckpointID = new TreeMap<>(); + this.discarded = false; + this.localRecoveryConfig = localRecoveryConfig; + } + + @Override + public void storeLocalState( + @Nonnegative long checkpointId, + @Nullable TaskStateSnapshot localState) { + + if (localState == null) { + localState = NULL_DUMMY; + } + + LOG.info("Storing local state for checkpoint {}.", checkpointId); + LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState); + + Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(MAX_RETAINED_SNAPSHOTS); + + synchronized (lock) { + if (discarded) { + // we ignore late stores and simply discard the state. + toDiscard.put(checkpointId, localState); + } else { + TaskStateSnapshot previous = + storedTaskStateByCheckpointID.put(checkpointId, localState); + + if (previous != null) { + toDiscard.put(checkpointId, previous); + } + + // remove from history. + while (storedTaskStateByCheckpointID.size() > MAX_RETAINED_SNAPSHOTS) { + Long removeCheckpointID = storedTaskStateByCheckpointID.firstKey(); --- End diff -- Yes, once a checkpoint completed, all `pending completed checkpoint` will be deleted, but if the job meet a problem and can never get a `global completed checkpoint` these will lead to the problems that I described. But I agree with you this doesn't matter too much, this is just a extreme case.
---