[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14713557#comment-14713557
 ] 

ASF GitHub Bot commented on FLINK-2111:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/750#discussion_r37990019
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 ---
    @@ -363,7 +363,135 @@ protected void run() {
                        }
                }};
        }
    -   
    +
    +   @Test
    +   public void testJobSubmissionAndStop() {
    +
    +           LOG.info(       
"--------------------------------------------------------------------\n" +
    +                                   "     Starting 
testJobSubmissionAndStop() \n" +
    +                                   
"--------------------------------------------------------------------");
    +
    +           new JavaTestKit(system){{
    +
    +                   ActorRef jobManager = null;
    +                   ActorRef taskManager = null;
    +                   try {
    +                           jobManager = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID.get()));
    +                           taskManager = createTaskManager(jobManager, 
true);
    +
    +                           final JobID jid1 = new JobID();
    +                           final JobID jid2 = new JobID();
    +
    +                           JobVertexID vid1 = new JobVertexID();
    +                           JobVertexID vid2 = new JobVertexID();
    +
    +                           final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
    +                           final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
    +
    +                           final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5,
    +                                           new Configuration(), new 
Configuration(), StopableInvokable.class.getName(),
    +                                           
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
    +                                           
Collections.<InputGateDeploymentDescriptor>emptyList(),
    +                                   new ArrayList<BlobKey>(), 0);
    +
    +                           final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7,
    +                                           new Configuration(), new 
Configuration(), TestInvokableBlockingCancelable.class.getName(),
    +                                           
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
    +                                           
Collections.<InputGateDeploymentDescriptor>emptyList(),
    +                                   new ArrayList<BlobKey>(), 0);
    +
    +                           final ActorRef tm = taskManager;
    +
    +                           new Within(d) {
    +
    +                                   @Override
    +                                   protected void run() {
    +                                           try {
    +                                                   Future<Object> 
t1Running = Patterns.ask(
    +                                                                   tm,
    +                                                                   new 
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1),
    +                                                                   
timeout);
    +                                                   Future<Object> 
t2Running = Patterns.ask(
    +                                                                   tm,
    +                                                                   new 
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2),
    +                                                                   
timeout);
    +
    +                                                   
tm.tell(decorator.decorate(new SubmitTask(tdd1)), getRef());
    +                                                   
tm.tell(decorator.decorate(new SubmitTask(tdd2)), getRef());
    +
    +                                                   
expectMsgEquals(Messages.getAcknowledge());
    +                                                   
expectMsgEquals(Messages.getAcknowledge());
    +
    +                                                   Await.ready(t1Running, 
d);
    +                                                   Await.ready(t2Running, 
d);
    +
    +                                                   
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
    +
    +                                                   Map<ExecutionAttemptID, 
Task> runningTasks = expectMsgClass(TestingTaskManagerMessages
    +                                                                   
.ResponseRunningTasks.class).asJava();
    +
    +                                                   assertEquals(2, 
runningTasks.size());
    +                                                   Task t1 = 
runningTasks.get(eid1);
    +                                                   Task t2 = 
runningTasks.get(eid2);
    +                                                   assertNotNull(t1);
    +                                                   assertNotNull(t2);
    +
    +                                                   
assertEquals(ExecutionState.RUNNING, t1.getExecutionState());
    +                                                   
assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
    +
    +                                                   
tm.tell(decorator.decorate(new StopTask(eid1)), getRef());
    +
    +                                                   expectMsgEquals(new 
TaskOperationResult(eid1, true));
    +
    +                                                   Future<Object> response 
= Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
    +                                                                   
timeout);
    +                                                   Await.ready(response, 
d);
    +
    +                                                   
assertEquals(ExecutionState.FINISHED, t1.getExecutionState());
    +
    +                                                   
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
    +                                                   runningTasks = 
expectMsgClass(TestingTaskManagerMessages
    +                                                                   
.ResponseRunningTasks.class).asJava();
    +
    +                                                   assertEquals(1, 
runningTasks.size());
    +
    +                                                   
tm.tell(decorator.decorate(new StopTask(eid1)), getRef());
    +                                                   expectMsgEquals(new 
TaskOperationResult(eid1, false, "No task with that execution ID was " +
    +                                                                   
"found."));
    +
    +                                                   
tm.tell(decorator.decorate(new StopTask(eid2)), getRef());
    +                                                   expectMsgEquals(new 
TaskOperationResult(eid2, false, "Stopping not supported by this task."));
    +
    +                                                   
assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
    +
    +                                                   
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
    +                                                   runningTasks = 
expectMsgClass(TestingTaskManagerMessages
    +                                                                   
.ResponseRunningTasks.class).asJava();
    +
    +                                                   assertEquals(1, 
runningTasks.size());
    +                                           } catch (Exception e) {
    +                                                   e.printStackTrace();
    +                                                   fail(e.getMessage());
    +                                           }
    +                                   }
    +                           };
    +                   }
    +                   catch(Exception e) {
    +                           e.printStackTrace();
    +                           fail(e.getMessage());
    --- End diff --
    
    stack trace


> Add "stop" signal to cleanly shutdown streaming jobs
> ----------------------------------------------------
>
>                 Key: FLINK-2111
>                 URL: https://issues.apache.org/jira/browse/FLINK-2111
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>            Reporter: Matthias J. Sax
>            Assignee: Matthias J. Sax
>            Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to