Myasuka commented on a change in pull request #16575:
URL: https://github.com/apache/flink/pull/16575#discussion_r678082247



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/track/TaskStateRegistry.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.track;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateObject;
+
+import java.util.Collection;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+/**
+ * A component responsible for managing state that is not shared across 
different TMs (i.e. not
+ * managed by JM). Managing state here means {@link StateObject#discardState() 
deletion} once not
+ * used.
+ *
+ * <p>Must not be shared between different jobs.
+ *
+ * <p>Explicitly aware of checkpointing so that tracking of associations 
between checkpoints and
+ * snapshots can be reused (instead of implementing in each backend).
+ *
+ * <h1>State sharing inside TM</h1>
+ *
+ * <p>Inside each TM, the registry <strong>must</strong> be shared across 
state backends on the same
+ * level as their state is, or higher. For example, if all state backends of 
operators of the same
+ * job in TM use the same file on DFS then they <strong>must</strong> share 
the same registry. OTH,
+ * if nothing is shared then the Registry can be per-backend (or shared).
+ *
+ * <h1>State sharing across TMs</h1>
+ *
+ * If a rescaling operation (or recovery with state sharing inside TM) results 
in some state being
+ * shared by multiple TMs (or TaskStateRegistries), such relevant shared 
states must be communicated
+ * to the registry so that it can ignore them. Implementations 
<strong>must</strong> ignore such
+ * shared states.
+ *
+ * <h1>State identity</h1>
+ *
+ * For the purpose of matching state objects from the different calls (and 
from previous runs),
+ * state can be identified by {@link StateObjectID}. A collection of such IDs 
can be obtained from
+ * the {@link StateObject} by recursively traversing it. This is 
implementation specific; however,
+ * it must be consistent across attempts, registries and backends.
+ *
+ * <h1>Basic usage</h1>
+ *
+ * <ol>
+ *   <li>{@link #stateUsed(Set, Collection) register state usage}
+ *   <li>{@link #checkpointStarting(String, long, boolean) start checkpoint 1}
+ *   <li>{@link #stateNotUsed(String, StateObject) unregister state usage}
+ *   <li>{@link #checkpointStarting(String, long, boolean) start checkpoint 2} 
(state is not used as
+ *       it was unregistered)
+ *   <li>{@link #checkpointSubsumed(String, long) subsume checkpoint 1} (state 
can be discarded)
+ * </ol>
+ *
+ * <p>It is not required to register state usage strictly before starting a 
checkpoint (mostly
+ * because it is usually not known which state will be included into the 
snapshot when the
+ * checkpoint starts).
+ *
+ * <p>Usually, state registration and un-registration is done automatically 
via {@link
+ * #snapshotTaken(String, StateObject, long, boolean)}.
+ *
+ * <h1>Consistency and dangling state</h1>
+ *
+ * There are several factors that can impact consistency: incremental 
checkpoints; out-of-order
+ * calls from a single backend; multiple backends; multiple concurrent 
checkpoints; savepoints;
+ * missing confirmations. Potential issues include:
+ *
+ * <ul>
+ *   <li>Incremental checkpoint subsumption: if some state is shared across 
checkpoints then
+ *       discarding an older checkpoint might invalidate subsequent checkpoints
+ *   <li>Out-of-order checkpoint completion by a single backend: if a reported 
snapshot doesn't
+ *       include some state anymore then that state can be discarded. Any 
<strong>earlier</strong>
+ *       snapshots reported <strong>later</strong> still using this state 
later will be invalid
+ *   <li>Out-of-order state (un)registration by multiple backends: if some 
state is shared then some
+ *       backend might register and unregister its usage before the other 
starts using it. The
+ *       other's backend snapshots with this state may be invalid.
+ *   <li>Savepoint state deletion: savepoint should never be deleted unless 
aborted; even if there
+ *       is no confirmation or it has be subsumed
+ *   <li>With RETAIN_ON_CANCELLATION policy, last num-retained checkpoints 
shouldn't be discarded
+ *       even if the job is cancelled (NOT IMPLEMENTED)
+ * </ul>
+ *
+ * To allow implementations to address the above issues, client MUST:
+ *
+ * <ul>
+ *   <li>list all the backends {@link #stateUsed(Set, Collection) using some 
state} at once
+ *   <li>list all state objects {@link #snapshotTaken(String, StateObject, 
long, boolean) used in a
+ *       snapshot} at once
+ *   <li>notify that the checkpoint is {@link #checkpointStarting(String, 
long, boolean) started}
+ *       before {@link #stateNotUsed(String, StateObject) unregistering} any 
state this checkpoint
+ *       might be using (if a state was unregistered after a checkpoint was 
started it can still be
+ *       included into the snapshot)
+ *   <li>in particular, report the {@link #snapshotTaken(String, StateObject, 
long, boolean)}
+ *       snapshot} for a checkpoint only after notifying about its start
+ *   <li>not {@link #checkpointStarting(String, long, boolean) start any 
checkpoint} that may have
+ *       been {@link #checkpointSubsumed(String, long) subsumed}
+ *   <li>for savepoints, call either {@link #snapshotTaken(String, 
StateObject, long, boolean)
+ *       snapshotTaken} or {@link #checkpointAborted(String, long) aborted} 
(otherwise, state won't
+ *       be deleted)
+ *   <li>for checkpoints, call either {@link #checkpointSubsumed(String, long) 
subsumed} or {@link
+ *       #checkpointAborted(String, long) aborted} (otherwise, state won't be 
deleted until {@link
+ *       #jobTerminated(JobStatus)})
+ * </ul>
+ *
+ * <h1>Thread safety</h1>
+ *
+ * Thread safety depends on the implementation. An implementation shared 
across different tasks (in
+ * a single TM) MUST be thread-safe.
+ *
+ * <h1>Asynchronous discarding</h1>
+ *
+ * Production implementations might choose to discard the state 
asynchronously; however, they might
+ * choose to block the caller if they don't keep up.
+ */
+@Internal
+public interface TaskStateRegistry extends AutoCloseable {
+
+    /**
+     * Mark the given state as used by the given state backends. Should be 
called upon initial
+     * creation of state object (e.g. upload to DFS). It can be called before 
or after {@link
+     * #checkpointStarting(String, long, boolean) starting} a checkpoint using 
it.
+     *
+     * @param backendIds <strong>must</strong> include <strong>all</strong> 
backends that will using
+     *     the given state.
+     */
+    void stateUsed(Set<String> backendIds, Collection<StateObject> states);
+
+    /**
+     * Mark the given state as not used anymore by the given backend (i.e. it 
will not be included
+     * into any <strong>future</strong> snapshots); discard if not used by any 
other backend or
+     * checkpoint. When using incremental checkpoints, it should be called 
upon materialization;
+     * otherwise, on checkpoint subsumption (in addition to {@link 
#checkpointSubsumed(String,
+     * long)}. The method does nothing if the state is not marked as used.
+     *
+     * <p>Note that there is no need to call this method during the shutdown - 
any state is
+     * considered unused as no future checkpoints will be made.
+     */
+    void stateNotUsed(String backendId, StateObject state);
+
+    /**
+     * Notify that the checkpoint is about to start. Until {@link 
#snapshotTaken(String,
+     * StateObject, long, boolean)} notified explicitly}, any state that is 
still in use by the
+     * backend is considered as potentially used by this checkpoint.
+     */
+    void checkpointStarting(String backendId, long checkpointId, boolean 
isSavepoint);
+
+    /**
+     * Notify about the state used in a snapshot for the given checkpoint 
(before or after sending
+     * to JM). All state should be reported at once. The method serves the 
following goals:
+     *
+     * <ul>
+     *   <li>more fine-grained tracking of state usage by checkpoints to allow 
deletion
+     *   <li>tracking of state usage by savepoints to prevent deletion
+     *   <li>automatic tracking of state usage (see below)
+     * </ul>
+     *
+     * Must be called <strong>after</strong> the corresponding {@link 
#checkpointStarting(String,
+     * long, boolean)}.
+     *
+     * @param trackStateUsage if true then new entries will be added and 
marked as used; any state
+     *     objects from the previous checkpoint that are <strong>not</strong> 
used in this
+     *     checkpoint will be marked as unused (and potentially discarded). 
This is equivalent to
+     *     manually {@link #stateNotUsed(String, StateObject) marking such 
states as unused}. This
+     *     will only take effect if the previous checkpointId differs exactly 
by 1L from the current
+     *     one (a gap might mean that some older checkpoint may be completed 
later).
+     * @throws NoSuchElementException if the state is not registered
+     * @throws IllegalStateException if the given checkpoint was already 
performed
+     */
+    void snapshotTaken(

Review comment:
       Since all other methods are named as `checkpointXXX`, why not name it as 
`checkpointCreated`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/track/BackendStateRegistry.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.track;
+
+import org.apache.flink.runtime.state.StateObject;
+import 
org.apache.flink.runtime.state.track.TaskStateRegistryImpl.StateObjectIDExtractor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A per-StateBackend StateRegistry. Tracks the usage of {@link StateEntry} by 
backend and its
+ * snapshots. {@link StateEntry} will discard its state once all such 
registries are not using it.
+ *
+ * <p>Explicitly aware of checkpointing so that tracking of associations 
between checkpoints and
+ * snapshots can be reused (instead of implementing in each backend).
+ *
+ * <h1>Tracking lifecycle</h1>
+ *
+ * <ol>
+ *   <li>Upon {@link #stateUsed(Collection) registration}, {@link StateEntry} 
is added to {@link
+ *       #inActiveUse}
+ *   <li>Once {@link #stateNotUsed(Set) not actively used} (usually after 
materialization), it is
+ *       moved to {@link #inUseBySavepoints} (if used by savepoints; otherwise 
this step is
+ *       skipped). At this point, <strong>any pending checkpoint or savepoint 
is considered as
+ *       potentially using the state</strong>
+ *   <li>If any savepoint is {@link #snapshotSentForSavepoint(long, Set) 
reported} to actually use
+ *       it (before being aborted) then tracking is stopped (it is removed 
from {@link
+ *       #inUseBySavepoints}), discard is not possible.
+ *   <li>Once no savepoints are using it (as a result of abortion) the entry 
is moved to {@link
+ *       #inUseByCheckpoints} under the highest checkpoint ID it might be used 
in (if no checkpoints
+ *       are using it then this step is skipped).
+ *   <li>Once all checkpoint potentially using it are {@link 
#checkpointSubsumed(long) subsumed},
+ *       tracking stops (i.e. it is removed from {@link #inUseByCheckpoints}).
+ * </ol>
+ *
+ * On each step, the entry is notified about the event. When tracking stops it 
might decide to
+ * discard the state.
+ *
+ * <h1>Checkpoint lifecycle</h1>
+ *
+ * Upon {@link #checkpointStarting(long, boolean) start}, Every checkpoint or 
savepoint first
+ * updates {@link #lastStartedCheckpoint} and is added {@link 
#pendingCheckpoints} or {@link
+ * #pendingSavepoints}.
+ *
+ * <h1>Thread safety</h1>
+ *
+ * The class is not thread-safe. Furthermore, it uses {@link StateEntry} 
potentially shared with
+ * other instances used by other threads - so external synchronization 
required.
+ *
+ * @param <K> type of state identifier
+ */
+@NotThreadSafe
+class BackendStateRegistry<K> {
+    private final Logger LOG = 
LoggerFactory.getLogger(BackendStateRegistry.class);
+
+    private final String backendId;
+    private final StateObjectIDExtractor<K> keyExtractor;
+
+    private long lastStartedCheckpoint = -1L;
+    private final NavigableSet<Long> pendingCheckpoints = new TreeSet<>();
+    private final Set<Long> pendingSavepoints = new HashSet<>();
+
+    private long lastSnapshottedCheckpoint = -1L;
+    @Nullable private Set<K> lastSnapshot = null;
+
+    private final Map<K, StateEntry<K>> inActiveUse = new HashMap<>();

Review comment:
       The name of `inActiveUse` might be misunderstanding as another word 
`inactive` means not active any more. Maybe `activeUsedStateEntries` looks more 
better.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/track/StateEntry.java
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.track;
+
+import org.apache.flink.runtime.state.StateObject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * StateEntry holds the tracking information about the state to eventually 
discard it.
+ *
+ * <p>Lifecycle
+ *
+ * <ol>
+ *   <li>Initially, the state is in active use, potentially by multiple state 
backends, as reflected
+ *       by {@link #activelyUsingBackends}.
+ *   <li>Once not actively used by a backend, the above counter is decremented 
and {@link
+ *       #pendingCheckpointsByBackend} and {@link #pendingSavepointsByBackend} 
are updated
+ *   <li>Each backend notifies it about the corresponding checkpoint updates 
(shrinking the above
+ *       maps)
+ *   <li>{@link #discardIfNotUsed() Once} no backend is actively using this 
entry, and no checkpoint
+ *       or savepoint is using it, the state is discarded (unless some 
savepoint was reported to be
+ *       actually using this state)
+ * </ol>
+ *
+ * One can think of {@link #pendingCheckpointsByBackend} and {@link 
#activelyUsingBackends} as
+ * potential usages in the past and in the future respectively.
+ *
+ * @param <K> type of state identifier
+ */
+@NotThreadSafe
+class StateEntry<K> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(StateEntry.class);
+
+    private final K key;
+    private final StateObject state;
+    private final TaskStateCleaner cleaner;
+    private final Consumer<K> discardCallback;
+
+    /** Number of state backends that may use this state in future 
checkpoints. */
+    private int activelyUsingBackends;
+
+    /**
+     * Not yet subsumed checkpoints by backend potentially using this {@link 
#state}. Entries
+     * (checkpoint IDs) for a backend are added once it stops using it and are 
removed on
+     * subsumption and {@link #checkpointAborted(String, long) abortion}. Once 
it's empty and {@link
+     * #pendingSavepointsByBackend} is empty and {@link 
#activelyUsingBackends} is zero, {@link
+     * #state} can be discarded.
+     *
+     * <p>INVARIANT: does not contain empty sets or nulls.
+     *
+     * <p>Set instead of a single highest checkpoint ID is used to handle 
abortions; NavigableSet is
+     * used to get this highest ID (a number would suffice at the cost of 
readability).
+     */
+    private final Map<String, NavigableSet<Long>> pendingCheckpointsByBackend 
= new HashMap<>();
+
+    /**
+     * Not aborted savepoints by backend potentially using this {@link 
#state}. Entries (savepoint
+     * IDs) for a backend are added once it stops using it and are removed on 
{@link
+     * #checkpointAborted(String, long) abortion}. Once a snapshot for a 
savepoint is reported, the
+     * state is marked as used externally and can not be removed. Once it's 
empty and {@link
+     * #pendingCheckpointsByBackend} is empty and {@link 
#activelyUsingBackends} is zero, {@link
+     * #state} can be discarded.
+     *
+     * <p>INVARIANT: does not contain empty sets or nulls.
+     *
+     * <p>Set instead of a single highest checkpoint ID is used to handle 
abortions; NavigableSet is
+     * used for the sake of readability.
+     */
+    private final Map<String, NavigableSet<Long>> pendingSavepointsByBackend = 
new HashMap<>();
+
+    /** Once set to true, state can not be discarded. */
+    private boolean usedExternally = false;
+
+    private boolean discarded = false;
+
+    StateEntry(
+            K key,
+            StateObject state,
+            int activelyUsingBackends,
+            TaskStateCleaner cleaner,
+            Consumer<K> discardCallback) {
+        this.key = key;
+        this.state = state;
+        this.cleaner = cleaner;
+        this.activelyUsingBackends = activelyUsingBackends;

Review comment:
       I think we should check whether the `activelyUsingBackends` larger than 
zero.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/track/BackendStateRegistry.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.track;
+
+import org.apache.flink.runtime.state.StateObject;
+import 
org.apache.flink.runtime.state.track.TaskStateRegistryImpl.StateObjectIDExtractor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A per-StateBackend StateRegistry. Tracks the usage of {@link StateEntry} by 
backend and its
+ * snapshots. {@link StateEntry} will discard its state once all such 
registries are not using it.
+ *
+ * <p>Explicitly aware of checkpointing so that tracking of associations 
between checkpoints and
+ * snapshots can be reused (instead of implementing in each backend).
+ *
+ * <h1>Tracking lifecycle</h1>
+ *
+ * <ol>
+ *   <li>Upon {@link #stateUsed(Collection) registration}, {@link StateEntry} 
is added to {@link
+ *       #inActiveUse}
+ *   <li>Once {@link #stateNotUsed(Set) not actively used} (usually after 
materialization), it is
+ *       moved to {@link #inUseBySavepoints} (if used by savepoints; otherwise 
this step is
+ *       skipped). At this point, <strong>any pending checkpoint or savepoint 
is considered as
+ *       potentially using the state</strong>
+ *   <li>If any savepoint is {@link #snapshotSentForSavepoint(long, Set) 
reported} to actually use
+ *       it (before being aborted) then tracking is stopped (it is removed 
from {@link
+ *       #inUseBySavepoints}), discard is not possible.
+ *   <li>Once no savepoints are using it (as a result of abortion) the entry 
is moved to {@link
+ *       #inUseByCheckpoints} under the highest checkpoint ID it might be used 
in (if no checkpoints
+ *       are using it then this step is skipped).
+ *   <li>Once all checkpoint potentially using it are {@link 
#checkpointSubsumed(long) subsumed},
+ *       tracking stops (i.e. it is removed from {@link #inUseByCheckpoints}).
+ * </ol>
+ *
+ * On each step, the entry is notified about the event. When tracking stops it 
might decide to
+ * discard the state.
+ *
+ * <h1>Checkpoint lifecycle</h1>
+ *
+ * Upon {@link #checkpointStarting(long, boolean) start}, Every checkpoint or 
savepoint first
+ * updates {@link #lastStartedCheckpoint} and is added {@link 
#pendingCheckpoints} or {@link
+ * #pendingSavepoints}.
+ *
+ * <h1>Thread safety</h1>
+ *
+ * The class is not thread-safe. Furthermore, it uses {@link StateEntry} 
potentially shared with
+ * other instances used by other threads - so external synchronization 
required.
+ *
+ * @param <K> type of state identifier
+ */
+@NotThreadSafe
+class BackendStateRegistry<K> {
+    private final Logger LOG = 
LoggerFactory.getLogger(BackendStateRegistry.class);
+
+    private final String backendId;
+    private final StateObjectIDExtractor<K> keyExtractor;
+
+    private long lastStartedCheckpoint = -1L;
+    private final NavigableSet<Long> pendingCheckpoints = new TreeSet<>();
+    private final Set<Long> pendingSavepoints = new HashSet<>();
+
+    private long lastSnapshottedCheckpoint = -1L;
+    @Nullable private Set<K> lastSnapshot = null;
+
+    private final Map<K, StateEntry<K>> inActiveUse = new HashMap<>();
+    private final Map<K, StateEntry<K>> inUseBySavepoints = new HashMap<>();
+    private final NavigableMap<Long, List<StateEntry<K>>> inUseByCheckpoints = 
new TreeMap<>();
+
+    public BackendStateRegistry(String backendId, StateObjectIDExtractor<K> 
keyExtractor) {
+        this.keyExtractor = keyExtractor;
+        this.backendId = backendId;
+    }
+
+    public void stateUsed(Collection<StateEntry<K>> entries) {
+        LOG.debug("State used, backend: {}, state: {}", backendId, entries);
+        entries.forEach(e -> inActiveUse.put(e.getKey(), e));
+    }
+
+    public void stateNotUsed(Set<K> stateIDs) {
+        for (K key : stateIDs) {
+            StateEntry<K> entry = inActiveUse.remove(key);
+            if (entry != null) {
+                entry.notActivelyUsed(
+                        backendId,
+                        new TreeSet<>(pendingSavepoints),
+                        new TreeSet<>(pendingCheckpoints));
+                trackNotActivelyUsedEntry(entry);
+            }
+        }
+    }
+
+    private void trackNotActivelyUsedEntry(StateEntry<K> entry) {
+        if (entry.isUsedInSavepoints(backendId)) {
+            LOG.debug(
+                    "State entry not used but {} pending savepoints exist, 
backend: {}, state: {}",
+                    pendingSavepoints.size(),
+                    backendId,
+                    entry);
+            inUseBySavepoints.put(entry.getKey(), entry);
+        } else if (trackIfUsedByCheckpoints(entry)) {
+            LOG.debug(
+                    "State entry not used but {} pending checkpoints exist, 
backend: {}",
+                    pendingCheckpoints.size(),
+                    backendId);
+        } else {
+            LOG.debug("State entry not used, backend: {}, state: {}", 
backendId, entry);
+        }
+    }
+
+    public void checkpointStarting(long checkpointId, boolean isSavepoint) {
+        LOG.debug(
+                "Checkpoint started, backend: {}, checkpoint: {}, isSavepoint: 
{}",
+                backendId,
+                checkpointId,
+                isSavepoint);
+        Set<Long> ids = isSavepoint ? pendingSavepoints : pendingCheckpoints;
+        checkState(
+                lastStartedCheckpoint < checkpointId,
+                "Out of order checkpoint: %s, backend: %s, previous: %s, 
isSavepoint: %s",
+                checkpointId,
+                backendId,
+                lastStartedCheckpoint,
+                isSavepoint);
+        ids.add(checkpointId);
+        lastStartedCheckpoint = checkpointId;
+    }
+
+    public void snapshotTaken(
+            long checkpointId, StateObject state, boolean 
inferUnusedFromPrevious) {
+        Set<K> newStateKeys = keyExtractor.apply(state).keySet();
+        LOG.debug(
+                "Checkpoint performed, backend: {}, checkpoint: {}, 
inferUnusedFromPrevious: {}, state objects: {}",
+                backendId,
+                checkpointId,
+                inferUnusedFromPrevious,
+                newStateKeys);
+        if (pendingSavepoints.remove(checkpointId)) {
+            snapshotSentForSavepoint(checkpointId, newStateKeys);
+        } else if (pendingCheckpoints.contains(checkpointId)) {
+            // note that we don't remove this checkpoint from pending for now 
- wait
+            // until subsumed
+            snapshotSentForCheckpoint(checkpointId, inferUnusedFromPrevious, 
newStateKeys);
+        } else {
+            throw new IllegalStateException(
+                    String.format("Unknown checkpoint %d for backend %s", 
checkpointId, backendId));
+        }
+    }
+
+    private void snapshotSentForSavepoint(long checkpointId, Set<K> 
newStateKeys) {

Review comment:
       From my point of view,  the name of `snapshotSentForSavepoint` is a bit 
hard to understand, `who` `send` the information to `where`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/track/TaskStateRegistry.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.track;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateObject;
+
+import java.util.Collection;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+/**
+ * A component responsible for managing state that is not shared across 
different TMs (i.e. not
+ * managed by JM). Managing state here means {@link StateObject#discardState() 
deletion} once not
+ * used.
+ *
+ * <p>Must not be shared between different jobs.
+ *
+ * <p>Explicitly aware of checkpointing so that tracking of associations 
between checkpoints and
+ * snapshots can be reused (instead of implementing in each backend).
+ *
+ * <h1>State sharing inside TM</h1>
+ *
+ * <p>Inside each TM, the registry <strong>must</strong> be shared across 
state backends on the same
+ * level as their state is, or higher. For example, if all state backends of 
operators of the same
+ * job in TM use the same file on DFS then they <strong>must</strong> share 
the same registry. OTH,
+ * if nothing is shared then the Registry can be per-backend (or shared).
+ *
+ * <h1>State sharing across TMs</h1>
+ *
+ * If a rescaling operation (or recovery with state sharing inside TM) results 
in some state being
+ * shared by multiple TMs (or TaskStateRegistries), such relevant shared 
states must be communicated
+ * to the registry so that it can ignore them. Implementations 
<strong>must</strong> ignore such
+ * shared states.
+ *
+ * <h1>State identity</h1>
+ *
+ * For the purpose of matching state objects from the different calls (and 
from previous runs),
+ * state can be identified by {@link StateObjectID}. A collection of such IDs 
can be obtained from
+ * the {@link StateObject} by recursively traversing it. This is 
implementation specific; however,
+ * it must be consistent across attempts, registries and backends.
+ *
+ * <h1>Basic usage</h1>
+ *
+ * <ol>
+ *   <li>{@link #stateUsed(Set, Collection) register state usage}
+ *   <li>{@link #checkpointStarting(String, long, boolean) start checkpoint 1}
+ *   <li>{@link #stateNotUsed(String, StateObject) unregister state usage}
+ *   <li>{@link #checkpointStarting(String, long, boolean) start checkpoint 2} 
(state is not used as
+ *       it was unregistered)
+ *   <li>{@link #checkpointSubsumed(String, long) subsume checkpoint 1} (state 
can be discarded)
+ * </ol>
+ *
+ * <p>It is not required to register state usage strictly before starting a 
checkpoint (mostly
+ * because it is usually not known which state will be included into the 
snapshot when the
+ * checkpoint starts).
+ *
+ * <p>Usually, state registration and un-registration is done automatically 
via {@link
+ * #snapshotTaken(String, StateObject, long, boolean)}.
+ *
+ * <h1>Consistency and dangling state</h1>
+ *
+ * There are several factors that can impact consistency: incremental 
checkpoints; out-of-order
+ * calls from a single backend; multiple backends; multiple concurrent 
checkpoints; savepoints;
+ * missing confirmations. Potential issues include:
+ *
+ * <ul>
+ *   <li>Incremental checkpoint subsumption: if some state is shared across 
checkpoints then
+ *       discarding an older checkpoint might invalidate subsequent checkpoints
+ *   <li>Out-of-order checkpoint completion by a single backend: if a reported 
snapshot doesn't
+ *       include some state anymore then that state can be discarded. Any 
<strong>earlier</strong>
+ *       snapshots reported <strong>later</strong> still using this state 
later will be invalid
+ *   <li>Out-of-order state (un)registration by multiple backends: if some 
state is shared then some
+ *       backend might register and unregister its usage before the other 
starts using it. The
+ *       other's backend snapshots with this state may be invalid.
+ *   <li>Savepoint state deletion: savepoint should never be deleted unless 
aborted; even if there
+ *       is no confirmation or it has be subsumed
+ *   <li>With RETAIN_ON_CANCELLATION policy, last num-retained checkpoints 
shouldn't be discarded
+ *       even if the job is cancelled (NOT IMPLEMENTED)
+ * </ul>
+ *
+ * To allow implementations to address the above issues, client MUST:
+ *
+ * <ul>
+ *   <li>list all the backends {@link #stateUsed(Set, Collection) using some 
state} at once
+ *   <li>list all state objects {@link #snapshotTaken(String, StateObject, 
long, boolean) used in a
+ *       snapshot} at once
+ *   <li>notify that the checkpoint is {@link #checkpointStarting(String, 
long, boolean) started}
+ *       before {@link #stateNotUsed(String, StateObject) unregistering} any 
state this checkpoint
+ *       might be using (if a state was unregistered after a checkpoint was 
started it can still be
+ *       included into the snapshot)
+ *   <li>in particular, report the {@link #snapshotTaken(String, StateObject, 
long, boolean)}
+ *       snapshot} for a checkpoint only after notifying about its start
+ *   <li>not {@link #checkpointStarting(String, long, boolean) start any 
checkpoint} that may have
+ *       been {@link #checkpointSubsumed(String, long) subsumed}
+ *   <li>for savepoints, call either {@link #snapshotTaken(String, 
StateObject, long, boolean)
+ *       snapshotTaken} or {@link #checkpointAborted(String, long) aborted} 
(otherwise, state won't
+ *       be deleted)
+ *   <li>for checkpoints, call either {@link #checkpointSubsumed(String, long) 
subsumed} or {@link
+ *       #checkpointAborted(String, long) aborted} (otherwise, state won't be 
deleted until {@link
+ *       #jobTerminated(JobStatus)})

Review comment:
       Which method you mean here?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/track/StateEntry.java
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.track;
+
+import org.apache.flink.runtime.state.StateObject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * StateEntry holds the tracking information about the state to eventually 
discard it.
+ *
+ * <p>Lifecycle
+ *
+ * <ol>
+ *   <li>Initially, the state is in active use, potentially by multiple state 
backends, as reflected
+ *       by {@link #activelyUsingBackends}.
+ *   <li>Once not actively used by a backend, the above counter is decremented 
and {@link
+ *       #pendingCheckpointsByBackend} and {@link #pendingSavepointsByBackend} 
are updated
+ *   <li>Each backend notifies it about the corresponding checkpoint updates 
(shrinking the above
+ *       maps)
+ *   <li>{@link #discardIfNotUsed() Once} no backend is actively using this 
entry, and no checkpoint
+ *       or savepoint is using it, the state is discarded (unless some 
savepoint was reported to be
+ *       actually using this state)
+ * </ol>
+ *
+ * One can think of {@link #pendingCheckpointsByBackend} and {@link 
#activelyUsingBackends} as
+ * potential usages in the past and in the future respectively.
+ *
+ * @param <K> type of state identifier
+ */
+@NotThreadSafe
+class StateEntry<K> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(StateEntry.class);
+
+    private final K key;
+    private final StateObject state;
+    private final TaskStateCleaner cleaner;
+    private final Consumer<K> discardCallback;
+
+    /** Number of state backends that may use this state in future 
checkpoints. */
+    private int activelyUsingBackends;
+
+    /**
+     * Not yet subsumed checkpoints by backend potentially using this {@link 
#state}. Entries
+     * (checkpoint IDs) for a backend are added once it stops using it and are 
removed on
+     * subsumption and {@link #checkpointAborted(String, long) abortion}. Once 
it's empty and {@link
+     * #pendingSavepointsByBackend} is empty and {@link 
#activelyUsingBackends} is zero, {@link
+     * #state} can be discarded.
+     *
+     * <p>INVARIANT: does not contain empty sets or nulls.
+     *
+     * <p>Set instead of a single highest checkpoint ID is used to handle 
abortions; NavigableSet is
+     * used to get this highest ID (a number would suffice at the cost of 
readability).
+     */
+    private final Map<String, NavigableSet<Long>> pendingCheckpointsByBackend 
= new HashMap<>();
+
+    /**
+     * Not aborted savepoints by backend potentially using this {@link 
#state}. Entries (savepoint
+     * IDs) for a backend are added once it stops using it and are removed on 
{@link
+     * #checkpointAborted(String, long) abortion}. Once a snapshot for a 
savepoint is reported, the
+     * state is marked as used externally and can not be removed. Once it's 
empty and {@link
+     * #pendingCheckpointsByBackend} is empty and {@link 
#activelyUsingBackends} is zero, {@link
+     * #state} can be discarded.
+     *
+     * <p>INVARIANT: does not contain empty sets or nulls.
+     *
+     * <p>Set instead of a single highest checkpoint ID is used to handle 
abortions; NavigableSet is
+     * used for the sake of readability.
+     */
+    private final Map<String, NavigableSet<Long>> pendingSavepointsByBackend = 
new HashMap<>();
+
+    /** Once set to true, state can not be discarded. */
+    private boolean usedExternally = false;
+
+    private boolean discarded = false;
+
+    StateEntry(
+            K key,
+            StateObject state,
+            int activelyUsingBackends,
+            TaskStateCleaner cleaner,
+            Consumer<K> discardCallback) {
+        this.key = key;
+        this.state = state;
+        this.cleaner = cleaner;
+        this.activelyUsingBackends = activelyUsingBackends;
+        this.discardCallback = discardCallback;
+    }
+
+    /**
+     * Mark this entry as not used for future checkpoints by the given 
backend. Older checkpoints
+     * may still use it.
+     *
+     * @param usingCheckpoints checkpoints that may be using this state - 
mutable for abortion
+     * @param usingSavepoints savepoints that may be using this state - 
mutable for abortion
+     */
+    public void notActivelyUsed(
+            String backendId,
+            NavigableSet<Long> usingSavepoints,
+            NavigableSet<Long> usingCheckpoints) {
+        LOG.trace(
+                "Update state entry usage, backendId: {}, usingSavepoints: {}, 
usingCheckpoints: {}",
+                backendId,
+                usingSavepoints,
+                usingCheckpoints);
+        checkState(!discarded && --activelyUsingBackends >= 0);
+        putOnceIfNonEmpty(pendingSavepointsByBackend, usingSavepoints, 
backendId);
+        putOnceIfNonEmpty(pendingCheckpointsByBackend, usingCheckpoints, 
backendId);
+        discardIfNotUsed();
+    }
+
+    public void savepointSucceeded(String backendId, long savepointId) {
+        checkState(!discarded, "State entry %s was already discarded", this);
+        NavigableSet<Long> savepoints = 
pendingSavepointsByBackend.get(backendId);
+        NavigableSet<Long> checkpoints = 
pendingCheckpointsByBackend.get(backendId);
+        if (savepoints == null) {
+            // can be null if in active use by this backend
+            // but then checkpoints must also be null
+            checkState(
+                    activelyUsingBackends > 0 && checkpoints == null,
+                    "State entry %s is in unexpected state, 
activelyUsingBackends: %s, checkpoints: %s, savepoints: %s, backend: %s",
+                    this,
+                    activelyUsingBackends,
+                    checkpoints,
+                    savepoints,
+                    backendId);
+        } else {
+            checkState(
+                    savepoints.remove(savepointId),
+                    "State entry %s wasn't used by backend %s for savepoint %s 
(savepoints: %s)",
+                    this,
+                    backendId,
+                    savepointId,
+                    savepoints);
+            if (savepoints.isEmpty()) {
+                pendingSavepointsByBackend.remove(backendId);
+            }
+        }
+        usedExternally = true;
+        discardIfNotUsed();
+        // rely on normal workflow and then GC to cleanup the objects in memory
+    }
+
+    private void discard() {
+        if (discarded) {
+            LOG.trace("Not discarding state entry - already discarded: {}", 
this);
+            return;
+        }
+        discarded = true;
+        discardCallback.accept(key);
+        if (usedExternally) {
+            LOG.trace("Not discarding state entry - used externally: {}", 
this);
+        } else {
+            LOG.trace("Discarding state entry: {}", this);
+            cleaner.discardAsync(state);
+        }
+    }
+
+    public void checkpointSubsumed(String backendId, long checkpointId) {
+        // It's enough to simply clear remainingCheckpoints for this backend 
because it should
+        // already track the highest checkpoint. But we'll check the 
remainingCheckpoints explicitly
+        // - for robustness and clarity
+        NavigableSet<Long> remainingCheckpoints = 
pendingCheckpointsByBackend.get(backendId);
+        if (remainingCheckpoints != null
+                && !remainingCheckpoints.isEmpty()
+                && remainingCheckpoints.last() <= checkpointId) {
+            pendingCheckpointsByBackend.remove(backendId);
+            discardIfNotUsed();
+        }
+    }
+
+    public void checkpointAborted(String backendId, long checkpointId) {
+        aborted(backendId, checkpointId, this.pendingSavepointsByBackend);
+    }
+
+    public void savepointAborted(String backendId, long checkpointId) {
+        aborted(backendId, checkpointId, pendingCheckpointsByBackend);
+    }
+
+    private void aborted(String backendId, long id, Map<String, 
NavigableSet<Long>> idsByBackend) {
+        Set<Long> remaining = idsByBackend.get(backendId);
+        if (remaining != null) {
+            remaining.remove(id);
+            discardIfNotUsed();
+        }
+    }
+
+    private void discardIfNotUsed() {
+        if (activelyUsingBackends == 0
+                && pendingSavepointsByBackend.isEmpty()
+                && pendingCheckpointsByBackend.isEmpty()) {
+            discard();
+        }
+    }
+
+    public boolean isUsedInSavepoints(String backendId) {
+        return nonEmpty(pendingSavepointsByBackend.get(backendId));
+    }
+
+    @Nullable
+    public Long getHighestUsingCheckpoint(String backendId) throws 
NoSuchElementException {
+        return last(pendingCheckpointsByBackend.get(backendId));
+    }
+
+    public K getKey() {
+        return key;
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "key=%s, state=%s, discarded=%s, backendCount=%d",
+                key, state, discarded, activelyUsingBackends);
+    }
+
+    private static void putOnceIfNonEmpty(
+            Map<String, NavigableSet<Long>> target, NavigableSet<Long> set, 
String key) {
+        if (nonEmpty(set)) {
+            checkState(target.put(key, set) == null);

Review comment:
       If the target already existed the `key`, I prefer to throw exception 
with detailed message instead of an `IllegalStateException`. To prevent useless 
constructor of `String`, we can use if-else statement.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/track/TaskStateRegistryImpl.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.track;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.function.Function;
+
+import static java.lang.Thread.holdsLock;
+import static java.util.Collections.singleton;
+import static 
org.apache.flink.runtime.state.track.TaskStateRegistryImpl.StateObjectIDExtractor.IDENTITY;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** @param <K> state object ID type (for {@link #distributedState}). */
+@ThreadSafe
+@Internal
+public class TaskStateRegistryImpl<K> implements TaskStateRegistry {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TaskStateRegistryImpl.class);
+
+    private final TaskStateCleaner cleaner;
+    private final StateObjectIDExtractor<K> keyExtractor;
+
+    // Synchronization is to prevent issues inside backend registries, not 
just Map modification.
+    // E.g. backends may share State Entries and otherwise, would update them 
concurrently.
+    @GuardedBy("lock")
+    private final Map<String, BackendStateRegistry<K>> backendStateRegistries 
= new HashMap<>();
+
+    @GuardedBy("lock")
+    private final Set<K> usedState = new HashSet<>();
+
+    // Synchronization is just to prevent concurrent Map modification issues.
+    @GuardedBy("lock")
+    private final Set<K> distributedState = new HashSet<>();
+
+    private final Object lock = new Object();
+
+    TaskStateRegistryImpl(TaskStateCleaner cleaner, StateObjectIDExtractor<K> 
keyExtractor) {
+        this.cleaner = checkNotNull(cleaner);
+        this.keyExtractor = checkNotNull(keyExtractor);
+    }
+
+    @Override
+    public void stateUsed(Set<String> backendIds, Collection<StateObject> 
states) {
+        synchronized (lock) {
+            stateUsedInternal(backendIds, states);
+        }
+    }
+
+    private void stateUsedInternal(Set<String> backendIds, 
Collection<StateObject> states) {
+        checkState(holdsLock(lock));
+        List<StateEntry<K>> entries = toStateEntries(states, 
backendIds.size());
+        if (!entries.isEmpty()) {
+            for (StateEntry<K> e : entries) {
+                usedState.add(e.getKey());
+            }
+            for (String backendId : backendIds) {
+                withRegistry(backendId, registry -> 
registry.stateUsed(entries));
+            }
+        }
+    }
+
+    private List<StateEntry<K>> toStateEntries(Collection<StateObject> states, 
int numBackends) {
+        checkState(holdsLock(lock));
+        List<StateEntry<K>> entries = new ArrayList<>();
+        for (StateObject stateObject : states) {
+            for (Map.Entry<K, StateObject> entry : 
keyExtractor.apply(stateObject).entrySet()) {
+                K stateKey = entry.getKey();
+                StateObject state = entry.getValue();
+                if (shouldTrack(stateKey)) {
+                    entries.add(
+                            new StateEntry<>(
+                                    stateKey, state, numBackends, cleaner, 
usedState::remove));
+                }
+            }
+        }
+        return entries;
+    }
+
+    private boolean shouldTrack(K k) {
+        return !usedState.contains(k) && !distributedState.contains(k);
+    }
+
+    @Override
+    public void stateNotUsed(String backendId, StateObject state) {
+        Set<K> keys = keyExtractor.apply(state).keySet();
+        if (!keys.isEmpty()) {
+            withRegistry(backendId, registry -> registry.stateNotUsed(keys));
+        }
+    }
+
+    @Override
+    public void checkpointStarting(String backendId, long checkpointId, 
boolean isSavepoint) {
+        withRegistry(backendId, registry -> 
registry.checkpointStarting(checkpointId, isSavepoint));
+    }
+
+    @Override
+    public void snapshotTaken(
+            String backendId, StateObject state, long checkpointId, boolean 
trackStateUsage) {
+        withRegistry(
+                backendId,
+                registry -> {
+                    if (checkpointId > 
registry.getLastSnapshottedCheckpoint()) {
+                        if (trackStateUsage) {
+                            stateUsedInternal(singleton(backendId), 
singleton(state));
+                        }
+                        registry.snapshotTaken(checkpointId, state, 
trackStateUsage);
+                    }
+                });
+    }
+
+    @Override
+    public void checkpointSubsumed(String backendId, long checkpointId) {
+        withRegistry(backendId, registry -> 
registry.checkpointSubsumed(checkpointId));
+    }
+
+    @Override
+    public void checkpointAborted(String backendId, long checkpointId) {
+        withRegistry(backendId, registry -> 
registry.checkpointAborted(checkpointId));
+    }
+
+    @Override
+    public void close() throws Exception {
+        LOG.debug("Close");

Review comment:
       I think this statement is too simple.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/track/TaskStateRegistry.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.track;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateObject;
+
+import java.util.Collection;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+/**
+ * A component responsible for managing state that is not shared across 
different TMs (i.e. not
+ * managed by JM). Managing state here means {@link StateObject#discardState() 
deletion} once not
+ * used.
+ *
+ * <p>Must not be shared between different jobs.
+ *
+ * <p>Explicitly aware of checkpointing so that tracking of associations 
between checkpoints and
+ * snapshots can be reused (instead of implementing in each backend).
+ *
+ * <h1>State sharing inside TM</h1>
+ *
+ * <p>Inside each TM, the registry <strong>must</strong> be shared across 
state backends on the same
+ * level as their state is, or higher. For example, if all state backends of 
operators of the same
+ * job in TM use the same file on DFS then they <strong>must</strong> share 
the same registry. OTH,
+ * if nothing is shared then the Registry can be per-backend (or shared).
+ *
+ * <h1>State sharing across TMs</h1>
+ *
+ * If a rescaling operation (or recovery with state sharing inside TM) results 
in some state being
+ * shared by multiple TMs (or TaskStateRegistries), such relevant shared 
states must be communicated
+ * to the registry so that it can ignore them. Implementations 
<strong>must</strong> ignore such
+ * shared states.
+ *
+ * <h1>State identity</h1>
+ *
+ * For the purpose of matching state objects from the different calls (and 
from previous runs),
+ * state can be identified by {@link StateObjectID}. A collection of such IDs 
can be obtained from
+ * the {@link StateObject} by recursively traversing it. This is 
implementation specific; however,
+ * it must be consistent across attempts, registries and backends.
+ *
+ * <h1>Basic usage</h1>
+ *
+ * <ol>
+ *   <li>{@link #stateUsed(Set, Collection) register state usage}
+ *   <li>{@link #checkpointStarting(String, long, boolean) start checkpoint 1}
+ *   <li>{@link #stateNotUsed(String, StateObject) unregister state usage}
+ *   <li>{@link #checkpointStarting(String, long, boolean) start checkpoint 2} 
(state is not used as
+ *       it was unregistered)
+ *   <li>{@link #checkpointSubsumed(String, long) subsume checkpoint 1} (state 
can be discarded)
+ * </ol>
+ *
+ * <p>It is not required to register state usage strictly before starting a 
checkpoint (mostly
+ * because it is usually not known which state will be included into the 
snapshot when the
+ * checkpoint starts).
+ *
+ * <p>Usually, state registration and un-registration is done automatically 
via {@link
+ * #snapshotTaken(String, StateObject, long, boolean)}.
+ *
+ * <h1>Consistency and dangling state</h1>
+ *
+ * There are several factors that can impact consistency: incremental 
checkpoints; out-of-order
+ * calls from a single backend; multiple backends; multiple concurrent 
checkpoints; savepoints;
+ * missing confirmations. Potential issues include:
+ *
+ * <ul>
+ *   <li>Incremental checkpoint subsumption: if some state is shared across 
checkpoints then
+ *       discarding an older checkpoint might invalidate subsequent checkpoints
+ *   <li>Out-of-order checkpoint completion by a single backend: if a reported 
snapshot doesn't
+ *       include some state anymore then that state can be discarded. Any 
<strong>earlier</strong>
+ *       snapshots reported <strong>later</strong> still using this state 
later will be invalid
+ *   <li>Out-of-order state (un)registration by multiple backends: if some 
state is shared then some
+ *       backend might register and unregister its usage before the other 
starts using it. The
+ *       other's backend snapshots with this state may be invalid.
+ *   <li>Savepoint state deletion: savepoint should never be deleted unless 
aborted; even if there
+ *       is no confirmation or it has be subsumed
+ *   <li>With RETAIN_ON_CANCELLATION policy, last num-retained checkpoints 
shouldn't be discarded
+ *       even if the job is cancelled (NOT IMPLEMENTED)
+ * </ul>
+ *
+ * To allow implementations to address the above issues, client MUST:
+ *
+ * <ul>
+ *   <li>list all the backends {@link #stateUsed(Set, Collection) using some 
state} at once
+ *   <li>list all state objects {@link #snapshotTaken(String, StateObject, 
long, boolean) used in a
+ *       snapshot} at once
+ *   <li>notify that the checkpoint is {@link #checkpointStarting(String, 
long, boolean) started}
+ *       before {@link #stateNotUsed(String, StateObject) unregistering} any 
state this checkpoint
+ *       might be using (if a state was unregistered after a checkpoint was 
started it can still be
+ *       included into the snapshot)
+ *   <li>in particular, report the {@link #snapshotTaken(String, StateObject, 
long, boolean)}
+ *       snapshot} for a checkpoint only after notifying about its start
+ *   <li>not {@link #checkpointStarting(String, long, boolean) start any 
checkpoint} that may have
+ *       been {@link #checkpointSubsumed(String, long) subsumed}
+ *   <li>for savepoints, call either {@link #snapshotTaken(String, 
StateObject, long, boolean)
+ *       snapshotTaken} or {@link #checkpointAborted(String, long) aborted} 
(otherwise, state won't
+ *       be deleted)
+ *   <li>for checkpoints, call either {@link #checkpointSubsumed(String, long) 
subsumed} or {@link
+ *       #checkpointAborted(String, long) aborted} (otherwise, state won't be 
deleted until {@link
+ *       #jobTerminated(JobStatus)})
+ * </ul>
+ *
+ * <h1>Thread safety</h1>
+ *
+ * Thread safety depends on the implementation. An implementation shared 
across different tasks (in
+ * a single TM) MUST be thread-safe.
+ *
+ * <h1>Asynchronous discarding</h1>
+ *
+ * Production implementations might choose to discard the state 
asynchronously; however, they might
+ * choose to block the caller if they don't keep up.
+ */
+@Internal
+public interface TaskStateRegistry extends AutoCloseable {
+
+    /**
+     * Mark the given state as used by the given state backends. Should be 
called upon initial
+     * creation of state object (e.g. upload to DFS). It can be called before 
or after {@link
+     * #checkpointStarting(String, long, boolean) starting} a checkpoint using 
it.
+     *
+     * @param backendIds <strong>must</strong> include <strong>all</strong> 
backends that will using
+     *     the given state.
+     */
+    void stateUsed(Set<String> backendIds, Collection<StateObject> states);
+
+    /**
+     * Mark the given state as not used anymore by the given backend (i.e. it 
will not be included
+     * into any <strong>future</strong> snapshots); discard if not used by any 
other backend or
+     * checkpoint. When using incremental checkpoints, it should be called 
upon materialization;
+     * otherwise, on checkpoint subsumption (in addition to {@link 
#checkpointSubsumed(String,
+     * long)}. The method does nothing if the state is not marked as used.
+     *
+     * <p>Note that there is no need to call this method during the shutdown - 
any state is
+     * considered unused as no future checkpoints will be made.
+     */
+    void stateNotUsed(String backendId, StateObject state);
+
+    /**
+     * Notify that the checkpoint is about to start. Until {@link 
#snapshotTaken(String,
+     * StateObject, long, boolean)} notified explicitly}, any state that is 
still in use by the
+     * backend is considered as potentially used by this checkpoint.
+     */
+    void checkpointStarting(String backendId, long checkpointId, boolean 
isSavepoint);
+
+    /**
+     * Notify about the state used in a snapshot for the given checkpoint 
(before or after sending
+     * to JM). All state should be reported at once. The method serves the 
following goals:
+     *
+     * <ul>
+     *   <li>more fine-grained tracking of state usage by checkpoints to allow 
deletion
+     *   <li>tracking of state usage by savepoints to prevent deletion
+     *   <li>automatic tracking of state usage (see below)
+     * </ul>
+     *
+     * Must be called <strong>after</strong> the corresponding {@link 
#checkpointStarting(String,
+     * long, boolean)}.
+     *
+     * @param trackStateUsage if true then new entries will be added and 
marked as used; any state

Review comment:
       To be honest, `trackStateUsage` is a bit hard to understand here. I 
cannot see any production code to set it as `false`. And what's more, where do 
we check whether `the previous checkpointId differs exactly by 1L from the 
current one`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/track/StateEntry.java
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.track;
+
+import org.apache.flink.runtime.state.StateObject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * StateEntry holds the tracking information about the state to eventually 
discard it.
+ *
+ * <p>Lifecycle
+ *
+ * <ol>
+ *   <li>Initially, the state is in active use, potentially by multiple state 
backends, as reflected
+ *       by {@link #activelyUsingBackends}.
+ *   <li>Once not actively used by a backend, the above counter is decremented 
and {@link
+ *       #pendingCheckpointsByBackend} and {@link #pendingSavepointsByBackend} 
are updated
+ *   <li>Each backend notifies it about the corresponding checkpoint updates 
(shrinking the above
+ *       maps)
+ *   <li>{@link #discardIfNotUsed() Once} no backend is actively using this 
entry, and no checkpoint

Review comment:
       ```suggestion
    *   <li>{@link #discardIfNotUsed()} Once no backend is actively using this 
entry, and no checkpoint
   ```

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -208,6 +221,8 @@ public ChangelogKeyedStateBackend(
         this.changelogStates = new HashMap<>();
         this.mainMailboxExecutor = checkNotNull(mainMailboxExecutor);
         this.asyncOperationsThreadPool = 
checkNotNull(asyncOperationsThreadPool);
+        this.taskStateRegistry = checkNotNull(taskStateRegistry);
+        this.backendId = UUID.randomUUID().toString();

Review comment:
       How about print the `backendId` info?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to