[
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14713557#comment-14713557
]
ASF GitHub Bot commented on FLINK-2111:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/750#discussion_r37990019
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
---
@@ -363,7 +363,135 @@ protected void run() {
}
}};
}
-
+
+ @Test
+ public void testJobSubmissionAndStop() {
+
+ LOG.info(
"--------------------------------------------------------------------\n" +
+ " Starting
testJobSubmissionAndStop() \n" +
+
"--------------------------------------------------------------------");
+
+ new JavaTestKit(system){{
+
+ ActorRef jobManager = null;
+ ActorRef taskManager = null;
+ try {
+ jobManager =
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID.get()));
+ taskManager = createTaskManager(jobManager,
true);
+
+ final JobID jid1 = new JobID();
+ final JobID jid2 = new JobID();
+
+ JobVertexID vid1 = new JobVertexID();
+ JobVertexID vid2 = new JobVertexID();
+
+ final ExecutionAttemptID eid1 = new
ExecutionAttemptID();
+ final ExecutionAttemptID eid2 = new
ExecutionAttemptID();
+
+ final TaskDeploymentDescriptor tdd1 = new
TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5,
+ new Configuration(), new
Configuration(), StopableInvokable.class.getName(),
+
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+
Collections.<InputGateDeploymentDescriptor>emptyList(),
+ new ArrayList<BlobKey>(), 0);
+
+ final TaskDeploymentDescriptor tdd2 = new
TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7,
+ new Configuration(), new
Configuration(), TestInvokableBlockingCancelable.class.getName(),
+
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+
Collections.<InputGateDeploymentDescriptor>emptyList(),
+ new ArrayList<BlobKey>(), 0);
+
+ final ActorRef tm = taskManager;
+
+ new Within(d) {
+
+ @Override
+ protected void run() {
+ try {
+ Future<Object>
t1Running = Patterns.ask(
+ tm,
+ new
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1),
+
timeout);
+ Future<Object>
t2Running = Patterns.ask(
+ tm,
+ new
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2),
+
timeout);
+
+
tm.tell(decorator.decorate(new SubmitTask(tdd1)), getRef());
+
tm.tell(decorator.decorate(new SubmitTask(tdd2)), getRef());
+
+
expectMsgEquals(Messages.getAcknowledge());
+
expectMsgEquals(Messages.getAcknowledge());
+
+ Await.ready(t1Running,
d);
+ Await.ready(t2Running,
d);
+
+
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
+
+ Map<ExecutionAttemptID,
Task> runningTasks = expectMsgClass(TestingTaskManagerMessages
+
.ResponseRunningTasks.class).asJava();
+
+ assertEquals(2,
runningTasks.size());
+ Task t1 =
runningTasks.get(eid1);
+ Task t2 =
runningTasks.get(eid2);
+ assertNotNull(t1);
+ assertNotNull(t2);
+
+
assertEquals(ExecutionState.RUNNING, t1.getExecutionState());
+
assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
+
+
tm.tell(decorator.decorate(new StopTask(eid1)), getRef());
+
+ expectMsgEquals(new
TaskOperationResult(eid1, true));
+
+ Future<Object> response
= Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
+
timeout);
+ Await.ready(response,
d);
+
+
assertEquals(ExecutionState.FINISHED, t1.getExecutionState());
+
+
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
+ runningTasks =
expectMsgClass(TestingTaskManagerMessages
+
.ResponseRunningTasks.class).asJava();
+
+ assertEquals(1,
runningTasks.size());
+
+
tm.tell(decorator.decorate(new StopTask(eid1)), getRef());
+ expectMsgEquals(new
TaskOperationResult(eid1, false, "No task with that execution ID was " +
+
"found."));
+
+
tm.tell(decorator.decorate(new StopTask(eid2)), getRef());
+ expectMsgEquals(new
TaskOperationResult(eid2, false, "Stopping not supported by this task."));
+
+
assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
+
+
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
+ runningTasks =
expectMsgClass(TestingTaskManagerMessages
+
.ResponseRunningTasks.class).asJava();
+
+ assertEquals(1,
runningTasks.size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ };
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
--- End diff --
stack trace
> Add "stop" signal to cleanly shutdown streaming jobs
> ----------------------------------------------------
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Runtime, JobManager, Local Runtime,
> Streaming, TaskManager, Webfrontend
> Reporter: Matthias J. Sax
> Assignee: Matthias J. Sax
> Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks
> such that the sources can stop emitting data and shutdown cleanly, resulting
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for
> https://issues.apache.org/jira/browse/FLINK-1929
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)