XComp commented on code in PR #24246:
URL: https://github.com/apache/flink/pull/24246#discussion_r1474488421
##########
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java:
##########
@@ -367,9 +367,11 @@ public static void waitForCheckpoint(JobID jobID,
MiniCluster miniCluster, int n
});
}
- /** Wait for on more completed checkpoint. */
- public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster
miniCluster)
- throws Exception {
+ /**
+ * Wait for a new completed checkpoint. Note: we wait for 2 or more
checkpoint to ensure the
+ * latest checkpoint start after waitForTwoOrMoreCheckpoint is called.
+ */
+ public static void waitForNewCheckpoint(JobID jobID, MiniCluster
miniCluster) throws Exception {
Review Comment:
```suggestion
public static void waitForNewCheckpoint(JobID jobID, MiniCluster
miniCluster, int checkpointCount) throws Exception {
```
Can't we make the number of checkpoints to wait for configurable? That way,
we can pass in `2` in the test implementation analogously to
`waitForCheckpoint`. I also feel like we can remove some redundant code within
the two methods. :thinking:
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -328,7 +330,7 @@ public void
testCheckpointRescalingNonPartitionedStateCausesException() throws E
// wait until the operator handles some data
StateSourceBase.workStartedLatch.await();
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Review Comment:
So far, we've only seen the issue in `#testCheckpointRescalingInKeyedState`.
We don't need the two checkpoints here, actually, because we're not relying on
elements in the test. We could keep the tests functionality if we make the
`waitForOneMoreCheckpoint` configurable as suggested one of my previous
comments.
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -261,7 +263,7 @@ public void testCheckpointRescalingKeyedState(boolean
scaleOut) throws Exception
waitForAllTaskRunning(cluster.getMiniCluster(),
jobGraph.getJobID(), false);
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Review Comment:
We should add a comment here explaining why we need to wait for 2 instead of
one checkpoint.
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -513,7 +515,7 @@ public void testCheckpointRescalingPartitionedOperatorState(
// wait until the operator handles some data
StateSourceBase.workStartedLatch.await();
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Review Comment:
AFAIU, we don't need to change it here.
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -411,7 +413,7 @@ public void
testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Review Comment:
I guess the scenario can happen in this test as well because it's almost the
same test implementation as in `#testCheckpointRescalingKeyedState` :thinking:
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -411,7 +413,7 @@ public void
testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Review Comment:
I'm wondering whether the redundant code could be removed here. But that's
probably a bit out-of-scope for this issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]