Repository: flink
Updated Branches:
  refs/heads/master 5eae47f5d -> 90532549e


http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index db6df75..27c32a0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -41,6 +41,9 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
+import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
@@ -48,9 +51,10 @@ import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
+import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
-
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -136,7 +140,7 @@ public class JobManagerTest {
                                
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
 
                                jobManagerGateway.tell(
-                                               new 
WaitForAllVerticesToBeRunningOrFinished(jobGraph.getJobID()),
+                                               new 
WaitForAllVerticesToBeRunningOrFinished(jid),
                                                testActorGateway);
 
                                
expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
@@ -227,4 +231,103 @@ public class JobManagerTest {
                        }
                }};
        }
+
+       @Test
+       public void testStopSignal() throws Exception {
+               new JavaTestKit(system) {{
+                       // Setup
+                       TestingCluster cluster = null;
+
+                       try {
+                               cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+                               // Create a task
+                               final JobVertex sender = new 
JobVertex("Sender");
+                               sender.setParallelism(2);
+                               
sender.setInvokableClass(StoppableInvokable.class);
+
+                               final JobGraph jobGraph = new 
JobGraph("Stoppable streaming test job", sender);
+                               final JobID jid = jobGraph.getJobID();
+
+                               final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+                               // we can set the leader session ID to None 
because we don't use this gateway to send messages
+                               final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+                               // Submit the job and wait for all vertices to 
be running
+                               jobManagerGateway.tell(
+                                               new SubmitJob(
+                                                               jobGraph,
+                                                               
ListeningBehaviour.EXECUTION_RESULT),
+                                               testActorGateway);
+                               
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+                               jobManagerGateway.tell(new 
WaitForAllVerticesToBeRunning(jid), testActorGateway);
+                               
expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+
+                               jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+                               // - The test 
----------------------------------------------------------------------
+                               expectMsgClass(StoppingSuccess.class);
+
+                               
expectMsgClass(JobManagerMessages.JobResultSuccess.class);
+                       }
+                       finally {
+                               if (cluster != null) {
+                                       cluster.shutdown();
+                               }
+                       }
+               }};
+       }
+
+       @Test
+       public void testStopSignalFail() throws Exception {
+               new JavaTestKit(system) {{
+                       // Setup
+                       TestingCluster cluster = null;
+
+                       try {
+                               cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+                               // Create a task
+                               final JobVertex sender = new 
JobVertex("Sender");
+                               sender.setParallelism(1);
+                               
sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+
+                               final JobGraph jobGraph = new 
JobGraph("Non-Stoppable batching test job", sender);
+                               final JobID jid = jobGraph.getJobID();
+
+                               final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+                               // we can set the leader session ID to None 
because we don't use this gateway to send messages
+                               final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+                               // Submit the job and wait for all vertices to 
be running
+                               jobManagerGateway.tell(
+                                               new SubmitJob(
+                                                               jobGraph,
+                                                               
ListeningBehaviour.EXECUTION_RESULT),
+                                               testActorGateway);
+                               
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+                               jobManagerGateway.tell(new 
WaitForAllVerticesToBeRunning(jid), testActorGateway);
+                               
expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+
+                               jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+                               // - The test 
----------------------------------------------------------------------
+                               expectMsgClass(StoppingFailure.class);
+
+                               jobManagerGateway.tell(new 
RequestExecutionGraph(jid), testActorGateway);
+
+                               expectMsgClass(ExecutionGraphFound.class);
+                       }
+                       finally {
+                               if (cluster != null) {
+                                       cluster.shutdown();
+                               }
+                       }
+               }};
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index f5271df..28284eb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -57,11 +57,13 @@ import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTr
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
+import org.apache.flink.runtime.messages.TaskMessages.StopTask;
 import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -358,7 +360,132 @@ public class TaskManagerTest extends TestLogger {
                        }
                }};
        }
-       
+
+       @Test
+       public void testJobSubmissionAndStop() {
+               new JavaTestKit(system){{
+
+                       ActorGateway jobManager = null;
+                       ActorGateway taskManager = null;
+
+                       final ActorGateway testActorGateway = new 
AkkaActorGateway(
+                                       getTestActor(),
+                                       leaderSessionID);
+
+                       try {
+                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
+                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+
+                               taskManager = TestingUtils.createTaskManager(
+                                               system,
+                                               jobManager,
+                                               new Configuration(),
+                                               true,
+                                               true);
+
+                               final JobID jid1 = new JobID();
+                               final JobID jid2 = new JobID();
+
+                               JobVertexID vid1 = new JobVertexID();
+                               JobVertexID vid2 = new JobVertexID();
+
+                               final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
+                               final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
+
+                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(appId, jid1, vid1, eid1, "TestTask1", 1, 5, 0,
+                                               new Configuration(), new 
Configuration(), StoppableInvokable.class.getName(),
+                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
+                                               new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
+
+                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(appId, jid2, vid2, eid2, "TestTask2", 2, 7, 0,
+                                               new Configuration(), new 
Configuration(), TestInvokableBlockingCancelable.class.getName(),
+                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
+                                               new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
+
+                               final ActorGateway tm = taskManager;
+
+                               new Within(d) {
+
+                                       @Override
+                                       protected void run() {
+                                               try {
+                                                       Future<Object> 
t1Running = tm.ask(
+                                                                       new 
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid1),
+                                                                       
timeout);
+                                                       Future<Object> 
t2Running = tm.ask(
+                                                                       new 
TestingTaskManagerMessages.NotifyWhenTaskIsRunning(eid2),
+                                                                       
timeout);
+
+                                                       tm.tell(new 
SubmitTask(tdd1), testActorGateway);
+                                                       tm.tell(new 
SubmitTask(tdd2), testActorGateway);
+
+                                                       
expectMsgEquals(Messages.getAcknowledge());
+                                                       
expectMsgEquals(Messages.getAcknowledge());
+
+                                                       Await.ready(t1Running, 
d);
+                                                       Await.ready(t2Running, 
d);
+
+                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), 
testActorGateway);
+
+                                                       Map<ExecutionAttemptID, 
Task> runningTasks = expectMsgClass(TestingTaskManagerMessages
+                                                                       
.ResponseRunningTasks.class).asJava();
+
+                                                       assertEquals(2, 
runningTasks.size());
+                                                       Task t1 = 
runningTasks.get(eid1);
+                                                       Task t2 = 
runningTasks.get(eid2);
+                                                       assertNotNull(t1);
+                                                       assertNotNull(t2);
+
+                                                       
assertEquals(ExecutionState.RUNNING, t1.getExecutionState());
+                                                       
assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
+
+                                                       tm.tell(new 
StopTask(eid1), testActorGateway);
+
+                                                       expectMsgEquals(new 
TaskOperationResult(eid1, true));
+
+                                                       Future<Object> response 
= tm.ask(
+                                                                       new 
TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
+                                                                       
timeout);
+                                                       Await.ready(response, 
d);
+
+                                                       
assertEquals(ExecutionState.FINISHED, t1.getExecutionState());
+
+                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), 
testActorGateway);
+                                                       runningTasks = 
expectMsgClass(TestingTaskManagerMessages
+                                                                       
.ResponseRunningTasks.class).asJava();
+
+                                                       assertEquals(1, 
runningTasks.size());
+
+                                                       tm.tell(new 
StopTask(eid1), testActorGateway);
+                                                       expectMsgEquals(new 
TaskOperationResult(eid1, false, "No task with that execution ID was " +
+                                                                       
"found."));
+
+                                                       tm.tell(new 
StopTask(eid2), testActorGateway);
+                                                       expectMsgEquals(new 
TaskOperationResult(eid2, false, "UnsupportedOperationException: Stopping not 
supported by this task."));
+
+                                                       
assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
+
+                                                       
tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), 
testActorGateway);
+                                                       runningTasks = 
expectMsgClass(TestingTaskManagerMessages
+                                                                       
.ResponseRunningTasks.class).asJava();
+
+                                                       assertEquals(1, 
runningTasks.size());
+                                               } catch (Exception e) {
+                                                       e.printStackTrace();
+                                                       fail(e.getMessage());
+                                               }
+                                       }
+                               };
+                       }
+                       finally {
+                               TestingUtils.stopActor(taskManager);
+                               TestingUtils.stopActor(jobManager);
+                       }
+               }};
+       }
+
        @Test
        public void testGateChannelEdgeMismatch() {
                new JavaTestKit(system){{
@@ -1192,6 +1319,7 @@ public class TaskManagerTest extends TestLogger {
                        this.leaderSessionID = leaderSessionID;
                }
 
+               @Override
                public void handleMessage(Object message) throws Exception {
                        if (message instanceof 
RegistrationMessages.RegisterTaskManager) {
                                final InstanceID iid = new InstanceID();
@@ -1367,4 +1495,5 @@ public class TaskManagerTest extends TestLogger {
                        }
                }
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
new file mode 100644
index 0000000..8ad823c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.taskmanager;
+
+import java.lang.reflect.Field;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, 
FiniteDuration.class })
+public class TaskStopTest {
+       private Task task;
+
+       public void doMocking(AbstractInvokable taskMock) throws Exception {
+
+               TaskInfo taskInfoMock = mock(TaskInfo.class);
+               
when(taskInfoMock.getTaskNameWithSubtasks()).thenReturn("dummyName");
+
+               TaskDeploymentDescriptor tddMock = 
mock(TaskDeploymentDescriptor.class);
+               when(tddMock.getTaskInfo()).thenReturn(taskInfoMock);
+               when(tddMock.getApplicationID()).thenReturn(new 
ApplicationID());
+               when(tddMock.getJobID()).thenReturn(mock(JobID.class));
+               when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class));
+               
when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class));
+               
when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class));
+               
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
+               when(tddMock.getInvokableClassName()).thenReturn("className");
+
+               task = new Task(tddMock, mock(MemoryManager.class), 
mock(IOManager.class), mock(NetworkEnvironment.class),
+                               mock(BroadcastVariableManager.class), 
mock(ActorGateway.class), mock(ActorGateway.class),
+                               mock(FiniteDuration.class), 
mock(LibraryCacheManager.class), mock(FileCache.class),
+                               mock(TaskManagerRuntimeInfo.class));
+               Field f = task.getClass().getDeclaredField("invokable");
+               f.setAccessible(true);
+               f.set(task, taskMock);
+
+               Field f2 = task.getClass().getDeclaredField("executionState");
+               f2.setAccessible(true);
+               f2.set(task, ExecutionState.RUNNING);
+       }
+
+       @Test(timeout = 10000)
+       public void testStopExecution() throws Exception {
+               StoppableTestTask taskMock = new StoppableTestTask();
+               doMocking(taskMock);
+
+               task.stopExecution();
+
+               while (!taskMock.stopCalled) {
+                       Thread.sleep(100);
+               }
+       }
+
+       @Test(expected = RuntimeException.class)
+       public void testStopExecutionFail() throws Exception {
+               AbstractInvokable taskMock = mock(AbstractInvokable.class);
+               doMocking(taskMock);
+
+               task.stopExecution();
+       }
+
+       private final static class StoppableTestTask extends AbstractInvokable 
implements StoppableTask {
+               public volatile boolean stopCalled = false;
+
+               @Override
+               public void invoke() throws Exception {
+               }
+
+               @Override
+               public void stop() {
+                       this.stopCalled = true;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/StoppableInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/StoppableInvokable.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/StoppableInvokable.java
new file mode 100644
index 0000000..10d2eff
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/StoppableInvokable.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
+
+public final class StoppableInvokable extends AbstractInvokable implements 
StoppableTask {
+       private boolean isRunning = true;
+
+       @Override
+       public void invoke() throws Exception {
+               while(isRunning) {
+                       Thread.sleep(100);
+               }
+       }
+
+       @Override
+       public void stop() {
+               this.isRunning = false;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 82e536c..2008061 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -66,6 +67,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
@@ -1225,7 +1227,12 @@ public abstract class StreamExecutionEnvironment {
                boolean isParallel = function instanceof ParallelSourceFunction;
 
                clean(function);
-               StreamSource<OUT> sourceOperator = new 
StreamSource<OUT>(function);
+               StreamSource<OUT> sourceOperator;
+               if (function instanceof StoppableFunction) {
+                       sourceOperator = new 
StoppableStreamSource<OUT>(function);
+               } else {
+                       sourceOperator = new StreamSource<OUT>(function);
+               }
 
                return new DataStreamSource<OUT>(this, typeInfo, 
sourceOperator, isParallel, sourceName);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 8725bf5..eb1f2bb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -45,6 +45,7 @@ import 
org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -54,10 +55,10 @@ import 
org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -172,7 +173,9 @@ public class StreamGraph extends StreamingPlan {
                        TypeInformation<OUT> outTypeInfo,
                        String operatorName) {
 
-               if (operatorObject instanceof StreamSource) {
+               if (operatorObject instanceof StoppableStreamSource) {
+                       addNode(vertexID, StoppableSourceStreamTask.class, 
operatorObject, operatorName);
+               } else if (operatorObject instanceof StreamSource) {
                        addNode(vertexID, SourceStreamTask.class, 
operatorObject, operatorName);
                } else {
                        addNode(vertexID, OneInputStreamTask.class, 
operatorObject, operatorName);

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java
new file mode 100644
index 0000000..3d8190f
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StoppableStreamSource.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+/**
+ * {@link StoppableStreamSource} takes a {@link SourceFunction} that 
implements {@link StoppableFunction}.
+ */
+public class StoppableStreamSource<T> extends StreamSource<T> {
+
+       private static final long serialVersionUID = -4365670858793587337L;
+
+       /**
+        * Takes a {@link SourceFunction} that implements {@link 
StoppableFunction}.
+        * 
+        * @param sourceFunction
+        *            A {@link SourceFunction} that implements {@link 
StoppableFunction}.
+        * 
+        * @throw IllegalArgumentException if {@code sourceFunction} does not 
implement {@link StoppableFunction}
+        */
+       public StoppableStreamSource(SourceFunction<T> sourceFunction) {
+               super(sourceFunction);
+
+               if (!(sourceFunction instanceof StoppableFunction)) {
+                       throw new IllegalArgumentException(
+                                       "The given SourceFunction must 
implement StoppableFunction.");
+               }
+       }
+
+       public void stop() {
+               ((StoppableFunction) userFunction).stop();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
new file mode 100644
index 0000000..7359cb3
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
+import org.apache.flink.streaming.api.operators.StoppableStreamSource;
+
+/**
+ * Stoppable task for executing stoppable streaming sources.
+ */
+public class StoppableSourceStreamTask<OUT> extends SourceStreamTask<OUT> 
implements StoppableTask {
+
+       @Override
+       public void stop() {
+               ((StoppableStreamSource<?>) this.headOperator).stop();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 232485d..703d9d8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -27,9 +28,9 @@ import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -76,6 +77,19 @@ public class SourceStreamTaskTest {
                Assert.assertEquals(10, resultElements.size());
        }
 
+       // test flag for testStop()
+       static boolean stopped = false;
+
+       @Test
+       public void testStop() {
+               final StoppableSourceStreamTask<Object> sourceTask = new 
StoppableSourceStreamTask<Object>();
+               sourceTask.headOperator = new StoppableStreamSource<Object>(new 
StoppableSource());
+
+               sourceTask.stop();
+
+               Assert.assertTrue(stopped);
+       }
+
        /**
         * This test ensures that the SourceStreamTask properly serializes 
checkpointing
         * and element emission. This also verifies that there are no 
concurrent invocations
@@ -103,24 +117,24 @@ public class SourceStreamTaskTest {
                        final TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = 
new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO);
                        final SourceStreamTask<Tuple2<Long, Integer>> 
sourceTask = new SourceStreamTask<Tuple2<Long, Integer>>();
                        final StreamTaskTestHarness<Tuple2<Long, Integer>> 
testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, 
typeInfo);
-       
+
                        StreamConfig streamConfig = 
testHarness.getStreamConfig();
                        StreamSource<Tuple2<Long, Integer>> sourceOperator = 
new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, 
SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));
                        streamConfig.setStreamOperator(sourceOperator);
-                       
-                       // prepare the 
-                       
+
+                       // prepare the
+
                        Future<Boolean>[] checkpointerResults = new 
Future[NUM_CHECKPOINTERS];
-       
+
                        // invoke this first, so the tasks are actually running 
when the checkpoints are scheduled
                        testHarness.invoke();
-                       
+
                        for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
                                checkpointerResults[i] = executor.submit(new 
Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask));
                        }
-                       
+
                        testHarness.waitForTaskCompletion();
-       
+
                        // Get the result from the checkpointers, if these 
threw an exception it
                        // will be rethrown here
                        for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
@@ -131,7 +145,7 @@ public class SourceStreamTaskTest {
                                        checkpointerResults[i].get();
                                }
                        }
-       
+
                        List<Tuple2<Long, Integer>> resultElements = 
TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
                        Assert.assertEquals(NUM_ELEMENTS, 
resultElements.size());
                }
@@ -140,7 +154,24 @@ public class SourceStreamTaskTest {
                }
        }
 
-       private static class MockSource implements SourceFunction<Tuple2<Long, 
Integer>>, Checkpointed<Serializable> {
+       private static class StoppableSource extends RichSourceFunction<Object> 
implements StoppableFunction {
+               private static final long serialVersionUID = 
728864804042338806L;
+
+               @Override
+               public void 
run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Object>
 ctx)
+                               throws Exception {
+               }
+
+               @Override
+               public void cancel() {}
+
+               @Override
+               public void stop() {
+                       stopped = true;
+               }
+       }
+
+       private static class MockSource implements SourceFunction<Tuple2<Long, 
Integer>>, Checkpointed {
                private static final long serialVersionUID = 1;
 
                private int maxElements;

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 73a3d66..43f3795 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -67,6 +67,14 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime-web_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-clients_2.10</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
@@ -141,6 +149,14 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime-web</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-shaded-curator-test</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 30f840c..0dd99e2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -24,20 +24,32 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 
 import org.apache.commons.io.FileUtils;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.files.MimeTypes;
+import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.test.util.TestBaseUtils;
-
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONObject;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -91,7 +103,7 @@ public class WebFrontendITCase extends 
MultipleProgramsTestBase {
                        ObjectMapper mapper = new ObjectMapper();
                        JsonNode response = mapper.readTree(json);
                        ArrayNode taskManagers = (ArrayNode) 
response.get("taskmanagers");
-                       
+
                        assertNotNull(taskManagers);
                        assertEquals(cluster.numTaskManagers(), 
taskManagers.size());
                }
@@ -109,7 +121,7 @@ public class WebFrontendITCase extends 
MultipleProgramsTestBase {
                        ObjectMapper mapper = new ObjectMapper();
                        JsonNode parsed = mapper.readTree(json);
                        ArrayNode taskManagers = (ArrayNode) 
parsed.get("taskmanagers");
-                       
+
                        assertNotNull(taskManagers);
                        assertEquals(cluster.numTaskManagers(), 
taskManagers.size());
 
@@ -158,4 +170,75 @@ public class WebFrontendITCase extends 
MultipleProgramsTestBase {
                        fail(e.getMessage());
                }
        }
+
+       @Test(timeout = 5000)
+       public void testStop() throws Exception {
+               // Create a task
+               final JobVertex sender = new JobVertex("Sender");
+               sender.setParallelism(2);
+               sender.setInvokableClass(StoppableInvokable.class);
+
+               final JobGraph jobGraph = new JobGraph("Stoppable streaming 
test job", sender);
+               final JobID jid = jobGraph.getJobID();
+
+               cluster.submitJobDetached(jobGraph);
+
+               final FiniteDuration testTimeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
+               final Deadline deadline = testTimeout.fromNow();
+
+               try (HttpTestClient client = new HttpTestClient("localhost", 
port)) {
+                       // Request the file from the web server
+                       client.sendDeleteRequest("/jobs/" + jid + "/stop", 
deadline.timeLeft());
+                       HttpTestClient.SimpleHttpResponse response = 
client.getNextResponse(deadline.timeLeft());
+
+                       assertEquals(HttpResponseStatus.OK, 
response.getStatus());
+                       assertEquals(response.getType(), 
MimeTypes.getMimeTypeForExtension("json"));
+                       assertEquals("{}", response.getContent());
+               }
+
+               waitForTaskManagers();
+       }
+
+       @Test(timeout = 5000)
+       public void testStopYarn() throws Exception {
+               // Create a task
+               final JobVertex sender = new JobVertex("Sender");
+               sender.setParallelism(2);
+               sender.setInvokableClass(StoppableInvokable.class);
+
+               final JobGraph jobGraph = new JobGraph("Stoppable streaming 
test job", sender);
+               final JobID jid = jobGraph.getJobID();
+
+               cluster.submitJobDetached(jobGraph);
+
+               final FiniteDuration testTimeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
+               final Deadline deadline = testTimeout.fromNow();
+
+               try (HttpTestClient client = new HttpTestClient("localhost", 
port)) {
+                       // Request the file from the web server
+                       client.sendGetRequest("/jobs/" + jid + "/yarn-stop", 
deadline.timeLeft());
+
+                       HttpTestClient.SimpleHttpResponse response = client
+                                       .getNextResponse(deadline.timeLeft());
+
+                       assertEquals(HttpResponseStatus.OK, 
response.getStatus());
+                       assertEquals(response.getType(), 
MimeTypes.getMimeTypeForExtension("json"));
+                       assertEquals("{}", response.getContent());
+               }
+
+               waitForTaskManagers();
+       }
+
+       private void waitForTaskManagers() throws Exception {
+               int count = 0;
+
+               while (count != 4) {
+                       String json = getFromHTTP("http://localhost:"; + port + 
"/taskmanagers/");
+                       JSONObject parsed = new JSONObject(json);
+                       JSONArray taskManagers = 
parsed.getJSONArray("taskmanagers");
+                       JSONObject taskManager = taskManagers.getJSONObject(0);
+                       count = taskManager.getInt("freeSlots");
+               }
+       }
+
 }

Reply via email to