[hotfix] [tests] Reset state to allow retry on failure

This closes #1611


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48b74546
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48b74546
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48b74546

Branch: refs/heads/master
Commit: 48b745466202ebbb68608930e13cb6ed4a35e6e7
Parents: 756cbaf
Author: Ufuk Celebi <u...@apache.org>
Authored: Tue Feb 9 12:45:41 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 10 15:27:41 2016 +0100

----------------------------------------------------------------------
 .../JobManagerCheckpointRecoveryITCase.java      | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48b74546/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
index 59a05ff..ea30c58 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
@@ -116,15 +116,15 @@ public class JobManagerCheckpointRecoveryITCase extends 
TestLogger {
 
        private static final int Parallelism = 8;
 
-       private static final CountDownLatch CompletedCheckpointsLatch = new 
CountDownLatch(2);
+       private static CountDownLatch CompletedCheckpointsLatch = new 
CountDownLatch(2);
 
-       private static final AtomicLongArray RecoveredStates = new 
AtomicLongArray(Parallelism);
+       private static AtomicLongArray RecoveredStates = new 
AtomicLongArray(Parallelism);
 
-       private static final CountDownLatch FinalCountLatch = new 
CountDownLatch(1);
+       private static CountDownLatch FinalCountLatch = new CountDownLatch(1);
 
-       private static final AtomicReference<Long> FinalCount = new 
AtomicReference<>();
+       private static AtomicReference<Long> FinalCount = new 
AtomicReference<>();
 
-       private static final long LastElement = -1;
+       private static long LastElement = -1;
 
        /**
         * Simple checkpointed streaming sum.
@@ -156,7 +156,6 @@ public class JobManagerCheckpointRecoveryITCase extends 
TestLogger {
                Configuration config = 
ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper
                                .getConnectString(), 
FileStateBackendBasePath.getAbsoluteFile().toURI().toString());
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
Parallelism);
-               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
 
                ActorSystem testSystem = null;
                JobManagerProcess[] jobManagerProcess = new 
JobManagerProcess[2];
@@ -248,6 +247,13 @@ public class JobManagerCheckpointRecoveryITCase extends 
TestLogger {
                        }
                }
                catch (Throwable t) {
+                       // Reset all static state for test retries
+                       CompletedCheckpointsLatch = new CountDownLatch(2);
+                       RecoveredStates = new AtomicLongArray(Parallelism);
+                       FinalCountLatch = new CountDownLatch(1);
+                       FinalCount = new AtomicReference<>();
+                       LastElement = -1;
+
                        // Print early (in some situations the process logs get 
too big
                        // for Travis and the root problem is not shown)
                        t.printStackTrace();
@@ -303,7 +309,6 @@ public class JobManagerCheckpointRecoveryITCase extends 
TestLogger {
                                fileStateBackendPath);
 
                config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2);
-               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
 
                JobManagerProcess[] jobManagerProcess = new 
JobManagerProcess[2];
                LeaderRetrievalService leaderRetrievalService = null;

Reply via email to