rkhachatryan commented on a change in pull request #16582:
URL: https://github.com/apache/flink/pull/16582#discussion_r676720289
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1243,6 +1250,29 @@ private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint)
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
exception);
}
+
+ try {
+
allPreviousCheckpoints.removeAll(completedCheckpointStore.getAllCheckpoints());
+ if (!allPreviousCheckpoints.isEmpty()) {
Review comment:
Why don't we just remember the latest subsumed checkpoint in store
(which actually subsumes them) and let coordinator ask for it?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -138,7 +138,7 @@
TaskSlotPayload,
TaskActions,
PartitionProducerStateProvider,
- CheckpointListener {
+ InternalCheckpointListener {
Review comment:
I think it's not neccessary to use the interface
(`InternalCheckpointListener`) here as the caller already knows the type
exactly (`Task`).
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
##########
@@ -136,6 +136,11 @@ public void notifyCheckpointAborted(long checkpointId)
throws Exception {
}
}
+ @Override
+ public void notifyCheckpointSubsumed(long checkpointId) throws Exception {
+ super.notifyCheckpointSubsumed(checkpointId);
+ }
Review comment:
Is this overriding actually necessary?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
##########
@@ -43,7 +43,7 @@
* @param <OUT> The output type of the operator
*/
@PublicEvolving
-public interface StreamOperator<OUT> extends CheckpointListener, KeyContext,
Serializable {
+public interface StreamOperator<OUT> extends InternalCheckpointListener,
KeyContext, Serializable {
Review comment:
It seems wrong that a `@PublicEvolving` interface extends `@Internal`
interface.
The root cause I think in the `StreamOperator` hierarchy, but that's a long
story.
Semantically, I think operators should only be aware of checkpoint
completion (to emit data for example); their backends should in addition know
about the subsumption.
I see the following solutions to notify state backends about the subsumption
1. Modify `CheckpointListener` interface instead of introducing
`InternalCheckpointListener` as you proposed initially
2. In `StreamOperatorWrapper`, get `KeyedStateBackend` from the operator
(if it's `AbstractStreamOperator`) and notify it directly (if it's
`InternalCheckpointListener`)
3. Same, but via `StreamOperatorStateHandler`
WDYT?
cc: @pnowojski
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/state/InternalCheckpointListener.java
##########
@@ -0,0 +1,52 @@
+/*
+
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+
+*/
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The internal checkpoint listener add another {@link
#notifyCheckpointSubsumed(long)} interface
+ * for internal usage.
+ */
+@Internal
+public interface InternalCheckpointListener extends CheckpointListener {
+
+ /**
+ * A invalid checkpoint id, which could be used in {@link
#notifyCheckpointSubsumed(long)} to
+ * represent no checkpoint needs to be subsumed.
+ */
+ long INVALID_CHECKPOINT_ID = -1;
+
+ /**
+ * This method is called as a notification once a distributed checkpoint
has been subsumed.
+ *
+ * <p>These notifications are "best effort", meaning they can sometimes be
skipped.
+ *
+ * <p>This method is very rarely necessary to implement. The "best effort"
guarantee, together
+ * with the fact that this method should not result in discarding any data
(per the "Checkpoint
+ * Subsuming Contract") means it is mainly useful for earlier cleanups of
auxiliary resources.
+ *
+ * @param checkpointId The ID of the checkpoint that has been subsumed.
+ * @throws Exception This method can propagate exceptions, which leads to
a failure/recovery for
+ * the task or job.
+ */
+ default void notifyCheckpointSubsumed(long checkpointId) throws Exception
{}
Review comment:
I think having a default no-op implementation is error-prone (as it
allows to forget to implement it). Do we really need it?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
##########
@@ -42,7 +42,7 @@
* <p>This interface also offers the complementary method that provides access
to previously saved
* state of operator instances in the task for restore purposes.
*/
-public interface TaskStateManager extends CheckpointListener, AutoCloseable {
+public interface TaskStateManager extends InternalCheckpointListener,
AutoCloseable {
Review comment:
What is the intention here, is it `LocalStateStore` notification?
But as of now, `LocalStateStore` doesn't accept these notifications.
Other than that, I think `TaskStateManager` doesn't need to know about the
subsumption.
I also couldn't find any call sites.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
##########
@@ -382,6 +382,11 @@ public void notifyCheckpointAborted(long checkpointId)
throws Exception {
stateHandler.notifyCheckpointAborted(checkpointId);
}
+ @Override
+ public void notifyCheckpointSubsumed(long checkpointId) throws Exception {
+ stateHandler.notifyCheckpointSubsumed(checkpointId);
+ }
Review comment:
There is also `AbstractStreamOperatorV2`...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]