http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html 
b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
index ab61ee7..c7dc0bc 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html
@@ -33,6 +33,7 @@ limitations under the License.
       {{ job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></div>
   <div ng-if="job.duration &gt; -1" title="{{job.duration | 
humanizeDuration:false}}" class="navbar-info last first">{{job.duration | 
humanizeDuration:true}}</div>
   <div ng-if="job.state=='RUNNING' || job.state=='CREATED' || 
job.state=='RESTARTING'" class="navbar-info last first"><span 
ng-click="cancelJob($event)" class="navbar-info-button btn 
btn-default">Cancel</span></div>
+  <div ng-if="job.isStoppable && (job.state=='CREATED' || job.state=='RUNNING' 
|| job.state=='RESTARTING')" class="navbar-info last first"><span 
ng-click="stopJob($event)" class="navbar-info-button btn 
btn-default">Stop</span></div>
 </nav>
 <nav ng-if="job" class="navbar navbar-default navbar-fixed-top 
navbar-main-additional">
   <ul class="nav nav-tabs">

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java
new file mode 100644
index 0000000..6bb71ce
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime;
+
+/**
+ * Indicates that a job is not stoppable.
+ */
+public class StoppingException extends Exception {
+
+       private static final long serialVersionUID = -721315728140810694L;
+
+       public StoppingException(String msg) {
+               super(msg);
+       }
+
+       public StoppingException(String message, Throwable cause) {
+               super(message, cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index db037bb..bc75664 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -77,6 +77,7 @@ import static 
org.apache.flink.runtime.execution.ExecutionState.RUNNING;
 import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
 import static org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import static 
org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
+import static org.apache.flink.runtime.messages.TaskMessages.StopTask;
 import static org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import static 
org.apache.flink.runtime.messages.TaskMessages.UpdatePartitionInfo;
 import static 
org.apache.flink.runtime.messages.TaskMessages.UpdateTaskSinglePartitionInfo;
@@ -106,11 +107,13 @@ public class Execution implements Serializable {
 
        private static final AtomicReferenceFieldUpdater<Execution, 
ExecutionState> STATE_UPDATER =
                        AtomicReferenceFieldUpdater.newUpdater(Execution.class, 
ExecutionState.class, "state");
-       
+
        private static final Logger LOG = ExecutionGraph.LOG;
-       
+
        private static final int NUM_CANCEL_CALL_TRIES = 3;
 
+       private static final int NUM_STOP_CALL_TRIES = 3;
+
        // 
--------------------------------------------------------------------------------------------
 
        private final ExecutionVertex vertex;
@@ -126,13 +129,13 @@ public class Execution implements Serializable {
        private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> 
partialInputChannelDeploymentDescriptors;
 
        private volatile ExecutionState state = CREATED;
-       
+
        private volatile SimpleSlot assignedResource;     // once assigned, 
never changes until the execution is archived
-       
+
        private volatile Throwable failureCause;          // once assigned, 
never changes
-       
+
        private volatile InstanceConnectionInfo assignedResourceLocation; // 
for the archived execution
-       
+
        private SerializedValue<StateHandle<?>> operatorState;
        
        private long recoveryTimestamp;
@@ -162,7 +165,7 @@ public class Execution implements Serializable {
 
                this.vertex = checkNotNull(vertex);
                this.attemptId = new ExecutionAttemptID();
-               
+
                this.attemptNumber = attemptNumber;
 
                this.stateTimestamps = new long[ExecutionState.values().length];
@@ -172,7 +175,7 @@ public class Execution implements Serializable {
 
                this.partialInputChannelDeploymentDescriptors = new 
ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor>();
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        //   Properties
        // 
--------------------------------------------------------------------------------------------
@@ -200,23 +203,23 @@ public class Execution implements Serializable {
        public InstanceConnectionInfo getAssignedResourceLocation() {
                return assignedResourceLocation;
        }
-       
+
        public Throwable getFailureCause() {
                return failureCause;
        }
-       
+
        public long[] getStateTimestamps() {
                return stateTimestamps;
        }
-       
+
        public long getStateTimestamp(ExecutionState state) {
                return this.stateTimestamps[state.ordinal()];
        }
-       
+
        public boolean isFinished() {
                return state == FINISHED || state == FAILED || state == 
CANCELED;
        }
-       
+
        /**
         * This method cleans fields that are irrelevant for the archived 
execution attempt.
         */
@@ -231,7 +234,7 @@ public class Execution implements Serializable {
                partialInputChannelDeploymentDescriptors.clear();
                partialInputChannelDeploymentDescriptors = null;
        }
-       
+
        public void setInitialState(SerializedValue<StateHandle<?>> 
initialState, long recoveryTimestamp) {
                if (state != ExecutionState.CREATED) {
                        throw new IllegalArgumentException("Can only assign 
operator state when execution attempt is in CREATED");
@@ -239,11 +242,11 @@ public class Execution implements Serializable {
                this.operatorState = initialState;
                this.recoveryTimestamp = recoveryTimestamp;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  Actions
        // 
--------------------------------------------------------------------------------------------
-       
+
        /**
         * NOTE: This method only throws exceptions if it is in an illegal 
state to be scheduled, or if the tasks needs
         *       to be scheduled immediately and no resource is available. If 
the task is accepted by the schedule, any
@@ -355,14 +358,14 @@ public class Execution implements Serializable {
                                slot.releaseSlot();
                                return;
                        }
-                       
+
                        if (LOG.isInfoEnabled()) {
                                LOG.info(String.format("Deploying %s (attempt 
#%d) to %s", vertex.getSimpleName(),
                                                attemptNumber, 
slot.getInstance().getInstanceConnectionInfo().getHostname()));
                        }
 
                        final TaskDeploymentDescriptor deployment = 
vertex.createDeploymentDescriptor(attemptId, slot, operatorState, 
recoveryTimestamp, attemptNumber);
-                       
+
                        // register this execution at the execution graph, to 
receive call backs
                        vertex.getExecutionGraph().registerExecution(this);
 
@@ -378,10 +381,10 @@ public class Execution implements Serializable {
                                        if (failure != null) {
                                                if (failure instanceof 
TimeoutException) {
                                                        String taskname = 
deployment.getTaskInfo().getTaskNameWithSubtasks() + " (" + attemptId + ')';
-                                                       
+
                                                        markFailed(new 
Exception(
                                                                        "Cannot 
deploy task " + taskname + " - TaskManager (" + instance
-                                                                               
        + ") not responding after a timeout of " + timeout, failure));
+                                                                       + ") 
not responding after a timeout of " + timeout, failure));
                                                }
                                                else {
                                                        markFailed(failure);
@@ -402,21 +405,53 @@ public class Execution implements Serializable {
                }
        }
 
+       /**
+        * Sends stop RPC call.
+        */
+       public void stop() {
+               final SimpleSlot slot = this.assignedResource;
+
+               if (slot != null) {
+                       final ActorGateway gateway = 
slot.getInstance().getActorGateway();
+
+                       Future<Object> stopResult = gateway.retry(
+                               new StopTask(attemptId),
+                               NUM_STOP_CALL_TRIES,
+                               timeout,
+                               executionContext);
+
+                       stopResult.onComplete(new OnComplete<Object>() {
+                               @Override
+                               public void onComplete(Throwable failure, 
Object success) throws Throwable {
+                                       if (failure != null) {
+                                               fail(new Exception("Task could 
not be stopped.", failure));
+                                       } else {
+                                               TaskOperationResult result = 
(TaskOperationResult) success;
+                                               if (!result.success()) {
+                                                       LOG.info("Stopping task 
was not successful. Description: {}",
+                                                                       
result.description());
+                                               }
+                                       }
+                               }
+                       }, executionContext);
+               }
+       }
+
        public void cancel() {
                // depending on the previous state, we go directly to cancelled 
(no cancel call necessary)
                // -- or to canceling (cancel call needs to be sent to the task 
manager)
-               
+
                // because of several possibly previous states, we need to 
again loop until we make a
                // successful atomic state transition
                while (true) {
-                       
+
                        ExecutionState current = this.state;
-                       
+
                        if (current == CANCELING || current == CANCELED) {
                                // already taken care of, no need to cancel 
again
                                return;
                        }
-                               
+
                        // these two are the common cases where we need to send 
a cancel call
                        else if (current == RUNNING || current == DEPLOYING) {
                                // try to transition to canceling, if 
successful, send the cancel call
@@ -426,7 +461,7 @@ public class Execution implements Serializable {
                                }
                                // else: fall through the loop
                        }
-                       
+
                        else if (current == FINISHED || current == FAILED) {
                                // nothing to do any more. finished failed 
before it could be cancelled.
                                // in any case, the task is removed from the 
TaskManager already
@@ -437,10 +472,10 @@ public class Execution implements Serializable {
                        else if (current == CREATED || current == SCHEDULED) {
                                // from here, we can directly switch to 
cancelled, because no task has been deployed
                                if (transitionState(current, CANCELED)) {
-                                       
+
                                        // we skip the canceling state. set the 
timestamp, for a consistent appearance
                                        markTimestamp(CANCELING, 
getStateTimestamp(CANCELED));
-                                       
+
                                        try {
                                                
vertex.getExecutionGraph().deregisterExecution(this);
                                                if (assignedResource != null) {
@@ -745,7 +780,7 @@ public class Execution implements Serializable {
                // the actual computation on the task manager is cleaned up by 
the TaskManager that noticed the failure
 
                // we may need to loop multiple times (in the presence of 
concurrent calls) in order to
-               // atomically switch to failed 
+               // atomically switch to failed
                while (true) {
                        ExecutionState current = this.state;
 
@@ -775,7 +810,7 @@ public class Execution implements Serializable {
                                finally {
                                        vertex.executionFailed(t);
                                }
-                               
+
                                if (!isCallback && (current == RUNNING || 
current == DEPLOYING)) {
                                        if (LOG.isDebugEnabled()) {
                                                LOG.debug("Sending out cancel 
request, to remove task execution from TaskManager.");

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 20288fb..7d83ae2 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
@@ -86,6 +87,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 /**
  * The execution graph is the central data structure that coordinates the 
distributed
  * execution of a data flow. It keeps representations of each parallel task, 
each
@@ -122,7 +124,7 @@ public class ExecutionGraph implements Serializable {
 
        /** The log object used for debugging. */
        static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
-       
+
        // 
--------------------------------------------------------------------------------------------
 
        /** The lock used to secure all access to mutable fields, especially 
the tracking of progress
@@ -143,12 +145,15 @@ public class ExecutionGraph implements Serializable {
        /** The job configuration that was originally attached to the JobGraph. 
*/
        private final Configuration jobConfiguration;
 
+       /** {@code true} if all source tasks are stoppable. */
+       private boolean isStoppable = true;
+
        /** All job vertices that are part of this graph */
        private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
 
        /** All vertices, in the order in which they were created **/
        private final List<ExecutionJobVertex> verticesInCreationOrder;
-       
+
        /** All intermediate results that are part of this graph */
        private final ConcurrentHashMap<IntermediateDataSetID, 
IntermediateResult> intermediateResults;
 
@@ -204,7 +209,7 @@ public class ExecutionGraph implements Serializable {
 
        /** The number of job vertices that have reached a terminal state */
        private volatile int numFinishedJobVertices;
-       
+
        // ------ Fields that are relevant to the execution and need to be 
cleared before archiving  -------
 
        /** The scheduler to use for scheduling new tasks as they are needed */
@@ -218,7 +223,7 @@ public class ExecutionGraph implements Serializable {
        /** The classloader for the user code. Needed for calls into user code 
classes */
        @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private ClassLoader userClassLoader;
-       
+
        /** The coordinator for checkpoints, if snapshot checkpoints are 
enabled */
        @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private CheckpointCoordinator checkpointCoordinator;
@@ -277,9 +282,11 @@ public class ExecutionGraph implements Serializable {
                        List<URL> requiredClasspaths,
                        ClassLoader userClassLoader) {
 
-               if (executionContext == null || jobId == null || jobName == 
null || jobConfig == null || userClassLoader == null) {
-                       throw new NullPointerException();
-               }
+               checkNotNull(executionContext);
+               checkNotNull(jobId);
+               checkNotNull(jobName);
+               checkNotNull(jobConfig);
+               checkNotNull(userClassLoader);
 
                this.executionContext = executionContext;
 
@@ -308,7 +315,7 @@ public class ExecutionGraph implements Serializable {
        }
 
        // 
--------------------------------------------------------------------------------------------
-       //  Configuration of Data-flow wide execution settings  
+       //  Configuration of Data-flow wide execution settings
        // 
--------------------------------------------------------------------------------------------
 
        /**
@@ -338,7 +345,7 @@ public class ExecutionGraph implements Serializable {
        public boolean isArchived() {
                return isArchived;
        }
-       
+
        public void enableSnapshotCheckpointing(
                        long interval,
                        long checkpointTimeout,
@@ -365,7 +372,7 @@ public class ExecutionGraph implements Serializable {
                ExecutionVertex[] tasksToTrigger = 
collectExecutionVertices(verticesToTrigger);
                ExecutionVertex[] tasksToWaitFor = 
collectExecutionVertices(verticesToWaitFor);
                ExecutionVertex[] tasksToCommitTo = 
collectExecutionVertices(verticesToCommitTo);
-               
+
                // disable to make sure existing checkpoint coordinators are 
cleared
                disableSnaphotCheckpointing();
 
@@ -399,7 +406,7 @@ public class ExecutionGraph implements Serializable {
                                completedCheckpointStore,
                                recoveryMode,
                                checkpointStatsTracker);
-               
+
                // the periodic checkpoint scheduler is activated and 
deactivated as a result of
                // job status changes (running -> on, all other states -> off)
                registerJobStatusListener(
@@ -435,7 +442,7 @@ public class ExecutionGraph implements Serializable {
                if (state != JobStatus.CREATED) {
                        throw new IllegalStateException("Job must be in CREATED 
state");
                }
-               
+
                if (checkpointCoordinator != null) {
                        checkpointCoordinator.shutdown();
                        checkpointCoordinator = null;
@@ -485,9 +492,9 @@ public class ExecutionGraph implements Serializable {
        }
 
        // 
--------------------------------------------------------------------------------------------
-       //  Properties and Status of the Execution Graph  
+       //  Properties and Status of the Execution Graph
        // 
--------------------------------------------------------------------------------------------
-       
+
        /**
         * Returns a list of BLOB keys referring to the JAR files required to 
run this job
         * @return list of BLOB keys referring to the JAR files required to run 
this job
@@ -530,6 +537,10 @@ public class ExecutionGraph implements Serializable {
                return jobName;
        }
 
+       public boolean isStoppable() {
+               return this.isStoppable;
+       }
+
        public Configuration getJobConfiguration() {
                return jobConfiguration;
        }
@@ -558,7 +569,7 @@ public class ExecutionGraph implements Serializable {
                // we return a specific iterator that does not fail with 
concurrent modifications
                // the list is append only, so it is safe for that
                final int numElements = this.verticesInCreationOrder.size();
-               
+
                return new Iterable<ExecutionJobVertex>() {
                        @Override
                        public Iterator<ExecutionJobVertex> iterator() {
@@ -688,6 +699,10 @@ public class ExecutionGraph implements Serializable {
 
                for (JobVertex jobVertex : topologiallySorted) {
 
+                       if (jobVertex.isInputVertex() && 
!jobVertex.isStoppable()) {
+                               this.isStoppable = false;
+                       }
+
                        // create the execution job vertex and attach it to the 
graph
                        ExecutionJobVertex ejv = new ExecutionJobVertex(this, 
jobVertex, 1, timeout, createTimestamp);
                        ejv.connectToPredecessors(this.intermediateResults);
@@ -705,20 +720,20 @@ public class ExecutionGraph implements Serializable {
                                                        res.getId(), res, 
previousDataSet));
                                }
                        }
-                       
+
                        this.verticesInCreationOrder.add(ejv);
                }
        }
-       
+
        public void scheduleForExecution(Scheduler scheduler) throws 
JobException {
                if (scheduler == null) {
                        throw new IllegalArgumentException("Scheduler must not 
be null.");
                }
-               
+
                if (this.scheduler != null && this.scheduler != scheduler) {
                        throw new IllegalArgumentException("Cannot use 
different schedulers for the same job");
                }
-               
+
                if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
                        this.scheduler = scheduler;
 
@@ -754,7 +769,7 @@ public class ExecutionGraph implements Serializable {
        public void cancel() {
                while (true) {
                        JobStatus current = state;
-                       
+
                        if (current == JobStatus.RUNNING || current == 
JobStatus.CREATED) {
                                if (transitionState(current, 
JobStatus.CANCELLING)) {
                                        for (ExecutionJobVertex ejv : 
verticesInCreationOrder) {
@@ -790,6 +805,18 @@ public class ExecutionGraph implements Serializable {
                }
        }
 
+       public void stop() throws StoppingException {
+               if(this.isStoppable) {
+                       for(ExecutionVertex ev : 
this.getAllExecutionVertices()) {
+                               if(ev.getNumberOfInputs() == 0) { // send 
signal to sources only
+                                       ev.stop();
+                               }
+                       }
+               } else {
+                       throw new StoppingException("This job is not 
stoppable.");
+               }
+       }
+
        public void fail(Throwable t) {
                while (true) {
                        JobStatus current = state;
@@ -808,10 +835,10 @@ public class ExecutionGraph implements Serializable {
                                        // set the state of the job to failed
                                        transitionState(JobStatus.FAILING, 
JobStatus.FAILED, t);
                                }
-                               
+
                                return;
                        }
-                       
+
                        // no need to treat other states
                }
        }
@@ -836,15 +863,15 @@ public class ExecutionGraph implements Serializable {
                                this.currentExecutions.clear();
 
                                Collection<CoLocationGroup> colGroups = new 
HashSet<>();
-                               
+
                                for (ExecutionJobVertex jv : 
this.verticesInCreationOrder) {
-                                       
+
                                        CoLocationGroup cgroup = 
jv.getCoLocationGroup();
                                        if(cgroup != null && 
!colGroups.contains(cgroup)){
                                                cgroup.resetConstraints();
                                                colGroups.add(cgroup);
                                        }
-                                       
+
                                        jv.resetForNewExecution();
                                }
 
@@ -853,7 +880,7 @@ public class ExecutionGraph implements Serializable {
                                }
                                numFinishedJobVertices = 0;
                                transitionState(JobStatus.RESTARTING, 
JobStatus.CREATED);
-                               
+
                                // if we have checkpointed state, reload it 
into the executions
                                if (checkpointCoordinator != null) {
                                        
checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, 
false);
@@ -962,7 +989,7 @@ public class ExecutionGraph implements Serializable {
                        }
                }
        }
-       
+
        private boolean transitionState(JobStatus current, JobStatus newState) {
                return transitionState(current, newState, null);
        }
@@ -989,14 +1016,14 @@ public class ExecutionGraph implements Serializable {
                        }
 
                        numFinishedJobVertices++;
-                       
+
                        if (numFinishedJobVertices == 
verticesInCreationOrder.size()) {
-                               
+
                                // we are done, transition to the final state
                                JobStatus current;
                                while (true) {
                                        current = this.state;
-                                       
+
                                        if (current == JobStatus.RUNNING) {
                                                if (transitionState(current, 
JobStatus.FINISHED)) {
                                                        postRunCleanup();
@@ -1066,7 +1093,7 @@ public class ExecutionGraph implements Serializable {
 
        /**
         * Updates the state of one of the ExecutionVertex's Execution attempts.
-        * If the new status if "FINISHED", this also updates the 
+        * If the new status if "FINISHED", this also updates the
         * 
         * @param state The state update.
         * @return True, if the task update was properly applied, false, if the 
execution attempt was not found.
@@ -1184,7 +1211,7 @@ public class ExecutionGraph implements Serializable {
                        this.executionListenerActors.add(listener);
                }
        }
-       
+
        private void notifyJobStatusChange(JobStatus newState, Throwable error) 
{
                if (jobStatusListenerActors.size() > 0) {
                        ExecutionGraphMessages.JobStatusChanged message =
@@ -1196,7 +1223,7 @@ public class ExecutionGraph implements Serializable {
                        }
                }
        }
-       
+
        void notifyExecutionChange(JobVertexID vertexId, int subtask, 
ExecutionAttemptID executionID, ExecutionState
                                                        newExecutionState, 
Throwable error)
        {

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 165dce4..e522c8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -73,7 +73,6 @@ public class ExecutionVertex implements Serializable {
 
        private static final long serialVersionUID = 42L;
 
-       @SuppressWarnings("unused")
        private static final Logger LOG = ExecutionGraph.LOG;
 
        private static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8;
@@ -467,6 +466,10 @@ public class ExecutionVertex implements Serializable {
                this.currentExecution.cancel();
        }
 
+       public void stop() {
+               this.currentExecution.stop();
+       }
+
        public void fail(Throwable t) {
                this.currentExecution.fail(t);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index e20f737..f99d754 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -81,6 +81,7 @@ public class JobGraph implements Serializable {
 
        /** Configuration which defines which restart strategy to use for the 
job recovery */
        private RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration;
+       
 
        /** The number of seconds after which the corresponding ExecutionGraph 
is removed at the
         * job manager after it has been executed. */

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 8ebc30c..9018029 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 
@@ -38,8 +39,8 @@ public class JobVertex implements java.io.Serializable {
        private static final long serialVersionUID = 1L;
 
        private static final String DEFAULT_NAME = "(unnamed vertex)";
-       
-       
+
+
        // 
--------------------------------------------------------------------------------------------
        // Members that define the structure / topology of the graph
        // 
--------------------------------------------------------------------------------------------
@@ -62,15 +63,18 @@ public class JobVertex implements java.io.Serializable {
        /** The class of the invokable. */
        private String invokableClassName;
 
+       /** Indicates of this job vertex is stoppable or not. */
+       private boolean isStoppable = false;
+
        /** Optionally, a source of input splits */
        private InputSplitSource<?> inputSplitSource;
-       
+
        /** The name of the vertex. This will be shown in runtime logs and will 
be in the runtime environment */
        private String name;
-       
+
        /** Optionally, a sharing group that allows subtasks from different job 
vertices to run concurrently in one slot */
        private SlotSharingGroup slotSharingGroup;
-       
+
        /** The group inside which the vertex subtasks share slots */
        private CoLocationGroup coLocationGroup;
 
@@ -83,11 +87,11 @@ public class JobVertex implements java.io.Serializable {
 
        /** Optional, pretty name of the operator, to be displayed in the JSON 
plan */
        private String operatorPrettyName;
-       
+
        /** Optional, the JSON for the optimizer properties of the operator 
result,
         * to be included in the JSON plan */
        private String resultOptimizerProperties;
-       
+
        // 
--------------------------------------------------------------------------------------------
 
        /**
@@ -98,7 +102,7 @@ public class JobVertex implements java.io.Serializable {
        public JobVertex(String name) {
                this(name, null);
        }
-       
+
        /**
         * Constructs a new job vertex and assigns it with the given name.
         * 
@@ -109,9 +113,9 @@ public class JobVertex implements java.io.Serializable {
                this.name = name == null ? DEFAULT_NAME : name;
                this.id = id == null ? new JobVertexID() : id;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        /**
         * Returns the ID of this job vertex.
         * 
@@ -120,7 +124,7 @@ public class JobVertex implements java.io.Serializable {
        public JobVertexID getID() {
                return this.id;
        }
-       
+
        /**
         * Returns the name of the vertex.
         * 
@@ -129,7 +133,7 @@ public class JobVertex implements java.io.Serializable {
        public String getName() {
                return this.name;
        }
-       
+
        /**
         * Sets the name of the vertex
         * 
@@ -168,12 +172,13 @@ public class JobVertex implements java.io.Serializable {
                }
                return this.configuration;
        }
-       
+
        public void setInvokableClass(Class<? extends AbstractInvokable> 
invokable) {
                Preconditions.checkNotNull(invokable);
                this.invokableClassName = invokable.getName();
+               this.isStoppable = 
StoppableTask.class.isAssignableFrom(invokable);
        }
-       
+
        /**
         * Returns the name of the invokable class which represents the task of 
this vertex.
         * 
@@ -182,7 +187,7 @@ public class JobVertex implements java.io.Serializable {
        public String getInvokableClassName() {
                return this.invokableClassName;
        }
-       
+
        /**
         * Returns the invokable class which represents the task of this vertex
         * 
@@ -196,7 +201,7 @@ public class JobVertex implements java.io.Serializable {
                if (invokableClassName == null) {
                        return null;
                }
-               
+
                try {
                        return Class.forName(invokableClassName, true, 
cl).asSubclass(AbstractInvokable.class);
                }
@@ -207,7 +212,7 @@ public class JobVertex implements java.io.Serializable {
                        throw new RuntimeException("The user-code class is no 
subclass of " + AbstractInvokable.class.getName(), e);
                }
        }
-       
+
        /**
         * Gets the parallelism of the task.
         * 
@@ -228,7 +233,7 @@ public class JobVertex implements java.io.Serializable {
                }
                this.parallelism = parallelism;
        }
-       
+
        public InputSplitSource<?> getInputSplitSource() {
                return inputSplitSource;
        }
@@ -236,15 +241,15 @@ public class JobVertex implements java.io.Serializable {
        public void setInputSplitSource(InputSplitSource<?> inputSplitSource) {
                this.inputSplitSource = inputSplitSource;
        }
-       
+
        public List<IntermediateDataSet> getProducedDataSets() {
                return this.results;
        }
-       
+
        public List<JobEdge> getInputs() {
                return this.inputs;
        }
-       
+
        /**
         * Associates this vertex with a slot sharing group for scheduling. 
Different vertices in the same
         * slot sharing group can run one subtask each in the same slot.
@@ -255,13 +260,13 @@ public class JobVertex implements java.io.Serializable {
                if (this.slotSharingGroup != null) {
                        this.slotSharingGroup.removeVertexFromGroup(id);
                }
-               
+
                this.slotSharingGroup = grp;
                if (grp != null) {
                        grp.addVertexToGroup(id);
                }
        }
-       
+
        /**
         * Gets the slot sharing group that this vertex is associated with. 
Different vertices in the same
         * slot sharing group can run one subtask each in the same slot. If the 
vertex is not associated with
@@ -272,7 +277,7 @@ public class JobVertex implements java.io.Serializable {
        public SlotSharingGroup getSlotSharingGroup() {
                return slotSharingGroup;
        }
-       
+
        /**
         * Tells this vertex to strictly co locate its subtasks with the 
subtasks of the given vertex.
         * Strict co-location implies that the n'th subtask of this vertex will 
run on the same parallel computing
@@ -294,10 +299,10 @@ public class JobVertex implements java.io.Serializable {
                if (this.slotSharingGroup == null || this.slotSharingGroup != 
strictlyCoLocatedWith.slotSharingGroup) {
                        throw new IllegalArgumentException("Strict co-location 
requires that both vertices are in the same slot sharing group.");
                }
-               
+
                CoLocationGroup thisGroup = this.coLocationGroup;
                CoLocationGroup otherGroup = 
strictlyCoLocatedWith.coLocationGroup;
-               
+
                if (otherGroup == null) {
                        if (thisGroup == null) {
                                CoLocationGroup group = new 
CoLocationGroup(this, strictlyCoLocatedWith);
@@ -320,11 +325,11 @@ public class JobVertex implements java.io.Serializable {
                        }
                }
        }
-       
+
        public CoLocationGroup getCoLocationGroup() {
                return coLocationGroup;
        }
-       
+
        public void updateCoLocationGroup(CoLocationGroup group) {
                this.coLocationGroup = group;
        }
@@ -384,38 +389,42 @@ public class JobVertex implements java.io.Serializable {
        }
 
        // 
--------------------------------------------------------------------------------------------
-       
+
        public boolean isInputVertex() {
                return this.inputs.isEmpty();
        }
-       
+
+       public boolean isStoppable() {
+               return this.isStoppable;
+       }
+
        public boolean isOutputVertex() {
                return this.results.isEmpty();
        }
-       
+
        public boolean hasNoConnectedInputs() {
                for (JobEdge edge : inputs) {
                        if (!edge.isIdReference()) {
                                return false;
                        }
                }
-               
+
                return true;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        /**
-        * A hook that can be overwritten by sub classes to implement logic 
that is called by the 
+        * A hook that can be overwritten by sub classes to implement logic 
that is called by the
         * master when the job starts.
         * 
         * @param loader The class loader for user defined code.
         * @throws Exception The method may throw exceptions which cause the 
job to fail immediately.
         */
        public void initializeOnMaster(ClassLoader loader) throws Exception {}
-       
+
        /**
-        * A hook that can be overwritten by sub classes to implement logic 
that is called by the 
+        * A hook that can be overwritten by sub classes to implement logic 
that is called by the
         * master after the job completed.
         * 
         * @param loader The class loader for user defined code.
@@ -458,7 +467,7 @@ public class JobVertex implements java.io.Serializable {
        }
 
        // 
--------------------------------------------------------------------------------------------
-       
+
        @Override
        public String toString() {
                return this.name + " (" + this.invokableClassName + ')';

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java
new file mode 100644
index 0000000..383a0d2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobgraph.tasks;
+
+/**
+ * Implemented by tasks that can receive STOP signal.
+ */
+public interface StoppableTask {
+       /** Called on STOP signal. */
+       public void stop();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 7832720..81dc01f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -49,8 +49,10 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
+import org.apache.flink.runtime.messages.TaskMessages.FailTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
@@ -59,6 +61,7 @@ import org.apache.flink.runtime.state.StateUtils;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -220,7 +223,7 @@ public class Task implements Runnable {
        private volatile long recoveryTs;
 
        /**
-        * <p><b>IMPORTANT:</b> This constructor may not start any work that 
would need to 
+        * <p><b>IMPORTANT:</b> This constructor may not start any work that 
would need to
         * be undone in the case of a failing task deployment.</p>
         */
        public Task(TaskDeploymentDescriptor tdd,
@@ -308,7 +311,7 @@ public class Task implements Runnable {
                }
 
                invokableHasBeenCanceled = new AtomicBoolean(false);
-               
+
                // finally, create the executing thread, but do not start it
                executingThread = new Thread(TASK_THREADS_GROUP, this, 
taskNameWithSubtask);
        }
@@ -336,15 +339,15 @@ public class Task implements Runnable {
        public Configuration getJobConfiguration() {
                return jobConfiguration;
        }
-       
+
        public Configuration getTaskConfiguration() {
                return this.taskConfiguration;
        }
-       
+
        public ResultPartitionWriter[] getAllWriters() {
                return writers;
        }
-       
+
        public SingleInputGate[] getAllInputGates() {
                return inputGates;
        }
@@ -445,7 +448,7 @@ public class Task implements Runnable {
 
                try {
                        // ----------------------------
-                       //  Task Bootstrap - We periodically 
+                       //  Task Bootstrap - We periodically
                        //  check for canceling as a shortcut
                        // ----------------------------
 
@@ -636,7 +639,7 @@ public class Task implements Runnable {
                                                LOG.error("Unexpected state in 
Task during an exception: " + current);
                                                break;
                                        }
-                                       // else fall through the loop and 
+                                       // else fall through the loop and
                                }
                        }
                        catch (Throwable tt) {
@@ -655,7 +658,7 @@ public class Task implements Runnable {
                                if (dispatcher != null && 
!dispatcher.isShutdown()) {
                                        dispatcher.shutdownNow();
                                }
-                               
+
                                // free the network resources
                                network.unregisterTask(this);
 
@@ -743,10 +746,39 @@ public class Task implements Runnable {
        }
 
        // 
----------------------------------------------------------------------------------------------------------------
-       //  Canceling / Failing the task from the outside
+       //  Stopping / Canceling / Failing the task from the outside
        // 
----------------------------------------------------------------------------------------------------------------
 
        /**
+        * Stops the executing task by calling {@link StoppableTask#stop()}.
+        * <p>
+        * This method never blocks.
+        * </p>
+        * 
+        * @throws UnsupportedOperationException
+        *             if the {@link AbstractInvokable} does not implement 
{@link StoppableTask}
+        */
+       public void stopExecution() throws UnsupportedOperationException {
+               LOG.info("Attempting to stop task " + taskNameWithSubtask);
+               if(this.invokable instanceof StoppableTask) {
+                       Runnable runnable = new Runnable() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               
((StoppableTask)Task.this.invokable).stop();
+                                       } catch(RuntimeException e) {
+                                               LOG.error("Stopping task " + 
taskNameWithSubtask + " failed.", e);
+                                               taskManager.tell(new 
FailTask(executionId, e));
+                                       }
+                               }
+                       };
+                       executeAsyncCallRunnable(runnable, "Stopping source 
task " + this.taskNameWithSubtask);
+               } else {
+                       throw new UnsupportedOperationException("Stopping not 
supported by this task.");
+               }
+       }
+
+       /**
         * Cancels the task execution. If the task is already in a terminal 
state
         * (such as FINISHED, CANCELED, FAILED), or if the task is already 
canceling this does nothing.
         * Otherwise it sets the state to CANCELING, and, if the invokable code 
is running,
@@ -853,7 +885,7 @@ public class Task implements Runnable {
         * {@link org.apache.flink.runtime.jobgraph.tasks.StatefulTask}.
         * 
         * @param checkpointID The ID identifying the checkpoint.
-        * @param checkpointTimestamp The timestamp associated with the 
checkpoint.   
+        * @param checkpointTimestamp The timestamp associated with the 
checkpoint.
         */
        public void triggerCheckpointBarrier(final long checkpointID, final 
long checkpointTimestamp) {
                AbstractInvokable invokable = this.invokable;
@@ -861,7 +893,7 @@ public class Task implements Runnable {
                if (executionState == ExecutionState.RUNNING && invokable != 
null) {
                        if (invokable instanceof StatefulTask) {
 
-                               // build a local closure 
+                               // build a local closure
                                final StatefulTask<?> statefulTask = 
(StatefulTask<?>) invokable;
                                final String taskName = taskNameWithSubtask;
 
@@ -895,14 +927,14 @@ public class Task implements Runnable {
                        LOG.debug("Ignoring request to trigger a checkpoint for 
non-running task.");
                }
        }
-       
+
        public void notifyCheckpointComplete(final long checkpointID) {
                AbstractInvokable invokable = this.invokable;
 
                if (executionState == ExecutionState.RUNNING && invokable != 
null) {
                        if (invokable instanceof StatefulTask) {
 
-                               // build a local closure 
+                               // build a local closure
                                final StatefulTask<?> statefulTask = 
(StatefulTask<?>) invokable;
                                final String taskName = taskNameWithSubtask;
 
@@ -1069,7 +1101,7 @@ public class Task implements Runnable {
                                        logger.error("Error while canceling the 
task", t);
                                }
 
-                               // interrupt the running thread initially 
+                               // interrupt the running thread initially
                                executer.interrupt();
                                try {
                                        executer.join(30000);

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index bd18160..6a22949 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -466,6 +466,33 @@ class JobManager(
           )
       }
 
+    case StopJob(jobID) =>
+      log.info(s"Trying to stop job with ID $jobID.")
+
+      currentJobs.get(jobID) match {
+        case Some((executionGraph, _)) =>
+          try {
+            if (!executionGraph.isStoppable()) {
+              sender ! StoppingFailure(jobID, new IllegalStateException(s"Job 
with ID $jobID" +
+                " is not stoppable."))
+            } else if(executionGraph.getState() != JobStatus.CREATED
+                && executionGraph.getState() != JobStatus.RUNNING
+                && executionGraph.getState() != JobStatus.RESTARTING) {
+              sender ! StoppingFailure(jobID, new IllegalStateException(s"Job 
with ID $jobID" +
+                "is not in state CREATED, RUNNING, or RESTARTING."))
+            } else {
+              executionGraph.stop()
+              sender ! StoppingSuccess(jobID)
+            }
+          } catch {
+            case t: Throwable =>  sender ! StoppingFailure(jobID, t)
+          }
+        case None =>
+          log.info(s"No job found with ID $jobID.")
+          sender ! StoppingFailure(jobID, new IllegalArgumentException("No job 
found with " +
+            s"ID $jobID."))
+      }
+
     case UpdateTaskExecutionState(taskExecutionState) =>
       if (taskExecutionState == null) {
         sender ! decorateMessage(false)

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 267e231..c949b4c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -96,6 +96,14 @@ object JobManagerMessages {
   case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID
 
   /**
+   * Stops a (streaming) job with the given [[jobID]] at the JobManager. The 
result of
+   * stopping is sent back to the sender as a [[StoppingResponse]] message.
+   *
+   * @param jobID
+   */
+  case class StopJob(jobID: JobID) extends RequiresLeaderSessionID
+
+  /**
    * Requesting next input split for the
    * [[org.apache.flink.runtime.executiongraph.ExecutionJobVertex]]
    * of the job specified by [[jobID]]. The next input split is sent back to 
the sender as a
@@ -238,6 +246,23 @@ object JobManagerMessages {
    */
   case class CancellationFailure(jobID: JobID, cause: Throwable) extends 
CancellationResponse
 
+  sealed trait StoppingResponse {
+    def jobID: JobID
+  }
+
+  /**
+   * Denotes a successful (streaming) job stopping
+   * @param jobID
+   */
+  case class StoppingSuccess(jobID: JobID) extends StoppingResponse
+
+  /**
+   * Denotes a failed (streaming) job stopping
+   * @param jobID
+   * @param cause
+   */
+  case class StoppingFailure(jobID: JobID, cause: Throwable) extends 
StoppingResponse
+
   /**
    * Requests all currently running jobs from the job manager. This message 
triggers a
    * [[RunningJobs]] response.

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
index a80ca99..94762ee 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
@@ -59,6 +59,15 @@ object TaskMessages {
     extends TaskMessage with RequiresLeaderSessionID
 
   /**
+   * Stops the task associated with [[attemptID]]. The result is sent back to 
the sender as a
+   * [[TaskOperationResult]] message.
+   *
+   * @param attemptID The task's execution attempt ID.
+   */
+  case class StopTask(attemptID: ExecutionAttemptID)
+    extends TaskMessage with RequiresLeaderSessionID
+
+  /**
    * Triggers a fail of specified task from the outside (as opposed to the 
task throwing
    * an exception itself) with the given exception as the cause.
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 3b68878..12bc426 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -419,6 +419,28 @@ class TaskManager(
             log.debug(s"Cannot find task to fail for execution $executionID)")
           }
 
+        // stops a task
+        case StopTask(executionID) =>
+          val task = runningTasks.get(executionID)
+          if (task != null) {
+            try {
+              task.stopExecution()
+              sender ! decorateMessage(new TaskOperationResult(executionID, 
true))
+            } catch {
+              case t: Throwable =>
+                        sender ! new TaskOperationResult(executionID, false,
+                            t.getClass().getSimpleName() + ": " + 
t.getLocalizedMessage())
+            }
+          } else {
+            log.debug(s"Cannot find task to stop for execution 
${executionID})")
+            sender ! decorateMessage(
+              new TaskOperationResult(
+               executionID,
+               false,
+               "No task with that execution ID was found.")
+            )
+          }
+ 
         // cancels a task
         case CancelTask(executionID) =>
           val task = runningTasks.get(executionID)

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index 2ca51db..ee372dd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -102,12 +102,12 @@ public class ExecutionGraphConstructionTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
                ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               jobId,
-                               jobName,
-                               cfg,
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                       TestingUtils.defaultExecutionContext(), 
+                       jobId, 
+                       jobName, 
+                       cfg,
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -146,12 +146,12 @@ public class ExecutionGraphConstructionTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
                ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               jobId,
-                               jobName,
-                               cfg,
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                       TestingUtils.defaultExecutionContext(), 
+                       jobId, 
+                       jobName, 
+                       cfg, 
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -213,12 +213,12 @@ public class ExecutionGraphConstructionTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
                ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               jobId,
-                               jobName,
-                               cfg,
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                       TestingUtils.defaultExecutionContext(), 
+                       jobId, 
+                       jobName, 
+                       cfg, 
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -467,12 +467,12 @@ public class ExecutionGraphConstructionTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1));
 
                ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               jobId,
-                               jobName,
-                               cfg,
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                       TestingUtils.defaultExecutionContext(), 
+                       jobId, 
+                       jobName, 
+                       cfg, 
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -523,12 +523,12 @@ public class ExecutionGraphConstructionTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
 
                ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               jobId,
-                               jobName,
-                               cfg,
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                       TestingUtils.defaultExecutionContext(), 
+                       jobId, 
+                       jobName, 
+                       cfg, 
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                        fail("Attached wrong jobgraph");
@@ -584,12 +584,12 @@ public class ExecutionGraphConstructionTest {
                        List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
                        ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       jobId,
-                                       jobName,
-                                       cfg,
-                                       AkkaUtils.getDefaultTimeout(),
-                                       new NoRestartStrategy());
+                               TestingUtils.defaultExecutionContext(), 
+                               jobId, 
+                               jobName, 
+                               cfg, 
+                               AkkaUtils.getDefaultTimeout(),
+                               new NoRestartStrategy());
                        try {
                                eg.attachJobGraph(ordered);
                        }
@@ -629,12 +629,12 @@ public class ExecutionGraphConstructionTest {
                        List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
                        ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       jobId,
-                                       jobName,
-                                       cfg,
-                                       AkkaUtils.getDefaultTimeout(),
-                                       new NoRestartStrategy());
+                               TestingUtils.defaultExecutionContext(), 
+                               jobId, 
+                               jobName,
+                               cfg, 
+                               AkkaUtils.getDefaultTimeout(),
+                               new NoRestartStrategy());
 
                        try {
                                eg.attachJobGraph(ordered);
@@ -700,12 +700,13 @@ public class ExecutionGraphConstructionTest {
                        JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, 
v4, v5, v6, v7, v8);
                        
                        ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       jobId,
-                                       jobName,
-                                       cfg,
-                                       AkkaUtils.getDefaultTimeout(),
-                                       new NoRestartStrategy());
+                               TestingUtils.defaultExecutionContext(), 
+                               jobId, 
+                               jobName, 
+                               cfg, 
+                               AkkaUtils.getDefaultTimeout(),
+                               new NoRestartStrategy());
+                       
                        
eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
                        
                        // check the v1 / v2 co location hints ( assumes 
parallelism(v1) >= parallelism(v2) )

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 9221bda..6362732 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -80,12 +80,12 @@ public class ExecutionGraphDeploymentTest {
                        v4.connectNewDataSetAsInput(v2, 
DistributionPattern.ALL_TO_ALL);
 
                        ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       jobId,
-                                       "some job",
-                                       new Configuration(),
-                                       AkkaUtils.getDefaultTimeout(),
-                                       new NoRestartStrategy());
+                               TestingUtils.defaultExecutionContext(), 
+                               jobId, 
+                               "some job", 
+                               new Configuration(), 
+                               AkkaUtils.getDefaultTimeout(),
+                               new NoRestartStrategy());
 
                        List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
 
@@ -283,12 +283,13 @@ public class ExecutionGraphDeploymentTest {
 
                // execution graph that executes actions synchronously
                ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.directExecutionContext(),
-                               jobId,
-                               "some job",
-                               new Configuration(),
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                       TestingUtils.directExecutionContext(), 
+                       jobId, 
+                       "some job", 
+                       new Configuration(),
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
+               
                eg.setQueuedSchedulingAllowed(false);
 
                List<JobVertex> ordered = Arrays.asList(v1, v2);
@@ -328,4 +329,4 @@ public class ExecutionGraphDeploymentTest {
                        throw new Exception();
                }
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
new file mode 100644
index 0000000..3712861
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StoppingException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionGraph.class)
+public class ExecutionGraphSignalsTest {
+       private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
+       private int[] dop = new int[] { 5, 7, 2, 11, 4 };
+       private ExecutionVertex[][] mockEV = new 
ExecutionVertex[mockEJV.length][];
+       private ExecutionGraph eg;
+       private Field f;
+
+       @Before
+       public void prepare() throws Exception {
+               final JobID jobId = new JobID();
+               final String jobName = "Test Job Sample Name";
+               final Configuration cfg = new Configuration();
+
+
+               assert (mockEJV.length == 5);
+               JobVertex v1 = new JobVertex("vertex1");
+               JobVertex v2 = new JobVertex("vertex2");
+               JobVertex v3 = new JobVertex("vertex3");
+               JobVertex v4 = new JobVertex("vertex4");
+               JobVertex v5 = new JobVertex("vertex5");
+
+               for(int i = 0; i < mockEJV.length; ++i) {
+                       mockEJV[i] = mock(ExecutionJobVertex.class);
+
+                       this.mockEV[i] = new ExecutionVertex[dop[i]];
+                       for (int j = 0; j < dop[i]; ++j) {
+                               this.mockEV[i][j] = mock(ExecutionVertex.class);
+                       }
+
+                       when(mockEJV[i].getProducedDataSets()).thenReturn(new 
IntermediateResult[0]);
+                       
when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
+               }
+
+               PowerMockito
+                       .whenNew(ExecutionJobVertex.class)
+                       .withArguments(any(ExecutionGraph.class), same(v1), 
any(Integer.class).intValue(),
+                               any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[0]);
+               PowerMockito
+                       .whenNew(ExecutionJobVertex.class)
+                       .withArguments(any(ExecutionGraph.class), same(v2), 
any(Integer.class).intValue(),
+                               any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[1]);
+               PowerMockito
+                       .whenNew(ExecutionJobVertex.class)
+                       .withArguments(any(ExecutionGraph.class), same(v3), 
any(Integer.class).intValue(),
+                               any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[2]);
+               PowerMockito
+                       .whenNew(ExecutionJobVertex.class)
+                       .withArguments(any(ExecutionGraph.class), same(v4), 
any(Integer.class).intValue(),
+                               any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[3]);
+               PowerMockito
+                       .whenNew(ExecutionJobVertex.class)
+                       .withArguments(any(ExecutionGraph.class), same(v5), 
any(Integer.class).intValue(),
+                               any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[4]);
+
+               v1.setParallelism(dop[0]);
+               v2.setParallelism(dop[1]);
+               v3.setParallelism(dop[2]);
+               v4.setParallelism(dop[3]);
+               v5.setParallelism(dop[4]);
+
+               v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
+               mockNumberOfInputs(1,0);
+               v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
+               mockNumberOfInputs(3,1);
+               v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+               mockNumberOfInputs(3,2);
+               v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
+               mockNumberOfInputs(4,3);
+               v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+               mockNumberOfInputs(4,2);
+
+               List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
+
+               eg = new ExecutionGraph(TestingUtils.defaultExecutionContext(), 
jobId, jobName,
+                               cfg, AkkaUtils.getDefaultTimeout());
+               eg.attachJobGraph(ordered);
+
+               f = eg.getClass().getDeclaredField("state");
+               f.setAccessible(true);
+       }
+
+       private void mockNumberOfInputs(int nodeIndex, int predecessorIndex) {
+               for(int j = 0; j < dop[nodeIndex]; ++j) {
+                       
when(mockEV[nodeIndex][j].getNumberOfInputs()).thenReturn(dop[predecessorIndex]);
+               }
+       }
+
+       @Test
+       public void testCancel() throws Exception {
+               Assert.assertEquals(JobStatus.CREATED, eg.getState());
+               eg.cancel();
+
+               verifyCancel(1);
+
+               f.set(eg, JobStatus.RUNNING);
+               eg.cancel();
+
+               verifyCancel(2);
+               Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+
+               eg.cancel();
+
+               verifyCancel(2);
+               Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+
+               f.set(eg, JobStatus.CANCELED);
+               eg.cancel();
+
+               verifyCancel(2);
+               Assert.assertEquals(JobStatus.CANCELED, eg.getState());
+
+               f.set(eg, JobStatus.FAILED);
+               eg.cancel();
+
+               verifyCancel(2);
+               Assert.assertEquals(JobStatus.FAILED, eg.getState());
+
+               f.set(eg, JobStatus.FAILING);
+               eg.cancel();
+
+               verifyCancel(2);
+               Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+
+               f.set(eg, JobStatus.FINISHED);
+               eg.cancel();
+
+               verifyCancel(2);
+               Assert.assertEquals(JobStatus.FINISHED, eg.getState());
+
+               f.set(eg, JobStatus.RESTARTING);
+               eg.cancel();
+
+               verifyCancel(2);
+               Assert.assertEquals(JobStatus.CANCELED, eg.getState());
+       }
+
+       private void verifyCancel(int times) {
+               for (int i = 0; i < mockEJV.length; ++i) {
+                       verify(mockEJV[i], times(times)).cancel();
+               }
+
+       }
+
+       // test that all source tasks receive STOP signal
+       // test that all non-source tasks do not receive STOP signal
+       @Test
+       public void testStop() throws Exception {
+               Field f = eg.getClass().getDeclaredField("isStoppable");
+               f.setAccessible(true);
+               f.set(eg, true);
+
+               eg.stop();
+
+               for (int i : new int[]{0,2}) {
+                       for (int j = 0; j < mockEV[i].length; ++j) {
+                               verify(mockEV[i][j], times(1)).stop();
+                       }
+               }
+
+               for (int i : new int[]{1,3,4}) {
+                       for (int j = 0; j < mockEV[i].length; ++j) {
+                               verify(mockEV[i][j], times(0)).stop();
+                       }
+               }
+       }
+
+       // STOP only supported if all sources are stoppable 
+       @Test(expected = StoppingException.class)
+       public void testStopBatching() throws StoppingException {
+               eg.stop();
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 35ab8ac..ca07fbf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -170,12 +170,12 @@ public class ExecutionGraphTestUtils {
                ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 
                ExecutionGraph graph = new ExecutionGraph(
-                               executionContext,
-                               new JobID(),
-                               "test job",
-                               new Configuration(),
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                       executionContext, 
+                       new JobID(), 
+                       "test job", 
+                       new Configuration(), 
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
 
                ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 
1,
                                AkkaUtils.getDefaultTimeout()));

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 9c520b6..5dd5ba6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -1,87 +1,86 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-import java.util.Arrays;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.Test;
-
-public class ExecutionStateProgressTest {
-
-       @Test
-       public void testAccumulatedStateFinished() {
-               try {
-                       final JobID jid = new JobID();
-                       final JobVertexID vid = new JobVertexID();
-
-                       JobVertex ajv = new JobVertex("TestVertex", vid);
-                       ajv.setParallelism(3);
-                       
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
-
-                       ExecutionGraph graph = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       jid,
-                                       "test job",
-                                       new Configuration(),
-                                       AkkaUtils.getDefaultTimeout(),
-                                       new NoRestartStrategy());
-
-                       graph.attachJobGraph(Arrays.asList(ajv));
-
-                       setGraphStatus(graph, JobStatus.RUNNING);
-
-                       ExecutionJobVertex ejv = graph.getJobVertex(vid);
-
-                       // mock resources and mock taskmanager
-                       for (ExecutionVertex ee : ejv.getTaskVertices()) {
-                               SimpleSlot slot = getInstance(
-                                               new SimpleActorGateway(
-                                                               
TestingUtils.defaultExecutionContext())
-                               ).allocateSimpleSlot(jid);
-                               ee.deployToSlot(slot);
-                       }
-
-                       // finish all
-                       for (ExecutionVertex ee : ejv.getTaskVertices()) {
-                               ee.executionFinished();
-                       }
-
-                       assertTrue(ejv.isInFinalState());
-                       assertEquals(JobStatus.FINISHED, graph.getState());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Test;
+
+public class ExecutionStateProgressTest {
+
+       @Test
+       public void testAccumulatedStateFinished() {
+               try {
+                       final JobID jid = new JobID();
+                       final JobVertexID vid = new JobVertexID();
+
+                       JobVertex ajv = new JobVertex("TestVertex", vid);
+                       ajv.setParallelism(3);
+                       
ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
+
+                       ExecutionGraph graph = new ExecutionGraph(
+                               TestingUtils.defaultExecutionContext(), 
+                               jid, 
+                               "test job", 
+                               new Configuration(), 
+                               AkkaUtils.getDefaultTimeout(),
+                               new NoRestartStrategy());
+                       graph.attachJobGraph(Arrays.asList(ajv));
+
+                       setGraphStatus(graph, JobStatus.RUNNING);
+
+                       ExecutionJobVertex ejv = graph.getJobVertex(vid);
+
+                       // mock resources and mock taskmanager
+                       for (ExecutionVertex ee : ejv.getTaskVertices()) {
+                               SimpleSlot slot = getInstance(
+                                               new SimpleActorGateway(
+                                                               
TestingUtils.defaultExecutionContext())
+                               ).allocateSimpleSlot(jid);
+                               ee.deployToSlot(slot);
+                       }
+
+                       // finish all
+                       for (ExecutionVertex ee : ejv.getTaskVertices()) {
+                               ee.executionFinished();
+                       }
+
+                       assertTrue(ejv.isInFinalState());
+                       assertEquals(JobStatus.FINISHED, graph.getState());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
new file mode 100644
index 0000000..ab29e5a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import scala.concurrent.ExecutionContext;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionVertex.class)
+public class ExecutionVertexStopTest extends TestLogger {
+
+       private static ActorSystem system;
+
+       private static boolean receivedStopSignal;
+
+       @AfterClass
+       public static void teardown(){
+               if(system != null) {
+                       JavaTestKit.shutdownActorSystem(system);
+                       system = null;
+               }
+       }
+
+       @Test
+       public void testStop() throws Exception {
+               final JobVertexID jid = new JobVertexID();
+               final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
+               Execution executionMock = mock(Execution.class);
+               
whenNew(Execution.class).withAnyArguments().thenReturn(executionMock);
+
+               final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new 
IntermediateResult[0],
+                               AkkaUtils.getDefaultTimeout());
+
+               vertex.stop();
+
+               verify(executionMock).stop();
+       }
+
+       @Test
+       public void testStopRpc() throws Exception {
+               final JobVertexID jid = new JobVertexID();
+               final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
+               final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new 
IntermediateResult[0],
+                               AkkaUtils.getDefaultTimeout());
+               final ExecutionAttemptID execId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+
+               setVertexState(vertex, ExecutionState.SCHEDULED);
+               assertEquals(ExecutionState.SCHEDULED, 
vertex.getExecutionState());
+
+               final ActorGateway gateway = new StopSequenceInstanceGateway(
+                               TestingUtils.defaultExecutionContext(), new 
TaskOperationResult(execId, true));
+
+               Instance instance = getInstance(gateway);
+               SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
+
+               vertex.deployToSlot(slot);
+
+               receivedStopSignal = false;
+               vertex.stop();
+               assertTrue(receivedStopSignal);
+       }
+
+       public static class StopSequenceInstanceGateway extends 
BaseTestingActorGateway {
+               private static final long serialVersionUID = 
7611571264006653627L;
+
+               private final TaskOperationResult result;
+
+               public StopSequenceInstanceGateway(ExecutionContext 
executionContext, TaskOperationResult result) {
+                       super(executionContext);
+                       this.result = result;
+               }
+
+               @Override
+               public Object handleMessage(Object message) throws Exception {
+                       Object result = null;
+                       if (message instanceof TaskMessages.SubmitTask) {
+                               result = Messages.getAcknowledge();
+                       } else if (message instanceof TaskMessages.StopTask) {
+                               result = this.result;
+                               receivedStopSignal = true;
+                       }
+
+                       return result;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index e0da2c9..5f9717f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -267,12 +267,12 @@ public class LocalInputSplitsTest {
                        JobGraph jobGraph = new JobGraph("test job", vertex);
                        
                        ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       jobGraph.getJobID(),
-                                       jobGraph.getName(),
-                                       jobGraph.getJobConfiguration(),
-                                       TIMEOUT,
-                                       new NoRestartStrategy());
+                               TestingUtils.defaultExecutionContext(), 
+                               jobGraph.getJobID(),
+                               jobGraph.getName(),  
+                               jobGraph.getJobConfiguration()
+                               TIMEOUT,
+                               new NoRestartStrategy());
                        
                        
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
                        eg.setQueuedSchedulingAllowed(false);
@@ -331,12 +331,13 @@ public class LocalInputSplitsTest {
                JobGraph jobGraph = new JobGraph("test job", vertex);
                
                ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               jobGraph.getJobID(),
-                               jobGraph.getName(),
-                               jobGraph.getJobConfiguration(),
-                               TIMEOUT,
-                               new NoRestartStrategy());
+                       TestingUtils.defaultExecutionContext(),
+                       jobGraph.getJobID(),
+                       jobGraph.getName(),  
+                       jobGraph.getJobConfiguration()
+                       TIMEOUT,
+                       new NoRestartStrategy());
+               
                eg.setQueuedSchedulingAllowed(false);
                
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index eda9115..c1afe04 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -59,12 +59,12 @@ public class PointwisePatternTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
                ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               jobId,
-                               jobName,
-                               cfg,
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                       TestingUtils.defaultExecutionContext(), 
+                       jobId, 
+                       jobName, 
+                       cfg,
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -100,12 +100,12 @@ public class PointwisePatternTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
                ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               jobId,
-                               jobName,
-                               cfg,
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                       TestingUtils.defaultExecutionContext(), 
+                       jobId, 
+                       jobName, 
+                       cfg, 
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -142,12 +142,12 @@ public class PointwisePatternTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
                ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               jobId,
-                               jobName,
-                               cfg,
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                       TestingUtils.defaultExecutionContext(), 
+                       jobId, 
+                       jobName, 
+                       cfg, 
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -185,12 +185,12 @@ public class PointwisePatternTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
                ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               jobId,
-                               jobName,
-                               cfg,
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                       TestingUtils.defaultExecutionContext(), 
+                       jobId, 
+                       jobName,
+                       cfg, 
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -226,12 +226,12 @@ public class PointwisePatternTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
                ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               jobId,
-                               jobName,
-                               cfg,
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                       TestingUtils.defaultExecutionContext(), 
+                       jobId, 
+                       jobName, 
+                       cfg, 
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -287,12 +287,12 @@ public class PointwisePatternTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
                ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               jobId,
-                               jobName,
-                               cfg,
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                       TestingUtils.defaultExecutionContext(), 
+                       jobId, 
+                       jobName, 
+                       cfg, 
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }
@@ -339,12 +339,12 @@ public class PointwisePatternTest {
                List<JobVertex> ordered = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
                ExecutionGraph eg = new ExecutionGraph(
-                               TestingUtils.defaultExecutionContext(),
-                               jobId,
-                               jobName,
-                               cfg,
-                               AkkaUtils.getDefaultTimeout(),
-                               new NoRestartStrategy());
+                       TestingUtils.defaultExecutionContext(), 
+                       jobId, 
+                       jobName, 
+                       cfg, 
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
                try {
                        eg.attachJobGraph(ordered);
                }

Reply via email to