XComp commented on code in PR #24246:
URL: https://github.com/apache/flink/pull/24246#discussion_r1478141469
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -411,7 +413,7 @@ public void
testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Review Comment:
I created a [PR](https://github.com/1996fanrui/flink/pull/9) to show what I
had in mind for the code redundancy reduction
##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java:
##########
@@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID,
MiniCluster miniCluster, int n
});
}
- /** Wait for on more completed checkpoint. */
- public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster
miniCluster)
- throws Exception {
+ /**
+ * Wait for a new completed checkpoint. Note: we wait for 2 or more
checkpoint to ensure the
+ * latest checkpoint start after waitForTwoOrMoreCheckpoint is called.
+ */
+ public static void waitForNewCheckpoint(JobID jobID, MiniCluster
miniCluster) throws Exception {
Review Comment:
Good idea. Checking the trigger time is a better solution. I like that. :+1:
##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java:
##########
@@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID,
MiniCluster miniCluster, int n
});
}
- /** Wait for on more completed checkpoint. */
- public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster
miniCluster)
- throws Exception {
+ /**
+ * Wait for a new completed checkpoint. Note: we wait for 2 or more
checkpoint to ensure the
+ * latest checkpoint start after waitForTwoOrMoreCheckpoint is called.
+ */
+ public static void waitForNewCheckpoint(JobID jobID, MiniCluster
miniCluster) throws Exception {
Review Comment:
About the redundant code:
```java
/** Wait for (at least) the given number of successful checkpoints. */
public static void waitForCheckpoint(JobID jobID, MiniCluster
miniCluster, int numCheckpoints)
throws Exception {
waitForCheckpoints(
jobID,
miniCluster,
checkpointStatsSnapshot ->
checkpointStatsSnapshot != null
&& checkpointStatsSnapshot
.getCounts()
.getNumberOfCompletedCheckpoints()
>= numCheckpoints);
}
/**
* Wait for a new completed checkpoint, the new checkpoint must be
triggered after
* waitForNewCheckpoint is called.
*/
public static void waitForNewCheckpoint(JobID jobID, MiniCluster
miniCluster) throws Exception {
final long startTime = System.currentTimeMillis();
waitForCheckpoints(
jobID,
miniCluster,
checkpointStatsSnapshot -> {
if (checkpointStatsSnapshot != null) {
final CompletedCheckpointStats
latestCompletedCheckpoint =
checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint();
return latestCompletedCheckpoint != null
&&
latestCompletedCheckpoint.getTriggerTimestamp() > startTime;
}
return false;
});
}
private static void waitForCheckpoints(
JobID jobId, MiniCluster miniCluster,
Predicate<CheckpointStatsSnapshot> condition)
throws Exception {
waitUntilCondition(
() -> {
final AccessExecutionGraph graph =
miniCluster.getExecutionGraph(jobId).get();
final CheckpointStatsSnapshot snapshot =
graph.getCheckpointStatsSnapshot();
if (condition.test(snapshot)) {
return true;
} else if (graph.getState().isGloballyTerminalState()) {
checkState(
graph.getFailureInfo() != null,
"Job terminated (state=%s) before completing
the requested checkpoint(s).",
graph.getState());
throw graph.getFailureInfo().getException();
}
return false;
});
}
```
...just to clarify what I meant. Feel free to ignore that one.
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -261,7 +263,13 @@ public void testCheckpointRescalingKeyedState(boolean
scaleOut) throws Exception
waitForAllTaskRunning(cluster.getMiniCluster(),
jobGraph.getJobID(), false);
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ // We must wait for a checkpoint that is triggered after calling
waitForNewCheckpoint.
+ // This test will fail if the job recovers from a checkpoint
triggered before
+ // `SubtaskIndexFlatMapper.workCompletedLatch.await` and after
calling
+ // `waitForNewCheckpoint`. Because `SubtaskIndexFlatMapper` expects
+ // `ValueState<Integer> counter` and `ValueState<Integer> sum`
after recovery from
+ // the checkpoint to be the count and sum of all data.
Review Comment:
```suggestion
// We need to wait for a checkpoint to be completed that was
triggered after all the data was processed. That ensures the entire data being
flushed out of the Operator's network buffers to avoid reprocessing test data
twice after the restore (see FLINK-34200).
```
Just as a proposal to keep it shorter. Refering back to the jira issue for
more context should be good enough. Additionally, adding markdown features
might not add much value in JavaDoc. If you want to go for that you might want
to use `{@code }` syntax.
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -328,7 +330,7 @@ public void
testCheckpointRescalingNonPartitionedStateCausesException() throws E
// wait until the operator handles some data
StateSourceBase.workStartedLatch.await();
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Review Comment:
Sounds good :+1:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]