This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push:
new 8b289000f65 [FLINK-34200][test] Fix the bug that
AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally
8b289000f65 is described below
commit 8b289000f65aef39f48d6cc6ddb817208a62fdee
Author: Rui Fan <[email protected]>
AuthorDate: Thu Feb 1 21:07:40 2024 +0800
[FLINK-34200][test] Fix the bug that
AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally
---
.../flink/runtime/testutils/CommonTestUtils.java | 67 +++++++++++-----------
.../test/checkpointing/AutoRescalingITCase.java | 14 +++--
2 files changed, 44 insertions(+), 37 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index da016150500..0720197e911 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -343,49 +343,52 @@ public class CommonTestUtils {
/** Wait for (at least) the given number of successful checkpoints. */
public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster,
int numCheckpoints)
- throws Exception, FlinkJobNotFoundException {
- waitUntilCondition(
- () -> {
- AccessExecutionGraph graph =
miniCluster.getExecutionGraph(jobID).get();
- if (Optional.ofNullable(graph.getCheckpointStatsSnapshot())
- .filter(
- st ->
-
st.getCounts().getNumberOfCompletedCheckpoints()
- >= numCheckpoints)
- .isPresent()) {
- return true;
- } else if (graph.getState().isGloballyTerminalState()) {
- checkState(
- graph.getFailureInfo() != null,
- "Job terminated before taking required %s
checkpoints: %s",
- numCheckpoints,
- graph.getState());
- throw graph.getFailureInfo().getException();
- } else {
+ throws Exception {
+ waitForCheckpoints(
+ jobID,
+ miniCluster,
+ checkpointStatsSnapshot ->
+ checkpointStatsSnapshot != null
+ && checkpointStatsSnapshot
+ .getCounts()
+
.getNumberOfCompletedCheckpoints()
+ >= numCheckpoints);
+ }
+
+ /**
+ * Wait for a new completed checkpoint, the new checkpoint must be
triggered after
+ * waitForNewCheckpoint is called.
+ */
+ public static void waitForNewCheckpoint(JobID jobID, MiniCluster
miniCluster) throws Exception {
+ final long startTime = System.currentTimeMillis();
+ waitForCheckpoints(
+ jobID,
+ miniCluster,
+ checkpointStatsSnapshot -> {
+ if (checkpointStatsSnapshot == null) {
return false;
}
+ final CompletedCheckpointStats latestCompletedCheckpoint =
+
checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint();
+ return latestCompletedCheckpoint != null
+ && latestCompletedCheckpoint.getTriggerTimestamp()
> startTime;
});
}
- /** Wait for on more completed checkpoint. */
- public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster
miniCluster)
+ // Wait for CheckpointStatsSnapshot to meet the condition.
+ private static void waitForCheckpoints(
+ JobID jobId, MiniCluster miniCluster,
Predicate<CheckpointStatsSnapshot> condition)
throws Exception {
- final long[] currentCheckpoint = new long[] {-1L};
waitUntilCondition(
() -> {
- AccessExecutionGraph graph =
miniCluster.getExecutionGraph(jobID).get();
- CheckpointStatsSnapshot snapshot =
graph.getCheckpointStatsSnapshot();
- if (snapshot != null) {
- long currentCount =
snapshot.getCounts().getNumberOfCompletedCheckpoints();
- if (currentCheckpoint[0] < 0L) {
- currentCheckpoint[0] = currentCount;
- } else {
- return currentCount > currentCheckpoint[0];
- }
+ final AccessExecutionGraph graph =
miniCluster.getExecutionGraph(jobId).get();
+ final CheckpointStatsSnapshot snapshot =
graph.getCheckpointStatsSnapshot();
+ if (condition.test(snapshot)) {
+ return true;
} else if (graph.getState().isGloballyTerminalState()) {
checkState(
graph.getFailureInfo() != null,
- "Job terminated before taking required
checkpoint.",
+ "Job terminated (state=%s) before completing
the requested checkpoint(s).",
graph.getState());
throw graph.getFailureInfo().getException();
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
index 691688ae602..eab82b1c727 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
@@ -86,7 +86,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
-import static
org.apache.flink.runtime.testutils.CommonTestUtils.waitForOneMoreCheckpoint;
+import static
org.apache.flink.runtime.testutils.CommonTestUtils.waitForNewCheckpoint;
import static
org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase.waitForAvailableSlots;
import static
org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase.waitForRunningTasks;
import static org.junit.Assert.assertEquals;
@@ -261,7 +261,11 @@ public class AutoRescalingITCase extends TestLogger {
waitForAllTaskRunning(cluster.getMiniCluster(),
jobGraph.getJobID(), false);
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ // We need to wait for a checkpoint to be completed that was
triggered after all the
+ // data was processed. That ensures the entire data being flushed
out of the Operator's
+ // network buffers to avoid reprocessing test data twice after the
restore (see
+ // FLINK-34200).
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
SubtaskIndexSource.SOURCE_LATCH.reset();
@@ -328,7 +332,7 @@ public class AutoRescalingITCase extends TestLogger {
// wait until the operator handles some data
StateSourceBase.workStartedLatch.await();
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
JobResourceRequirements.Builder builder =
JobResourceRequirements.newBuilder();
for (JobVertex vertex : jobGraph.getVertices()) {
@@ -411,7 +415,7 @@ public class AutoRescalingITCase extends TestLogger {
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
SubtaskIndexSource.SOURCE_LATCH.reset();
@@ -513,7 +517,7 @@ public class AutoRescalingITCase extends TestLogger {
// wait until the operator handles some data
StateSourceBase.workStartedLatch.await();
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
JobResourceRequirements.Builder builder =
JobResourceRequirements.newBuilder();
for (JobVertex vertex : jobGraph.getVertices()) {