Repository: flink Updated Branches: refs/heads/master 5eae47f5d -> 90532549e
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index db6df75..27c32a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -41,6 +41,9 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; +import org.apache.flink.runtime.messages.JobManagerMessages.StopJob; +import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure; +import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess; import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; import org.apache.flink.runtime.messages.TaskMessages.PartitionState; @@ -48,9 +51,10 @@ import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished; - import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.StoppableInvokable; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -136,7 +140,7 @@ public class JobManagerTest { expectMsgClass(JobManagerMessages.JobSubmitSuccess.class); jobManagerGateway.tell( - new WaitForAllVerticesToBeRunningOrFinished(jobGraph.getJobID()), + new WaitForAllVerticesToBeRunningOrFinished(jid), testActorGateway); expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class); @@ -227,4 +231,103 @@ public class JobManagerTest { } }}; } + + @Test + public void testStopSignal() throws Exception { + new JavaTestKit(system) {{ + // Setup + TestingCluster cluster = null; + + try { + cluster = startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT()); + + // Create a task + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(2); + sender.setInvokableClass(StoppableInvokable.class); + + final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender); + final JobID jid = jobGraph.getJobID(); + + final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); + + // we can set the leader session ID to None because we don't use this gateway to send messages + final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null); + + // Submit the job and wait for all vertices to be running + jobManagerGateway.tell( + new SubmitJob( + jobGraph, + ListeningBehaviour.EXECUTION_RESULT), + testActorGateway); + expectMsgClass(JobManagerMessages.JobSubmitSuccess.class); + + jobManagerGateway.tell(new WaitForAllVerticesToBeRunning(jid), testActorGateway); + expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class); + + jobManagerGateway.tell(new StopJob(jid), testActorGateway); + + // - The test ---------------------------------------------------------------------- + expectMsgClass(StoppingSuccess.class); + + expectMsgClass(JobManagerMessages.JobResultSuccess.class); + } + finally { + if (cluster != null) { + cluster.shutdown(); + } + } + }}; + } + + @Test + public void testStopSignalFail() throws Exception { + new JavaTestKit(system) {{ + // Setup + TestingCluster cluster = null; + + try { + cluster = startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT()); + + // Create a task + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(1); + sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block + + final JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", sender); + final JobID jid = jobGraph.getJobID(); + + final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); + + // we can set the leader session ID to None because we don't use this gateway to send messages + final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null); + + // Submit the job and wait for all vertices to be running + jobManagerGateway.tell( + new SubmitJob( + jobGraph, + ListeningBehaviour.EXECUTION_RESULT), + testActorGateway); + expectMsgClass(JobManagerMessages.JobSubmitSuccess.class); + + jobManagerGateway.tell(new WaitForAllVerticesToBeRunning(jid), testActorGateway); + expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class); + + jobManagerGateway.tell(new StopJob(jid), testActorGateway); + + // - The test ---------------------------------------------------------------------- + expectMsgClass(StoppingFailure.class); + + jobManagerGateway.tell(new RequestExecutionGraph(jid), testActorGateway); + + expectMsgClass(ExecutionGraphFound.class); + } + finally { + if (cluster != null) { + cluster.shutdown(); + } + } + }}; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index f5271df..28284eb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -57,11 +57,13 @@ import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTr import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.messages.TaskMessages; import org.apache.flink.runtime.messages.TaskMessages.CancelTask; +import org.apache.flink.runtime.messages.TaskMessages.StopTask; import org.apache.flink.runtime.messages.TaskMessages.PartitionState; import org.apache.flink.runtime.messages.TaskMessages.SubmitTask; import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.StoppableInvokable; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; @@ -358,7 +360,132 @@ public class TaskManagerTest extends TestLogger { } }}; } - + + @Test + public void testJobSubmissionAndStop() { + new JavaTestKit(system){{ + + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + final ActorGateway testActorGateway = new AkkaActorGateway( + getTestActor(), + leaderSessionID); + + try { + ActorRef jm = system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID)); + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + new Configuration(), + true, + true); + + final JobID jid1 = new JobID(); + final JobID jid2 = new JobID(); + + JobVertexID vid1 = new JobVertexID(); + JobVertexID vid2 = new JobVertexID(); + + final ExecutionAttemptID eid1 = new ExecutionAttemptID(); + final ExecutionAttemptID eid2 = new ExecutionAttemptID(); + + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(appId, jid1, vid1, eid1, "TestTask1", 1, 5, 0, + new Configuration(), new Configuration(), StoppableInvokable.class.getName(), + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.<InputGateDeploymentDescriptor>emptyList(), + new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); + + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(appId, jid2, vid2, eid2, "TestTask2", 2, 7, 0, + new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.<InputGateDeploymentDescriptor>emptyList(), + new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); + + final ActorGateway tm = taskManager; + + new Within(d) { + + @Override + protected void run() { + try { + Future<Object> t1Running = tm.ask( + new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1), + timeout); + Future<Object> t2Running = tm.ask( + new TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2), + timeout); + + tm.tell(new SubmitTask(tdd1), testActorGateway); + tm.tell(new SubmitTask(tdd2), testActorGateway); + + expectMsgEquals(Messages.getAcknowledge()); + expectMsgEquals(Messages.getAcknowledge()); + + Await.ready(t1Running, d); + Await.ready(t2Running, d); + + tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), testActorGateway); + + Map<ExecutionAttemptID, Task> runningTasks = expectMsgClass(TestingTaskManagerMessages + .ResponseRunningTasks.class).asJava(); + + assertEquals(2, runningTasks.size()); + Task t1 = runningTasks.get(eid1); + Task t2 = runningTasks.get(eid2); + assertNotNull(t1); + assertNotNull(t2); + + assertEquals(ExecutionState.RUNNING, t1.getExecutionState()); + assertEquals(ExecutionState.RUNNING, t2.getExecutionState()); + + tm.tell(new StopTask(eid1), testActorGateway); + + expectMsgEquals(new TaskOperationResult(eid1, true)); + + Future<Object> response = tm.ask( + new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1), + timeout); + Await.ready(response, d); + + assertEquals(ExecutionState.FINISHED, t1.getExecutionState()); + + tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), testActorGateway); + runningTasks = expectMsgClass(TestingTaskManagerMessages + .ResponseRunningTasks.class).asJava(); + + assertEquals(1, runningTasks.size()); + + tm.tell(new StopTask(eid1), testActorGateway); + expectMsgEquals(new TaskOperationResult(eid1, false, "No task with that execution ID was " + + "found.")); + + tm.tell(new StopTask(eid2), testActorGateway); + expectMsgEquals(new TaskOperationResult(eid2, false, "UnsupportedOperationException: Stopping not supported by this task.")); + + assertEquals(ExecutionState.RUNNING, t2.getExecutionState()); + + tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), testActorGateway); + runningTasks = expectMsgClass(TestingTaskManagerMessages + .ResponseRunningTasks.class).asJava(); + + assertEquals(1, runningTasks.size()); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + }; + } + finally { + TestingUtils.stopActor(taskManager); + TestingUtils.stopActor(jobManager); + } + }}; + } + @Test public void testGateChannelEdgeMismatch() { new JavaTestKit(system){{ @@ -1192,6 +1319,7 @@ public class TaskManagerTest extends TestLogger { this.leaderSessionID = leaderSessionID; } + @Override public void handleMessage(Object message) throws Exception { if (message instanceof RegistrationMessages.RegisterTaskManager) { final InstanceID iid = new InstanceID(); @@ -1367,4 +1495,5 @@ public class TaskManagerTest extends TestLogger { } } } + } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java new file mode 100644 index 0000000..8ad823c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.taskmanager; + +import java.lang.reflect.Field; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; +import org.apache.flink.runtime.memory.MemoryManager; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import scala.concurrent.duration.FiniteDuration; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, FiniteDuration.class }) +public class TaskStopTest { + private Task task; + + public void doMocking(AbstractInvokable taskMock) throws Exception { + + TaskInfo taskInfoMock = mock(TaskInfo.class); + when(taskInfoMock.getTaskNameWithSubtasks()).thenReturn("dummyName"); + + TaskDeploymentDescriptor tddMock = mock(TaskDeploymentDescriptor.class); + when(tddMock.getTaskInfo()).thenReturn(taskInfoMock); + when(tddMock.getApplicationID()).thenReturn(new ApplicationID()); + when(tddMock.getJobID()).thenReturn(mock(JobID.class)); + when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class)); + when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class)); + when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class)); + when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class)); + when(tddMock.getInvokableClassName()).thenReturn("className"); + + task = new Task(tddMock, mock(MemoryManager.class), mock(IOManager.class), mock(NetworkEnvironment.class), + mock(BroadcastVariableManager.class), mock(ActorGateway.class), mock(ActorGateway.class), + mock(FiniteDuration.class), mock(LibraryCacheManager.class), mock(FileCache.class), + mock(TaskManagerRuntimeInfo.class)); + Field f = task.getClass().getDeclaredField("invokable"); + f.setAccessible(true); + f.set(task, taskMock); + + Field f2 = task.getClass().getDeclaredField("executionState"); + f2.setAccessible(true); + f2.set(task, ExecutionState.RUNNING); + } + + @Test(timeout = 10000) + public void testStopExecution() throws Exception { + StoppableTestTask taskMock = new StoppableTestTask(); + doMocking(taskMock); + + task.stopExecution(); + + while (!taskMock.stopCalled) { + Thread.sleep(100); + } + } + + @Test(expected = RuntimeException.class) + public void testStopExecutionFail() throws Exception { + AbstractInvokable taskMock = mock(AbstractInvokable.class); + doMocking(taskMock); + + task.stopExecution(); + } + + private final static class StoppableTestTask extends AbstractInvokable implements StoppableTask { + public volatile boolean stopCalled = false; + + @Override + public void invoke() throws Exception { + } + + @Override + public void stop() { + this.stopCalled = true; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/StoppableInvokable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/StoppableInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/StoppableInvokable.java new file mode 100644 index 0000000..10d2eff --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/StoppableInvokable.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.runtime.testutils; + +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; + +public final class StoppableInvokable extends AbstractInvokable implements StoppableTask { + private boolean isRunning = true; + + @Override + public void invoke() throws Exception { + while(isRunning) { + Thread.sleep(100); + } + } + + @Override + public void stop() { + this.isRunning = false; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 82e536c..2008061 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -66,6 +67,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.flink.streaming.api.operators.StoppableStreamSource; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.streaming.api.transformations.StreamTransformation; @@ -1225,7 +1227,12 @@ public abstract class StreamExecutionEnvironment { boolean isParallel = function instanceof ParallelSourceFunction; clean(function); - StreamSource<OUT> sourceOperator = new StreamSource<OUT>(function); + StreamSource<OUT> sourceOperator; + if (function instanceof StoppableFunction) { + sourceOperator = new StoppableStreamSource<OUT>(function); + } else { + sourceOperator = new StreamSource<OUT>(function); + } return new DataStreamSource<OUT>(this, typeInfo, sourceOperator, isParallel, sourceName); } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 8725bf5..eb1f2bb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -45,6 +45,7 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.operators.StoppableStreamSource; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; @@ -54,10 +55,10 @@ import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; +import org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask; import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -172,7 +173,9 @@ public class StreamGraph extends StreamingPlan { TypeInformation<OUT> outTypeInfo, String operatorName) { - if (operatorObject instanceof StreamSource) { + if (operatorObject instanceof StoppableStreamSource) { + addNode(vertexID, StoppableSourceStreamTask.class, operatorObject, operatorName); + } else if (operatorObject instanceof StreamSource) { addNode(vertexID, SourceStreamTask.class, operatorObject, operatorName); } else { addNode(vertexID, OneInputStreamTask.class, operatorObject, operatorName); http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java new file mode 100644 index 0000000..3d8190f --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +/** + * {@link StoppableStreamSource} takes a {@link SourceFunction} that implements {@link StoppableFunction}. + */ +public class StoppableStreamSource<T> extends StreamSource<T> { + + private static final long serialVersionUID = -4365670858793587337L; + + /** + * Takes a {@link SourceFunction} that implements {@link StoppableFunction}. + * + * @param sourceFunction + * A {@link SourceFunction} that implements {@link StoppableFunction}. + * + * @throw IllegalArgumentException if {@code sourceFunction} does not implement {@link StoppableFunction} + */ + public StoppableStreamSource(SourceFunction<T> sourceFunction) { + super(sourceFunction); + + if (!(sourceFunction instanceof StoppableFunction)) { + throw new IllegalArgumentException( + "The given SourceFunction must implement StoppableFunction."); + } + } + + public void stop() { + ((StoppableFunction) userFunction).stop(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java new file mode 100644 index 0000000..7359cb3 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; +import org.apache.flink.streaming.api.operators.StoppableStreamSource; + +/** + * Stoppable task for executing stoppable streaming sources. + */ +public class StoppableSourceStreamTask<OUT> extends SourceStreamTask<OUT> implements StoppableTask { + + @Override + public void stop() { + ((StoppableStreamSource<?>) this.headOperator).stop(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index 232485d..703d9d8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -27,9 +28,9 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.StoppableStreamSource; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.util.TestHarnessUtil; - import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -76,6 +77,19 @@ public class SourceStreamTaskTest { Assert.assertEquals(10, resultElements.size()); } + // test flag for testStop() + static boolean stopped = false; + + @Test + public void testStop() { + final StoppableSourceStreamTask<Object> sourceTask = new StoppableSourceStreamTask<Object>(); + sourceTask.headOperator = new StoppableStreamSource<Object>(new StoppableSource()); + + sourceTask.stop(); + + Assert.assertTrue(stopped); + } + /** * This test ensures that the SourceStreamTask properly serializes checkpointing * and element emission. This also verifies that there are no concurrent invocations @@ -103,24 +117,24 @@ public class SourceStreamTaskTest { final TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); final SourceStreamTask<Tuple2<Long, Integer>> sourceTask = new SourceStreamTask<Tuple2<Long, Integer>>(); final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo); - + StreamConfig streamConfig = testHarness.getStreamConfig(); StreamSource<Tuple2<Long, Integer>> sourceOperator = new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY)); streamConfig.setStreamOperator(sourceOperator); - - // prepare the - + + // prepare the + Future<Boolean>[] checkpointerResults = new Future[NUM_CHECKPOINTERS]; - + // invoke this first, so the tasks are actually running when the checkpoints are scheduled testHarness.invoke(); - + for (int i = 0; i < NUM_CHECKPOINTERS; i++) { checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask)); } - + testHarness.waitForTaskCompletion(); - + // Get the result from the checkpointers, if these threw an exception it // will be rethrown here for (int i = 0; i < NUM_CHECKPOINTERS; i++) { @@ -131,7 +145,7 @@ public class SourceStreamTaskTest { checkpointerResults[i].get(); } } - + List<Tuple2<Long, Integer>> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput()); Assert.assertEquals(NUM_ELEMENTS, resultElements.size()); } @@ -140,7 +154,24 @@ public class SourceStreamTaskTest { } } - private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> { + private static class StoppableSource extends RichSourceFunction<Object> implements StoppableFunction { + private static final long serialVersionUID = 728864804042338806L; + + @Override + public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Object> ctx) + throws Exception { + } + + @Override + public void cancel() {} + + @Override + public void stop() { + stopped = true; + } + } + + private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed { private static final long serialVersionUID = 1; private int maxElements; http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 73a3d66..43f3795 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -67,6 +67,14 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime-web_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.10</artifactId> <version>${project.version}</version> <scope>test</scope> @@ -141,6 +149,14 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime-web</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-curator-test</artifactId> <version>${project.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java index 30f840c..0dd99e2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java @@ -24,20 +24,32 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.testutils.StoppableInvokable; import org.apache.flink.runtime.webmonitor.WebMonitor; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.files.MimeTypes; +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.test.util.TestBaseUtils; - +import org.apache.sling.commons.json.JSONArray; +import org.apache.sling.commons.json.JSONObject; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; +import io.netty.handler.codec.http.HttpResponseStatus; + import java.util.Arrays; import java.util.Collection; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -91,7 +103,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { ObjectMapper mapper = new ObjectMapper(); JsonNode response = mapper.readTree(json); ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers"); - + assertNotNull(taskManagers); assertEquals(cluster.numTaskManagers(), taskManagers.size()); } @@ -109,7 +121,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { ObjectMapper mapper = new ObjectMapper(); JsonNode parsed = mapper.readTree(json); ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers"); - + assertNotNull(taskManagers); assertEquals(cluster.numTaskManagers(), taskManagers.size()); @@ -158,4 +170,75 @@ public class WebFrontendITCase extends MultipleProgramsTestBase { fail(e.getMessage()); } } + + @Test(timeout = 5000) + public void testStop() throws Exception { + // Create a task + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(2); + sender.setInvokableClass(StoppableInvokable.class); + + final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender); + final JobID jid = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES); + final Deadline deadline = testTimeout.fromNow(); + + try (HttpTestClient client = new HttpTestClient("localhost", port)) { + // Request the file from the web server + client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft()); + HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); + + assertEquals(HttpResponseStatus.OK, response.getStatus()); + assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json")); + assertEquals("{}", response.getContent()); + } + + waitForTaskManagers(); + } + + @Test(timeout = 5000) + public void testStopYarn() throws Exception { + // Create a task + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(2); + sender.setInvokableClass(StoppableInvokable.class); + + final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender); + final JobID jid = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES); + final Deadline deadline = testTimeout.fromNow(); + + try (HttpTestClient client = new HttpTestClient("localhost", port)) { + // Request the file from the web server + client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft()); + + HttpTestClient.SimpleHttpResponse response = client + .getNextResponse(deadline.timeLeft()); + + assertEquals(HttpResponseStatus.OK, response.getStatus()); + assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json")); + assertEquals("{}", response.getContent()); + } + + waitForTaskManagers(); + } + + private void waitForTaskManagers() throws Exception { + int count = 0; + + while (count != 4) { + String json = getFromHTTP("http://localhost:" + port + "/taskmanagers/"); + JSONObject parsed = new JSONObject(json); + JSONArray taskManagers = parsed.getJSONArray("taskmanagers"); + JSONObject taskManager = taskManagers.getJSONObject(0); + count = taskManager.getInt("freeSlots"); + } + } + }
