This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 856c927 [FLINK-23797][tests] Wait for all task running before
savepoint for all tests in SavepointITCase
856c927 is described below
commit 856c927ba937beb188c258490c1696555cc53f6c
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Tue Aug 31 17:14:47 2021 +0200
[FLINK-23797][tests] Wait for all task running before savepoint for all
tests in SavepointITCase
---
.../java/org/apache/flink/test/checkpointing/SavepointITCase.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index cb6426a..75d1161 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -206,6 +206,7 @@ public class SavepointITCase extends TestLogger {
client.submitJob(jobGraph).get();
BoundedPassThroughOperator.getProgressLatch().await();
+ waitForAllTaskRunning(cluster.getMiniCluster(), jobId, false);
client.stopWithSavepoint(jobId, drain, null).get();
@@ -608,6 +609,7 @@ public class SavepointITCase extends TestLogger {
client.submitJob(jobGraph).get();
BoundedPassThroughOperator.getProgressLatch().await();
+ waitForAllTaskRunning(cluster.getMiniCluster(), jobId, false);
client.stopWithSavepoint(jobId, false, null).get();
@@ -873,12 +875,13 @@ public class SavepointITCase extends TestLogger {
cluster.before();
try {
ClusterClient<?> client = cluster.getClusterClient();
- client.submitJob(jobGraph).get();
+ JobID jobID = client.submitJob(jobGraph).get();
// we need to wait for both pipelines to be in state RUNNING
because that's the only
// state which allows creating a savepoint
failingPipelineLatch.await();
succeedingPipelineLatch.await();
+ waitForAllTaskRunning(cluster.getMiniCluster(), jobID, false);
try {
client.stopWithSavepoint(jobGraph.getJobID(), false,
savepointDir.getAbsolutePath())