rkhachatryan commented on a change in pull request #16575:
URL: https://github.com/apache/flink/pull/16575#discussion_r678573385



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/track/StateEntry.java
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.track;
+
+import org.apache.flink.runtime.state.StateObject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * StateEntry holds the tracking information about the state to eventually 
discard it.
+ *
+ * <p>Lifecycle
+ *
+ * <ol>
+ *   <li>Initially, the state is in active use, potentially by multiple state 
backends, as reflected
+ *       by {@link #activelyUsingBackends}.
+ *   <li>Once not actively used by a backend, the above counter is decremented 
and {@link
+ *       #pendingCheckpointsByBackend} and {@link #pendingSavepointsByBackend} 
are updated
+ *   <li>Each backend notifies it about the corresponding checkpoint updates 
(shrinking the above
+ *       maps)
+ *   <li>{@link #discardIfNotUsed() Once} no backend is actively using this 
entry, and no checkpoint
+ *       or savepoint is using it, the state is discarded (unless some 
savepoint was reported to be
+ *       actually using this state)
+ * </ol>
+ *
+ * One can think of {@link #pendingCheckpointsByBackend} and {@link 
#activelyUsingBackends} as
+ * potential usages in the past and in the future respectively.
+ *
+ * @param <K> type of state identifier
+ */
+@NotThreadSafe
+class StateEntry<K> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(StateEntry.class);
+
+    private final K key;
+    private final StateObject state;
+    private final TaskStateCleaner cleaner;
+    private final Consumer<K> discardCallback;
+
+    /** Number of state backends that may use this state in future 
checkpoints. */
+    private int activelyUsingBackends;
+
+    /**
+     * Not yet subsumed checkpoints by backend potentially using this {@link 
#state}. Entries
+     * (checkpoint IDs) for a backend are added once it stops using it and are 
removed on
+     * subsumption and {@link #checkpointAborted(String, long) abortion}. Once 
it's empty and {@link
+     * #pendingSavepointsByBackend} is empty and {@link 
#activelyUsingBackends} is zero, {@link
+     * #state} can be discarded.
+     *
+     * <p>INVARIANT: does not contain empty sets or nulls.
+     *
+     * <p>Set instead of a single highest checkpoint ID is used to handle 
abortions; NavigableSet is
+     * used to get this highest ID (a number would suffice at the cost of 
readability).
+     */
+    private final Map<String, NavigableSet<Long>> pendingCheckpointsByBackend 
= new HashMap<>();
+
+    /**
+     * Not aborted savepoints by backend potentially using this {@link 
#state}. Entries (savepoint
+     * IDs) for a backend are added once it stops using it and are removed on 
{@link
+     * #checkpointAborted(String, long) abortion}. Once a snapshot for a 
savepoint is reported, the
+     * state is marked as used externally and can not be removed. Once it's 
empty and {@link
+     * #pendingCheckpointsByBackend} is empty and {@link 
#activelyUsingBackends} is zero, {@link
+     * #state} can be discarded.
+     *
+     * <p>INVARIANT: does not contain empty sets or nulls.
+     *
+     * <p>Set instead of a single highest checkpoint ID is used to handle 
abortions; NavigableSet is
+     * used for the sake of readability.
+     */
+    private final Map<String, NavigableSet<Long>> pendingSavepointsByBackend = 
new HashMap<>();
+
+    /** Once set to true, state can not be discarded. */
+    private boolean usedExternally = false;
+
+    private boolean discarded = false;
+
+    StateEntry(
+            K key,
+            StateObject state,
+            int activelyUsingBackends,
+            TaskStateCleaner cleaner,
+            Consumer<K> discardCallback) {
+        this.key = key;
+        this.state = state;
+        this.cleaner = cleaner;
+        this.activelyUsingBackends = activelyUsingBackends;

Review comment:
       Good point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to