This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9ba3b1a5863 [FLINK-31527][tests] Stabilize ChangelogRescalingITCase
9ba3b1a5863 is described below
commit 9ba3b1a5863c1aeeca0be25b4bb375abfe02b940
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Mar 21 21:08:27 2023 +0000
[FLINK-31527][tests] Stabilize ChangelogRescalingITCase
---
.../org/apache/flink/test/state/ChangelogRescalingITCase.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
index 6891eafadae..10ba869f6a8 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
@@ -329,7 +329,7 @@ public class ChangelogRescalingITCase extends TestLogger {
private String checkpointAndCancel(JobID jobID) throws Exception {
waitForCheckpoint(jobID, cluster.getMiniCluster(), 1);
cluster.getClusterClient().cancel(jobID).get();
- checkStatus(jobID);
+ waitForSuccessfulTermination(jobID);
return CommonTestUtils.getLatestCompletedCheckpointPath(jobID,
cluster.getMiniCluster())
.<NoSuchElementException>orElseThrow(
() -> {
@@ -337,7 +337,13 @@ public class ChangelogRescalingITCase extends TestLogger {
});
}
- private void checkStatus(JobID jobID) throws InterruptedException,
ExecutionException {
+ private void waitForSuccessfulTermination(JobID jobID) throws Exception {
+ CommonTestUtils.waitUntilCondition(
+ () ->
+ cluster.getClusterClient()
+ .getJobStatus(jobID)
+ .get()
+ .isGloballyTerminalState());
if
(cluster.getClusterClient().getJobStatus(jobID).get().isGloballyTerminalState())
{
cluster.getClusterClient()
.requestJobResult(jobID)