1996fanrui commented on code in PR #24246:
URL: https://github.com/apache/flink/pull/24246#discussion_r1475680928
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -328,7 +330,7 @@ public void
testCheckpointRescalingNonPartitionedStateCausesException() throws E
// wait until the operator handles some data
StateSourceBase.workStartedLatch.await();
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Review Comment:
IIUC, we can call the newest `waitForNewCheckpoint` here.
From the semantic, the old `waitForOneMoreCheckpoint` wait for one and more
checkpoints. It check the checkpoint each 100ms by default. It means if the job
generates 10 checkpoints within 100ms, `waitForOneMoreCheckpoint` will wait 10
checkpoints.
So I don't think `waitForNewCheckpoint` breaks the semantic
`waitForOneMoreCheckpoint`. And it's more clearer than before.
Also, if we introduced the `waitForNewCheckpoint`, we might don't need
`waitForOneMoreCheckpoint` that checking the checkpoint count. It's needed if
we have a requirement that must wait for a number of new checkpoints. At least
I didn't see the strong requirement for now.
WDYT?
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -411,7 +413,7 @@ public void
testCheckpointRescalingWithKeyedAndNonPartitionedState() throws Exce
// clear the CollectionSink set for the restarted job
CollectionSink.clearElementsSet();
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Review Comment:
Do you mean `createJobGraphWithKeyedState` and
`createJobGraphWithKeyedAndNonPartitionedOperatorState` have redundant code ?
Or `testCheckpointRescalingWithKeyedAndNonPartitionedState` and
`testCheckpointRescalingKeyedState`?
I checked them, they have a lot of differences in details. Such as:
- Source is different
- The parallelism and MaxParallelism is fixed parallelism for
`NonPartitionedOperator`
I will check could they extract some common code later. If yes, I can submit
a hotfix PR and cc you.
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -513,7 +515,7 @@ public void testCheckpointRescalingPartitionedOperatorState(
// wait until the operator handles some data
StateSourceBase.workStartedLatch.await();
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Review Comment:
Same with this comment:
https://github.com/apache/flink/pull/24246/files#r1474917357
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java:
##########
@@ -261,7 +263,7 @@ public void testCheckpointRescalingKeyedState(boolean
scaleOut) throws Exception
waitForAllTaskRunning(cluster.getMiniCluster(),
jobGraph.getJobID(), false);
- waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+ waitForNewCheckpoint(jobID, cluster.getMiniCluster());
Review Comment:
Added, please help double check, thanks~
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]