[hotfix] [tests] Fix race condition in RescalingITCase that could make the test 
stuck in a blocking call until timeout

This closes #2513


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f67b54b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f67b54b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f67b54b

Branch: refs/heads/master
Commit: 5f67b54b2ca4f7ea79d184e65a99ef230dbdc660
Parents: 4d4eb64
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Mon Sep 19 17:54:23 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 21 17:53:33 2016 +0200

----------------------------------------------------------------------
 .../test/checkpointing/RescalingITCase.java     | 51 ++++++++++++--------
 1 file changed, 30 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5f67b54b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 7f1d7f3..263bf79 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -69,9 +69,9 @@ import static org.junit.Assert.fail;
 
 public class RescalingITCase extends TestLogger {
 
-       private static int numTaskManagers = 2;
-       private static int slotsPerTaskManager = 2;
-       private static int numSlots = numTaskManagers * slotsPerTaskManager;
+       private static final int numTaskManagers = 2;
+       private static final int slotsPerTaskManager = 2;
+       private static final int numSlots = numTaskManagers * 
slotsPerTaskManager;
 
        private static TestingCluster cluster;
 
@@ -109,12 +109,12 @@ public class RescalingITCase extends TestLogger {
         */
        @Test
        public void testSavepointRescalingWithPartitionedState() throws 
Exception {
-               int numberKeys = 42;
-               int numberElements = 1000;
-               int numberElements2 = 500;
-               int parallelism = numSlots / 2;
-               int parallelism2 = numSlots;
-               int maxParallelism = 13;
+               final int numberKeys = 42;
+               final int numberElements = 1000;
+               final int numberElements2 = 500;
+               final int parallelism = numSlots / 2;
+               final int parallelism2 = numSlots;
+               final int maxParallelism = 13;
 
                FiniteDuration timeout = new FiniteDuration(3, 
TimeUnit.MINUTES);
                Deadline deadline = timeout.fromNow();
@@ -214,9 +214,9 @@ public class RescalingITCase extends TestLogger {
         */
        @Test
        public void testSavepointRescalingFailureWithNonPartitionedState() 
throws Exception {
-               int parallelism = numSlots / 2;
-               int parallelism2 = numSlots;
-               int maxParallelism = 13;
+               final int parallelism = numSlots / 2;
+               final int parallelism2 = numSlots;
+               final int maxParallelism = 13;
 
                FiniteDuration timeout = new FiniteDuration(3, 
TimeUnit.MINUTES);
                Deadline deadline = timeout.fromNow();
@@ -235,12 +235,14 @@ public class RescalingITCase extends TestLogger {
 
                        Object savepointResponse = null;
 
-                       // we might be too early for taking a savepoint if the 
operators have not been started yet
+                       // wait until the operator is started
+                       NonPartitionedStateSource.workStartedLatch.await();
+
                        while (deadline.hasTimeLeft()) {
 
                                Future<Object> savepointPathFuture = 
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), 
deadline.timeLeft());
-
-                               savepointResponse = 
Await.result(savepointPathFuture, deadline.timeLeft());
+                               FiniteDuration waitingTime = new 
FiniteDuration(10, TimeUnit.SECONDS);
+                               savepointResponse = 
Await.result(savepointPathFuture, waitingTime);
 
                                if (savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess) {
                                        break;
@@ -428,6 +430,8 @@ public class RescalingITCase extends TestLogger {
                env.enableCheckpointing(checkpointInterval);
                env.setRestartStrategy(RestartStrategies.noRestart());
 
+               NonPartitionedStateSource.workStartedLatch = new 
CountDownLatch(1);
+
                DataStream<Integer> input = env.addSource(new 
NonPartitionedStateSource());
 
                input.addSink(new DiscardingSink<Integer>());
@@ -466,7 +470,7 @@ public class RescalingITCase extends TestLogger {
 
                DataStream<Tuple2<Integer, Integer>> result = input.flatMap(new 
SubtaskIndexFlatMapper(numberElements));
 
-               result.addSink(new CollectionSink());
+               result.addSink(new CollectionSink<Tuple2<Integer, Integer>>());
 
                return env.getStreamGraph().getJobGraph();
        }
@@ -504,7 +508,7 @@ public class RescalingITCase extends TestLogger {
 
                DataStream<Tuple2<Integer, Integer>> result = input.flatMap(new 
SubtaskIndexFlatMapper(numberElements));
 
-               result.addSink(new CollectionSink());
+               result.addSink(new CollectionSink<Tuple2<Integer, Integer>>());
 
                return env.getStreamGraph().getJobGraph();
        }
@@ -645,8 +649,10 @@ public class RescalingITCase extends TestLogger {
 
                private static final long serialVersionUID = 
-8108185918123186841L;
 
-               private int counter = 0;
-               private boolean running = true;
+               private static volatile CountDownLatch workStartedLatch = new 
CountDownLatch(1);
+
+               private volatile int counter = 0;
+               private volatile boolean running = true;
 
                @Override
                public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
@@ -669,13 +675,16 @@ public class RescalingITCase extends TestLogger {
                                        ctx.collect(counter * 
getRuntimeContext().getIndexOfThisSubtask());
                                }
 
-                               Thread.sleep(100);
+                               Thread.sleep(2);
+                               if(counter == 10) {
+                                       workStartedLatch.countDown();
+                               }
                        }
                }
 
                @Override
                public void cancel() {
-                       running = true;
+                       running = false;
                }
        }
 }

Reply via email to