http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/web/partials/jobs/job.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html index ab61ee7..c7dc0bc 100644 --- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html @@ -33,6 +33,7 @@ limitations under the License. {{ job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></div> <div ng-if="job.duration > -1" title="{{job.duration | humanizeDuration:false}}" class="navbar-info last first">{{job.duration | humanizeDuration:true}}</div> <div ng-if="job.state=='RUNNING' || job.state=='CREATED' || job.state=='RESTARTING'" class="navbar-info last first"><span ng-click="cancelJob($event)" class="navbar-info-button btn btn-default">Cancel</span></div> + <div ng-if="job.isStoppable && (job.state=='CREATED' || job.state=='RUNNING' || job.state=='RESTARTING')" class="navbar-info last first"><span ng-click="stopJob($event)" class="navbar-info-button btn btn-default">Stop</span></div> </nav> <nav ng-if="job" class="navbar navbar-default navbar-fixed-top navbar-main-additional"> <ul class="nav nav-tabs">
http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java new file mode 100644 index 0000000..6bb71ce --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime; + +/** + * Indicates that a job is not stoppable. + */ +public class StoppingException extends Exception { + + private static final long serialVersionUID = -721315728140810694L; + + public StoppingException(String msg) { + super(msg); + } + + public StoppingException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index db037bb..bc75664 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -77,6 +77,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.RUNNING; import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED; import static org.apache.flink.runtime.messages.TaskMessages.CancelTask; import static org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions; +import static org.apache.flink.runtime.messages.TaskMessages.StopTask; import static org.apache.flink.runtime.messages.TaskMessages.SubmitTask; import static org.apache.flink.runtime.messages.TaskMessages.UpdatePartitionInfo; import static org.apache.flink.runtime.messages.TaskMessages.UpdateTaskSinglePartitionInfo; @@ -106,11 +107,13 @@ public class Execution implements Serializable { private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state"); - + private static final Logger LOG = ExecutionGraph.LOG; - + private static final int NUM_CANCEL_CALL_TRIES = 3; + private static final int NUM_STOP_CALL_TRIES = 3; + // -------------------------------------------------------------------------------------------- private final ExecutionVertex vertex; @@ -126,13 +129,13 @@ public class Execution implements Serializable { private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors; private volatile ExecutionState state = CREATED; - + private volatile SimpleSlot assignedResource; // once assigned, never changes until the execution is archived - + private volatile Throwable failureCause; // once assigned, never changes - + private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution - + private SerializedValue<StateHandle<?>> operatorState; private long recoveryTimestamp; @@ -162,7 +165,7 @@ public class Execution implements Serializable { this.vertex = checkNotNull(vertex); this.attemptId = new ExecutionAttemptID(); - + this.attemptNumber = attemptNumber; this.stateTimestamps = new long[ExecutionState.values().length]; @@ -172,7 +175,7 @@ public class Execution implements Serializable { this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor>(); } - + // -------------------------------------------------------------------------------------------- // Properties // -------------------------------------------------------------------------------------------- @@ -200,23 +203,23 @@ public class Execution implements Serializable { public InstanceConnectionInfo getAssignedResourceLocation() { return assignedResourceLocation; } - + public Throwable getFailureCause() { return failureCause; } - + public long[] getStateTimestamps() { return stateTimestamps; } - + public long getStateTimestamp(ExecutionState state) { return this.stateTimestamps[state.ordinal()]; } - + public boolean isFinished() { return state == FINISHED || state == FAILED || state == CANCELED; } - + /** * This method cleans fields that are irrelevant for the archived execution attempt. */ @@ -231,7 +234,7 @@ public class Execution implements Serializable { partialInputChannelDeploymentDescriptors.clear(); partialInputChannelDeploymentDescriptors = null; } - + public void setInitialState(SerializedValue<StateHandle<?>> initialState, long recoveryTimestamp) { if (state != ExecutionState.CREATED) { throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED"); @@ -239,11 +242,11 @@ public class Execution implements Serializable { this.operatorState = initialState; this.recoveryTimestamp = recoveryTimestamp; } - + // -------------------------------------------------------------------------------------------- // Actions // -------------------------------------------------------------------------------------------- - + /** * NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs * to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any @@ -355,14 +358,14 @@ public class Execution implements Serializable { slot.releaseSlot(); return; } - + if (LOG.isInfoEnabled()) { LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getSimpleName(), attemptNumber, slot.getInstance().getInstanceConnectionInfo().getHostname())); } final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot, operatorState, recoveryTimestamp, attemptNumber); - + // register this execution at the execution graph, to receive call backs vertex.getExecutionGraph().registerExecution(this); @@ -378,10 +381,10 @@ public class Execution implements Serializable { if (failure != null) { if (failure instanceof TimeoutException) { String taskname = deployment.getTaskInfo().getTaskNameWithSubtasks() + " (" + attemptId + ')'; - + markFailed(new Exception( "Cannot deploy task " + taskname + " - TaskManager (" + instance - + ") not responding after a timeout of " + timeout, failure)); + + ") not responding after a timeout of " + timeout, failure)); } else { markFailed(failure); @@ -402,21 +405,53 @@ public class Execution implements Serializable { } } + /** + * Sends stop RPC call. + */ + public void stop() { + final SimpleSlot slot = this.assignedResource; + + if (slot != null) { + final ActorGateway gateway = slot.getInstance().getActorGateway(); + + Future<Object> stopResult = gateway.retry( + new StopTask(attemptId), + NUM_STOP_CALL_TRIES, + timeout, + executionContext); + + stopResult.onComplete(new OnComplete<Object>() { + @Override + public void onComplete(Throwable failure, Object success) throws Throwable { + if (failure != null) { + fail(new Exception("Task could not be stopped.", failure)); + } else { + TaskOperationResult result = (TaskOperationResult) success; + if (!result.success()) { + LOG.info("Stopping task was not successful. Description: {}", + result.description()); + } + } + } + }, executionContext); + } + } + public void cancel() { // depending on the previous state, we go directly to cancelled (no cancel call necessary) // -- or to canceling (cancel call needs to be sent to the task manager) - + // because of several possibly previous states, we need to again loop until we make a // successful atomic state transition while (true) { - + ExecutionState current = this.state; - + if (current == CANCELING || current == CANCELED) { // already taken care of, no need to cancel again return; } - + // these two are the common cases where we need to send a cancel call else if (current == RUNNING || current == DEPLOYING) { // try to transition to canceling, if successful, send the cancel call @@ -426,7 +461,7 @@ public class Execution implements Serializable { } // else: fall through the loop } - + else if (current == FINISHED || current == FAILED) { // nothing to do any more. finished failed before it could be cancelled. // in any case, the task is removed from the TaskManager already @@ -437,10 +472,10 @@ public class Execution implements Serializable { else if (current == CREATED || current == SCHEDULED) { // from here, we can directly switch to cancelled, because no task has been deployed if (transitionState(current, CANCELED)) { - + // we skip the canceling state. set the timestamp, for a consistent appearance markTimestamp(CANCELING, getStateTimestamp(CANCELED)); - + try { vertex.getExecutionGraph().deregisterExecution(this); if (assignedResource != null) { @@ -745,7 +780,7 @@ public class Execution implements Serializable { // the actual computation on the task manager is cleaned up by the TaskManager that noticed the failure // we may need to loop multiple times (in the presence of concurrent calls) in order to - // atomically switch to failed + // atomically switch to failed while (true) { ExecutionState current = this.state; @@ -775,7 +810,7 @@ public class Execution implements Serializable { finally { vertex.executionFailed(t); } - + if (!isCallback && (current == RUNNING || current == DEPLOYING)) { if (LOG.isDebugEnabled()) { LOG.debug("Sending out cancel request, to remove task execution from TaskManager."); http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 20288fb..7d83ae2 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.StoppingException; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; @@ -86,6 +87,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import static com.google.common.base.Preconditions.checkNotNull; /** * The execution graph is the central data structure that coordinates the distributed * execution of a data flow. It keeps representations of each parallel task, each @@ -122,7 +124,7 @@ public class ExecutionGraph implements Serializable { /** The log object used for debugging. */ static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class); - + // -------------------------------------------------------------------------------------------- /** The lock used to secure all access to mutable fields, especially the tracking of progress @@ -143,12 +145,15 @@ public class ExecutionGraph implements Serializable { /** The job configuration that was originally attached to the JobGraph. */ private final Configuration jobConfiguration; + /** {@code true} if all source tasks are stoppable. */ + private boolean isStoppable = true; + /** All job vertices that are part of this graph */ private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks; /** All vertices, in the order in which they were created **/ private final List<ExecutionJobVertex> verticesInCreationOrder; - + /** All intermediate results that are part of this graph */ private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults; @@ -204,7 +209,7 @@ public class ExecutionGraph implements Serializable { /** The number of job vertices that have reached a terminal state */ private volatile int numFinishedJobVertices; - + // ------ Fields that are relevant to the execution and need to be cleared before archiving ------- /** The scheduler to use for scheduling new tasks as they are needed */ @@ -218,7 +223,7 @@ public class ExecutionGraph implements Serializable { /** The classloader for the user code. Needed for calls into user code classes */ @SuppressWarnings("NonSerializableFieldInSerializableClass") private ClassLoader userClassLoader; - + /** The coordinator for checkpoints, if snapshot checkpoints are enabled */ @SuppressWarnings("NonSerializableFieldInSerializableClass") private CheckpointCoordinator checkpointCoordinator; @@ -277,9 +282,11 @@ public class ExecutionGraph implements Serializable { List<URL> requiredClasspaths, ClassLoader userClassLoader) { - if (executionContext == null || jobId == null || jobName == null || jobConfig == null || userClassLoader == null) { - throw new NullPointerException(); - } + checkNotNull(executionContext); + checkNotNull(jobId); + checkNotNull(jobName); + checkNotNull(jobConfig); + checkNotNull(userClassLoader); this.executionContext = executionContext; @@ -308,7 +315,7 @@ public class ExecutionGraph implements Serializable { } // -------------------------------------------------------------------------------------------- - // Configuration of Data-flow wide execution settings + // Configuration of Data-flow wide execution settings // -------------------------------------------------------------------------------------------- /** @@ -338,7 +345,7 @@ public class ExecutionGraph implements Serializable { public boolean isArchived() { return isArchived; } - + public void enableSnapshotCheckpointing( long interval, long checkpointTimeout, @@ -365,7 +372,7 @@ public class ExecutionGraph implements Serializable { ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger); ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor); ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo); - + // disable to make sure existing checkpoint coordinators are cleared disableSnaphotCheckpointing(); @@ -399,7 +406,7 @@ public class ExecutionGraph implements Serializable { completedCheckpointStore, recoveryMode, checkpointStatsTracker); - + // the periodic checkpoint scheduler is activated and deactivated as a result of // job status changes (running -> on, all other states -> off) registerJobStatusListener( @@ -435,7 +442,7 @@ public class ExecutionGraph implements Serializable { if (state != JobStatus.CREATED) { throw new IllegalStateException("Job must be in CREATED state"); } - + if (checkpointCoordinator != null) { checkpointCoordinator.shutdown(); checkpointCoordinator = null; @@ -485,9 +492,9 @@ public class ExecutionGraph implements Serializable { } // -------------------------------------------------------------------------------------------- - // Properties and Status of the Execution Graph + // Properties and Status of the Execution Graph // -------------------------------------------------------------------------------------------- - + /** * Returns a list of BLOB keys referring to the JAR files required to run this job * @return list of BLOB keys referring to the JAR files required to run this job @@ -530,6 +537,10 @@ public class ExecutionGraph implements Serializable { return jobName; } + public boolean isStoppable() { + return this.isStoppable; + } + public Configuration getJobConfiguration() { return jobConfiguration; } @@ -558,7 +569,7 @@ public class ExecutionGraph implements Serializable { // we return a specific iterator that does not fail with concurrent modifications // the list is append only, so it is safe for that final int numElements = this.verticesInCreationOrder.size(); - + return new Iterable<ExecutionJobVertex>() { @Override public Iterator<ExecutionJobVertex> iterator() { @@ -688,6 +699,10 @@ public class ExecutionGraph implements Serializable { for (JobVertex jobVertex : topologiallySorted) { + if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) { + this.isStoppable = false; + } + // create the execution job vertex and attach it to the graph ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp); ejv.connectToPredecessors(this.intermediateResults); @@ -705,20 +720,20 @@ public class ExecutionGraph implements Serializable { res.getId(), res, previousDataSet)); } } - + this.verticesInCreationOrder.add(ejv); } } - + public void scheduleForExecution(Scheduler scheduler) throws JobException { if (scheduler == null) { throw new IllegalArgumentException("Scheduler must not be null."); } - + if (this.scheduler != null && this.scheduler != scheduler) { throw new IllegalArgumentException("Cannot use different schedulers for the same job"); } - + if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { this.scheduler = scheduler; @@ -754,7 +769,7 @@ public class ExecutionGraph implements Serializable { public void cancel() { while (true) { JobStatus current = state; - + if (current == JobStatus.RUNNING || current == JobStatus.CREATED) { if (transitionState(current, JobStatus.CANCELLING)) { for (ExecutionJobVertex ejv : verticesInCreationOrder) { @@ -790,6 +805,18 @@ public class ExecutionGraph implements Serializable { } } + public void stop() throws StoppingException { + if(this.isStoppable) { + for(ExecutionVertex ev : this.getAllExecutionVertices()) { + if(ev.getNumberOfInputs() == 0) { // send signal to sources only + ev.stop(); + } + } + } else { + throw new StoppingException("This job is not stoppable."); + } + } + public void fail(Throwable t) { while (true) { JobStatus current = state; @@ -808,10 +835,10 @@ public class ExecutionGraph implements Serializable { // set the state of the job to failed transitionState(JobStatus.FAILING, JobStatus.FAILED, t); } - + return; } - + // no need to treat other states } } @@ -836,15 +863,15 @@ public class ExecutionGraph implements Serializable { this.currentExecutions.clear(); Collection<CoLocationGroup> colGroups = new HashSet<>(); - + for (ExecutionJobVertex jv : this.verticesInCreationOrder) { - + CoLocationGroup cgroup = jv.getCoLocationGroup(); if(cgroup != null && !colGroups.contains(cgroup)){ cgroup.resetConstraints(); colGroups.add(cgroup); } - + jv.resetForNewExecution(); } @@ -853,7 +880,7 @@ public class ExecutionGraph implements Serializable { } numFinishedJobVertices = 0; transitionState(JobStatus.RESTARTING, JobStatus.CREATED); - + // if we have checkpointed state, reload it into the executions if (checkpointCoordinator != null) { checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false); @@ -962,7 +989,7 @@ public class ExecutionGraph implements Serializable { } } } - + private boolean transitionState(JobStatus current, JobStatus newState) { return transitionState(current, newState, null); } @@ -989,14 +1016,14 @@ public class ExecutionGraph implements Serializable { } numFinishedJobVertices++; - + if (numFinishedJobVertices == verticesInCreationOrder.size()) { - + // we are done, transition to the final state JobStatus current; while (true) { current = this.state; - + if (current == JobStatus.RUNNING) { if (transitionState(current, JobStatus.FINISHED)) { postRunCleanup(); @@ -1066,7 +1093,7 @@ public class ExecutionGraph implements Serializable { /** * Updates the state of one of the ExecutionVertex's Execution attempts. - * If the new status if "FINISHED", this also updates the + * If the new status if "FINISHED", this also updates the * * @param state The state update. * @return True, if the task update was properly applied, false, if the execution attempt was not found. @@ -1184,7 +1211,7 @@ public class ExecutionGraph implements Serializable { this.executionListenerActors.add(listener); } } - + private void notifyJobStatusChange(JobStatus newState, Throwable error) { if (jobStatusListenerActors.size() > 0) { ExecutionGraphMessages.JobStatusChanged message = @@ -1196,7 +1223,7 @@ public class ExecutionGraph implements Serializable { } } } - + void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState newExecutionState, Throwable error) { http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 165dce4..e522c8b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -73,7 +73,6 @@ public class ExecutionVertex implements Serializable { private static final long serialVersionUID = 42L; - @SuppressWarnings("unused") private static final Logger LOG = ExecutionGraph.LOG; private static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8; @@ -467,6 +466,10 @@ public class ExecutionVertex implements Serializable { this.currentExecution.cancel(); } + public void stop() { + this.currentExecution.stop(); + } + public void fail(Throwable t) { this.currentExecution.fail(t); } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index e20f737..f99d754 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -81,6 +81,7 @@ public class JobGraph implements Serializable { /** Configuration which defines which restart strategy to use for the job recovery */ private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration; + /** The number of seconds after which the corresponding ExecutionGraph is removed at the * job manager after it has been executed. */ http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index 8ebc30c..9018029 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitSource; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; @@ -38,8 +39,8 @@ public class JobVertex implements java.io.Serializable { private static final long serialVersionUID = 1L; private static final String DEFAULT_NAME = "(unnamed vertex)"; - - + + // -------------------------------------------------------------------------------------------- // Members that define the structure / topology of the graph // -------------------------------------------------------------------------------------------- @@ -62,15 +63,18 @@ public class JobVertex implements java.io.Serializable { /** The class of the invokable. */ private String invokableClassName; + /** Indicates of this job vertex is stoppable or not. */ + private boolean isStoppable = false; + /** Optionally, a source of input splits */ private InputSplitSource<?> inputSplitSource; - + /** The name of the vertex. This will be shown in runtime logs and will be in the runtime environment */ private String name; - + /** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */ private SlotSharingGroup slotSharingGroup; - + /** The group inside which the vertex subtasks share slots */ private CoLocationGroup coLocationGroup; @@ -83,11 +87,11 @@ public class JobVertex implements java.io.Serializable { /** Optional, pretty name of the operator, to be displayed in the JSON plan */ private String operatorPrettyName; - + /** Optional, the JSON for the optimizer properties of the operator result, * to be included in the JSON plan */ private String resultOptimizerProperties; - + // -------------------------------------------------------------------------------------------- /** @@ -98,7 +102,7 @@ public class JobVertex implements java.io.Serializable { public JobVertex(String name) { this(name, null); } - + /** * Constructs a new job vertex and assigns it with the given name. * @@ -109,9 +113,9 @@ public class JobVertex implements java.io.Serializable { this.name = name == null ? DEFAULT_NAME : name; this.id = id == null ? new JobVertexID() : id; } - + // -------------------------------------------------------------------------------------------- - + /** * Returns the ID of this job vertex. * @@ -120,7 +124,7 @@ public class JobVertex implements java.io.Serializable { public JobVertexID getID() { return this.id; } - + /** * Returns the name of the vertex. * @@ -129,7 +133,7 @@ public class JobVertex implements java.io.Serializable { public String getName() { return this.name; } - + /** * Sets the name of the vertex * @@ -168,12 +172,13 @@ public class JobVertex implements java.io.Serializable { } return this.configuration; } - + public void setInvokableClass(Class<? extends AbstractInvokable> invokable) { Preconditions.checkNotNull(invokable); this.invokableClassName = invokable.getName(); + this.isStoppable = StoppableTask.class.isAssignableFrom(invokable); } - + /** * Returns the name of the invokable class which represents the task of this vertex. * @@ -182,7 +187,7 @@ public class JobVertex implements java.io.Serializable { public String getInvokableClassName() { return this.invokableClassName; } - + /** * Returns the invokable class which represents the task of this vertex * @@ -196,7 +201,7 @@ public class JobVertex implements java.io.Serializable { if (invokableClassName == null) { return null; } - + try { return Class.forName(invokableClassName, true, cl).asSubclass(AbstractInvokable.class); } @@ -207,7 +212,7 @@ public class JobVertex implements java.io.Serializable { throw new RuntimeException("The user-code class is no subclass of " + AbstractInvokable.class.getName(), e); } } - + /** * Gets the parallelism of the task. * @@ -228,7 +233,7 @@ public class JobVertex implements java.io.Serializable { } this.parallelism = parallelism; } - + public InputSplitSource<?> getInputSplitSource() { return inputSplitSource; } @@ -236,15 +241,15 @@ public class JobVertex implements java.io.Serializable { public void setInputSplitSource(InputSplitSource<?> inputSplitSource) { this.inputSplitSource = inputSplitSource; } - + public List<IntermediateDataSet> getProducedDataSets() { return this.results; } - + public List<JobEdge> getInputs() { return this.inputs; } - + /** * Associates this vertex with a slot sharing group for scheduling. Different vertices in the same * slot sharing group can run one subtask each in the same slot. @@ -255,13 +260,13 @@ public class JobVertex implements java.io.Serializable { if (this.slotSharingGroup != null) { this.slotSharingGroup.removeVertexFromGroup(id); } - + this.slotSharingGroup = grp; if (grp != null) { grp.addVertexToGroup(id); } } - + /** * Gets the slot sharing group that this vertex is associated with. Different vertices in the same * slot sharing group can run one subtask each in the same slot. If the vertex is not associated with @@ -272,7 +277,7 @@ public class JobVertex implements java.io.Serializable { public SlotSharingGroup getSlotSharingGroup() { return slotSharingGroup; } - + /** * Tells this vertex to strictly co locate its subtasks with the subtasks of the given vertex. * Strict co-location implies that the n'th subtask of this vertex will run on the same parallel computing @@ -294,10 +299,10 @@ public class JobVertex implements java.io.Serializable { if (this.slotSharingGroup == null || this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) { throw new IllegalArgumentException("Strict co-location requires that both vertices are in the same slot sharing group."); } - + CoLocationGroup thisGroup = this.coLocationGroup; CoLocationGroup otherGroup = strictlyCoLocatedWith.coLocationGroup; - + if (otherGroup == null) { if (thisGroup == null) { CoLocationGroup group = new CoLocationGroup(this, strictlyCoLocatedWith); @@ -320,11 +325,11 @@ public class JobVertex implements java.io.Serializable { } } } - + public CoLocationGroup getCoLocationGroup() { return coLocationGroup; } - + public void updateCoLocationGroup(CoLocationGroup group) { this.coLocationGroup = group; } @@ -384,38 +389,42 @@ public class JobVertex implements java.io.Serializable { } // -------------------------------------------------------------------------------------------- - + public boolean isInputVertex() { return this.inputs.isEmpty(); } - + + public boolean isStoppable() { + return this.isStoppable; + } + public boolean isOutputVertex() { return this.results.isEmpty(); } - + public boolean hasNoConnectedInputs() { for (JobEdge edge : inputs) { if (!edge.isIdReference()) { return false; } } - + return true; } - + // -------------------------------------------------------------------------------------------- - + /** - * A hook that can be overwritten by sub classes to implement logic that is called by the + * A hook that can be overwritten by sub classes to implement logic that is called by the * master when the job starts. * * @param loader The class loader for user defined code. * @throws Exception The method may throw exceptions which cause the job to fail immediately. */ public void initializeOnMaster(ClassLoader loader) throws Exception {} - + /** - * A hook that can be overwritten by sub classes to implement logic that is called by the + * A hook that can be overwritten by sub classes to implement logic that is called by the * master after the job completed. * * @param loader The class loader for user defined code. @@ -458,7 +467,7 @@ public class JobVertex implements java.io.Serializable { } // -------------------------------------------------------------------------------------------- - + @Override public String toString() { return this.name + " (" + this.invokableClassName + ')'; http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java new file mode 100644 index 0000000..383a0d2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.jobgraph.tasks; + +/** + * Implemented by tasks that can receive STOP signal. + */ +public interface StoppableTask { + /** Called on STOP signal. */ + public void stop(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 7832720..81dc01f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -49,8 +49,10 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; +import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError; +import org.apache.flink.runtime.messages.TaskMessages.FailTask; import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState; import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; @@ -59,6 +61,7 @@ import org.apache.flink.runtime.state.StateUtils; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.concurrent.duration.FiniteDuration; import java.io.IOException; @@ -220,7 +223,7 @@ public class Task implements Runnable { private volatile long recoveryTs; /** - * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to + * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to * be undone in the case of a failing task deployment.</p> */ public Task(TaskDeploymentDescriptor tdd, @@ -308,7 +311,7 @@ public class Task implements Runnable { } invokableHasBeenCanceled = new AtomicBoolean(false); - + // finally, create the executing thread, but do not start it executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); } @@ -336,15 +339,15 @@ public class Task implements Runnable { public Configuration getJobConfiguration() { return jobConfiguration; } - + public Configuration getTaskConfiguration() { return this.taskConfiguration; } - + public ResultPartitionWriter[] getAllWriters() { return writers; } - + public SingleInputGate[] getAllInputGates() { return inputGates; } @@ -445,7 +448,7 @@ public class Task implements Runnable { try { // ---------------------------- - // Task Bootstrap - We periodically + // Task Bootstrap - We periodically // check for canceling as a shortcut // ---------------------------- @@ -636,7 +639,7 @@ public class Task implements Runnable { LOG.error("Unexpected state in Task during an exception: " + current); break; } - // else fall through the loop and + // else fall through the loop and } } catch (Throwable tt) { @@ -655,7 +658,7 @@ public class Task implements Runnable { if (dispatcher != null && !dispatcher.isShutdown()) { dispatcher.shutdownNow(); } - + // free the network resources network.unregisterTask(this); @@ -743,10 +746,39 @@ public class Task implements Runnable { } // ---------------------------------------------------------------------------------------------------------------- - // Canceling / Failing the task from the outside + // Stopping / Canceling / Failing the task from the outside // ---------------------------------------------------------------------------------------------------------------- /** + * Stops the executing task by calling {@link StoppableTask#stop()}. + * <p> + * This method never blocks. + * </p> + * + * @throws UnsupportedOperationException + * if the {@link AbstractInvokable} does not implement {@link StoppableTask} + */ + public void stopExecution() throws UnsupportedOperationException { + LOG.info("Attempting to stop task " + taskNameWithSubtask); + if(this.invokable instanceof StoppableTask) { + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + ((StoppableTask)Task.this.invokable).stop(); + } catch(RuntimeException e) { + LOG.error("Stopping task " + taskNameWithSubtask + " failed.", e); + taskManager.tell(new FailTask(executionId, e)); + } + } + }; + executeAsyncCallRunnable(runnable, "Stopping source task " + this.taskNameWithSubtask); + } else { + throw new UnsupportedOperationException("Stopping not supported by this task."); + } + } + + /** * Cancels the task execution. If the task is already in a terminal state * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing. * Otherwise it sets the state to CANCELING, and, if the invokable code is running, @@ -853,7 +885,7 @@ public class Task implements Runnable { * {@link org.apache.flink.runtime.jobgraph.tasks.StatefulTask}. * * @param checkpointID The ID identifying the checkpoint. - * @param checkpointTimestamp The timestamp associated with the checkpoint. + * @param checkpointTimestamp The timestamp associated with the checkpoint. */ public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) { AbstractInvokable invokable = this.invokable; @@ -861,7 +893,7 @@ public class Task implements Runnable { if (executionState == ExecutionState.RUNNING && invokable != null) { if (invokable instanceof StatefulTask) { - // build a local closure + // build a local closure final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable; final String taskName = taskNameWithSubtask; @@ -895,14 +927,14 @@ public class Task implements Runnable { LOG.debug("Ignoring request to trigger a checkpoint for non-running task."); } } - + public void notifyCheckpointComplete(final long checkpointID) { AbstractInvokable invokable = this.invokable; if (executionState == ExecutionState.RUNNING && invokable != null) { if (invokable instanceof StatefulTask) { - // build a local closure + // build a local closure final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable; final String taskName = taskNameWithSubtask; @@ -1069,7 +1101,7 @@ public class Task implements Runnable { logger.error("Error while canceling the task", t); } - // interrupt the running thread initially + // interrupt the running thread initially executer.interrupt(); try { executer.join(30000); http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index bd18160..6a22949 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -466,6 +466,33 @@ class JobManager( ) } + case StopJob(jobID) => + log.info(s"Trying to stop job with ID $jobID.") + + currentJobs.get(jobID) match { + case Some((executionGraph, _)) => + try { + if (!executionGraph.isStoppable()) { + sender ! StoppingFailure(jobID, new IllegalStateException(s"Job with ID $jobID" + + " is not stoppable.")) + } else if(executionGraph.getState() != JobStatus.CREATED + && executionGraph.getState() != JobStatus.RUNNING + && executionGraph.getState() != JobStatus.RESTARTING) { + sender ! StoppingFailure(jobID, new IllegalStateException(s"Job with ID $jobID" + + "is not in state CREATED, RUNNING, or RESTARTING.")) + } else { + executionGraph.stop() + sender ! StoppingSuccess(jobID) + } + } catch { + case t: Throwable => sender ! StoppingFailure(jobID, t) + } + case None => + log.info(s"No job found with ID $jobID.") + sender ! StoppingFailure(jobID, new IllegalArgumentException("No job found with " + + s"ID $jobID.")) + } + case UpdateTaskExecutionState(taskExecutionState) => if (taskExecutionState == null) { sender ! decorateMessage(false) http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index 267e231..c949b4c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -96,6 +96,14 @@ object JobManagerMessages { case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID /** + * Stops a (streaming) job with the given [[jobID]] at the JobManager. The result of + * stopping is sent back to the sender as a [[StoppingResponse]] message. + * + * @param jobID + */ + case class StopJob(jobID: JobID) extends RequiresLeaderSessionID + + /** * Requesting next input split for the * [[org.apache.flink.runtime.executiongraph.ExecutionJobVertex]] * of the job specified by [[jobID]]. The next input split is sent back to the sender as a @@ -238,6 +246,23 @@ object JobManagerMessages { */ case class CancellationFailure(jobID: JobID, cause: Throwable) extends CancellationResponse + sealed trait StoppingResponse { + def jobID: JobID + } + + /** + * Denotes a successful (streaming) job stopping + * @param jobID + */ + case class StoppingSuccess(jobID: JobID) extends StoppingResponse + + /** + * Denotes a failed (streaming) job stopping + * @param jobID + * @param cause + */ + case class StoppingFailure(jobID: JobID, cause: Throwable) extends StoppingResponse + /** * Requests all currently running jobs from the job manager. This message triggers a * [[RunningJobs]] response. http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala index a80ca99..94762ee 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala @@ -59,6 +59,15 @@ object TaskMessages { extends TaskMessage with RequiresLeaderSessionID /** + * Stops the task associated with [[attemptID]]. The result is sent back to the sender as a + * [[TaskOperationResult]] message. + * + * @param attemptID The task's execution attempt ID. + */ + case class StopTask(attemptID: ExecutionAttemptID) + extends TaskMessage with RequiresLeaderSessionID + + /** * Triggers a fail of specified task from the outside (as opposed to the task throwing * an exception itself) with the given exception as the cause. * http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 3b68878..12bc426 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -419,6 +419,28 @@ class TaskManager( log.debug(s"Cannot find task to fail for execution $executionID)") } + // stops a task + case StopTask(executionID) => + val task = runningTasks.get(executionID) + if (task != null) { + try { + task.stopExecution() + sender ! decorateMessage(new TaskOperationResult(executionID, true)) + } catch { + case t: Throwable => + sender ! new TaskOperationResult(executionID, false, + t.getClass().getSimpleName() + ": " + t.getLocalizedMessage()) + } + } else { + log.debug(s"Cannot find task to stop for execution ${executionID})") + sender ! decorateMessage( + new TaskOperationResult( + executionID, + false, + "No task with that execution ID was found.") + ) + } + // cancels a task case CancelTask(executionID) => val task = runningTasks.get(executionID) http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java index 2ca51db..ee372dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java @@ -102,12 +102,12 @@ public class ExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -146,12 +146,12 @@ public class ExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -213,12 +213,12 @@ public class ExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -467,12 +467,12 @@ public class ExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -523,12 +523,12 @@ public class ExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); fail("Attached wrong jobgraph"); @@ -584,12 +584,12 @@ public class ExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -629,12 +629,12 @@ public class ExecutionGraphConstructionTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); @@ -700,12 +700,13 @@ public class ExecutionGraphConstructionTest { JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); + eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources()); // check the v1 / v2 co location hints ( assumes parallelism(v1) >= parallelism(v2) ) http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 9221bda..6362732 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -80,12 +80,12 @@ public class ExecutionGraphDeploymentTest { v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - "some job", - new Configuration(), - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + "some job", + new Configuration(), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4); @@ -283,12 +283,13 @@ public class ExecutionGraphDeploymentTest { // execution graph that executes actions synchronously ExecutionGraph eg = new ExecutionGraph( - TestingUtils.directExecutionContext(), - jobId, - "some job", - new Configuration(), - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.directExecutionContext(), + jobId, + "some job", + new Configuration(), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); + eg.setQueuedSchedulingAllowed(false); List<JobVertex> ordered = Arrays.asList(v1, v2); @@ -328,4 +329,4 @@ public class ExecutionGraphDeploymentTest { throw new Exception(); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java new file mode 100644 index 0000000..3712861 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.StoppingException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.api.mockito.PowerMockito; + +import scala.concurrent.duration.FiniteDuration; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.same; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(ExecutionGraph.class) +public class ExecutionGraphSignalsTest { + private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5]; + private int[] dop = new int[] { 5, 7, 2, 11, 4 }; + private ExecutionVertex[][] mockEV = new ExecutionVertex[mockEJV.length][]; + private ExecutionGraph eg; + private Field f; + + @Before + public void prepare() throws Exception { + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + final Configuration cfg = new Configuration(); + + + assert (mockEJV.length == 5); + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + JobVertex v4 = new JobVertex("vertex4"); + JobVertex v5 = new JobVertex("vertex5"); + + for(int i = 0; i < mockEJV.length; ++i) { + mockEJV[i] = mock(ExecutionJobVertex.class); + + this.mockEV[i] = new ExecutionVertex[dop[i]]; + for (int j = 0; j < dop[i]; ++j) { + this.mockEV[i][j] = mock(ExecutionVertex.class); + } + + when(mockEJV[i].getProducedDataSets()).thenReturn(new IntermediateResult[0]); + when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]); + } + + PowerMockito + .whenNew(ExecutionJobVertex.class) + .withArguments(any(ExecutionGraph.class), same(v1), any(Integer.class).intValue(), + any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[0]); + PowerMockito + .whenNew(ExecutionJobVertex.class) + .withArguments(any(ExecutionGraph.class), same(v2), any(Integer.class).intValue(), + any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[1]); + PowerMockito + .whenNew(ExecutionJobVertex.class) + .withArguments(any(ExecutionGraph.class), same(v3), any(Integer.class).intValue(), + any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[2]); + PowerMockito + .whenNew(ExecutionJobVertex.class) + .withArguments(any(ExecutionGraph.class), same(v4), any(Integer.class).intValue(), + any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[3]); + PowerMockito + .whenNew(ExecutionJobVertex.class) + .withArguments(any(ExecutionGraph.class), same(v5), any(Integer.class).intValue(), + any(FiniteDuration.class), any(Long.class).longValue()).thenReturn(mockEJV[4]); + + v1.setParallelism(dop[0]); + v2.setParallelism(dop[1]); + v3.setParallelism(dop[2]); + v4.setParallelism(dop[3]); + v5.setParallelism(dop[4]); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL); + mockNumberOfInputs(1,0); + v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL); + mockNumberOfInputs(3,1); + v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL); + mockNumberOfInputs(3,2); + v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL); + mockNumberOfInputs(4,3); + v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL); + mockNumberOfInputs(4,2); + + List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5)); + + eg = new ExecutionGraph(TestingUtils.defaultExecutionContext(), jobId, jobName, + cfg, AkkaUtils.getDefaultTimeout()); + eg.attachJobGraph(ordered); + + f = eg.getClass().getDeclaredField("state"); + f.setAccessible(true); + } + + private void mockNumberOfInputs(int nodeIndex, int predecessorIndex) { + for(int j = 0; j < dop[nodeIndex]; ++j) { + when(mockEV[nodeIndex][j].getNumberOfInputs()).thenReturn(dop[predecessorIndex]); + } + } + + @Test + public void testCancel() throws Exception { + Assert.assertEquals(JobStatus.CREATED, eg.getState()); + eg.cancel(); + + verifyCancel(1); + + f.set(eg, JobStatus.RUNNING); + eg.cancel(); + + verifyCancel(2); + Assert.assertEquals(JobStatus.CANCELLING, eg.getState()); + + eg.cancel(); + + verifyCancel(2); + Assert.assertEquals(JobStatus.CANCELLING, eg.getState()); + + f.set(eg, JobStatus.CANCELED); + eg.cancel(); + + verifyCancel(2); + Assert.assertEquals(JobStatus.CANCELED, eg.getState()); + + f.set(eg, JobStatus.FAILED); + eg.cancel(); + + verifyCancel(2); + Assert.assertEquals(JobStatus.FAILED, eg.getState()); + + f.set(eg, JobStatus.FAILING); + eg.cancel(); + + verifyCancel(2); + Assert.assertEquals(JobStatus.CANCELLING, eg.getState()); + + f.set(eg, JobStatus.FINISHED); + eg.cancel(); + + verifyCancel(2); + Assert.assertEquals(JobStatus.FINISHED, eg.getState()); + + f.set(eg, JobStatus.RESTARTING); + eg.cancel(); + + verifyCancel(2); + Assert.assertEquals(JobStatus.CANCELED, eg.getState()); + } + + private void verifyCancel(int times) { + for (int i = 0; i < mockEJV.length; ++i) { + verify(mockEJV[i], times(times)).cancel(); + } + + } + + // test that all source tasks receive STOP signal + // test that all non-source tasks do not receive STOP signal + @Test + public void testStop() throws Exception { + Field f = eg.getClass().getDeclaredField("isStoppable"); + f.setAccessible(true); + f.set(eg, true); + + eg.stop(); + + for (int i : new int[]{0,2}) { + for (int j = 0; j < mockEV[i].length; ++j) { + verify(mockEV[i][j], times(1)).stop(); + } + } + + for (int i : new int[]{1,3,4}) { + for (int j = 0; j < mockEV[i].length; ++j) { + verify(mockEV[i][j], times(0)).stop(); + } + } + } + + // STOP only supported if all sources are stoppable + @Test(expected = StoppingException.class) + public void testStopBatching() throws StoppingException { + eg.stop(); + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 35ab8ac..ca07fbf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -170,12 +170,12 @@ public class ExecutionGraphTestUtils { ajv.setInvokableClass(mock(AbstractInvokable.class).getClass()); ExecutionGraph graph = new ExecutionGraph( - executionContext, - new JobID(), - "test job", - new Configuration(), - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + executionContext, + new JobID(), + "test job", + new Configuration(), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1, AkkaUtils.getDefaultTimeout())); http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java index 9c520b6..5dd5ba6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java @@ -1,87 +1,86 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*; -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; - -import java.util.Arrays; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; -import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.Test; - -public class ExecutionStateProgressTest { - - @Test - public void testAccumulatedStateFinished() { - try { - final JobID jid = new JobID(); - final JobVertexID vid = new JobVertexID(); - - JobVertex ajv = new JobVertex("TestVertex", vid); - ajv.setParallelism(3); - ajv.setInvokableClass(mock(AbstractInvokable.class).getClass()); - - ExecutionGraph graph = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jid, - "test job", - new Configuration(), - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); - - graph.attachJobGraph(Arrays.asList(ajv)); - - setGraphStatus(graph, JobStatus.RUNNING); - - ExecutionJobVertex ejv = graph.getJobVertex(vid); - - // mock resources and mock taskmanager - for (ExecutionVertex ee : ejv.getTaskVertices()) { - SimpleSlot slot = getInstance( - new SimpleActorGateway( - TestingUtils.defaultExecutionContext()) - ).allocateSimpleSlot(jid); - ee.deployToSlot(slot); - } - - // finish all - for (ExecutionVertex ee : ejv.getTaskVertices()) { - ee.executionFinished(); - } - - assertTrue(ejv.isInFinalState()); - assertEquals(JobStatus.FINISHED, graph.getState()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +import java.util.Arrays; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.junit.Test; + +public class ExecutionStateProgressTest { + + @Test + public void testAccumulatedStateFinished() { + try { + final JobID jid = new JobID(); + final JobVertexID vid = new JobVertexID(); + + JobVertex ajv = new JobVertex("TestVertex", vid); + ajv.setParallelism(3); + ajv.setInvokableClass(mock(AbstractInvokable.class).getClass()); + + ExecutionGraph graph = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jid, + "test job", + new Configuration(), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); + graph.attachJobGraph(Arrays.asList(ajv)); + + setGraphStatus(graph, JobStatus.RUNNING); + + ExecutionJobVertex ejv = graph.getJobVertex(vid); + + // mock resources and mock taskmanager + for (ExecutionVertex ee : ejv.getTaskVertices()) { + SimpleSlot slot = getInstance( + new SimpleActorGateway( + TestingUtils.defaultExecutionContext()) + ).allocateSimpleSlot(jid); + ee.deployToSlot(slot); + } + + // finish all + for (ExecutionVertex ee : ejv.getTaskVertices()) { + ee.executionFinished(); + } + + assertTrue(ejv.isInFinalState()); + assertEquals(JobStatus.FINISHED, graph.getState()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java new file mode 100644 index 0000000..ab29e5a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.BaseTestingActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.TaskMessages; +import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import scala.concurrent.ExecutionContext; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(ExecutionVertex.class) +public class ExecutionVertexStopTest extends TestLogger { + + private static ActorSystem system; + + private static boolean receivedStopSignal; + + @AfterClass + public static void teardown(){ + if(system != null) { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + } + + @Test + public void testStop() throws Exception { + final JobVertexID jid = new JobVertexID(); + final ExecutionJobVertex ejv = getExecutionVertex(jid); + + Execution executionMock = mock(Execution.class); + whenNew(Execution.class).withAnyArguments().thenReturn(executionMock); + + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], + AkkaUtils.getDefaultTimeout()); + + vertex.stop(); + + verify(executionMock).stop(); + } + + @Test + public void testStopRpc() throws Exception { + final JobVertexID jid = new JobVertexID(); + final ExecutionJobVertex ejv = getExecutionVertex(jid); + + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], + AkkaUtils.getDefaultTimeout()); + final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); + + setVertexState(vertex, ExecutionState.SCHEDULED); + assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState()); + + final ActorGateway gateway = new StopSequenceInstanceGateway( + TestingUtils.defaultExecutionContext(), new TaskOperationResult(execId, true)); + + Instance instance = getInstance(gateway); + SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + + vertex.deployToSlot(slot); + + receivedStopSignal = false; + vertex.stop(); + assertTrue(receivedStopSignal); + } + + public static class StopSequenceInstanceGateway extends BaseTestingActorGateway { + private static final long serialVersionUID = 7611571264006653627L; + + private final TaskOperationResult result; + + public StopSequenceInstanceGateway(ExecutionContext executionContext, TaskOperationResult result) { + super(executionContext); + this.result = result; + } + + @Override + public Object handleMessage(Object message) throws Exception { + Object result = null; + if (message instanceof TaskMessages.SubmitTask) { + result = Messages.getAcknowledge(); + } else if (message instanceof TaskMessages.StopTask) { + result = this.result; + receivedStopSignal = true; + } + + return result; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java index e0da2c9..5f9717f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java @@ -267,12 +267,12 @@ public class LocalInputSplitsTest { JobGraph jobGraph = new JobGraph("test job", vertex); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobGraph.getJobID(), - jobGraph.getName(), - jobGraph.getJobConfiguration(), - TIMEOUT, - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobGraph.getJobID(), + jobGraph.getName(), + jobGraph.getJobConfiguration() + TIMEOUT, + new NoRestartStrategy()); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); eg.setQueuedSchedulingAllowed(false); @@ -331,12 +331,13 @@ public class LocalInputSplitsTest { JobGraph jobGraph = new JobGraph("test job", vertex); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobGraph.getJobID(), - jobGraph.getName(), - jobGraph.getJobConfiguration(), - TIMEOUT, - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobGraph.getJobID(), + jobGraph.getName(), + jobGraph.getJobConfiguration() + TIMEOUT, + new NoRestartStrategy()); + eg.setQueuedSchedulingAllowed(false); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java index eda9115..c1afe04 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java @@ -59,12 +59,12 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -100,12 +100,12 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -142,12 +142,12 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -185,12 +185,12 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -226,12 +226,12 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -287,12 +287,12 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); } @@ -339,12 +339,12 @@ public class PointwisePatternTest { List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2)); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - jobId, - jobName, - cfg, - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); try { eg.attachJobGraph(ordered); }
