This is an automated email from the ASF dual-hosted git repository.
rkhachatryan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f3ffe4e1e1b [FLINK-39738] Use pekko ask() instead of tell() to ack the
checkpoint
f3ffe4e1e1b is described below
commit f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Apr 23 18:10:08 2025 +0200
[FLINK-39738] Use pekko ask() instead of tell() to ack the checkpoint
This allows to fail checkpoints faster in case of exceeding pekko framesize
instead of waiting for checkpoint timeout.
---
.../checkpoint/CheckpointCoordinatorGateway.java | 5 +-
.../apache/flink/runtime/jobmaster/JobMaster.java | 3 +-
.../taskexecutor/rpc/RpcCheckpointResponder.java | 14 +-
.../runtime/tasks/AsyncCheckpointRunnable.java | 43 ++++-
.../jobmaster/utils/TestingJobMasterGateway.java | 3 +-
.../runtime/tasks/AsyncCheckpointRunnableTest.java | 79 +++++++++
.../CheckpointAcknowledgeFailureITCase.java | 176 +++++++++++++++++++++
7 files changed, 310 insertions(+), 13 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
index 2e057289143..42947fb655a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -20,16 +20,19 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.util.SerializedValue;
import javax.annotation.Nullable;
+import java.util.concurrent.CompletableFuture;
+
/** RPC Gateway interface for messages to the CheckpointCoordinator. */
public interface CheckpointCoordinatorGateway extends RpcGateway {
- void acknowledgeCheckpoint(
+ CompletableFuture<Acknowledge> acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6f67ecc43c3..39a29191b97 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -618,7 +618,7 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
// TODO: This method needs a leader session ID
@Override
- public void acknowledgeCheckpoint(
+ public CompletableFuture<Acknowledge> acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
@@ -632,6 +632,7 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
checkpointMetrics,
deserializeTaskStateSnapshot(checkpointState,
getClass().getClassLoader()));
}
+ return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index 95121929e68..ce57e9aef92 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -47,12 +47,14 @@ public class RpcCheckpointResponder implements
CheckpointResponder {
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot subtaskState) {
- checkpointCoordinatorGateway.acknowledgeCheckpoint(
- jobID,
- executionAttemptID,
- checkpointId,
- checkpointMetrics,
- serializeTaskStateSnapshot(subtaskState));
+ checkpointCoordinatorGateway
+ .acknowledgeCheckpoint(
+ jobID,
+ executionAttemptID,
+ checkpointId,
+ checkpointMetrics,
+ serializeTaskStateSnapshot(subtaskState))
+ .join();
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index ee674fc1c2a..0908eff3d74 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -133,10 +133,20 @@ final class AsyncCheckpointRunnable implements Runnable,
Closeable {
if (asyncCheckpointState.compareAndSet(
AsyncCheckpointState.RUNNING,
AsyncCheckpointState.COMPLETED)) {
- reportCompletedSnapshotStates(
-
snapshotsFinalizeResult.jobManagerTaskOperatorSubtaskStates,
- snapshotsFinalizeResult.localTaskOperatorSubtaskStates,
- asyncDurationMillis);
+ try {
+ reportCompletedSnapshotStates(
+
snapshotsFinalizeResult.jobManagerTaskOperatorSubtaskStates,
+
snapshotsFinalizeResult.localTaskOperatorSubtaskStates,
+ asyncDurationMillis);
+ } catch (Exception reportFailure) {
+ // Upload already succeeded; JM is authoritative for state
cleanup.
+ // Running cleanup() here could delete files referenced by
a checkpoint
+ // the JM has already completed (ACK applied but response
RPC failed).
+ // Decline so JM aborts the still-pending checkpoint
quickly; if the ACK
+ // was actually applied, the decline arrives after the
checkpoint moved
+ // to COMPLETED and is silently dropped by the coordinator.
+ declineWithoutDiscard(reportFailure);
+ }
} else {
LOG.debug(
@@ -275,6 +285,31 @@ final class AsyncCheckpointRunnable implements Runnable,
Closeable {
.reportIncompleteTaskStateSnapshots(checkpointMetaData,
metrics);
}
+ private void declineWithoutDiscard(Exception reportFailure) {
+ long checkpointId = checkpointMetaData.getCheckpointId();
+ LOG.warn(
+ "{} - failed to report completed checkpoint {} to JobManager. "
+ + "Declining without discarding uploaded state.",
+ taskName,
+ checkpointId,
+ reportFailure);
+ if (!isTaskRunning.get()) {
+ return;
+ }
+ try {
+ taskEnvironment.declineCheckpoint(
+ checkpointId,
+ new CheckpointException(
+
CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION, reportFailure));
+ } catch (Exception declineFailure) {
+ LOG.warn(
+ "{} - failed to decline checkpoint {} after report
failure.",
+ taskName,
+ checkpointId,
+ declineFailure);
+ }
+ }
+
private void handleExecutionException(Exception e) {
boolean didCleanup = false;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index ad311f3e7ea..25f64f2fd86 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -479,7 +479,7 @@ public class TestingJobMasterGateway implements
JobMasterGateway {
}
@Override
- public void acknowledgeCheckpoint(
+ public CompletableFuture<Acknowledge> acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
@@ -492,6 +492,7 @@ public class TestingJobMasterGateway implements
JobMasterGateway {
checkpointId,
checkpointMetrics,
deserializeTaskStateSnapshot(subtaskState,
getClass().getClassLoader())));
+ return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
index 8086a291ecc..3763fdd2c36 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
@@ -19,10 +19,12 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -125,6 +127,73 @@ class AsyncCheckpointRunnableTest {
assertThat(environment.getCause().getCheckpointFailureReason()).isSameAs(originalReason);
}
+ @Test
+ void testReportFailureDeclinesWithoutDiscardingState() {
+ testCheckpointAckFailureHandling(true);
+ }
+
+ @Test
+ void testReportFailureDoesNotDeclineWhenTaskNotRunning() {
+ testCheckpointAckFailureHandling(false);
+ }
+
+ /**
+ * Simulates the upload succeeding but the ACK RPC failing (e.g.
AskTimeoutException either
+ * because the JM never received the ACK, or because it applied the ACK
and the response timed
+ * out). The TM must:
+ *
+ * <ol>
+ * <li>NOT run cleanup() — files may be referenced by a completed
checkpoint.
+ * <li>Send declineCheckpoint (if the task is running) — fails the
checkpoint fast in the
+ * still-pending case; silently dropped by the coordinator if the
ACK was applied.
+ * </ol>
+ */
+ private void testCheckpointAckFailureHandling(boolean isTaskRunning) {
+ RuntimeException reportFailure = new RuntimeException("simulated ACK
response timeout");
+
+ CancelTrackingOperatorSnapshotFutures snapshot =
+ new CancelTrackingOperatorSnapshotFutures();
+ Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress = new
HashMap<>();
+ snapshotsInProgress.put(new OperatorID(), snapshot);
+
+ TestTaskStateManager throwingStateManager =
+ new TestTaskStateManager() {
+ @Override
+ public void reportTaskStateSnapshots(
+ CheckpointMetaData checkpointMetaData,
+ CheckpointMetrics checkpointMetrics,
+ TaskStateSnapshot acknowledgedState,
+ TaskStateSnapshot localState) {
+ throw reportFailure;
+ }
+ };
+
+ TestEnvironment env =
+ new TestEnvironment(
+ new Configuration(),
+ new Configuration(),
+ new ExecutionConfig(),
+ 1L,
+ new MockInputSplitProvider(),
+ 1,
+ throwingStateManager);
+
+ AsyncCheckpointRunnable runnable =
+ createAsyncRunnable(snapshotsInProgress, env, false,
isTaskRunning);
+ runnable.run();
+
+ assertThat(snapshot.cancelCount).isZero();
+ assertThat(runnable.getFinishedFuture()).isCompleted();
+ if (isTaskRunning) {
+ assertThat(env.getCause()).isNotNull();
+ assertThat(env.getCause().getCheckpointFailureReason())
+
.isSameAs(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
+ assertThat(env.getCause()).hasRootCause(reportFailure);
+ } else {
+ assertThat(env.getCause()).isNull();
+ }
+ }
+
@Test
void testReportFinishedOnRestoreTaskSnapshots() {
TestEnvironment environment = new TestEnvironment();
@@ -206,4 +275,14 @@ class AsyncCheckpointRunnableTest {
return cause;
}
}
+
+ private static class CancelTrackingOperatorSnapshotFutures extends
OperatorSnapshotFutures {
+ int cancelCount = 0;
+
+ @Override
+ public Tuple2<Long, Long> cancel() throws Exception {
+ cancelCount++;
+ return super.cancel();
+ }
+ }
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAcknowledgeFailureITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAcknowledgeFailureITCase.java
new file mode 100644
index 00000000000..e55337177f9
--- /dev/null
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAcknowledgeFailureITCase.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.test.junit5.InjectMiniCluster;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.junit.SharedObjectsExtension;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTING_TIMEOUT;
+import static
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINT_STORAGE;
+import static
org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
+import static org.apache.flink.configuration.RpcOptions.ASK_TIMEOUT_DURATION;
+import static org.apache.flink.configuration.RpcOptions.FRAMESIZE;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** CheckpointAcknowledgeFailureITCase. */
+public class CheckpointAcknowledgeFailureITCase extends TestLogger {
+
+ // the minimum allowed by pekko
+ private static final int MIN_PEKKO_FRAME_SIZE = 32000;
+
+ // set frame size high enough to allow checkpoint decline RPC after
failing to ACK
+ private static final int PEKKO_FRAME_SIZE = MIN_PEKKO_FRAME_SIZE * 2;
+ // set state size higher than frame size so that checkpoint can not be
acked
+ private static final int STATE_SIZE = PEKKO_FRAME_SIZE * 2;
+ // let all the state go via checkpoint ACK RPC to exceed frame size limit
+ private static final MemorySize IN_MEM_STATE_THRESHOLD = new
MemorySize(STATE_SIZE * 2);
+
+ // let pekko time out checkpoint ACK
+ private static final Duration ASK_TIMEOUT = Duration.ofMillis(250);
+ // do NOT let flink time out the checkpoint
+ private static final Duration CHECKPOINT_TIMEOUT =
ASK_TIMEOUT.multipliedBy(1000);
+
+ @RegisterExtension
+ static final MiniClusterExtension MINI_CLUSTER =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfiguration())
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(1)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .build());
+
+ @RegisterExtension
+ private final SharedObjectsExtension sharedObjects =
SharedObjectsExtension.create();
+
+ private static Configuration getConfiguration() {
+ Configuration cfg = new Configuration();
+ cfg.set(FRAMESIZE, new MemorySize(PEKKO_FRAME_SIZE).toString());
+ cfg.set(ASK_TIMEOUT_DURATION, ASK_TIMEOUT);
+ cfg.set(FS_SMALL_FILE_THRESHOLD, IN_MEM_STATE_THRESHOLD);
+ cfg.set(CHECKPOINT_STORAGE, "jobmanager");
+
+ return cfg;
+ }
+
+ /**
+ * Test that if a task is unable to acknowledge a checkpoint then the
checkpoint fails
+ * (immediately) with org.apache.pekko.pattern.AskTimeoutException caused.
This requires the use
+ * of pekko ask, and not pekko tell under the hood for the acknowledgment.
Typical failure
+ * reason is exceeding pekko framesize.
+ */
+ @Test
+ void testCheckpointAckFailure(@InjectMiniCluster MiniCluster cluster)
throws Exception {
+ SharedReference<CompletableFuture<Object>> stateUpdatedFuture =
+ sharedObjects.add(new CompletableFuture<>());
+ Configuration cfg = new Configuration();
+ cfg.set(CHECKPOINTING_TIMEOUT, CHECKPOINT_TIMEOUT);
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(cfg);
+ JobID jobID = executeJobAsync(env, stateUpdatedFuture);
+ stateUpdatedFuture.get().join();
+
+ assertThatThrownBy(() -> cluster.triggerCheckpoint(jobID).get())
+ .hasCauseInstanceOf(CheckpointException.class)
+
.matches(CheckpointAcknowledgeFailureITCase::hasAskTimeoutException);
+ }
+
+ private static JobID executeJobAsync(
+ StreamExecutionEnvironment env,
SharedReference<CompletableFuture<Object>> stateUpdated)
+ throws Exception {
+ env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE)
+ .keyBy(ign -> 0L)
+ .process(
+ new KeyedProcessFunction<Long, Long, Long>() {
+
+ @Override
+ public void processElement(
+ Long value,
+ KeyedProcessFunction<Long, Long,
Long>.Context ctx,
+ Collector<Long> out)
+ throws Exception {
+ if (value == Long.MIN_VALUE) {
+ getRuntimeContext()
+ .getState(
+ new ValueStateDescriptor<>(
+ "test",
byte[].class))
+ .update(buildState());
+ stateUpdated.get().complete(null);
+ }
+ }
+ })
+ .sinkTo(new DiscardingSink<>());
+ return env.executeAsync().getJobID();
+ }
+
+ private static byte[] buildState() {
+ // Random (incompressible) bytes — keeps the ACK above the pekko frame
size even
+ // when snapshot compression is enabled (e.g. via test-config
randomization).
+ byte[] state = new byte[STATE_SIZE];
+ new Random(0).nextBytes(state);
+ return state;
+ }
+
+ /**
+ * We can't use the exception class to check for the AskTimeoutException
because it's been
+ * loaded by a ClassLoader we don't have access to.
+ *
+ * @param t the exception to check
+ * @return true if the exception is an AskTimeoutException
+ */
+ private static boolean hasAskTimeoutException(Throwable t) {
+ final String clazz = "org.apache.pekko.pattern.AskTimeoutException";
+ return ExceptionUtils.findThrowable(
+ t,
+ cause -> {
+ if (cause instanceof SerializedThrowable) {
+ final SerializedThrowable serializedThrowable =
+ (SerializedThrowable) cause;
+ return clazz.equals(
+
serializedThrowable.getOriginalErrorClassName());
+ }
+ return clazz.equals(cause.getClass().getName());
+ })
+ .isPresent();
+ }
+}