[FLINK-7352] [tests] Stabilize ExecutionGraphRestartTest

Introduce an explicit waiting for the deployment of tasks. This replaces the 
loose
ordering induced by Thread.sleep and fixes the race conditions caused by it.

This closes #4501.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f59de67d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f59de67d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f59de67d

Branch: refs/heads/master
Commit: f59de67d9bd440b40352fdea5ede7c709f991a9e
Parents: f9db6fe
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Aug 9 09:57:56 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Aug 10 11:17:53 2017 +0200

----------------------------------------------------------------------
 .../ExecutionGraphRestartTest.java              | 112 ++++++++++++++++---
 .../executiongraph/ExecutionGraphTestUtils.java |  31 ++---
 .../utils/SimpleAckingTaskManagerGateway.java   |  13 +++
 3 files changed, 118 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f59de67d/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 7275e0f..acf854f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -70,18 +70,20 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilDeployedAndSwitchToRunning;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.switchToRunning;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -585,11 +587,24 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
 
        @Test
        public void testConcurrentLocalFailAndRestart() throws Exception {
-               final ExecutionGraph eg = createSimpleTestGraph(new 
FixedDelayRestartStrategy(10, 0L));
+               final int parallelism = 10;
+               SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+
+               final ExecutionGraph eg = createSimpleTestGraph(
+                       new JobID(),
+                       taskManagerGateway,
+                       new FixedDelayRestartStrategy(10, 0L),
+                       createNoOpVertex(parallelism));
+
+               WaitForTasks waitForTasks = new WaitForTasks(parallelism);
+               taskManagerGateway.setCondition(waitForTasks);
+
                eg.setScheduleMode(ScheduleMode.EAGER);
                eg.scheduleForExecution();
 
-               waitUntilDeployedAndSwitchToRunning(eg, 1000);
+               waitForTasks.getFuture().get(1000, TimeUnit.MILLISECONDS);
+
+               switchToRunning(eg);
 
                final ExecutionJobVertex vertex = 
eg.getVerticesTopologically().iterator().next();
                final Execution first = 
vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
@@ -629,10 +644,17 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                failTrigger.trigger();
 
                waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
+
+               WaitForTasks waitForTasksAfterRestart = new 
WaitForTasks(parallelism);
+               taskManagerGateway.setCondition(waitForTasksAfterRestart);
+
                completeCancellingForAllVertices(eg);
 
                waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
-               waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+               waitForTasksAfterRestart.getFuture().get(1000, 
TimeUnit.MILLISECONDS);
+
+               switchToRunning(eg);
                finishAllVertices(eg);
 
                eg.waitUntilTerminal();
@@ -646,18 +668,29 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                final int parallelism = 10;
                final JobID jid = new JobID();
                final JobVertex vertex = createNoOpVertex(parallelism);
-               final SlotProvider slots = new SimpleSlotProvider(jid, 
parallelism, new NotCancelAckingTaskGateway());
+               final NotCancelAckingTaskGateway taskManagerGateway = new 
NotCancelAckingTaskGateway();
+               final SlotProvider slots = new SimpleSlotProvider(jid, 
parallelism, taskManagerGateway);
                final TriggeredRestartStrategy restartStrategy = new 
TriggeredRestartStrategy(restartTrigger);
 
                final ExecutionGraph eg = createSimpleTestGraph(jid, slots, 
restartStrategy, vertex);
+
+               WaitForTasks waitForTasks = new WaitForTasks(parallelism);
+               taskManagerGateway.setCondition(waitForTasks);
+
                eg.setScheduleMode(ScheduleMode.EAGER);
                eg.scheduleForExecution();
 
-               waitUntilDeployedAndSwitchToRunning(eg, 1000);
+               waitForTasks.getFuture().get(1000, TimeUnit.MILLISECONDS);
+
+               switchToRunning(eg);
 
                // fail into 'RESTARTING'
                eg.failGlobal(new Exception("intended test failure 1"));
                assertEquals(JobStatus.FAILING, eg.getState());
+
+               WaitForTasks waitForTasksRestart = new 
WaitForTasks(parallelism);
+               taskManagerGateway.setCondition(waitForTasksRestart);
+
                completeCancellingForAllVertices(eg);
                waitUntilJobStatus(eg, JobStatus.RESTARTING, 1000);
 
@@ -668,7 +701,9 @@ public class ExecutionGraphRestartTest extends TestLogger {
                restartTrigger.trigger();
 
                waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
-               waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+               waitForTasksRestart.getFuture().get(1000, 
TimeUnit.MILLISECONDS);
+               switchToRunning(eg);
                finishAllVertices(eg);
 
                eg.waitUntilTerminal();
@@ -684,8 +719,9 @@ public class ExecutionGraphRestartTest extends TestLogger {
                // this test is inconclusive if not used with a proper 
multi-threaded executor
                assertTrue("test assumptions violated", ((ThreadPoolExecutor) 
executor).getCorePoolSize() > 1);
 
+               SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
                final int parallelism = 20;
-               final Scheduler scheduler = 
createSchedulerWithInstances(parallelism);
+               final Scheduler scheduler = 
createSchedulerWithInstances(parallelism, taskManagerGateway);
 
                final SlotSharingGroup sharingGroup = new SlotSharingGroup();
 
@@ -701,23 +737,34 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                sink.connectNewDataSetAsInput(source, 
DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
 
                final ExecutionGraph eg = 
ExecutionGraphTestUtils.createExecutionGraph(
-                               new JobID(), scheduler, new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, 0), executor, source, sink);
+                       new JobID(), scheduler, new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, 0), executor, source, sink);
+
+               WaitForTasks waitForTasks = new WaitForTasks(parallelism * 2);
+               taskManagerGateway.setCondition(waitForTasks);
 
                eg.setScheduleMode(ScheduleMode.EAGER);
                eg.scheduleForExecution();
 
-               waitUntilDeployedAndSwitchToRunning(eg, 1000);
+               waitForTasks.getFuture().get(1000, TimeUnit.MILLISECONDS);
+
+               switchToRunning(eg);
 
                // fail into 'RESTARTING'
                
eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail(
-                               new Exception("intended test failure"));
+                       new Exception("intended test failure"));
 
                assertEquals(JobStatus.FAILING, eg.getState());
+
+               WaitForTasks waitForTasksAfterRestart = new 
WaitForTasks(parallelism * 2);
+               taskManagerGateway.setCondition(waitForTasksAfterRestart);
+
                completeCancellingForAllVertices(eg);
 
                // clean termination
                waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
-               waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+               waitForTasksAfterRestart.getFuture().get(1000, 
TimeUnit.MILLISECONDS);
+               switchToRunning(eg);
                finishAllVertices(eg);
                waitUntilJobStatus(eg, JobStatus.FINISHED, 1000);
        }
@@ -730,7 +777,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
                final int numRestarts = 10;
                final int parallelism = 20;
 
-               final Scheduler scheduler = 
createSchedulerWithInstances(parallelism - 1);
+               TaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+               final Scheduler scheduler = 
createSchedulerWithInstances(parallelism - 1, taskManagerGateway);
 
                final SlotSharingGroup sharingGroup = new SlotSharingGroup();
 
@@ -768,24 +816,23 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private Scheduler createSchedulerWithInstances(int num) {
+       private Scheduler createSchedulerWithInstances(int num, 
TaskManagerGateway taskManagerGateway) {
                final Scheduler scheduler = new Scheduler(executor);
                final Instance[] instances = new Instance[num];
 
                for (int i = 0; i < instances.length; i++) {
-                       instances[i] = createInstance(55443 + i);
+                       instances[i] = createInstance(taskManagerGateway, 55443 
+ i);
                        scheduler.newInstanceAvailable(instances[i]);
                }
 
                return scheduler;
        }
 
-       private static Instance createInstance(int port) {
+       private static Instance createInstance(TaskManagerGateway 
taskManagerGateway, int port) {
                final HardwareDescription resources = new 
HardwareDescription(4, 1_000_000_000, 500_000_000, 400_000_000);
-               final TaskManagerGateway taskManager = new 
SimpleAckingTaskManagerGateway();
                final TaskManagerLocation location = new TaskManagerLocation(
                                ResourceID.generate(), 
InetAddress.getLoopbackAddress(), port);
-               return new Instance(taskManager, location, new InstanceID(), 
resources, 1);
+               return new Instance(taskManagerGateway, location, new 
InstanceID(), resources, 1);
        }
 
        // 
------------------------------------------------------------------------
@@ -992,4 +1039,33 @@ public class ExecutionGraphRestartTest extends TestLogger 
{
                        });
                }
        }
+
+       /**
+        * A consumer which counts the number of tasks for which it has been 
called and completes a future
+        * upon reaching the number of tasks to wait for.
+        */
+       public static class WaitForTasks implements 
Consumer<ExecutionAttemptID> {
+
+               private final int tasksToWaitFor;
+               private final CompletableFuture<Boolean> allTasksReceived;
+               private int counter;
+
+               public WaitForTasks(int tasksToWaitFor) {
+                       this.tasksToWaitFor = tasksToWaitFor;
+                       this.allTasksReceived = new CompletableFuture<>();
+               }
+
+               public CompletableFuture<Boolean> getFuture() {
+                       return allTasksReceived;
+               }
+
+               @Override
+               public void accept(ExecutionAttemptID executionAttemptID) {
+                       counter++;
+
+                       if (counter >= tasksToWaitFor) {
+                               allTasksReceived.complete(true);
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f59de67d/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index a6b5a4b..2daf28f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.instance.BaseTestingActorGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
@@ -190,28 +191,17 @@ public class ExecutionGraphTestUtils {
        }
 
        /**
-        * Turns a newly scheduled execution graph into a state where all 
vertices run.
-        * This waits until all executions have reached state 'DEPLOYING' and 
then switches them to running.
+        * Checks that all execution are in state DEPLOYING and then switches 
them
+        * to state RUNNING
         */
-       public static void waitUntilDeployedAndSwitchToRunning(ExecutionGraph 
eg, long timeout) throws TimeoutException {
-               // wait until everything is running
+       public static void switchToRunning(ExecutionGraph eg) {
+               // check that all execution are in state DEPLOYING
                for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
                        final Execution exec = ev.getCurrentExecutionAttempt();
-                       waitUntilExecutionState(exec, ExecutionState.DEPLOYING, 
timeout);
-               }
-
-               // Note: As ugly as it is, we need this minor sleep, because 
between switching
-               // to 'DEPLOYED' and when the 'switchToRunning()' may be called 
lies a race check
-               // against concurrent modifications (cancel / fail). We can 
only switch this to running
-               // once that check is passed. For the actual runtime, this 
switch is triggered by a callback
-               // from the TaskManager, which comes strictly after that. For 
tests, we use mock TaskManagers
-               // which cannot easily tell us when that condition has 
happened, unfortunately.
-               try {
-                       Thread.sleep(2);
-               } catch (InterruptedException e) {
-                       Thread.currentThread().interrupt();
+                       assert(exec.getState() == ExecutionState.DEPLOYING);
                }
 
+               // switch executions to RUNNING
                for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
                        final Execution exec = ev.getCurrentExecutionAttempt();
                        exec.switchToRunning();
@@ -285,7 +275,7 @@ public class ExecutionGraphTestUtils {
        public static ExecutionGraph createSimpleTestGraph(RestartStrategy 
restartStrategy) throws Exception {
                JobVertex vertex = createNoOpVertex(10);
 
-               return createSimpleTestGraph(new JobID(), restartStrategy, 
vertex);
+               return createSimpleTestGraph(new JobID(), new 
SimpleAckingTaskManagerGateway(), restartStrategy, vertex);
        }
 
        /**
@@ -294,7 +284,7 @@ public class ExecutionGraphTestUtils {
         * <p>The execution graph uses {@link NoRestartStrategy} as the restart 
strategy.
         */
        public static ExecutionGraph createSimpleTestGraph(JobID jid, 
JobVertex... vertices) throws Exception {
-               return createSimpleTestGraph(jid, new NoRestartStrategy(), 
vertices);
+               return createSimpleTestGraph(jid, new 
SimpleAckingTaskManagerGateway(), new NoRestartStrategy(), vertices);
        }
 
        /**
@@ -302,6 +292,7 @@ public class ExecutionGraphTestUtils {
         */
        public static ExecutionGraph createSimpleTestGraph(
                        JobID jid,
+                       TaskManagerGateway taskManagerGateway,
                        RestartStrategy restartStrategy,
                        JobVertex... vertices) throws Exception {
 
@@ -310,7 +301,7 @@ public class ExecutionGraphTestUtils {
                        numSlotsNeeded += vertex.getParallelism();
                }
 
-               SlotProvider slotProvider = new SimpleSlotProvider(jid, 
numSlotsNeeded);
+               SlotProvider slotProvider = new SimpleSlotProvider(jid, 
numSlotsNeeded, taskManagerGateway);
 
                return createSimpleTestGraph(jid, slotProvider, 
restartStrategy, vertices);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f59de67d/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index b968d39..a38f674 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -33,8 +33,10 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 /**
  * A TaskManagerGateway that simply acks the basic operations (deploy, cancel, 
update) and does not
@@ -44,6 +46,16 @@ public class SimpleAckingTaskManagerGateway implements 
TaskManagerGateway {
 
        private final String address = UUID.randomUUID().toString();
 
+       private Optional<Consumer<ExecutionAttemptID>> optSubmitCondition;
+
+       public SimpleAckingTaskManagerGateway() {
+               optSubmitCondition = Optional.empty();
+       }
+
+       public void setCondition(Consumer<ExecutionAttemptID> predicate) {
+               optSubmitCondition = Optional.of(predicate);
+       }
+
        @Override
        public String getAddress() {
                return address;
@@ -73,6 +85,7 @@ public class SimpleAckingTaskManagerGateway implements 
TaskManagerGateway {
 
        @Override
        public CompletableFuture<Acknowledge> 
submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+               optSubmitCondition.ifPresent(condition -> 
condition.accept(tdd.getExecutionAttemptId()));
                return CompletableFuture.completedFuture(Acknowledge.get());
        }
 

Reply via email to