http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index eb0aab4..9db330b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -24,18 +24,11 @@ import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
 
-import akka.actor.Actor;
-import akka.testkit.TestActorRef;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.actor.UntypedActor;
-import akka.japi.Creator;
-import akka.testkit.JavaTestKit;
-
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.BaseTestingInstanceGateway;
+import org.apache.flink.runtime.instance.DummyInstanceGateway;
+import org.apache.flink.runtime.instance.InstanceGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.api.common.JobID;
@@ -46,26 +39,12 @@ import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
+import scala.concurrent.ExecutionContext;
 
 @SuppressWarnings("serial")
 public class ExecutionVertexCancelTest {
 
-       private static ActorSystem system;
-
-       @BeforeClass
-       public static void setup(){
-               system = ActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
-       }
-
-       @AfterClass
-       public static void teardown(){
-               JavaTestKit.shutdownActorSystem(system);
-               system = null;
-       }
-
        // 
--------------------------------------------------------------------------------------------
        //  Canceling in different states
        // 
--------------------------------------------------------------------------------------------
@@ -127,388 +106,350 @@ public class ExecutionVertexCancelTest {
 
        @Test
        public void testCancelConcurrentlyToDeploying_CallsNotOvertaking() {
-               new JavaTestKit(system){{
-                       try {
-                               final JobVertexID jid = new JobVertexID();
-                               final ActionQueue actions = new ActionQueue();
+               try {
+                       final JobVertexID jid = new JobVertexID();
 
-                               TestingUtils.setExecutionContext(new 
TestingUtils.QueuedActionExecutionContext(
-                                               actions));
+                       final TestingUtils.QueuedActionExecutionContext 
executionContext = TestingUtils.queuedActionExecutionContext();
+                       final TestingUtils.ActionQueue actions = 
executionContext.actionQueue();
 
-                               final ExecutionJobVertex ejv = 
getExecutionVertex(jid);
 
-                               final ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                               AkkaUtils.getDefaultTimeout());
-                               final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+                       final ExecutionJobVertex ejv = getExecutionVertex(
+                               jid,
+                               executionContext
+                       );
 
-                               setVertexState(vertex, 
ExecutionState.SCHEDULED);
-                               assertEquals(ExecutionState.SCHEDULED, 
vertex.getExecutionState());
+                       final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
+                                       AkkaUtils.getDefaultTimeout());
+                       final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
-                               ActorRef taskManager = 
TestActorRef.create(system, Props.create(new
-                                               
CancelSequenceTaskManagerCreator(new TaskOperationResult(execId, true),
-                                               new TaskOperationResult(execId, 
false))));
+                       setVertexState(vertex, ExecutionState.SCHEDULED);
+                       assertEquals(ExecutionState.SCHEDULED, 
vertex.getExecutionState());
 
-                               Instance instance = getInstance(taskManager);
-                               SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
+                       InstanceGateway instanceGateway = new 
CancelSequenceInstanceGateway(
+                                       executionContext,
+                                       new TaskOperationResult(execId, true),
+                                       new TaskOperationResult(execId, false));
 
-                               vertex.deployToSlot(slot);
+                       Instance instance = getInstance(instanceGateway);
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
+                       vertex.deployToSlot(slot);
 
-                               assertEquals(ExecutionState.DEPLOYING, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.DEPLOYING, 
vertex.getExecutionState());
 
-                               vertex.cancel();
+                       vertex.cancel();
 
-                               assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
 
-                               // first action happens (deploy)
-                               actions.triggerNextAction();
-                               assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
+                       // first action happens (deploy)
+                       actions.triggerNextAction();
+                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
 
-                               // the deploy call found itself in canceling 
after it returned and needs to send a cancel call
-                               // the call did not yet execute, so it is still 
in canceling
-                               assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
+                       // the deploy call found itself in canceling after it 
returned and needs to send a cancel call
+                       // the call did not yet execute, so it is still in 
canceling
+                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
 
-                               // second action happens (cancel call from 
cancel function)
-                               actions.triggerNextAction();
+                       // second action happens (cancel call from cancel 
function)
+                       actions.triggerNextAction();
 
-                               // TaskManager reports back (canceling done)
-                               
vertex.getCurrentExecutionAttempt().cancelingComplete();
+                       // TaskManager reports back (canceling done)
+                       vertex.getCurrentExecutionAttempt().cancelingComplete();
 
-                               // should properly set state to cancelled
-                               assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
+                       // should properly set state to cancelled
+                       assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
 
-                               // trigger the correction canceling call
-                               actions.triggerNextAction();
-                               assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
+                       // trigger the correction canceling call
+                       actions.triggerNextAction();
+                       assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
 
-                               assertTrue(slot.isReleased());
+                       assertTrue(slot.isReleased());
 
-                               assertNull(vertex.getFailureCause());
+                       assertNull(vertex.getFailureCause());
 
-                               
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-                               
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-                               
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
-                       }
-                       catch(Exception e) {
-                               e.printStackTrace();
-                               fail(e.getMessage());
-                       }
-                       finally {
-                               TestingUtils.setGlobalExecutionContext();
-                       }
-               }};
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
 
        @Test
        public void testCancelConcurrentlyToDeploying_CallsOvertaking() {
-               new JavaTestKit(system){
-                       {
-                               try {
-                                       final JobVertexID jid = new 
JobVertexID();
-                                       final ActionQueue actions = new 
ActionQueue();
+               try {
+                       final JobVertexID jid = new JobVertexID();
 
-                                       TestingUtils.setExecutionContext(new 
TestingUtils
-                                                       
.QueuedActionExecutionContext(actions));
+                       final TestingUtils.QueuedActionExecutionContext 
executionContext = TestingUtils.queuedActionExecutionContext();
+                       final TestingUtils.ActionQueue actions = 
executionContext.actionQueue();
 
-                                       final ExecutionJobVertex ejv = 
getExecutionVertex(jid);
+                       final ExecutionJobVertex ejv = getExecutionVertex(jid, 
executionContext);
 
-                                       final ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                                       
AkkaUtils.getDefaultTimeout());
-                                       final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+                       final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
+                                       AkkaUtils.getDefaultTimeout());
+                       final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
-                                       setVertexState(vertex, 
ExecutionState.SCHEDULED);
-                                       assertEquals(ExecutionState.SCHEDULED, 
vertex.getExecutionState());
+                       setVertexState(vertex, ExecutionState.SCHEDULED);
+                       assertEquals(ExecutionState.SCHEDULED, 
vertex.getExecutionState());
 
-                                       // task manager cancel sequence mock 
actor
-                                       // first return NOT SUCCESS (task not 
found, cancel call overtook deploy call), then success (cancel call after 
deploy call)
-                                       TestActorRef<? extends Actor> 
taskManager = TestActorRef.create(system, Props.create(new
-                                                       
CancelSequenceTaskManagerCreator(new
-                                                       
TaskOperationResult(execId, false), new TaskOperationResult(execId, true))));
+                       // task manager cancel sequence mock actor
+                       // first return NOT SUCCESS (task not found, cancel 
call overtook deploy call), then success (cancel call after deploy call)
+                       InstanceGateway instanceGateway = new 
CancelSequenceInstanceGateway(
+                                       executionContext,
+                                       new     TaskOperationResult(execId, 
false),
+                                       new TaskOperationResult(execId, true)
+                       );
 
-                                       Instance instance = 
getInstance(taskManager);
-                                       SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
+                       Instance instance = getInstance(instanceGateway);
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
-                                       vertex.deployToSlot(slot);
+                       vertex.deployToSlot(slot);
 
-                                       assertEquals(ExecutionState.DEPLOYING, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.DEPLOYING, 
vertex.getExecutionState());
 
-                                       vertex.cancel();
+                       vertex.cancel();
 
-                                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
 
-                                       // first action happens (deploy)
-                                       Runnable deployAction = 
actions.popNextAction();
-                                       Runnable cancelAction = 
actions.popNextAction();
+                       // first action happens (deploy)
+                       Runnable deployAction = actions.popNextAction();
+                       Runnable cancelAction = actions.popNextAction();
 
-                                       // cancel call first
-                                       cancelAction.run();
-                                       // process onComplete callback
-                                       actions.triggerNextAction();
+                       // cancel call first
+                       cancelAction.run();
+                       // process onComplete callback
+                       actions.triggerNextAction();
 
-                                       // did not find the task, not properly 
cancelled, stay in canceling
-                                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
+                       // did not find the task, not properly cancelled, stay 
in canceling
+                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
 
-                                       // deploy action next
-                                       deployAction.run();
+                       // deploy action next
+                       deployAction.run();
 
-                                       // the deploy call found itself in 
canceling after it returned and needs to send a cancel call
-                                       // the call did not yet execute, so it 
is still in canceling
-                                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
+                       // the deploy call found itself in canceling after it 
returned and needs to send a cancel call
+                       // the call did not yet execute, so it is still in 
canceling
+                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
 
-                                       
vertex.getCurrentExecutionAttempt().cancelingComplete();
+                       vertex.getCurrentExecutionAttempt().cancelingComplete();
 
-                                       assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
 
-                                       assertTrue(slot.isReleased());
+                       assertTrue(slot.isReleased());
 
-                                       assertNull(vertex.getFailureCause());
+                       assertNull(vertex.getFailureCause());
 
-                                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-                                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-                                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
-                               } catch (Exception e) {
-                                       e.printStackTrace();
-                                       fail(e.getMessage());
-                               }finally{
-                                       
TestingUtils.setGlobalExecutionContext();
-                               }
-                       }
-               };
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
 
        @Test
        public void testCancelFromRunning() {
-               new JavaTestKit(system) {
-                       {
-                               try {
-                                       
TestingUtils.setCallingThreadDispatcher(system);
-                                       final JobVertexID jid = new 
JobVertexID();
-                                       final ExecutionJobVertex ejv = 
getExecutionVertex(jid);
+               try {
+                       final JobVertexID jid = new JobVertexID();
+                       final ExecutionJobVertex ejv = getExecutionVertex(jid, 
TestingUtils.directExecutionContext());
 
-                                       final ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                                       
AkkaUtils.getDefaultTimeout());
-                                       final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+                       final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
+                                       AkkaUtils.getDefaultTimeout());
+                       final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
-                                       final TestActorRef<? extends Actor> 
taskManager = TestActorRef.create(system,
-                                                       Props.create(new 
CancelSequenceTaskManagerCreator(new
-                                                                       
TaskOperationResult(execId, true))));
+                       InstanceGateway instanceGateway = new 
CancelSequenceInstanceGateway(
+                                       TestingUtils.directExecutionContext(),
+                                       new TaskOperationResult(execId, true));
 
-                                       Instance instance = 
getInstance(taskManager);
-                                       SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
+                       Instance instance = getInstance(instanceGateway);
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
-                                       setVertexState(vertex, 
ExecutionState.RUNNING);
-                                       setVertexResource(vertex, slot);
+                       setVertexState(vertex, ExecutionState.RUNNING);
+                       setVertexResource(vertex, slot);
 
-                                       assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
 
-                                       vertex.cancel();
-                                       
vertex.getCurrentExecutionAttempt().cancelingComplete(); // responce by task 
manager once actially canceled
+                       vertex.cancel();
+                       
vertex.getCurrentExecutionAttempt().cancelingComplete(); // responce by task 
manager once actially canceled
 
-                                       assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
 
-                                       assertTrue(slot.isReleased());
+                       assertTrue(slot.isReleased());
 
-                                       assertNull(vertex.getFailureCause());
+                       assertNull(vertex.getFailureCause());
 
-                                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-                                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-                                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
-                               } catch (Exception e) {
-                                       e.printStackTrace();
-                                       fail(e.getMessage());
-                               }finally{
-                                       
TestingUtils.setGlobalExecutionContext();
-                               }
-                       }
-               };
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
 
        @Test
        public void testRepeatedCancelFromRunning() {
-               new JavaTestKit(system) {
-                       {
-                               try {
-                                       
TestingUtils.setCallingThreadDispatcher(system);
+               try {
 
-                                       final JobVertexID jid = new 
JobVertexID();
-                                       final ExecutionJobVertex ejv = 
getExecutionVertex(jid);
+                       final JobVertexID jid = new JobVertexID();
+                       final ExecutionJobVertex ejv = getExecutionVertex(jid, 
TestingUtils.directExecutionContext());
 
-                                       final ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                                       
AkkaUtils.getDefaultTimeout());
-                                       final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+                       final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
+                                       AkkaUtils.getDefaultTimeout());
+                       final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
-                                       final ActorRef taskManager = 
TestActorRef.create(system, Props.create(new
-                                                       
CancelSequenceTaskManagerCreator(new
-                                                       
TaskOperationResult(execId, true))));
+                       final InstanceGateway instanceGateway = new 
CancelSequenceInstanceGateway(
+                                       TestingUtils.directExecutionContext(),
+                                       new TaskOperationResult(execId, true)
+                       );
 
-                                       Instance instance = 
getInstance(taskManager);
-                                       SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
+                       Instance instance = getInstance(instanceGateway);
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
-                                       setVertexState(vertex, 
ExecutionState.RUNNING);
-                                       setVertexResource(vertex, slot);
+                       setVertexState(vertex, ExecutionState.RUNNING);
+                       setVertexResource(vertex, slot);
 
-                                       assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
 
-                                       vertex.cancel();
+                       vertex.cancel();
 
-                                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
 
-                                       vertex.cancel();
+                       vertex.cancel();
 
-                                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
 
-                                       // callback by TaskManager after 
canceling completes
-                                       
vertex.getCurrentExecutionAttempt().cancelingComplete();
+                       // callback by TaskManager after canceling completes
+                       vertex.getCurrentExecutionAttempt().cancelingComplete();
 
-                                       assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.CANCELED, 
vertex.getExecutionState());
 
-                                       assertTrue(slot.isReleased());
+                       assertTrue(slot.isReleased());
 
-                                       assertNull(vertex.getFailureCause());
+                       assertNull(vertex.getFailureCause());
 
-                                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-                                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-                                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
-                               } catch (Exception e) {
-                                       e.printStackTrace();
-                                       fail(e.getMessage());
-                               }finally{
-                                       
TestingUtils.setGlobalExecutionContext();
-                               }
-                       }
-               };
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
 
        @Test
        public void testCancelFromRunningDidNotFindTask() {
                // this may happen when the task finished or failed while the 
call was in progress
-               new JavaTestKit(system) {
-                       {
-                               try {
-                                       
TestingUtils.setCallingThreadDispatcher(system);
-                                       final JobVertexID jid = new 
JobVertexID();
-                                       final ExecutionJobVertex ejv = 
getExecutionVertex(jid);
+               try {
+                       final JobVertexID jid = new JobVertexID();
+                       final ExecutionJobVertex ejv = getExecutionVertex(jid, 
TestingUtils.directExecutionContext());
 
-                                       final ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                                       
AkkaUtils.getDefaultTimeout());
-                                       final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+                       final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
+                                       AkkaUtils.getDefaultTimeout());
+                       final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
-                                       final ActorRef taskManager = 
TestActorRef.create(system,Props.create(new
-                                                       
CancelSequenceTaskManagerCreator(new
-                                                       
TaskOperationResult(execId, false))));
 
-                                       Instance instance = 
getInstance(taskManager);
-                                       SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
+                       final InstanceGateway instanceGateway = new 
CancelSequenceInstanceGateway(
+                                       TestingUtils.directExecutionContext(),
+                                       new TaskOperationResult(execId, false)
+                       );
 
-                                       setVertexState(vertex, 
ExecutionState.RUNNING);
-                                       setVertexResource(vertex, slot);
+                       Instance instance = getInstance(instanceGateway);
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
-                                       assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
+                       setVertexState(vertex, ExecutionState.RUNNING);
+                       setVertexResource(vertex, slot);
 
-                                       vertex.cancel();
+                       assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
 
-                                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
+                       vertex.cancel();
 
-                                       assertNull(vertex.getFailureCause());
+                       assertEquals(ExecutionState.CANCELING, 
vertex.getExecutionState());
 
-                                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-                                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-                               } catch (Exception e) {
-                                       e.printStackTrace();
-                                       fail(e.getMessage());
-                               }finally{
-                                       
TestingUtils.setGlobalExecutionContext();
-                               }
-                       }
-               };
+                       assertNull(vertex.getFailureCause());
+
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
 
        @Test
        public void testCancelCallFails() {
-               new JavaTestKit(system) {
-                       {
-                               try {
-                                       
TestingUtils.setCallingThreadDispatcher(system);
-                                       final JobVertexID jid = new 
JobVertexID();
-                                       final ExecutionJobVertex ejv = 
getExecutionVertex(jid);
+               try {
+                       final JobVertexID jid = new JobVertexID();
+                       final ExecutionJobVertex ejv = getExecutionVertex(jid, 
TestingUtils.directExecutionContext());
 
-                                       final ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                                       
AkkaUtils.getDefaultTimeout());
+                       final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
+                                       AkkaUtils.getDefaultTimeout());
 
-                                       final ActorRef taskManager = 
TestActorRef.create(system, Props.create(new
-                                                       
CancelSequenceTaskManagerCreator()));
+                       final InstanceGateway gateway = new 
CancelSequenceInstanceGateway(TestingUtils.directExecutionContext());
 
-                                       Instance instance = 
getInstance(taskManager);
-                                       SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
+                       Instance instance = getInstance(gateway);
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
-                                       setVertexState(vertex, 
ExecutionState.RUNNING);
-                                       setVertexResource(vertex, slot);
+                       setVertexState(vertex, ExecutionState.RUNNING);
+                       setVertexResource(vertex, slot);
 
-                                       assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
 
-                                       vertex.cancel();
+                       vertex.cancel();
 
-                                       assertEquals(ExecutionState.FAILED, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.FAILED, 
vertex.getExecutionState());
 
-                                       assertTrue(slot.isReleased());
+                       assertTrue(slot.isReleased());
 
-                                       assertNotNull(vertex.getFailureCause());
+                       assertNotNull(vertex.getFailureCause());
 
-                                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
-                                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-                                       
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
-                               } catch (Exception e) {
-                                       e.printStackTrace();
-                                       fail(e.getMessage());
-                               }finally{
-                                       
TestingUtils.setGlobalExecutionContext();
-                               }
-                       }
-               };
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
+                       
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
 
        @Test
        public void testSendCancelAndReceiveFail() {
-               new JavaTestKit(system) {
-                       {
-                               try {
-                                       final JobVertexID jid = new 
JobVertexID();
-                                       final ExecutionJobVertex ejv = 
getExecutionVertex(jid);
+               try {
+                       final JobVertexID jid = new JobVertexID();
+                       final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
-                                       final ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
-                                                       
AkkaUtils.getDefaultTimeout());
-                                       final ExecutionAttemptID execID = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+                       final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
+                                       AkkaUtils.getDefaultTimeout());
+                       final ExecutionAttemptID execID = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
-                                       final ActorRef taskManager = 
system.actorOf(
-                                                       Props.create(new 
CancelSequenceTaskManagerCreator(
-                                                                       new 
TaskOperationResult(execID, true)
-                                                       )));
+                       final InstanceGateway gateway = new 
CancelSequenceInstanceGateway(
+                                       TestingUtils.defaultExecutionContext(),
+                                       new TaskOperationResult(execID, true));
 
-                                       Instance instance = 
getInstance(taskManager);
-                                       SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
+                       Instance instance = getInstance(gateway);
+                       SimpleSlot slot = instance.allocateSimpleSlot(new 
JobID());
 
-                                       setVertexState(vertex, 
ExecutionState.RUNNING);
-                                       setVertexResource(vertex, slot);
+                       setVertexState(vertex, ExecutionState.RUNNING);
+                       setVertexResource(vertex, slot);
 
-                                       assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
+                       assertEquals(ExecutionState.RUNNING, 
vertex.getExecutionState());
 
-                                       vertex.cancel();
+                       vertex.cancel();
 
-                                       assertTrue(vertex.getExecutionState() 
== ExecutionState.CANCELING || vertex.getExecutionState() == 
ExecutionState.FAILED);
+                       assertTrue(vertex.getExecutionState() == 
ExecutionState.CANCELING || vertex.getExecutionState() == 
ExecutionState.FAILED);
 
-                                       
vertex.getCurrentExecutionAttempt().markFailed(new Throwable("test"));
+                       vertex.getCurrentExecutionAttempt().markFailed(new 
Throwable("test"));
 
-                                       assertTrue(vertex.getExecutionState() 
== ExecutionState.CANCELED || vertex.getExecutionState() == 
ExecutionState.FAILED);
+                       assertTrue(vertex.getExecutionState() == 
ExecutionState.CANCELED || vertex.getExecutionState() == ExecutionState.FAILED);
 
-                                       assertTrue(slot.isReleased());
+                       assertTrue(slot.isReleased());
 
-                                       assertEquals(0, 
vertex.getExecutionGraph().getRegisteredExecutions().size());
-                               } catch (Exception e) {
-                                       e.printStackTrace();
-                                       fail(e.getMessage());
-                               }
-                       }
-               };
+                       assertEquals(0, 
vertex.getExecutionGraph().getRegisteredExecutions().size());
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -541,7 +482,7 @@ public class ExecutionVertexCancelTest {
                        // deploying after canceling from CREATED needs to 
raise an exception, because
                        // the scheduler (or any caller) needs to know that the 
slot should be released
                        try {
-                               Instance instance = 
getInstance(ActorRef.noSender());
+                               Instance instance = 
getInstance(DummyInstanceGateway.INSTANCE);
                                SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
 
                                vertex.deployToSlot(slot);
@@ -584,7 +525,7 @@ public class ExecutionVertexCancelTest {
                                                AkkaUtils.getDefaultTimeout());
                                setVertexState(vertex, 
ExecutionState.CANCELING);
 
-                               Instance instance = 
getInstance(ActorRef.noSender());
+                               Instance instance = 
getInstance(DummyInstanceGateway.INSTANCE);
                                SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
 
                                vertex.deployToSlot(slot);
@@ -600,7 +541,7 @@ public class ExecutionVertexCancelTest {
                                ExecutionVertex vertex = new 
ExecutionVertex(ejv, 0, new IntermediateResult[0],
                                                AkkaUtils.getDefaultTimeout());
 
-                               Instance instance = 
getInstance(ActorRef.noSender());
+                               Instance instance = 
getInstance(DummyInstanceGateway.INSTANCE);
                                SimpleSlot slot = 
instance.allocateSimpleSlot(new JobID());
 
                                setVertexResource(vertex, slot);
@@ -621,39 +562,33 @@ public class ExecutionVertexCancelTest {
                }
        }
 
-       public static class CancelSequenceTaskManagerCreator implements 
Creator<CancelSequenceTaskManager> {
-               private final TaskOperationResult[] results;
-               public CancelSequenceTaskManagerCreator(TaskOperationResult ... 
results){
-                       this.results = results;
-               }
-
-               @Override
-               public CancelSequenceTaskManager create() throws Exception {
-                       return new CancelSequenceTaskManager(results);
-               }
-       }
-
-       public static class CancelSequenceTaskManager extends UntypedActor{
+       public static class CancelSequenceInstanceGateway extends 
BaseTestingInstanceGateway {
                private final TaskOperationResult[] results;
-               private int index;
+               private int index = -1;
 
-               public CancelSequenceTaskManager(TaskOperationResult[] results){
-                       this.results = results;
-                       index = -1;
+               public CancelSequenceInstanceGateway(ExecutionContext 
executionContext, TaskOperationResult ... result) {
+                       super(executionContext);
+                       this.results = result;
                }
 
                @Override
-               public void onReceive(Object message) throws Exception {
-                       if(message instanceof TaskMessages.SubmitTask){
-                               getSender().tell(Messages.getAcknowledge(), 
getSelf());
-                       }else if(message instanceof TaskMessages.CancelTask){
+               public Object handleMessage(Object message) throws Exception {
+                       Object result;
+                       if(message instanceof TaskMessages.SubmitTask) {
+                               result = Messages.getAcknowledge();
+                       } else if(message instanceof TaskMessages.CancelTask) {
                                index++;
+
                                if(index >= results.length){
-                                       getSender().tell(new Status.Failure(new 
IOException("RPC call failed.")), getSelf());
-                               }else {
-                                       getSender().tell(results[index], 
getSelf());
+                                       throw new IOException("RPC call 
failed.");
+                               } else {
+                                       result = results[index];
                                }
+                       } else {
+                               result = null;
                        }
+
+                       return result;
                }
        }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index eadf328..431c3a9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -22,13 +22,6 @@ import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
 
 import static org.junit.Assert.*;
 
-import akka.actor.Actor;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
@@ -37,37 +30,20 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class ExecutionVertexDeploymentTest {
 
-       private static ActorSystem system;
-
-       @BeforeClass
-       public static void setup(){
-               system = ActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
-       }
-
-       @AfterClass
-       public static void teardown(){
-               JavaTestKit.shutdownActorSystem(system);
-       }
-
        @Test
        public void testDeployCall() {
                try {
                        final JobVertexID jid = new JobVertexID();
 
-                       TestingUtils.setCallingThreadDispatcher(system);
-                       ActorRef tm = TestActorRef.create(system, 
Props.create(SimpleAcknowledgingTaskManager
-                                       .class));
-
                        final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
                        // mock taskmanager to simply accept the call
-                       Instance instance = getInstance(tm);
+                       Instance instance = getInstance(
+                                       new 
SimpleInstanceGateway(TestingUtils.directExecutionContext()));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
@@ -95,24 +71,17 @@ public class ExecutionVertexDeploymentTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       TestingUtils.setGlobalExecutionContext();
-               }
        }
 
        @Test
        public void testDeployWithSynchronousAnswer() {
                try {
-                       TestingUtils.setCallingThreadDispatcher(system);
-
                        final JobVertexID jid = new JobVertexID();
 
-                       final TestActorRef<? extends Actor> simpleTaskManager = 
TestActorRef.create(system,
-                                       
Props.create(SimpleAcknowledgingTaskManager.class));
-
-                       final ExecutionJobVertex ejv = getExecutionVertex(jid);
+                       final ExecutionJobVertex ejv = getExecutionVertex(jid, 
TestingUtils.directExecutionContext());
 
-                       final Instance instance = 
getInstance(simpleTaskManager);
+                       final Instance instance = getInstance(
+                                       new 
SimpleInstanceGateway(TestingUtils.directExecutionContext()));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
@@ -143,9 +112,6 @@ public class ExecutionVertexDeploymentTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       TestingUtils.setGlobalExecutionContext();
-               }
        }
 
        @Test
@@ -157,10 +123,8 @@ public class ExecutionVertexDeploymentTest {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                        AkkaUtils.getDefaultTimeout());
 
-                       final TestActorRef<? extends Actor> simpleTaskManager = 
TestActorRef.create(system,
-                                       
Props.create(SimpleAcknowledgingTaskManager.class));
-
-                       final Instance instance = 
getInstance(simpleTaskManager);
+                       final Instance instance = getInstance(
+                                       new 
SimpleInstanceGateway(TestingUtils.defaultExecutionContext()));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
@@ -200,18 +164,14 @@ public class ExecutionVertexDeploymentTest {
        @Test
        public void testDeployFailedSynchronous() {
                try {
-                       TestingUtils.setCallingThreadDispatcher(system);
-
                        final JobVertexID jid = new JobVertexID();
-                       final ExecutionJobVertex ejv = getExecutionVertex(jid);
+                       final ExecutionJobVertex ejv = getExecutionVertex(jid, 
TestingUtils.directExecutionContext());
 
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                        AkkaUtils.getDefaultTimeout());
 
-                       final TestActorRef<? extends Actor> simpleTaskManager = 
TestActorRef.create(system,
-                                       
Props.create(SimpleFailingTaskManager.class));
-
-                       final Instance instance = 
getInstance(simpleTaskManager);
+                       final Instance instance = getInstance(
+                                       new 
SimpleFailingInstanceGateway(TestingUtils.directExecutionContext()));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
@@ -230,9 +190,6 @@ public class ExecutionVertexDeploymentTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       TestingUtils.setGlobalExecutionContext();
-               }
        }
 
        @Test
@@ -243,10 +200,8 @@ public class ExecutionVertexDeploymentTest {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                        AkkaUtils.getDefaultTimeout());
 
-                       final TestActorRef<? extends Actor> simpleTaskManager = 
TestActorRef.create(system,
-                                       
Props.create(SimpleFailingTaskManager.class));
-
-                       final Instance instance = 
getInstance(simpleTaskManager);
+                       final Instance instance = getInstance(
+                                       new 
SimpleFailingInstanceGateway(TestingUtils.directExecutionContext()));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
@@ -280,20 +235,16 @@ public class ExecutionVertexDeploymentTest {
        public void testFailExternallyDuringDeploy() {
                try {
                        final JobVertexID jid = new JobVertexID();
-                       final ExecutionJobVertex ejv = getExecutionVertex(jid);
 
-                       final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
-                                       AkkaUtils.getDefaultTimeout());
+                       final TestingUtils.QueuedActionExecutionContext ec = 
TestingUtils.queuedActionExecutionContext();
+                       final TestingUtils.ActionQueue queue = ec.actionQueue();
 
-                       final ActionQueue queue = new ActionQueue();
-                       final TestingUtils.QueuedActionExecutionContext ec = 
new TestingUtils
-                                       .QueuedActionExecutionContext(queue);
+                       final ExecutionJobVertex ejv = getExecutionVertex(jid, 
ec);
 
-                       TestingUtils.setExecutionContext(ec);
+                       final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
+                                       AkkaUtils.getDefaultTimeout());
 
-                       final TestActorRef<? extends Actor> simpleTaskManager = 
TestActorRef.create(system,
-                                       
Props.create(SimpleAcknowledgingTaskManager.class));
-                       final Instance instance = 
getInstance(simpleTaskManager);
+                       final Instance instance = getInstance(new 
SimpleInstanceGateway(TestingUtils.directExecutionContext()));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
@@ -307,6 +258,7 @@ public class ExecutionVertexDeploymentTest {
                        assertEquals(testError, vertex.getFailureCause());
 
                        queue.triggerNextAction();
+                       queue.triggerNextAction();
 
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
                        
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
@@ -316,34 +268,29 @@ public class ExecutionVertexDeploymentTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       TestingUtils.setGlobalExecutionContext();
-               }
        }
 
        @Test
        public void testFailCallOvertakesDeploymentAnswer() {
 
                try {
-                       ActionQueue queue = new ActionQueue();
-                       TestingUtils.QueuedActionExecutionContext context = new 
TestingUtils
-                                       .QueuedActionExecutionContext(queue);
-
-                       TestingUtils.setExecutionContext(context);
+                       final TestingUtils.QueuedActionExecutionContext context 
= TestingUtils.queuedActionExecutionContext();
+                       final TestingUtils.ActionQueue queue = 
context.actionQueue();
 
                        final JobVertexID jid = new JobVertexID();
 
-                       final ExecutionJobVertex ejv = getExecutionVertex(jid);
+                       final ExecutionJobVertex ejv = getExecutionVertex(jid, 
context);
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                        AkkaUtils.getDefaultTimeout());
 
                        final ExecutionAttemptID eid = 
vertex.getCurrentExecutionAttempt().getAttemptId();
 
-                       final TestActorRef<? extends Actor> simpleTaskManager = 
TestActorRef.create(system, Props.create(new
-                                       
ExecutionVertexCancelTest.CancelSequenceTaskManagerCreator(new
-                                       TaskOperationResult(eid, false), new 
TaskOperationResult(eid, true))));
+                       final Instance instance = getInstance(
+                                       new 
ExecutionVertexCancelTest.CancelSequenceInstanceGateway(
+                                                       context,
+                                                       new 
TaskOperationResult(eid, false),
+                                                       new 
TaskOperationResult(eid, true)));
 
-                       final Instance instance = 
getInstance(simpleTaskManager);
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
@@ -361,11 +308,14 @@ public class ExecutionVertexDeploymentTest {
                        Runnable cancel1 = queue.popNextAction();
 
                        cancel1.run();
-                       // execute onComplete callback
+                       // execute onComplete callback of cancel
                        queue.triggerNextAction();
 
                        deploy.run();
 
+                       // execute onComplete callback of deploy
+                       queue.triggerNextAction();
+
                        assertEquals(ExecutionState.FAILED, 
vertex.getExecutionState());
 
                        assertEquals(testError, vertex.getFailureCause());
@@ -380,8 +330,5 @@ public class ExecutionVertexDeploymentTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       TestingUtils.setGlobalExecutionContext();
-               }
        }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 1e9c30b..8ea7017 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -23,14 +23,9 @@ import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.DummyInstanceGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -38,28 +33,12 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.mockito.Matchers;
 
 public class ExecutionVertexSchedulingTest {
 
-       private static ActorSystem system;
-
-       @BeforeClass
-       public static void setup(){
-               system = ActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
-       }
-
-       @AfterClass
-       public static void teardown(){
-               JavaTestKit.shutdownActorSystem(system);
-               system = null;
-       }
-
        @Test
        public void testSlotReleasedWhenScheduledImmediately() {
                try {
@@ -68,7 +47,7 @@ public class ExecutionVertexSchedulingTest {
                                        AkkaUtils.getDefaultTimeout());
 
                        // a slot than cannot be deployed to
-                       final Instance instance = 
getInstance(ActorRef.noSender());
+                       final Instance instance = 
getInstance(DummyInstanceGateway.INSTANCE);
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
                        
                        slot.releaseSlot();
@@ -98,7 +77,7 @@ public class ExecutionVertexSchedulingTest {
                                        AkkaUtils.getDefaultTimeout());
 
                        // a slot than cannot be deployed to
-                       final Instance instance = 
getInstance(ActorRef.noSender());
+                       final Instance instance = 
getInstance(DummyInstanceGateway.INSTANCE);
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        slot.releaseSlot();
@@ -134,11 +113,7 @@ public class ExecutionVertexSchedulingTest {
                        final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
                                        AkkaUtils.getDefaultTimeout());
 
-                       TestingUtils.setCallingThreadDispatcher(system);
-                       ActorRef tm = TestActorRef.create(system, 
Props.create(ExecutionGraphTestUtils
-                                       .SimpleAcknowledgingTaskManager.class));
-
-                       final Instance instance = getInstance(tm);
+                       final Instance instance = getInstance(new 
ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.defaultExecutionContext()));
                        final SimpleSlot slot = 
instance.allocateSimpleSlot(ejv.getJobId());
 
                        Scheduler scheduler = mock(Scheduler.class);
@@ -153,8 +128,6 @@ public class ExecutionVertexSchedulingTest {
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
-               }finally{
-                       TestingUtils.setGlobalExecutionContext();
                }
        }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index 0d2ffeb..b4a7e63 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -39,39 +39,14 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.Actor;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
 
 public class LocalInputSplitsTest {
        
        private static final FiniteDuration TIMEOUT = new FiniteDuration(100, 
TimeUnit.SECONDS);
        
-       private static ActorSystem system;
-       
-       private static TestActorRef<? extends Actor> taskManager;
-       
-       
-       @BeforeClass
-       public static void setup() {
-               system = ActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
-               taskManager = TestActorRef.create(system,
-                               
Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class));
-       }
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(system);
-               system = null;
-       }
-       
        // 
--------------------------------------------------------------------------------------------
        
        @Test
@@ -290,14 +265,18 @@ public class LocalInputSplitsTest {
                        
                        JobGraph jobGraph = new JobGraph("test job", vertex);
                        
-                       ExecutionGraph eg = new 
ExecutionGraph(jobGraph.getJobID(),
-                                       jobGraph.getName(), 
jobGraph.getJobConfiguration(), TIMEOUT);
+                       ExecutionGraph eg = new ExecutionGraph(
+                                       TestingUtils.defaultExecutionContext(),
+                                       jobGraph.getJobID(),
+                                       jobGraph.getName(),
+                                       jobGraph.getJobConfiguration(),
+                                       TIMEOUT);
                        
                        
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
                        eg.setQueuedSchedulingAllowed(false);
                        
                        // create a scheduler with 6 instances where always two 
are on the same host
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                        Instance i1 = getInstance(new byte[] {10,0,1,1}, 12345, 
"host1", 1);
                        Instance i2 = getInstance(new byte[] {10,0,1,1}, 12346, 
"host1", 1);
                        Instance i3 = getInstance(new byte[] {10,0,1,2}, 12345, 
"host2", 1);
@@ -349,8 +328,12 @@ public class LocalInputSplitsTest {
                
                JobGraph jobGraph = new JobGraph("test job", vertex);
                
-               ExecutionGraph eg = new ExecutionGraph(jobGraph.getJobID(),
-                               jobGraph.getName(), 
jobGraph.getJobConfiguration(), TIMEOUT);
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               jobGraph.getJobID(),
+                               jobGraph.getName(),
+                               jobGraph.getJobConfiguration(),
+                               TIMEOUT);
                eg.setQueuedSchedulingAllowed(false);
                
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -370,7 +353,7 @@ public class LocalInputSplitsTest {
        }
        
        private static Scheduler getScheduler(int numInstances, int 
numSlotsPerInstance) throws Exception {
-               Scheduler scheduler = new Scheduler();
+               Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                
                for (int i = 0; i < numInstances; i++) {
                        byte[] ipAddress = new byte[] { 10, 0, 1, (byte) (1 + 
i) };
@@ -393,7 +376,13 @@ public class LocalInputSplitsTest {
                when(connection.getHostname()).thenReturn(hostname);
                when(connection.getFQDNHostname()).thenReturn(hostname);
                
-               return new Instance(taskManager, connection, new InstanceID(), 
hardwareDescription, slots);
+               return new Instance(
+                               new 
ExecutionGraphTestUtils.SimpleInstanceGateway(
+                                               
TestingUtils.defaultExecutionContext()),
+                               connection,
+                               new InstanceID(),
+                               hardwareDescription,
+                               slots);
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 4677bf8..3cedb63 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -56,7 +57,12 @@ public class PointwisePatternTest {
        
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -91,7 +97,12 @@ public class PointwisePatternTest {
        
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -127,7 +138,12 @@ public class PointwisePatternTest {
        
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -164,7 +180,12 @@ public class PointwisePatternTest {
        
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -199,7 +220,12 @@ public class PointwisePatternTest {
        
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -254,7 +280,12 @@ public class PointwisePatternTest {
        
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -300,7 +331,12 @@ public class PointwisePatternTest {
        
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
-               ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, 
AkkaUtils.getDefaultTimeout());
+               ExecutionGraph eg = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(),
+                               jobId,
+                               jobName,
+                               cfg,
+                               AkkaUtils.getDefaultTimeout());
                try {
                        eg.attachJobGraph(ordered);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 376ff14..b779d79 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.actor.ActorRef;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.DummyInstanceGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -78,7 +78,7 @@ public class TerminalStateDeadlockTest {
                        InstanceConnectionInfo ci = new 
InstanceConnectionInfo(address, 12345);
                                
                        HardwareDescription resources = new 
HardwareDescription(4, 4000000, 3000000, 2000000);
-                       Instance instance = new Instance(ActorRef.noSender(), 
ci, new InstanceID(), resources, 4);
+                       Instance instance = new 
Instance(DummyInstanceGateway.INSTANCE, ci, new InstanceID(), resources, 4);
 
                        this.resource = instance.allocateSimpleSlot(new 
JobID());
                }
@@ -116,7 +116,7 @@ public class TerminalStateDeadlockTest {
                                vertices = Arrays.asList(v1, v2);
                        }
                        
-                       final Scheduler scheduler = new Scheduler();
+                       final Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                        
                        final Executor executor = 
Executors.newFixedThreadPool(4);
                        
@@ -181,7 +181,7 @@ public class TerminalStateDeadlockTest {
                private volatile boolean done;
 
                TestExecGraph(JobID jobId) {
-                       super(jobId, "test graph", EMPTY_CONFIG, TIMEOUT);
+                       super(TestingUtils.defaultExecutionContext(), jobId, 
"test graph", EMPTY_CONFIG, TIMEOUT);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index 756b9a4..3305254 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.flink.runtime.instance.DummyInstanceGateway;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -39,42 +40,14 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.Actor;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
 
 public class VertexLocationConstraintTest {
 
        private static final FiniteDuration timeout = new FiniteDuration(100, 
TimeUnit.SECONDS);
        
-       private static ActorSystem system;
-       
-       private static TestActorRef<? extends Actor> taskManager;
-       
-       
-       @BeforeClass
-       public static void setup() {
-               system = ActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
-               
-               taskManager = TestActorRef.create(system,
-                               
Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class));
-       }
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(system);
-               system = null;
-       }
-       
-       
        @Test
        public void testScheduleWithConstraint1() {
                try {
@@ -91,7 +64,7 @@ public class VertexLocationConstraintTest {
                        Instance instance2 = getInstance(address2, 6789, 
hostname2);
                        Instance instance3 = getInstance(address3, 6789, 
hostname3);
                        
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                        scheduler.newInstanceAvailable(instance1);
                        scheduler.newInstanceAvailable(instance2);
                        scheduler.newInstanceAvailable(instance3);
@@ -102,7 +75,12 @@ public class VertexLocationConstraintTest {
                        jobVertex.setParallelism(2);
                        JobGraph jg = new JobGraph("test job", jobVertex);
                        
-                       ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), 
jg.getName(), jg.getJobConfiguration(), timeout);
+                       ExecutionGraph eg = new ExecutionGraph(
+                                       TestingUtils.defaultExecutionContext(),
+                                       jg.getJobID(),
+                                       jg.getName(),
+                                       jg.getJobConfiguration(),
+                                       timeout);
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
                        
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
@@ -157,7 +135,7 @@ public class VertexLocationConstraintTest {
                        Instance instance2 = getInstance(address2, 6789, 
hostname2);
                        Instance instance3 = getInstance(address3, 6789, 
hostname3);
                        
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                        scheduler.newInstanceAvailable(instance1);
                        scheduler.newInstanceAvailable(instance2);
                        scheduler.newInstanceAvailable(instance3);
@@ -168,7 +146,12 @@ public class VertexLocationConstraintTest {
                        jobVertex.setParallelism(2);
                        JobGraph jg = new JobGraph("test job", jobVertex);
                        
-                       ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), 
jg.getName(), jg.getJobConfiguration(), timeout);
+                       ExecutionGraph eg = new ExecutionGraph(
+                                       TestingUtils.defaultExecutionContext(),
+                                       jg.getJobID(),
+                                       jg.getName(),
+                                       jg.getJobConfiguration(),
+                                       timeout);
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
                        
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
@@ -219,7 +202,7 @@ public class VertexLocationConstraintTest {
                        Instance instance2 = getInstance(address2, 6789, 
hostname2);
                        Instance instance3 = getInstance(address3, 6789, 
hostname3);
                        
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                        scheduler.newInstanceAvailable(instance1);
                        scheduler.newInstanceAvailable(instance2);
                        scheduler.newInstanceAvailable(instance3);
@@ -238,7 +221,12 @@ public class VertexLocationConstraintTest {
                        
                        JobGraph jg = new JobGraph("test job", jobVertex1, 
jobVertex2);
                        
-                       ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), 
jg.getName(), jg.getJobConfiguration(), timeout);
+                       ExecutionGraph eg = new ExecutionGraph(
+                                       TestingUtils.defaultExecutionContext(),
+                                       jg.getJobID(),
+                                       jg.getName(),
+                                       jg.getJobConfiguration(),
+                                       timeout);
                        eg.attachJobGraph(Arrays.asList(jobVertex1, 
jobVertex2));
                        
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex1.getID());
@@ -290,7 +278,7 @@ public class VertexLocationConstraintTest {
                        Instance instance1 = getInstance(address1, 6789, 
hostname1);
                        Instance instance2 = getInstance(address2, 6789, 
hostname2);
                        
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                        scheduler.newInstanceAvailable(instance1);
                        
                        // prepare the execution graph
@@ -299,7 +287,12 @@ public class VertexLocationConstraintTest {
                        jobVertex.setParallelism(1);
                        JobGraph jg = new JobGraph("test job", jobVertex);
                        
-                       ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), 
jg.getName(), jg.getJobConfiguration(), timeout);
+                       ExecutionGraph eg = new ExecutionGraph(
+                                       TestingUtils.defaultExecutionContext(),
+                                       jg.getJobID(),
+                                       jg.getName(),
+                                       jg.getJobConfiguration(),
+                                       timeout);
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
                        
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
@@ -343,7 +336,7 @@ public class VertexLocationConstraintTest {
                        Instance instance1 = getInstance(address1, 6789, 
hostname1);
                        Instance instance2 = getInstance(address2, 6789, 
hostname2);
                        
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                        scheduler.newInstanceAvailable(instance1);
                        
                        // prepare the execution graph
@@ -362,7 +355,12 @@ public class VertexLocationConstraintTest {
                        jobVertex1.setSlotSharingGroup(sharingGroup);
                        jobVertex2.setSlotSharingGroup(sharingGroup);
                        
-                       ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), 
jg.getName(), jg.getJobConfiguration(), timeout);
+                       ExecutionGraph eg = new ExecutionGraph(
+                                       TestingUtils.defaultExecutionContext(),
+                                       jg.getJobID(),
+                                       jg.getName(),
+                                       jg.getJobConfiguration(),
+                                       timeout);
                        eg.attachJobGraph(Arrays.asList(jobVertex1, 
jobVertex2));
                        
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex1.getID());
@@ -395,12 +393,17 @@ public class VertexLocationConstraintTest {
                        JobVertex vertex = new JobVertex("test vertex", new 
JobVertexID());
                        JobGraph jg = new JobGraph("test job", vertex);
                        
-                       ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), 
jg.getName(), jg.getJobConfiguration(), timeout);
+                       ExecutionGraph eg = new ExecutionGraph(
+                                       TestingUtils.defaultExecutionContext(),
+                                       jg.getJobID(),
+                                       jg.getName(),
+                                       jg.getJobConfiguration(),
+                                       timeout);
                        eg.attachJobGraph(Collections.singletonList(vertex));
                        
                        ExecutionVertex ev = 
eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0];
                        
-                       Instance instance = 
ExecutionGraphTestUtils.getInstance(ActorRef.noSender());
+                       Instance instance = 
ExecutionGraphTestUtils.getInstance(DummyInstanceGateway.INSTANCE);
                        
ev.setLocationConstraintHosts(Collections.singletonList(instance));
                        
                        assertNotNull(ev.getPreferredLocations());
@@ -431,6 +434,12 @@ public class VertexLocationConstraintTest {
                when(connection.getHostname()).thenReturn(hostname);
                when(connection.getFQDNHostname()).thenReturn(hostname);
                
-               return new Instance(taskManager, connection, new InstanceID(), 
hardwareDescription, 1);
+               return new Instance(
+                               new 
ExecutionGraphTestUtils.SimpleInstanceGateway(
+                                               
TestingUtils.defaultExecutionContext()),
+                               connection,
+                               new InstanceID(),
+                               hardwareDescription,
+                               1);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index a1d6d03..d9e422c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 
 public class VertexSlotSharingTest {
@@ -68,7 +69,11 @@ public class VertexSlotSharingTest {
                        
                        List<JobVertex> vertices = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
                        
-                       ExecutionGraph eg = new ExecutionGraph(new JobID(), 
"test job", new Configuration(),
+                       ExecutionGraph eg = new ExecutionGraph(
+                                       TestingUtils.defaultExecutionContext(),
+                                       new JobID(),
+                                       "test job",
+                                       new Configuration(),
                                        AkkaUtils.getDefaultTimeout());
                        eg.attachJobGraph(vertices);
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
new file mode 100644
index 0000000..e9f8259
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract base class for testing {@link InstanceGateway} instances. The 
implementing subclass
+ * only has to provide an implementation for handleMessage which contains the 
logic to treat
+ * different messages.
+ */
+abstract public class BaseTestingInstanceGateway implements InstanceGateway {
+       /**
+        * {@link ExecutionContext} which is used to execute the futures.
+        */
+       private final ExecutionContext executionContext;
+
+       public BaseTestingInstanceGateway(ExecutionContext executionContext) {
+               this.executionContext = executionContext;
+       }
+
+       @Override
+       public Future<Object> ask(Object message, FiniteDuration timeout) {
+               try {
+                       final Object result = handleMessage(message);
+
+                       return Futures.future(new Callable<Object>() {
+                               @Override
+                               public Object call() throws Exception {
+                                       return result;
+                               }
+                       }, executionContext);
+
+               } catch (final Exception e) {
+                       // if an exception occurred in the handleMessage method 
then return it as part of the future
+                       return Futures.future(new Callable<Object>() {
+                               @Override
+                               public Object call() throws Exception {
+                                       throw e;
+                               }
+                       }, executionContext);
+               }
+       }
+
+       /**
+        * Handles the supported messages by this InstanceGateway
+        *
+        * @param message Message to handle
+        * @return Result
+        * @throws Exception
+        */
+       abstract public Object handleMessage(Object message) throws Exception;
+
+       @Override
+       public void tell(Object message) {}
+
+       @Override
+       public void forward(Object message, ActorRef sender) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Future<Object> retry(Object message, int numberRetries, 
FiniteDuration timeout, ExecutionContext executionContext) {
+               return ask(message, timeout);
+       }
+
+       @Override
+       public String path() {
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
new file mode 100644
index 0000000..5941201
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Dummy {@link InstanceGateway} implementation used for testing.
+ */
+public class DummyInstanceGateway implements InstanceGateway {
+       public static final DummyInstanceGateway INSTANCE = new 
DummyInstanceGateway();
+
+       @Override
+       public Future<Object> ask(Object message, FiniteDuration timeout) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void tell(Object message) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void forward(Object message, ActorRef sender) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Future<Object> retry(Object message, int numberRetries, 
FiniteDuration timeout, ExecutionContext executionContext) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public String path() {
+               return "DummyInstanceGateway";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 595ac7e..c075a17 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.*;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 
-import akka.actor.ActorRef;
 import org.apache.flink.api.common.JobID;
 import org.junit.Test;
 
@@ -39,7 +38,7 @@ public class InstanceTest {
                        InetAddress address = 
InetAddress.getByName("127.0.0.1");
                        InstanceConnectionInfo connection = new 
InstanceConnectionInfo(address, 10001);
 
-                       Instance instance = new Instance(ActorRef.noSender(), 
connection, new InstanceID(), hardwareDescription, 4);
+                       Instance instance = new 
Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), 
hardwareDescription, 4);
 
                        assertEquals(4, instance.getTotalNumberOfSlots());
                        assertEquals(4, instance.getNumberOfAvailableSlots());
@@ -100,7 +99,7 @@ public class InstanceTest {
                        InetAddress address = 
InetAddress.getByName("127.0.0.1");
                        InstanceConnectionInfo connection = new 
InstanceConnectionInfo(address, 10001);
 
-                       Instance instance = new Instance(ActorRef.noSender(), 
connection, new InstanceID(), hardwareDescription, 3);
+                       Instance instance = new 
Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), 
hardwareDescription, 3);
 
                        assertEquals(3, instance.getNumberOfAvailableSlots());
 
@@ -130,7 +129,7 @@ public class InstanceTest {
                        InetAddress address = 
InetAddress.getByName("127.0.0.1");
                        InstanceConnectionInfo connection = new 
InstanceConnectionInfo(address, 10001);
 
-                       Instance instance = new Instance(ActorRef.noSender(), 
connection, new InstanceID(), hardwareDescription, 3);
+                       Instance instance = new 
Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), 
hardwareDescription, 3);
 
                        assertEquals(3, instance.getNumberOfAvailableSlots());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index e075ab6..29ec7b5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -23,8 +23,6 @@ import static org.junit.Assert.*;
 
 import java.net.InetAddress;
 
-import akka.actor.ActorRef;
-
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.api.common.JobID;
 
@@ -148,7 +146,7 @@ public class SimpleSlotTest {
                InetAddress address = InetAddress.getByName("127.0.0.1");
                InstanceConnectionInfo connection = new 
InstanceConnectionInfo(address, 10001);
 
-               Instance instance = new Instance(ActorRef.noSender(), 
connection, new InstanceID(), hardwareDescription, 1);
+               Instance instance = new Instance(DummyInstanceGateway.INSTANCE, 
connection, new InstanceID(), hardwareDescription, 1);
                return instance.allocateSimpleSlot(new JobID());
        }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 7739dea..6affdcc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 import org.mockito.Mockito;
 import scala.Some;
@@ -58,7 +59,10 @@ public class NetworkEnvironmentTest {
                                        NUM_BUFFERS, BUFFER_SIZE, 
IOManager.IOMode.SYNC, new Some<NettyConfig>(nettyConf),
                                        new Tuple2<Integer, Integer>(0, 0));
 
-                       NetworkEnvironment env = new NetworkEnvironment(new 
FiniteDuration(30, TimeUnit.SECONDS), config);
+                       NetworkEnvironment env = new NetworkEnvironment(
+                               TestingUtils.defaultExecutionContext(),
+                               new FiniteDuration(30, TimeUnit.SECONDS),
+                               config);
 
                        assertFalse(env.isShutdown());
                        assertFalse(env.isAssociated());

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index a15e477..676b2a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -27,41 +27,22 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class ScheduleWithCoLocationHintTest {
 
-       private static ActorSystem system;
-
-       @BeforeClass
-       public static void setup(){
-               system = ActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
-               TestingUtils.setCallingThreadDispatcher(system);
-       }
-
-       @AfterClass
-       public static void teardown(){
-               TestingUtils.setGlobalExecutionContext();
-               JavaTestKit.shutdownActorSystem(system);
-       }
-
        @Test
        public void scheduleAllSharedAndCoLocated() {
                try {
                        JobVertexID jid1 = new JobVertexID();
                        JobVertexID jid2 = new JobVertexID();
                        
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
                        
                        scheduler.newInstanceAvailable(getRandomInstance(2));
                        scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -187,7 +168,7 @@ public class ScheduleWithCoLocationHintTest {
                        JobVertexID jid3 = new JobVertexID();
                        JobVertexID jid4 = new JobVertexID();
                        
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
                        
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
@@ -231,7 +212,7 @@ public class ScheduleWithCoLocationHintTest {
                        JobVertexID jid2 = new JobVertexID();
                        JobVertexID jid3 = new JobVertexID();
                        
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
                        
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
@@ -276,7 +257,7 @@ public class ScheduleWithCoLocationHintTest {
                        JobVertexID jid3 = new JobVertexID();
                        JobVertexID jid4 = new JobVertexID();
                        
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
                        scheduler.newInstanceAvailable(getRandomInstance(1));
                        scheduler.newInstanceAvailable(getRandomInstance(1));
                        scheduler.newInstanceAvailable(getRandomInstance(1));
@@ -338,7 +319,7 @@ public class ScheduleWithCoLocationHintTest {
                        JobVertexID jid2 = new JobVertexID();
                        JobVertexID jid3 = new JobVertexID();
                        
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
                        
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
@@ -407,7 +388,7 @@ public class ScheduleWithCoLocationHintTest {
                        JobVertexID jid1 = new JobVertexID();
                        JobVertexID jid2 = new JobVertexID();
                        
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
                        
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
@@ -461,7 +442,7 @@ public class ScheduleWithCoLocationHintTest {
                        JobVertexID jid2 = new JobVertexID();
                        JobVertexID jidx = new JobVertexID();
                        
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
                        
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
@@ -519,7 +500,7 @@ public class ScheduleWithCoLocationHintTest {
                        JobVertexID jid1 = new JobVertexID();
                        JobVertexID jid2 = new JobVertexID();
                        
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
                        
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
@@ -581,7 +562,7 @@ public class ScheduleWithCoLocationHintTest {
                        JobVertexID jid1 = new JobVertexID();
                        JobVertexID jid2 = new JobVertexID();
                        
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
                        
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index d19299b..2ee53d9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -25,11 +25,7 @@ import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.g
 import static org.junit.Assert.*;
 
 import org.apache.flink.runtime.instance.SimpleSlot;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -47,24 +43,11 @@ import org.apache.flink.runtime.instance.Instance;
  * Tests for the {@link Scheduler} when scheduling individual tasks.
  */
 public class SchedulerIsolatedTasksTest {
-       private static ActorSystem system;
 
-       @BeforeClass
-       public static void setup(){
-               system = ActorSystem.create("TestingActorSystem", 
TestingUtils.testConfig());
-               TestingUtils.setCallingThreadDispatcher(system);
-       }
-
-       @AfterClass
-       public static void teardown(){
-               TestingUtils.setGlobalExecutionContext();
-               JavaTestKit.shutdownActorSystem(system);
-       }
-       
        @Test
        public void testAddAndRemoveInstance() {
                try {
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                        
                        Instance i1 = getRandomInstance(2);
                        Instance i2 = getRandomInstance(2);
@@ -128,7 +111,7 @@ public class SchedulerIsolatedTasksTest {
        @Test
        public void testScheduleImmediately() {
                try {
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                        assertEquals(0, scheduler.getNumberOfAvailableSlots());
                        
                        scheduler.newInstanceAvailable(getRandomInstance(2));
@@ -197,12 +180,10 @@ public class SchedulerIsolatedTasksTest {
                final int NUM_SLOTS_PER_INSTANCE = 3;
                final int NUM_TASKS_TO_SCHEDULE = 2000;
 
-               TestingUtils.setGlobalExecutionContext();
-
                try {
                        // note: since this test asynchronously releases slots, 
the executor needs release workers.
                        // doing the release call synchronous can lead to a 
deadlock
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
 
                        for (int i = 0; i < NUM_INSTANCES; i++) {
                                
scheduler.newInstanceAvailable(getRandomInstance((int) (Math.random() * 
NUM_SLOTS_PER_INSTANCE) + 1));
@@ -287,15 +268,13 @@ public class SchedulerIsolatedTasksTest {
                } catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
-               } finally {
-                       TestingUtils.setCallingThreadDispatcher(system);
                }
        }
        
        @Test
        public void testScheduleWithDyingInstances() {
                try {
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                        
                        Instance i1 = getRandomInstance(2);
                        Instance i2 = getRandomInstance(2);
@@ -355,7 +334,7 @@ public class SchedulerIsolatedTasksTest {
        @Test
        public void testSchedulingLocation() {
                try {
-                       Scheduler scheduler = new Scheduler();
+                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                        
                        Instance i1 = getRandomInstance(2);
                        Instance i2 = getRandomInstance(2);

Reply via email to