This is an automated email from the ASF dual-hosted git repository.

rkhachatryan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f3ffe4e1e1b [FLINK-39738] Use pekko ask() instead of tell() to ack the 
checkpoint
f3ffe4e1e1b is described below

commit f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Apr 23 18:10:08 2025 +0200

    [FLINK-39738] Use pekko ask() instead of tell() to ack the checkpoint
    
    This allows to fail checkpoints faster in case of exceeding pekko framesize
    instead of waiting for checkpoint timeout.
---
 .../checkpoint/CheckpointCoordinatorGateway.java   |   5 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   3 +-
 .../taskexecutor/rpc/RpcCheckpointResponder.java   |  14 +-
 .../runtime/tasks/AsyncCheckpointRunnable.java     |  43 ++++-
 .../jobmaster/utils/TestingJobMasterGateway.java   |   3 +-
 .../runtime/tasks/AsyncCheckpointRunnableTest.java |  79 +++++++++
 .../CheckpointAcknowledgeFailureITCase.java        | 176 +++++++++++++++++++++
 7 files changed, 310 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
index 2e057289143..42947fb655a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -20,16 +20,19 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
 
+import java.util.concurrent.CompletableFuture;
+
 /** RPC Gateway interface for messages to the CheckpointCoordinator. */
 public interface CheckpointCoordinatorGateway extends RpcGateway {
 
-    void acknowledgeCheckpoint(
+    CompletableFuture<Acknowledge> acknowledgeCheckpoint(
             final JobID jobID,
             final ExecutionAttemptID executionAttemptID,
             final long checkpointId,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6f67ecc43c3..39a29191b97 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -618,7 +618,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
 
     // TODO: This method needs a leader session ID
     @Override
-    public void acknowledgeCheckpoint(
+    public CompletableFuture<Acknowledge> acknowledgeCheckpoint(
             final JobID jobID,
             final ExecutionAttemptID executionAttemptID,
             final long checkpointId,
@@ -632,6 +632,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
                     checkpointMetrics,
                     deserializeTaskStateSnapshot(checkpointState, 
getClass().getClassLoader()));
         }
+        return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index 95121929e68..ce57e9aef92 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -47,12 +47,14 @@ public class RpcCheckpointResponder implements 
CheckpointResponder {
             long checkpointId,
             CheckpointMetrics checkpointMetrics,
             TaskStateSnapshot subtaskState) {
-        checkpointCoordinatorGateway.acknowledgeCheckpoint(
-                jobID,
-                executionAttemptID,
-                checkpointId,
-                checkpointMetrics,
-                serializeTaskStateSnapshot(subtaskState));
+        checkpointCoordinatorGateway
+                .acknowledgeCheckpoint(
+                        jobID,
+                        executionAttemptID,
+                        checkpointId,
+                        checkpointMetrics,
+                        serializeTaskStateSnapshot(subtaskState))
+                .join();
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index ee674fc1c2a..0908eff3d74 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -133,10 +133,20 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
             if (asyncCheckpointState.compareAndSet(
                     AsyncCheckpointState.RUNNING, 
AsyncCheckpointState.COMPLETED)) {
 
-                reportCompletedSnapshotStates(
-                        
snapshotsFinalizeResult.jobManagerTaskOperatorSubtaskStates,
-                        snapshotsFinalizeResult.localTaskOperatorSubtaskStates,
-                        asyncDurationMillis);
+                try {
+                    reportCompletedSnapshotStates(
+                            
snapshotsFinalizeResult.jobManagerTaskOperatorSubtaskStates,
+                            
snapshotsFinalizeResult.localTaskOperatorSubtaskStates,
+                            asyncDurationMillis);
+                } catch (Exception reportFailure) {
+                    // Upload already succeeded; JM is authoritative for state 
cleanup.
+                    // Running cleanup() here could delete files referenced by 
a checkpoint
+                    // the JM has already completed (ACK applied but response 
RPC failed).
+                    // Decline so JM aborts the still-pending checkpoint 
quickly; if the ACK
+                    // was actually applied, the decline arrives after the 
checkpoint moved
+                    // to COMPLETED and is silently dropped by the coordinator.
+                    declineWithoutDiscard(reportFailure);
+                }
 
             } else {
                 LOG.debug(
@@ -275,6 +285,31 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
                 .reportIncompleteTaskStateSnapshots(checkpointMetaData, 
metrics);
     }
 
+    private void declineWithoutDiscard(Exception reportFailure) {
+        long checkpointId = checkpointMetaData.getCheckpointId();
+        LOG.warn(
+                "{} - failed to report completed checkpoint {} to JobManager. "
+                        + "Declining without discarding uploaded state.",
+                taskName,
+                checkpointId,
+                reportFailure);
+        if (!isTaskRunning.get()) {
+            return;
+        }
+        try {
+            taskEnvironment.declineCheckpoint(
+                    checkpointId,
+                    new CheckpointException(
+                            
CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION, reportFailure));
+        } catch (Exception declineFailure) {
+            LOG.warn(
+                    "{} - failed to decline checkpoint {} after report 
failure.",
+                    taskName,
+                    checkpointId,
+                    declineFailure);
+        }
+    }
+
     private void handleExecutionException(Exception e) {
 
         boolean didCleanup = false;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index ad311f3e7ea..25f64f2fd86 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -479,7 +479,7 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
     }
 
     @Override
-    public void acknowledgeCheckpoint(
+    public CompletableFuture<Acknowledge> acknowledgeCheckpoint(
             JobID jobID,
             ExecutionAttemptID executionAttemptID,
             long checkpointId,
@@ -492,6 +492,7 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
                         checkpointId,
                         checkpointMetrics,
                         deserializeTaskStateSnapshot(subtaskState, 
getClass().getClassLoader())));
+        return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
index 8086a291ecc..3763fdd2c36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
@@ -19,10 +19,12 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -125,6 +127,73 @@ class AsyncCheckpointRunnableTest {
         
assertThat(environment.getCause().getCheckpointFailureReason()).isSameAs(originalReason);
     }
 
+    @Test
+    void testReportFailureDeclinesWithoutDiscardingState() {
+        testCheckpointAckFailureHandling(true);
+    }
+
+    @Test
+    void testReportFailureDoesNotDeclineWhenTaskNotRunning() {
+        testCheckpointAckFailureHandling(false);
+    }
+
+    /**
+     * Simulates the upload succeeding but the ACK RPC failing (e.g. 
AskTimeoutException either
+     * because the JM never received the ACK, or because it applied the ACK 
and the response timed
+     * out). The TM must:
+     *
+     * <ol>
+     *   <li>NOT run cleanup() — files may be referenced by a completed 
checkpoint.
+     *   <li>Send declineCheckpoint (if the task is running) — fails the 
checkpoint fast in the
+     *       still-pending case; silently dropped by the coordinator if the 
ACK was applied.
+     * </ol>
+     */
+    private void testCheckpointAckFailureHandling(boolean isTaskRunning) {
+        RuntimeException reportFailure = new RuntimeException("simulated ACK 
response timeout");
+
+        CancelTrackingOperatorSnapshotFutures snapshot =
+                new CancelTrackingOperatorSnapshotFutures();
+        Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress = new 
HashMap<>();
+        snapshotsInProgress.put(new OperatorID(), snapshot);
+
+        TestTaskStateManager throwingStateManager =
+                new TestTaskStateManager() {
+                    @Override
+                    public void reportTaskStateSnapshots(
+                            CheckpointMetaData checkpointMetaData,
+                            CheckpointMetrics checkpointMetrics,
+                            TaskStateSnapshot acknowledgedState,
+                            TaskStateSnapshot localState) {
+                        throw reportFailure;
+                    }
+                };
+
+        TestEnvironment env =
+                new TestEnvironment(
+                        new Configuration(),
+                        new Configuration(),
+                        new ExecutionConfig(),
+                        1L,
+                        new MockInputSplitProvider(),
+                        1,
+                        throwingStateManager);
+
+        AsyncCheckpointRunnable runnable =
+                createAsyncRunnable(snapshotsInProgress, env, false, 
isTaskRunning);
+        runnable.run();
+
+        assertThat(snapshot.cancelCount).isZero();
+        assertThat(runnable.getFinishedFuture()).isCompleted();
+        if (isTaskRunning) {
+            assertThat(env.getCause()).isNotNull();
+            assertThat(env.getCause().getCheckpointFailureReason())
+                    
.isSameAs(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
+            assertThat(env.getCause()).hasRootCause(reportFailure);
+        } else {
+            assertThat(env.getCause()).isNull();
+        }
+    }
+
     @Test
     void testReportFinishedOnRestoreTaskSnapshots() {
         TestEnvironment environment = new TestEnvironment();
@@ -206,4 +275,14 @@ class AsyncCheckpointRunnableTest {
             return cause;
         }
     }
+
+    private static class CancelTrackingOperatorSnapshotFutures extends 
OperatorSnapshotFutures {
+        int cancelCount = 0;
+
+        @Override
+        public Tuple2<Long, Long> cancel() throws Exception {
+            cancelCount++;
+            return super.cancel();
+        }
+    }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAcknowledgeFailureITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAcknowledgeFailureITCase.java
new file mode 100644
index 00000000000..e55337177f9
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAcknowledgeFailureITCase.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.test.junit5.InjectMiniCluster;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.junit.SharedObjectsExtension;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTING_TIMEOUT;
+import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINT_STORAGE;
+import static 
org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
+import static org.apache.flink.configuration.RpcOptions.ASK_TIMEOUT_DURATION;
+import static org.apache.flink.configuration.RpcOptions.FRAMESIZE;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** CheckpointAcknowledgeFailureITCase. */
+public class CheckpointAcknowledgeFailureITCase extends TestLogger {
+
+    // the minimum allowed by pekko
+    private static final int MIN_PEKKO_FRAME_SIZE = 32000;
+
+    // set frame size high enough to allow checkpoint decline RPC after 
failing to ACK
+    private static final int PEKKO_FRAME_SIZE = MIN_PEKKO_FRAME_SIZE * 2;
+    // set state size higher than frame size so that checkpoint can not be 
acked
+    private static final int STATE_SIZE = PEKKO_FRAME_SIZE * 2;
+    // let all the state go via checkpoint ACK RPC to exceed frame size limit
+    private static final MemorySize IN_MEM_STATE_THRESHOLD = new 
MemorySize(STATE_SIZE * 2);
+
+    // let pekko time out checkpoint ACK
+    private static final Duration ASK_TIMEOUT = Duration.ofMillis(250);
+    // do NOT let flink time out the checkpoint
+    private static final Duration CHECKPOINT_TIMEOUT = 
ASK_TIMEOUT.multipliedBy(1000);
+
+    @RegisterExtension
+    static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(getConfiguration())
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(1)
+                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                            .build());
+
+    @RegisterExtension
+    private final SharedObjectsExtension sharedObjects = 
SharedObjectsExtension.create();
+
+    private static Configuration getConfiguration() {
+        Configuration cfg = new Configuration();
+        cfg.set(FRAMESIZE, new MemorySize(PEKKO_FRAME_SIZE).toString());
+        cfg.set(ASK_TIMEOUT_DURATION, ASK_TIMEOUT);
+        cfg.set(FS_SMALL_FILE_THRESHOLD, IN_MEM_STATE_THRESHOLD);
+        cfg.set(CHECKPOINT_STORAGE, "jobmanager");
+
+        return cfg;
+    }
+
+    /**
+     * Test that if a task is unable to acknowledge a checkpoint then the 
checkpoint fails
+     * (immediately) with org.apache.pekko.pattern.AskTimeoutException caused. 
This requires the use
+     * of pekko ask, and not pekko tell under the hood for the acknowledgment. 
Typical failure
+     * reason is exceeding pekko framesize.
+     */
+    @Test
+    void testCheckpointAckFailure(@InjectMiniCluster MiniCluster cluster) 
throws Exception {
+        SharedReference<CompletableFuture<Object>> stateUpdatedFuture =
+                sharedObjects.add(new CompletableFuture<>());
+        Configuration cfg = new Configuration();
+        cfg.set(CHECKPOINTING_TIMEOUT, CHECKPOINT_TIMEOUT);
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(cfg);
+        JobID jobID = executeJobAsync(env, stateUpdatedFuture);
+        stateUpdatedFuture.get().join();
+
+        assertThatThrownBy(() -> cluster.triggerCheckpoint(jobID).get())
+                .hasCauseInstanceOf(CheckpointException.class)
+                
.matches(CheckpointAcknowledgeFailureITCase::hasAskTimeoutException);
+    }
+
+    private static JobID executeJobAsync(
+            StreamExecutionEnvironment env, 
SharedReference<CompletableFuture<Object>> stateUpdated)
+            throws Exception {
+        env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE)
+                .keyBy(ign -> 0L)
+                .process(
+                        new KeyedProcessFunction<Long, Long, Long>() {
+
+                            @Override
+                            public void processElement(
+                                    Long value,
+                                    KeyedProcessFunction<Long, Long, 
Long>.Context ctx,
+                                    Collector<Long> out)
+                                    throws Exception {
+                                if (value == Long.MIN_VALUE) {
+                                    getRuntimeContext()
+                                            .getState(
+                                                    new ValueStateDescriptor<>(
+                                                            "test", 
byte[].class))
+                                            .update(buildState());
+                                    stateUpdated.get().complete(null);
+                                }
+                            }
+                        })
+                .sinkTo(new DiscardingSink<>());
+        return env.executeAsync().getJobID();
+    }
+
+    private static byte[] buildState() {
+        // Random (incompressible) bytes — keeps the ACK above the pekko frame 
size even
+        // when snapshot compression is enabled (e.g. via test-config 
randomization).
+        byte[] state = new byte[STATE_SIZE];
+        new Random(0).nextBytes(state);
+        return state;
+    }
+
+    /**
+     * We can't use the exception class to check for the AskTimeoutException 
because it's been
+     * loaded by a ClassLoader we don't have access to.
+     *
+     * @param t the exception to check
+     * @return true if the exception is an AskTimeoutException
+     */
+    private static boolean hasAskTimeoutException(Throwable t) {
+        final String clazz = "org.apache.pekko.pattern.AskTimeoutException";
+        return ExceptionUtils.findThrowable(
+                        t,
+                        cause -> {
+                            if (cause instanceof SerializedThrowable) {
+                                final SerializedThrowable serializedThrowable =
+                                        (SerializedThrowable) cause;
+                                return clazz.equals(
+                                        
serializedThrowable.getOriginalErrorClassName());
+                            }
+                            return clazz.equals(cause.getClass().getName());
+                        })
+                .isPresent();
+    }
+}

Reply via email to