pnowojski commented on a change in pull request #16582:
URL: https://github.com/apache/flink/pull/16582#discussion_r676851147
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
##########
@@ -43,7 +43,7 @@
* @param <OUT> The output type of the operator
*/
@PublicEvolving
-public interface StreamOperator<OUT> extends CheckpointListener, KeyContext,
Serializable {
+public interface StreamOperator<OUT> extends InternalCheckpointListener,
KeyContext, Serializable {
Review comment:
> It seems wrong to me that a @PublicEvolving interface extends
@Internal interface.
+1
What do you @rkhachatryan mean by:
> 1. Modify CheckpointListener interface instead of introducing
InternalCheckpointListener as you proposed initially
?
Regardless I also think we shouldn't go through the operators if we don't
want to make `notifyCheckpointSubsumed()` public.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/state/InternalCheckpointListener.java
##########
@@ -0,0 +1,52 @@
+/*
+
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+
+*/
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The internal checkpoint listener add another {@link
#notifyCheckpointSubsumed(long)} interface
+ * for internal usage.
+ */
+@Internal
+public interface InternalCheckpointListener extends CheckpointListener {
+
+ /**
+ * A invalid checkpoint id, which could be used in {@link
#notifyCheckpointSubsumed(long)} to
+ * represent no checkpoint needs to be subsumed.
+ */
+ long INVALID_CHECKPOINT_ID = -1;
+
+ /**
+ * This method is called as a notification once a distributed checkpoint
has been subsumed.
+ *
+ * <p>These notifications are "best effort", meaning they can sometimes be
skipped.
+ *
+ * <p>This method is very rarely necessary to implement. The "best effort"
guarantee, together
+ * with the fact that this method should not result in discarding any data
(per the "Checkpoint
+ * Subsuming Contract") means it is mainly useful for earlier cleanups of
auxiliary resources.
+ *
+ * @param checkpointId The ID of the checkpoint that has been subsumed.
+ * @throws Exception This method can propagate exceptions, which leads to
a failure/recovery for
+ * the task or job.
+ */
+ default void notifyCheckpointSubsumed(long checkpointId) throws Exception
{}
Review comment:
Yes I second this comment.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -1399,6 +1399,34 @@ public void notifyCheckpointAborted(final long
checkpointID) {
}
}
+ @Override
+ public void notifyCheckpointSubsumed(long checkpointId) {
+ AbstractInvokable invokable = this.invokable;
+
+ if (executionState == ExecutionState.RUNNING && invokable != null) {
+
+ try {
+ invokable.notifyCheckpointSubsumedAsync(checkpointId);
+ } catch (RejectedExecutionException ex) {
+ // This may happen if the mailbox is closed. It means that the
task is shutting
+ // down, so we just ignore it.
+ LOG.debug(
+ "Notify checkpoint subsume {} for {} ({}) was rejected
by the mailbox",
+ checkpointId,
+ taskNameWithSubtask,
+ executionId);
+ } catch (Exception e) {
+ if (getExecutionState() == ExecutionState.RUNNING) {
+ LOG.warn("Error while subsuming checkpoint {}.",
checkpointId, e);
+ }
+ }
+ } else {
+ LOG.info(
+ "Ignoring checkpoint subsume notification for non-running
task {}.",
+ taskNameWithSubtask);
+ }
+ }
+
Review comment:
Maybe try to de-duplicate the code? Existing `notifyCheckpointXXX` looks
the same?
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/state/InternalCheckpointListener.java
##########
@@ -0,0 +1,52 @@
+/*
+
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+
+*/
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The internal checkpoint listener add another {@link
#notifyCheckpointSubsumed(long)} interface
+ * for internal usage.
+ */
+@Internal
+public interface InternalCheckpointListener extends CheckpointListener {
Review comment:
Do we need this interface in the first place? The only value I can see
of this is to have a common signature of this `notifyCheckpointSubsumed()`,
which maybe is a good reason on it's own, which would make refactoring/renaming
easier, but I'm not entirely sure...
Also I'm not entirely sure if `InternalCheckpointListener` should extend
`CheckpointListener` or rather we should have another smaller interface
`CheckpointSubsumedListener` that just adds this single method.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -138,7 +138,7 @@
TaskSlotPayload,
TaskActions,
PartitionProducerStateProvider,
- CheckpointListener {
+ InternalCheckpointListener {
Review comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]