[ https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15033743#comment-15033743 ]
ASF GitHub Bot commented on FLINK-2111: --------------------------------------- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46283023 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.BaseTestingActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.TaskMessages; +import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import scala.concurrent.ExecutionContext; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(ExecutionVertex.class) +public class ExecutionVertexStopTest { + + @Test + public void testStop() throws Exception { + final JobVertexID jid = new JobVertexID(); + final ExecutionJobVertex ejv = getExecutionVertex(jid); + + Execution executionMock = mock(Execution.class); + whenNew(Execution.class).withAnyArguments().thenReturn(executionMock); + + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], + AkkaUtils.getDefaultTimeout()); + + vertex.stop(); + + verify(executionMock).stop(); + } + + private static ActorSystem system; + + @AfterClass + public static void teardown(){ + if(system != null) { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + } + + static boolean receivedStopSignal; --- End diff -- Ok. Will do. (for a test I thought is would be better to have it at the singlet test method where it is used). > Add "stop" signal to cleanly shutdown streaming jobs > ---------------------------------------------------- > > Key: FLINK-2111 > URL: https://issues.apache.org/jira/browse/FLINK-2111 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime, JobManager, Local Runtime, > Streaming, TaskManager, Webfrontend > Reporter: Matthias J. Sax > Assignee: Matthias J. Sax > Priority: Minor > > Currently, streaming jobs can only be stopped using "cancel" command, what is > a "hard" stop with no clean shutdown. > The new introduced "stop" signal, will only affect streaming source tasks > such that the sources can stop emitting data and shutdown cleanly, resulting > in a clean shutdown of the whole streaming job. > This feature is a pre-requirment for > https://issues.apache.org/jira/browse/FLINK-1929 -- This message was sent by Atlassian JIRA (v6.3.4#6332)