http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java index eb0aab4..9db330b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java @@ -24,18 +24,11 @@ import static org.mockito.Mockito.mock; import java.io.IOException; -import akka.actor.Actor; -import akka.testkit.TestActorRef; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.actor.Status; -import akka.actor.UntypedActor; -import akka.japi.Creator; -import akka.testkit.JavaTestKit; - import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.BaseTestingInstanceGateway; +import org.apache.flink.runtime.instance.DummyInstanceGateway; +import org.apache.flink.runtime.instance.InstanceGateway; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.api.common.JobID; @@ -46,26 +39,12 @@ import org.apache.flink.runtime.messages.TaskMessages; import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; +import scala.concurrent.ExecutionContext; @SuppressWarnings("serial") public class ExecutionVertexCancelTest { - private static ActorSystem system; - - @BeforeClass - public static void setup(){ - system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); - } - - @AfterClass - public static void teardown(){ - JavaTestKit.shutdownActorSystem(system); - system = null; - } - // -------------------------------------------------------------------------------------------- // Canceling in different states // -------------------------------------------------------------------------------------------- @@ -127,388 +106,350 @@ public class ExecutionVertexCancelTest { @Test public void testCancelConcurrentlyToDeploying_CallsNotOvertaking() { - new JavaTestKit(system){{ - try { - final JobVertexID jid = new JobVertexID(); - final ActionQueue actions = new ActionQueue(); + try { + final JobVertexID jid = new JobVertexID(); - TestingUtils.setExecutionContext(new TestingUtils.QueuedActionExecutionContext( - actions)); + final TestingUtils.QueuedActionExecutionContext executionContext = TestingUtils.queuedActionExecutionContext(); + final TestingUtils.ActionQueue actions = executionContext.actionQueue(); - final ExecutionJobVertex ejv = getExecutionVertex(jid); - final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); - final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); + final ExecutionJobVertex ejv = getExecutionVertex( + jid, + executionContext + ); - setVertexState(vertex, ExecutionState.SCHEDULED); - assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState()); + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], + AkkaUtils.getDefaultTimeout()); + final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); - ActorRef taskManager = TestActorRef.create(system, Props.create(new - CancelSequenceTaskManagerCreator(new TaskOperationResult(execId, true), - new TaskOperationResult(execId, false)))); + setVertexState(vertex, ExecutionState.SCHEDULED); + assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState()); - Instance instance = getInstance(taskManager); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + InstanceGateway instanceGateway = new CancelSequenceInstanceGateway( + executionContext, + new TaskOperationResult(execId, true), + new TaskOperationResult(execId, false)); - vertex.deployToSlot(slot); + Instance instance = getInstance(instanceGateway); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + vertex.deployToSlot(slot); - assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState()); + assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState()); - vertex.cancel(); + vertex.cancel(); - assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); + assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); - // first action happens (deploy) - actions.triggerNextAction(); - assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); + // first action happens (deploy) + actions.triggerNextAction(); + assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); - // the deploy call found itself in canceling after it returned and needs to send a cancel call - // the call did not yet execute, so it is still in canceling - assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); + // the deploy call found itself in canceling after it returned and needs to send a cancel call + // the call did not yet execute, so it is still in canceling + assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); - // second action happens (cancel call from cancel function) - actions.triggerNextAction(); + // second action happens (cancel call from cancel function) + actions.triggerNextAction(); - // TaskManager reports back (canceling done) - vertex.getCurrentExecutionAttempt().cancelingComplete(); + // TaskManager reports back (canceling done) + vertex.getCurrentExecutionAttempt().cancelingComplete(); - // should properly set state to cancelled - assertEquals(ExecutionState.CANCELED, vertex.getExecutionState()); + // should properly set state to cancelled + assertEquals(ExecutionState.CANCELED, vertex.getExecutionState()); - // trigger the correction canceling call - actions.triggerNextAction(); - assertEquals(ExecutionState.CANCELED, vertex.getExecutionState()); + // trigger the correction canceling call + actions.triggerNextAction(); + assertEquals(ExecutionState.CANCELED, vertex.getExecutionState()); - assertTrue(slot.isReleased()); + assertTrue(slot.isReleased()); - assertNull(vertex.getFailureCause()); + assertNull(vertex.getFailureCause()); - assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); - assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0); - assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0); - } - catch(Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - TestingUtils.setGlobalExecutionContext(); - } - }}; + assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); + assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0); + assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0); + } + catch(Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } @Test public void testCancelConcurrentlyToDeploying_CallsOvertaking() { - new JavaTestKit(system){ - { - try { - final JobVertexID jid = new JobVertexID(); - final ActionQueue actions = new ActionQueue(); + try { + final JobVertexID jid = new JobVertexID(); - TestingUtils.setExecutionContext(new TestingUtils - .QueuedActionExecutionContext(actions)); + final TestingUtils.QueuedActionExecutionContext executionContext = TestingUtils.queuedActionExecutionContext(); + final TestingUtils.ActionQueue actions = executionContext.actionQueue(); - final ExecutionJobVertex ejv = getExecutionVertex(jid); + final ExecutionJobVertex ejv = getExecutionVertex(jid, executionContext); - final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); - final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], + AkkaUtils.getDefaultTimeout()); + final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); - setVertexState(vertex, ExecutionState.SCHEDULED); - assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState()); + setVertexState(vertex, ExecutionState.SCHEDULED); + assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState()); - // task manager cancel sequence mock actor - // first return NOT SUCCESS (task not found, cancel call overtook deploy call), then success (cancel call after deploy call) - TestActorRef<? extends Actor> taskManager = TestActorRef.create(system, Props.create(new - CancelSequenceTaskManagerCreator(new - TaskOperationResult(execId, false), new TaskOperationResult(execId, true)))); + // task manager cancel sequence mock actor + // first return NOT SUCCESS (task not found, cancel call overtook deploy call), then success (cancel call after deploy call) + InstanceGateway instanceGateway = new CancelSequenceInstanceGateway( + executionContext, + new TaskOperationResult(execId, false), + new TaskOperationResult(execId, true) + ); - Instance instance = getInstance(taskManager); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + Instance instance = getInstance(instanceGateway); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); - vertex.deployToSlot(slot); + vertex.deployToSlot(slot); - assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState()); + assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState()); - vertex.cancel(); + vertex.cancel(); - assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); + assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); - // first action happens (deploy) - Runnable deployAction = actions.popNextAction(); - Runnable cancelAction = actions.popNextAction(); + // first action happens (deploy) + Runnable deployAction = actions.popNextAction(); + Runnable cancelAction = actions.popNextAction(); - // cancel call first - cancelAction.run(); - // process onComplete callback - actions.triggerNextAction(); + // cancel call first + cancelAction.run(); + // process onComplete callback + actions.triggerNextAction(); - // did not find the task, not properly cancelled, stay in canceling - assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); + // did not find the task, not properly cancelled, stay in canceling + assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); - // deploy action next - deployAction.run(); + // deploy action next + deployAction.run(); - // the deploy call found itself in canceling after it returned and needs to send a cancel call - // the call did not yet execute, so it is still in canceling - assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); + // the deploy call found itself in canceling after it returned and needs to send a cancel call + // the call did not yet execute, so it is still in canceling + assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); - vertex.getCurrentExecutionAttempt().cancelingComplete(); + vertex.getCurrentExecutionAttempt().cancelingComplete(); - assertEquals(ExecutionState.CANCELED, vertex.getExecutionState()); + assertEquals(ExecutionState.CANCELED, vertex.getExecutionState()); - assertTrue(slot.isReleased()); + assertTrue(slot.isReleased()); - assertNull(vertex.getFailureCause()); + assertNull(vertex.getFailureCause()); - assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); - assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0); - assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - }finally{ - TestingUtils.setGlobalExecutionContext(); - } - } - }; + assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); + assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0); + assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } @Test public void testCancelFromRunning() { - new JavaTestKit(system) { - { - try { - TestingUtils.setCallingThreadDispatcher(system); - final JobVertexID jid = new JobVertexID(); - final ExecutionJobVertex ejv = getExecutionVertex(jid); + try { + final JobVertexID jid = new JobVertexID(); + final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext()); - final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); - final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], + AkkaUtils.getDefaultTimeout()); + final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); - final TestActorRef<? extends Actor> taskManager = TestActorRef.create(system, - Props.create(new CancelSequenceTaskManagerCreator(new - TaskOperationResult(execId, true)))); + InstanceGateway instanceGateway = new CancelSequenceInstanceGateway( + TestingUtils.directExecutionContext(), + new TaskOperationResult(execId, true)); - Instance instance = getInstance(taskManager); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + Instance instance = getInstance(instanceGateway); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); - setVertexState(vertex, ExecutionState.RUNNING); - setVertexResource(vertex, slot); + setVertexState(vertex, ExecutionState.RUNNING); + setVertexResource(vertex, slot); - assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); + assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); - vertex.cancel(); - vertex.getCurrentExecutionAttempt().cancelingComplete(); // responce by task manager once actially canceled + vertex.cancel(); + vertex.getCurrentExecutionAttempt().cancelingComplete(); // responce by task manager once actially canceled - assertEquals(ExecutionState.CANCELED, vertex.getExecutionState()); + assertEquals(ExecutionState.CANCELED, vertex.getExecutionState()); - assertTrue(slot.isReleased()); + assertTrue(slot.isReleased()); - assertNull(vertex.getFailureCause()); + assertNull(vertex.getFailureCause()); - assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); - assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0); - assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - }finally{ - TestingUtils.setGlobalExecutionContext(); - } - } - }; + assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); + assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0); + assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } @Test public void testRepeatedCancelFromRunning() { - new JavaTestKit(system) { - { - try { - TestingUtils.setCallingThreadDispatcher(system); + try { - final JobVertexID jid = new JobVertexID(); - final ExecutionJobVertex ejv = getExecutionVertex(jid); + final JobVertexID jid = new JobVertexID(); + final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext()); - final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); - final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], + AkkaUtils.getDefaultTimeout()); + final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); - final ActorRef taskManager = TestActorRef.create(system, Props.create(new - CancelSequenceTaskManagerCreator(new - TaskOperationResult(execId, true)))); + final InstanceGateway instanceGateway = new CancelSequenceInstanceGateway( + TestingUtils.directExecutionContext(), + new TaskOperationResult(execId, true) + ); - Instance instance = getInstance(taskManager); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + Instance instance = getInstance(instanceGateway); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); - setVertexState(vertex, ExecutionState.RUNNING); - setVertexResource(vertex, slot); + setVertexState(vertex, ExecutionState.RUNNING); + setVertexResource(vertex, slot); - assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); + assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); - vertex.cancel(); + vertex.cancel(); - assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); + assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); - vertex.cancel(); + vertex.cancel(); - assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); + assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); - // callback by TaskManager after canceling completes - vertex.getCurrentExecutionAttempt().cancelingComplete(); + // callback by TaskManager after canceling completes + vertex.getCurrentExecutionAttempt().cancelingComplete(); - assertEquals(ExecutionState.CANCELED, vertex.getExecutionState()); + assertEquals(ExecutionState.CANCELED, vertex.getExecutionState()); - assertTrue(slot.isReleased()); + assertTrue(slot.isReleased()); - assertNull(vertex.getFailureCause()); + assertNull(vertex.getFailureCause()); - assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); - assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0); - assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - }finally{ - TestingUtils.setGlobalExecutionContext(); - } - } - }; + assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); + assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0); + assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELED) > 0); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } @Test public void testCancelFromRunningDidNotFindTask() { // this may happen when the task finished or failed while the call was in progress - new JavaTestKit(system) { - { - try { - TestingUtils.setCallingThreadDispatcher(system); - final JobVertexID jid = new JobVertexID(); - final ExecutionJobVertex ejv = getExecutionVertex(jid); + try { + final JobVertexID jid = new JobVertexID(); + final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext()); - final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); - final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], + AkkaUtils.getDefaultTimeout()); + final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); - final ActorRef taskManager = TestActorRef.create(system,Props.create(new - CancelSequenceTaskManagerCreator(new - TaskOperationResult(execId, false)))); - Instance instance = getInstance(taskManager); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + final InstanceGateway instanceGateway = new CancelSequenceInstanceGateway( + TestingUtils.directExecutionContext(), + new TaskOperationResult(execId, false) + ); - setVertexState(vertex, ExecutionState.RUNNING); - setVertexResource(vertex, slot); + Instance instance = getInstance(instanceGateway); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); - assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); + setVertexState(vertex, ExecutionState.RUNNING); + setVertexResource(vertex, slot); - vertex.cancel(); + assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); - assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); + vertex.cancel(); - assertNull(vertex.getFailureCause()); + assertEquals(ExecutionState.CANCELING, vertex.getExecutionState()); - assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); - assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - }finally{ - TestingUtils.setGlobalExecutionContext(); - } - } - }; + assertNull(vertex.getFailureCause()); + + assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); + assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } @Test public void testCancelCallFails() { - new JavaTestKit(system) { - { - try { - TestingUtils.setCallingThreadDispatcher(system); - final JobVertexID jid = new JobVertexID(); - final ExecutionJobVertex ejv = getExecutionVertex(jid); + try { + final JobVertexID jid = new JobVertexID(); + final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext()); - final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], + AkkaUtils.getDefaultTimeout()); - final ActorRef taskManager = TestActorRef.create(system, Props.create(new - CancelSequenceTaskManagerCreator())); + final InstanceGateway gateway = new CancelSequenceInstanceGateway(TestingUtils.directExecutionContext()); - Instance instance = getInstance(taskManager); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + Instance instance = getInstance(gateway); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); - setVertexState(vertex, ExecutionState.RUNNING); - setVertexResource(vertex, slot); + setVertexState(vertex, ExecutionState.RUNNING); + setVertexResource(vertex, slot); - assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); + assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); - vertex.cancel(); + vertex.cancel(); - assertEquals(ExecutionState.FAILED, vertex.getExecutionState()); + assertEquals(ExecutionState.FAILED, vertex.getExecutionState()); - assertTrue(slot.isReleased()); + assertTrue(slot.isReleased()); - assertNotNull(vertex.getFailureCause()); + assertNotNull(vertex.getFailureCause()); - assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); - assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0); - assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - }finally{ - TestingUtils.setGlobalExecutionContext(); - } - } - }; + assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); + assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0); + assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } @Test public void testSendCancelAndReceiveFail() { - new JavaTestKit(system) { - { - try { - final JobVertexID jid = new JobVertexID(); - final ExecutionJobVertex ejv = getExecutionVertex(jid); + try { + final JobVertexID jid = new JobVertexID(); + final ExecutionJobVertex ejv = getExecutionVertex(jid); - final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); - final ExecutionAttemptID execID = vertex.getCurrentExecutionAttempt().getAttemptId(); + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], + AkkaUtils.getDefaultTimeout()); + final ExecutionAttemptID execID = vertex.getCurrentExecutionAttempt().getAttemptId(); - final ActorRef taskManager = system.actorOf( - Props.create(new CancelSequenceTaskManagerCreator( - new TaskOperationResult(execID, true) - ))); + final InstanceGateway gateway = new CancelSequenceInstanceGateway( + TestingUtils.defaultExecutionContext(), + new TaskOperationResult(execID, true)); - Instance instance = getInstance(taskManager); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + Instance instance = getInstance(gateway); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); - setVertexState(vertex, ExecutionState.RUNNING); - setVertexResource(vertex, slot); + setVertexState(vertex, ExecutionState.RUNNING); + setVertexResource(vertex, slot); - assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); + assertEquals(ExecutionState.RUNNING, vertex.getExecutionState()); - vertex.cancel(); + vertex.cancel(); - assertTrue(vertex.getExecutionState() == ExecutionState.CANCELING || vertex.getExecutionState() == ExecutionState.FAILED); + assertTrue(vertex.getExecutionState() == ExecutionState.CANCELING || vertex.getExecutionState() == ExecutionState.FAILED); - vertex.getCurrentExecutionAttempt().markFailed(new Throwable("test")); + vertex.getCurrentExecutionAttempt().markFailed(new Throwable("test")); - assertTrue(vertex.getExecutionState() == ExecutionState.CANCELED || vertex.getExecutionState() == ExecutionState.FAILED); + assertTrue(vertex.getExecutionState() == ExecutionState.CANCELED || vertex.getExecutionState() == ExecutionState.FAILED); - assertTrue(slot.isReleased()); + assertTrue(slot.isReleased()); - assertEquals(0, vertex.getExecutionGraph().getRegisteredExecutions().size()); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - }; + assertEquals(0, vertex.getExecutionGraph().getRegisteredExecutions().size()); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } } // -------------------------------------------------------------------------------------------- @@ -541,7 +482,7 @@ public class ExecutionVertexCancelTest { // deploying after canceling from CREATED needs to raise an exception, because // the scheduler (or any caller) needs to know that the slot should be released try { - Instance instance = getInstance(ActorRef.noSender()); + Instance instance = getInstance(DummyInstanceGateway.INSTANCE); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); vertex.deployToSlot(slot); @@ -584,7 +525,7 @@ public class ExecutionVertexCancelTest { AkkaUtils.getDefaultTimeout()); setVertexState(vertex, ExecutionState.CANCELING); - Instance instance = getInstance(ActorRef.noSender()); + Instance instance = getInstance(DummyInstanceGateway.INSTANCE); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); vertex.deployToSlot(slot); @@ -600,7 +541,7 @@ public class ExecutionVertexCancelTest { ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - Instance instance = getInstance(ActorRef.noSender()); + Instance instance = getInstance(DummyInstanceGateway.INSTANCE); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexResource(vertex, slot); @@ -621,39 +562,33 @@ public class ExecutionVertexCancelTest { } } - public static class CancelSequenceTaskManagerCreator implements Creator<CancelSequenceTaskManager> { - private final TaskOperationResult[] results; - public CancelSequenceTaskManagerCreator(TaskOperationResult ... results){ - this.results = results; - } - - @Override - public CancelSequenceTaskManager create() throws Exception { - return new CancelSequenceTaskManager(results); - } - } - - public static class CancelSequenceTaskManager extends UntypedActor{ + public static class CancelSequenceInstanceGateway extends BaseTestingInstanceGateway { private final TaskOperationResult[] results; - private int index; + private int index = -1; - public CancelSequenceTaskManager(TaskOperationResult[] results){ - this.results = results; - index = -1; + public CancelSequenceInstanceGateway(ExecutionContext executionContext, TaskOperationResult ... result) { + super(executionContext); + this.results = result; } @Override - public void onReceive(Object message) throws Exception { - if(message instanceof TaskMessages.SubmitTask){ - getSender().tell(Messages.getAcknowledge(), getSelf()); - }else if(message instanceof TaskMessages.CancelTask){ + public Object handleMessage(Object message) throws Exception { + Object result; + if(message instanceof TaskMessages.SubmitTask) { + result = Messages.getAcknowledge(); + } else if(message instanceof TaskMessages.CancelTask) { index++; + if(index >= results.length){ - getSender().tell(new Status.Failure(new IOException("RPC call failed.")), getSelf()); - }else { - getSender().tell(results[index], getSelf()); + throw new IOException("RPC call failed."); + } else { + result = results[index]; } + } else { + result = null; } + + return result; } } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index eadf328..431c3a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -22,13 +22,6 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*; import static org.junit.Assert.*; -import akka.actor.Actor; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.testkit.JavaTestKit; -import akka.testkit.TestActorRef; - import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.Instance; @@ -37,37 +30,20 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; public class ExecutionVertexDeploymentTest { - private static ActorSystem system; - - @BeforeClass - public static void setup(){ - system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); - } - - @AfterClass - public static void teardown(){ - JavaTestKit.shutdownActorSystem(system); - } - @Test public void testDeployCall() { try { final JobVertexID jid = new JobVertexID(); - TestingUtils.setCallingThreadDispatcher(system); - ActorRef tm = TestActorRef.create(system, Props.create(SimpleAcknowledgingTaskManager - .class)); - final ExecutionJobVertex ejv = getExecutionVertex(jid); // mock taskmanager to simply accept the call - Instance instance = getInstance(tm); + Instance instance = getInstance( + new SimpleInstanceGateway(TestingUtils.directExecutionContext())); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], @@ -95,24 +71,17 @@ public class ExecutionVertexDeploymentTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - TestingUtils.setGlobalExecutionContext(); - } } @Test public void testDeployWithSynchronousAnswer() { try { - TestingUtils.setCallingThreadDispatcher(system); - final JobVertexID jid = new JobVertexID(); - final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system, - Props.create(SimpleAcknowledgingTaskManager.class)); - - final ExecutionJobVertex ejv = getExecutionVertex(jid); + final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext()); - final Instance instance = getInstance(simpleTaskManager); + final Instance instance = getInstance( + new SimpleInstanceGateway(TestingUtils.directExecutionContext())); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], @@ -143,9 +112,6 @@ public class ExecutionVertexDeploymentTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - TestingUtils.setGlobalExecutionContext(); - } } @Test @@ -157,10 +123,8 @@ public class ExecutionVertexDeploymentTest { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system, - Props.create(SimpleAcknowledgingTaskManager.class)); - - final Instance instance = getInstance(simpleTaskManager); + final Instance instance = getInstance( + new SimpleInstanceGateway(TestingUtils.defaultExecutionContext())); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -200,18 +164,14 @@ public class ExecutionVertexDeploymentTest { @Test public void testDeployFailedSynchronous() { try { - TestingUtils.setCallingThreadDispatcher(system); - final JobVertexID jid = new JobVertexID(); - final ExecutionJobVertex ejv = getExecutionVertex(jid); + final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system, - Props.create(SimpleFailingTaskManager.class)); - - final Instance instance = getInstance(simpleTaskManager); + final Instance instance = getInstance( + new SimpleFailingInstanceGateway(TestingUtils.directExecutionContext())); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -230,9 +190,6 @@ public class ExecutionVertexDeploymentTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - TestingUtils.setGlobalExecutionContext(); - } } @Test @@ -243,10 +200,8 @@ public class ExecutionVertexDeploymentTest { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system, - Props.create(SimpleFailingTaskManager.class)); - - final Instance instance = getInstance(simpleTaskManager); + final Instance instance = getInstance( + new SimpleFailingInstanceGateway(TestingUtils.directExecutionContext())); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -280,20 +235,16 @@ public class ExecutionVertexDeploymentTest { public void testFailExternallyDuringDeploy() { try { final JobVertexID jid = new JobVertexID(); - final ExecutionJobVertex ejv = getExecutionVertex(jid); - final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], - AkkaUtils.getDefaultTimeout()); + final TestingUtils.QueuedActionExecutionContext ec = TestingUtils.queuedActionExecutionContext(); + final TestingUtils.ActionQueue queue = ec.actionQueue(); - final ActionQueue queue = new ActionQueue(); - final TestingUtils.QueuedActionExecutionContext ec = new TestingUtils - .QueuedActionExecutionContext(queue); + final ExecutionJobVertex ejv = getExecutionVertex(jid, ec); - TestingUtils.setExecutionContext(ec); + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], + AkkaUtils.getDefaultTimeout()); - final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system, - Props.create(SimpleAcknowledgingTaskManager.class)); - final Instance instance = getInstance(simpleTaskManager); + final Instance instance = getInstance(new SimpleInstanceGateway(TestingUtils.directExecutionContext())); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -307,6 +258,7 @@ public class ExecutionVertexDeploymentTest { assertEquals(testError, vertex.getFailureCause()); queue.triggerNextAction(); + queue.triggerNextAction(); assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0); assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0); @@ -316,34 +268,29 @@ public class ExecutionVertexDeploymentTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - TestingUtils.setGlobalExecutionContext(); - } } @Test public void testFailCallOvertakesDeploymentAnswer() { try { - ActionQueue queue = new ActionQueue(); - TestingUtils.QueuedActionExecutionContext context = new TestingUtils - .QueuedActionExecutionContext(queue); - - TestingUtils.setExecutionContext(context); + final TestingUtils.QueuedActionExecutionContext context = TestingUtils.queuedActionExecutionContext(); + final TestingUtils.ActionQueue queue = context.actionQueue(); final JobVertexID jid = new JobVertexID(); - final ExecutionJobVertex ejv = getExecutionVertex(jid); + final ExecutionJobVertex ejv = getExecutionVertex(jid, context); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId(); - final TestActorRef<? extends Actor> simpleTaskManager = TestActorRef.create(system, Props.create(new - ExecutionVertexCancelTest.CancelSequenceTaskManagerCreator(new - TaskOperationResult(eid, false), new TaskOperationResult(eid, true)))); + final Instance instance = getInstance( + new ExecutionVertexCancelTest.CancelSequenceInstanceGateway( + context, + new TaskOperationResult(eid, false), + new TaskOperationResult(eid, true))); - final Instance instance = getInstance(simpleTaskManager); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -361,11 +308,14 @@ public class ExecutionVertexDeploymentTest { Runnable cancel1 = queue.popNextAction(); cancel1.run(); - // execute onComplete callback + // execute onComplete callback of cancel queue.triggerNextAction(); deploy.run(); + // execute onComplete callback of deploy + queue.triggerNextAction(); + assertEquals(ExecutionState.FAILED, vertex.getExecutionState()); assertEquals(testError, vertex.getFailureCause()); @@ -380,8 +330,5 @@ public class ExecutionVertexDeploymentTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - TestingUtils.setGlobalExecutionContext(); - } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index 1e9c30b..8ea7017 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -23,14 +23,9 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge import static org.junit.Assert.*; import static org.mockito.Mockito.*; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.testkit.JavaTestKit; -import akka.testkit.TestActorRef; - import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.DummyInstanceGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -38,28 +33,12 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture; import org.apache.flink.runtime.testingUtils.TestingUtils; - -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Matchers; public class ExecutionVertexSchedulingTest { - private static ActorSystem system; - - @BeforeClass - public static void setup(){ - system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); - } - - @AfterClass - public static void teardown(){ - JavaTestKit.shutdownActorSystem(system); - system = null; - } - @Test public void testSlotReleasedWhenScheduledImmediately() { try { @@ -68,7 +47,7 @@ public class ExecutionVertexSchedulingTest { AkkaUtils.getDefaultTimeout()); // a slot than cannot be deployed to - final Instance instance = getInstance(ActorRef.noSender()); + final Instance instance = getInstance(DummyInstanceGateway.INSTANCE); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); slot.releaseSlot(); @@ -98,7 +77,7 @@ public class ExecutionVertexSchedulingTest { AkkaUtils.getDefaultTimeout()); // a slot than cannot be deployed to - final Instance instance = getInstance(ActorRef.noSender()); + final Instance instance = getInstance(DummyInstanceGateway.INSTANCE); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); slot.releaseSlot(); @@ -134,11 +113,7 @@ public class ExecutionVertexSchedulingTest { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - TestingUtils.setCallingThreadDispatcher(system); - ActorRef tm = TestActorRef.create(system, Props.create(ExecutionGraphTestUtils - .SimpleAcknowledgingTaskManager.class)); - - final Instance instance = getInstance(tm); + final Instance instance = getInstance(new ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.defaultExecutionContext())); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); Scheduler scheduler = mock(Scheduler.class); @@ -153,8 +128,6 @@ public class ExecutionVertexSchedulingTest { catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); - }finally{ - TestingUtils.setGlobalExecutionContext(); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java index 0d2ffeb..b4a7e63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java @@ -39,39 +39,14 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import scala.concurrent.duration.FiniteDuration; -import akka.actor.Actor; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.testkit.JavaTestKit; -import akka.testkit.TestActorRef; public class LocalInputSplitsTest { private static final FiniteDuration TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS); - private static ActorSystem system; - - private static TestActorRef<? extends Actor> taskManager; - - - @BeforeClass - public static void setup() { - system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); - taskManager = TestActorRef.create(system, - Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class)); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - system = null; - } - // -------------------------------------------------------------------------------------------- @Test @@ -290,14 +265,18 @@ public class LocalInputSplitsTest { JobGraph jobGraph = new JobGraph("test job", vertex); - ExecutionGraph eg = new ExecutionGraph(jobGraph.getJobID(), - jobGraph.getName(), jobGraph.getJobConfiguration(), TIMEOUT); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobGraph.getJobID(), + jobGraph.getName(), + jobGraph.getJobConfiguration(), + TIMEOUT); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); eg.setQueuedSchedulingAllowed(false); // create a scheduler with 6 instances where always two are on the same host - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); Instance i1 = getInstance(new byte[] {10,0,1,1}, 12345, "host1", 1); Instance i2 = getInstance(new byte[] {10,0,1,1}, 12346, "host1", 1); Instance i3 = getInstance(new byte[] {10,0,1,2}, 12345, "host2", 1); @@ -349,8 +328,12 @@ public class LocalInputSplitsTest { JobGraph jobGraph = new JobGraph("test job", vertex); - ExecutionGraph eg = new ExecutionGraph(jobGraph.getJobID(), - jobGraph.getName(), jobGraph.getJobConfiguration(), TIMEOUT); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobGraph.getJobID(), + jobGraph.getName(), + jobGraph.getJobConfiguration(), + TIMEOUT); eg.setQueuedSchedulingAllowed(false); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); @@ -370,7 +353,7 @@ public class LocalInputSplitsTest { } private static Scheduler getScheduler(int numInstances, int numSlotsPerInstance) throws Exception { - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); for (int i = 0; i < numInstances; i++) { byte[] ipAddress = new byte[] { 10, 0, 1, (byte) (1 + i) }; @@ -393,7 +376,13 @@ public class LocalInputSplitsTest { when(connection.getHostname()).thenReturn(hostname); when(connection.getFQDNHostname()).thenReturn(hostname); - return new Instance(taskManager, connection, new InstanceID(), hardwareDescription, slots); + return new Instance( + new ExecutionGraphTestUtils.SimpleInstanceGateway( + TestingUtils.defaultExecutionContext()), + connection, + new InstanceID(), + hardwareDescription, + slots); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java index 4677bf8..3cedb63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; import java.util.ArrayList; @@ -56,7 +57,12 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -91,7 +97,12 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -127,7 +138,12 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -164,7 +180,12 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -199,7 +220,12 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -254,7 +280,12 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -300,7 +331,12 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java index 376ff14..b779d79 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java @@ -18,11 +18,10 @@ package org.apache.flink.runtime.executiongraph; -import akka.actor.ActorRef; - import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.DummyInstanceGateway; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; @@ -34,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; import scala.concurrent.duration.FiniteDuration; @@ -78,7 +78,7 @@ public class TerminalStateDeadlockTest { InstanceConnectionInfo ci = new InstanceConnectionInfo(address, 12345); HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000); - Instance instance = new Instance(ActorRef.noSender(), ci, new InstanceID(), resources, 4); + Instance instance = new Instance(DummyInstanceGateway.INSTANCE, ci, new InstanceID(), resources, 4); this.resource = instance.allocateSimpleSlot(new JobID()); } @@ -116,7 +116,7 @@ public class TerminalStateDeadlockTest { vertices = Arrays.asList(v1, v2); } - final Scheduler scheduler = new Scheduler(); + final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); final Executor executor = Executors.newFixedThreadPool(4); @@ -181,7 +181,7 @@ public class TerminalStateDeadlockTest { private volatile boolean done; TestExecGraph(JobID jobId) { - super(jobId, "test graph", EMPTY_CONFIG, TIMEOUT); + super(TestingUtils.defaultExecutionContext(), jobId, "test graph", EMPTY_CONFIG, TIMEOUT); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java index 756b9a4..3305254 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.concurrent.TimeUnit; +import org.apache.flink.runtime.instance.DummyInstanceGateway; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; @@ -39,42 +40,14 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import scala.concurrent.duration.FiniteDuration; -import akka.actor.Actor; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.testkit.JavaTestKit; -import akka.testkit.TestActorRef; public class VertexLocationConstraintTest { private static final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS); - private static ActorSystem system; - - private static TestActorRef<? extends Actor> taskManager; - - - @BeforeClass - public static void setup() { - system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); - - taskManager = TestActorRef.create(system, - Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class)); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - system = null; - } - - @Test public void testScheduleWithConstraint1() { try { @@ -91,7 +64,7 @@ public class VertexLocationConstraintTest { Instance instance2 = getInstance(address2, 6789, hostname2); Instance instance3 = getInstance(address3, 6789, hostname3); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); scheduler.newInstanceAvailable(instance1); scheduler.newInstanceAvailable(instance2); scheduler.newInstanceAvailable(instance3); @@ -102,7 +75,12 @@ public class VertexLocationConstraintTest { jobVertex.setParallelism(2); JobGraph jg = new JobGraph("test job", jobVertex); - ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jg.getJobID(), + jg.getName(), + jg.getJobConfiguration(), + timeout); eg.attachJobGraph(Collections.singletonList(jobVertex)); ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID()); @@ -157,7 +135,7 @@ public class VertexLocationConstraintTest { Instance instance2 = getInstance(address2, 6789, hostname2); Instance instance3 = getInstance(address3, 6789, hostname3); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); scheduler.newInstanceAvailable(instance1); scheduler.newInstanceAvailable(instance2); scheduler.newInstanceAvailable(instance3); @@ -168,7 +146,12 @@ public class VertexLocationConstraintTest { jobVertex.setParallelism(2); JobGraph jg = new JobGraph("test job", jobVertex); - ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jg.getJobID(), + jg.getName(), + jg.getJobConfiguration(), + timeout); eg.attachJobGraph(Collections.singletonList(jobVertex)); ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID()); @@ -219,7 +202,7 @@ public class VertexLocationConstraintTest { Instance instance2 = getInstance(address2, 6789, hostname2); Instance instance3 = getInstance(address3, 6789, hostname3); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); scheduler.newInstanceAvailable(instance1); scheduler.newInstanceAvailable(instance2); scheduler.newInstanceAvailable(instance3); @@ -238,7 +221,12 @@ public class VertexLocationConstraintTest { JobGraph jg = new JobGraph("test job", jobVertex1, jobVertex2); - ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jg.getJobID(), + jg.getName(), + jg.getJobConfiguration(), + timeout); eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2)); ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID()); @@ -290,7 +278,7 @@ public class VertexLocationConstraintTest { Instance instance1 = getInstance(address1, 6789, hostname1); Instance instance2 = getInstance(address2, 6789, hostname2); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); scheduler.newInstanceAvailable(instance1); // prepare the execution graph @@ -299,7 +287,12 @@ public class VertexLocationConstraintTest { jobVertex.setParallelism(1); JobGraph jg = new JobGraph("test job", jobVertex); - ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jg.getJobID(), + jg.getName(), + jg.getJobConfiguration(), + timeout); eg.attachJobGraph(Collections.singletonList(jobVertex)); ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID()); @@ -343,7 +336,7 @@ public class VertexLocationConstraintTest { Instance instance1 = getInstance(address1, 6789, hostname1); Instance instance2 = getInstance(address2, 6789, hostname2); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); scheduler.newInstanceAvailable(instance1); // prepare the execution graph @@ -362,7 +355,12 @@ public class VertexLocationConstraintTest { jobVertex1.setSlotSharingGroup(sharingGroup); jobVertex2.setSlotSharingGroup(sharingGroup); - ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jg.getJobID(), + jg.getName(), + jg.getJobConfiguration(), + timeout); eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2)); ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID()); @@ -395,12 +393,17 @@ public class VertexLocationConstraintTest { JobVertex vertex = new JobVertex("test vertex", new JobVertexID()); JobGraph jg = new JobGraph("test job", vertex); - ExecutionGraph eg = new ExecutionGraph(jg.getJobID(), jg.getName(), jg.getJobConfiguration(), timeout); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jg.getJobID(), + jg.getName(), + jg.getJobConfiguration(), + timeout); eg.attachJobGraph(Collections.singletonList(vertex)); ExecutionVertex ev = eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0]; - Instance instance = ExecutionGraphTestUtils.getInstance(ActorRef.noSender()); + Instance instance = ExecutionGraphTestUtils.getInstance(DummyInstanceGateway.INSTANCE); ev.setLocationConstraintHosts(Collections.singletonList(instance)); assertNotNull(ev.getPreferredLocations()); @@ -431,6 +434,12 @@ public class VertexLocationConstraintTest { when(connection.getHostname()).thenReturn(hostname); when(connection.getFQDNHostname()).thenReturn(hostname); - return new Instance(taskManager, connection, new InstanceID(), hardwareDescription, 1); + return new Instance( + new ExecutionGraphTestUtils.SimpleInstanceGateway( + TestingUtils.defaultExecutionContext()), + connection, + new InstanceID(), + hardwareDescription, + 1); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java index a1d6d03..d9e422c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; public class VertexSlotSharingTest { @@ -68,7 +69,11 @@ public class VertexSlotSharingTest { List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); - ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration(), + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + new JobID(), + "test job", + new Configuration(), AkkaUtils.getDefaultTimeout()); eg.attachJobGraph(vertices); http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java new file mode 100644 index 0000000..e9f8259 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import akka.actor.ActorPath; +import akka.actor.ActorRef; +import akka.dispatch.Futures; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.Callable; + +/** + * Abstract base class for testing {@link InstanceGateway} instances. The implementing subclass + * only has to provide an implementation for handleMessage which contains the logic to treat + * different messages. + */ +abstract public class BaseTestingInstanceGateway implements InstanceGateway { + /** + * {@link ExecutionContext} which is used to execute the futures. + */ + private final ExecutionContext executionContext; + + public BaseTestingInstanceGateway(ExecutionContext executionContext) { + this.executionContext = executionContext; + } + + @Override + public Future<Object> ask(Object message, FiniteDuration timeout) { + try { + final Object result = handleMessage(message); + + return Futures.future(new Callable<Object>() { + @Override + public Object call() throws Exception { + return result; + } + }, executionContext); + + } catch (final Exception e) { + // if an exception occurred in the handleMessage method then return it as part of the future + return Futures.future(new Callable<Object>() { + @Override + public Object call() throws Exception { + throw e; + } + }, executionContext); + } + } + + /** + * Handles the supported messages by this InstanceGateway + * + * @param message Message to handle + * @return Result + * @throws Exception + */ + abstract public Object handleMessage(Object message) throws Exception; + + @Override + public void tell(Object message) {} + + @Override + public void forward(Object message, ActorRef sender) { + throw new UnsupportedOperationException(); + } + + @Override + public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) { + return ask(message, timeout); + } + + @Override + public String path() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java new file mode 100644 index 0000000..5941201 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import akka.actor.ActorPath; +import akka.actor.ActorRef; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +/** + * Dummy {@link InstanceGateway} implementation used for testing. + */ +public class DummyInstanceGateway implements InstanceGateway { + public static final DummyInstanceGateway INSTANCE = new DummyInstanceGateway(); + + @Override + public Future<Object> ask(Object message, FiniteDuration timeout) { + throw new UnsupportedOperationException(); + } + + @Override + public void tell(Object message) { + throw new UnsupportedOperationException(); + } + + @Override + public void forward(Object message, ActorRef sender) { + throw new UnsupportedOperationException(); + } + + @Override + public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) { + throw new UnsupportedOperationException(); + } + + @Override + public String path() { + return "DummyInstanceGateway"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java index 595ac7e..c075a17 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.*; import java.lang.reflect.Method; import java.net.InetAddress; -import akka.actor.ActorRef; import org.apache.flink.api.common.JobID; import org.junit.Test; @@ -39,7 +38,7 @@ public class InstanceTest { InetAddress address = InetAddress.getByName("127.0.0.1"); InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001); - Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 4); + Instance instance = new Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 4); assertEquals(4, instance.getTotalNumberOfSlots()); assertEquals(4, instance.getNumberOfAvailableSlots()); @@ -100,7 +99,7 @@ public class InstanceTest { InetAddress address = InetAddress.getByName("127.0.0.1"); InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001); - Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 3); + Instance instance = new Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 3); assertEquals(3, instance.getNumberOfAvailableSlots()); @@ -130,7 +129,7 @@ public class InstanceTest { InetAddress address = InetAddress.getByName("127.0.0.1"); InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001); - Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 3); + Instance instance = new Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 3); assertEquals(3, instance.getNumberOfAvailableSlots()); http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java index e075ab6..29ec7b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java @@ -23,8 +23,6 @@ import static org.junit.Assert.*; import java.net.InetAddress; -import akka.actor.ActorRef; - import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.api.common.JobID; @@ -148,7 +146,7 @@ public class SimpleSlotTest { InetAddress address = InetAddress.getByName("127.0.0.1"); InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001); - Instance instance = new Instance(ActorRef.noSender(), connection, new InstanceID(), hardwareDescription, 1); + Instance instance = new Instance(DummyInstanceGateway.INSTANCE, connection, new InstanceID(), hardwareDescription, 1); return instance.allocateSimpleSlot(new JobID()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index 7739dea..6affdcc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.net.NetUtils; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; import org.mockito.Mockito; import scala.Some; @@ -58,7 +59,10 @@ public class NetworkEnvironmentTest { NUM_BUFFERS, BUFFER_SIZE, IOManager.IOMode.SYNC, new Some<NettyConfig>(nettyConf), new Tuple2<Integer, Integer>(0, 0)); - NetworkEnvironment env = new NetworkEnvironment(new FiniteDuration(30, TimeUnit.SECONDS), config); + NetworkEnvironment env = new NetworkEnvironment( + TestingUtils.defaultExecutionContext(), + new FiniteDuration(30, TimeUnit.SECONDS), + config); assertFalse(env.isShutdown()); assertFalse(env.isAssociated()); http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java index a15e477..676b2a3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java @@ -27,41 +27,22 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; - import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; public class ScheduleWithCoLocationHintTest { - private static ActorSystem system; - - @BeforeClass - public static void setup(){ - system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); - TestingUtils.setCallingThreadDispatcher(system); - } - - @AfterClass - public static void teardown(){ - TestingUtils.setGlobalExecutionContext(); - JavaTestKit.shutdownActorSystem(system); - } - @Test public void scheduleAllSharedAndCoLocated() { try { JobVertexID jid1 = new JobVertexID(); JobVertexID jid2 = new JobVertexID(); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); scheduler.newInstanceAvailable(getRandomInstance(2)); scheduler.newInstanceAvailable(getRandomInstance(2)); @@ -187,7 +168,7 @@ public class ScheduleWithCoLocationHintTest { JobVertexID jid3 = new JobVertexID(); JobVertexID jid4 = new JobVertexID(); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); @@ -231,7 +212,7 @@ public class ScheduleWithCoLocationHintTest { JobVertexID jid2 = new JobVertexID(); JobVertexID jid3 = new JobVertexID(); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); @@ -276,7 +257,7 @@ public class ScheduleWithCoLocationHintTest { JobVertexID jid3 = new JobVertexID(); JobVertexID jid4 = new JobVertexID(); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); scheduler.newInstanceAvailable(getRandomInstance(1)); scheduler.newInstanceAvailable(getRandomInstance(1)); scheduler.newInstanceAvailable(getRandomInstance(1)); @@ -338,7 +319,7 @@ public class ScheduleWithCoLocationHintTest { JobVertexID jid2 = new JobVertexID(); JobVertexID jid3 = new JobVertexID(); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); @@ -407,7 +388,7 @@ public class ScheduleWithCoLocationHintTest { JobVertexID jid1 = new JobVertexID(); JobVertexID jid2 = new JobVertexID(); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); @@ -461,7 +442,7 @@ public class ScheduleWithCoLocationHintTest { JobVertexID jid2 = new JobVertexID(); JobVertexID jidx = new JobVertexID(); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); @@ -519,7 +500,7 @@ public class ScheduleWithCoLocationHintTest { JobVertexID jid1 = new JobVertexID(); JobVertexID jid2 = new JobVertexID(); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); @@ -581,7 +562,7 @@ public class ScheduleWithCoLocationHintTest { JobVertexID jid1 = new JobVertexID(); JobVertexID jid2 = new JobVertexID(); - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java index d19299b..2ee53d9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java @@ -25,11 +25,7 @@ import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.g import static org.junit.Assert.*; import org.apache.flink.runtime.instance.SimpleSlot; -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import java.util.ArrayList; @@ -47,24 +43,11 @@ import org.apache.flink.runtime.instance.Instance; * Tests for the {@link Scheduler} when scheduling individual tasks. */ public class SchedulerIsolatedTasksTest { - private static ActorSystem system; - @BeforeClass - public static void setup(){ - system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); - TestingUtils.setCallingThreadDispatcher(system); - } - - @AfterClass - public static void teardown(){ - TestingUtils.setGlobalExecutionContext(); - JavaTestKit.shutdownActorSystem(system); - } - @Test public void testAddAndRemoveInstance() { try { - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); Instance i1 = getRandomInstance(2); Instance i2 = getRandomInstance(2); @@ -128,7 +111,7 @@ public class SchedulerIsolatedTasksTest { @Test public void testScheduleImmediately() { try { - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); assertEquals(0, scheduler.getNumberOfAvailableSlots()); scheduler.newInstanceAvailable(getRandomInstance(2)); @@ -197,12 +180,10 @@ public class SchedulerIsolatedTasksTest { final int NUM_SLOTS_PER_INSTANCE = 3; final int NUM_TASKS_TO_SCHEDULE = 2000; - TestingUtils.setGlobalExecutionContext(); - try { // note: since this test asynchronously releases slots, the executor needs release workers. // doing the release call synchronous can lead to a deadlock - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); for (int i = 0; i < NUM_INSTANCES; i++) { scheduler.newInstanceAvailable(getRandomInstance((int) (Math.random() * NUM_SLOTS_PER_INSTANCE) + 1)); @@ -287,15 +268,13 @@ public class SchedulerIsolatedTasksTest { } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); - } finally { - TestingUtils.setCallingThreadDispatcher(system); } } @Test public void testScheduleWithDyingInstances() { try { - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); Instance i1 = getRandomInstance(2); Instance i2 = getRandomInstance(2); @@ -355,7 +334,7 @@ public class SchedulerIsolatedTasksTest { @Test public void testSchedulingLocation() { try { - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); Instance i1 = getRandomInstance(2); Instance i2 = getRandomInstance(2);
