This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 375a5a3  [FLINK-9900][tests] Fix unstable 
ZooKeeperHighAvailabilityITCase
375a5a3 is described below

commit 375a5a3fa299fc7abd7a84667eba5c16cc6220af
Author: ifndef-SleePy <[email protected]>
AuthorDate: Tue Jul 30 12:43:36 2019 +0800

    [FLINK-9900][tests] Fix unstable ZooKeeperHighAvailabilityITCase
    
    This closes #9900.
---
 .../test/checkpointing/ZooKeeperHighAvailabilityITCase.java  | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index 9bec331..7389613 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -106,8 +106,9 @@ public class ZooKeeperHighAvailabilityITCase extends 
TestLogger {
 
        private static MiniClusterWithClientResource miniClusterResource;
 
-       private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
-       private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
+       private static OneShotLatch waitForCheckpointLatch;
+       private static OneShotLatch failInCheckpointLatch;
+       private static OneShotLatch blockSnapshotLatch;
 
        @BeforeClass
        public static void setup() throws Exception {
@@ -179,6 +180,7 @@ public class ZooKeeperHighAvailabilityITCase extends 
TestLogger {
 
                waitForCheckpointLatch = new OneShotLatch();
                failInCheckpointLatch = new OneShotLatch();
+               blockSnapshotLatch = new OneShotLatch();
 
                ClusterClient<?> clusterClient = 
miniClusterResource.getClusterClient();
                final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
@@ -255,6 +257,7 @@ public class ZooKeeperHighAvailabilityITCase extends 
TestLogger {
                                return FileVisitResult.CONTINUE;
                        }
                });
+               blockSnapshotLatch.trigger();
 
                // now the job should be able to go to RUNNING again and then 
eventually to FINISHED,
                // which it only does if it could successfully restore
@@ -366,6 +369,11 @@ public class ZooKeeperHighAvailabilityITCase extends 
TestLogger {
                                failInCheckpointLatch.await();
                                if (!failedAlready.getAndSet(true)) {
                                        throw new RuntimeException("Failing on 
purpose.");
+                               } else {
+                                       // make sure there would be no more 
successful checkpoint before job failing
+                                       // otherwise there might be a 
successful checkpoint 7 which is unexpected
+                                       // we expect checkpoint 5 is the last 
successful one before ha storage recovering
+                                       blockSnapshotLatch.await();
                                }
                        }
                }

Reply via email to