Repository: asterixdb Updated Branches: refs/heads/master f7100f704 -> 88b576952
Add job cancellation support in Hyracks. This change also: - cleans up JobRun; - adds tests for job cancellation. Change-Id: Ic26330c19c8642dd3246739b5150c4aa667c359c Reviewed-on: https://asterix-gerrit.ics.uci.edu/1537 Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/88b57695 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/88b57695 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/88b57695 Branch: refs/heads/master Commit: 88b57695259c5fdc94ea7dbc2f7ad91715687fe0 Parents: f7100f7 Author: Yingyi Bu <[email protected]> Authored: Fri Mar 3 17:22:10 2017 -0800 Committer: Yingyi Bu <[email protected]> Committed: Fri Mar 3 23:35:58 2017 -0800 ---------------------------------------------------------------------- .../src/test/resources/runtimets/testsuite.xml | 2 +- .../client/HyracksClientInterfaceFunctions.java | 20 ++ .../HyracksClientInterfaceRemoteProxy.java | 7 + .../hyracks/api/client/HyracksConnection.java | 5 + .../api/client/IHyracksClientConnection.java | 9 + .../api/client/IHyracksClientInterface.java | 2 + .../hyracks/api/exceptions/ErrorCode.java | 3 +- .../api/exceptions/HyracksDataException.java | 7 + .../api/exceptions/HyracksException.java | 7 + .../SuperActivityOperatorNodePushable.java | 4 +- .../src/main/resources/errormsg/en.properties | 2 + .../hyracks/control/cc/ClientInterfaceIPCI.java | 7 + .../control/cc/executor/JobExecutor.java | 106 +++++-- .../hyracks/control/cc/job/IJobManager.java | 8 + .../hyracks/control/cc/job/JobManager.java | 73 +++-- .../apache/hyracks/control/cc/job/JobRun.java | 27 +- .../control/cc/scheduler/FIFOJobQueue.java | 29 +- .../hyracks/control/cc/scheduler/IJobQueue.java | 17 ++ .../hyracks/control/cc/work/CancelJobWork.java | 53 ++++ .../hyracks/control/cc/work/JobStartWork.java | 6 +- .../hyracks/control/cc/job/JobManagerTest.java | 55 ++++ .../AbstractMultiNCIntegrationTest.java | 58 +++- .../tests/integration/CancelJobTest.java | 303 +++++++++++++++++++ 23 files changed, 717 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml index a651833..29998fd 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml @@ -6758,7 +6758,7 @@ <test-case FilePath="load"> <compilation-unit name="duplicate-key-error"> <output-dir compare="Text">none</output-dir> - <expected-error>org.apache.hyracks.api.exceptions.HyracksException</expected-error> + <expected-error>Input stream given to BTree bulk load has duplicates</expected-error> </compilation-unit> </test-case> <test-case FilePath="load"> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index aa292f6..aa9232e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -38,6 +38,7 @@ public class HyracksClientInterfaceFunctions { START_JOB, DISTRIBUTE_JOB, DESTROY_JOB, + CANCEL_JOB, GET_DATASET_DIRECTORY_SERIVICE_INFO, GET_DATASET_RESULT_STATUS, GET_DATASET_RESULT_LOCATIONS, @@ -122,6 +123,25 @@ public class HyracksClientInterfaceFunctions { } } + public static class CancelJobFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + + public CancelJobFunction(JobId jobId) { + this.jobId = jobId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.CANCEL_JOB; + } + + public JobId getJobId() { + return jobId; + } + } + public static class DestroyJobFunction extends Function { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java index 8e7affb..0142c7d 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java @@ -69,6 +69,13 @@ public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterfac } @Override + public void cancelJob(JobId jobId) throws Exception { + HyracksClientInterfaceFunctions.CancelJobFunction cjf = new HyracksClientInterfaceFunctions.CancelJobFunction( + jobId); + rpci.call(ipcHandle, cjf); + } + + @Override public JobId startJob(JobId jobId) throws Exception { HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(jobId); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index 5da1f34..4b3aff2 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -91,6 +91,11 @@ public final class HyracksConnection implements IHyracksClientConnection { } @Override + public void cancelJob(JobId jobId) throws Exception { + hci.cancelJob(jobId); + } + + @Override public JobId startJob(JobSpecification jobSpec) throws Exception { return startJob(jobSpec, EnumSet.noneOf(JobFlag.class)); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java index e65cacd..0956d85 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java @@ -57,6 +57,15 @@ public interface IHyracksClientConnection extends IClusterInfoCollector { public JobInfo getJobInfo(JobId jobId) throws Exception; /** + * Cancel the job that has the given job id. + * + * @param jobId + * the JobId of the Job + * @throws Exception + */ + public void cancelJob(JobId jobId) throws Exception; + + /** * Start the specified Job. * * @param jobSpec http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java index f7995d7..1afbe9e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java @@ -40,6 +40,8 @@ public interface IHyracksClientInterface { public JobId startJob(JobId jobId) throws Exception; + public void cancelJob(JobId jobId) throws Exception; + public JobId distributeJob(byte[] acggfBytes) throws Exception; public JobId destroyJob(JobId jobId) throws Exception; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 1b97a60..333b1df 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -18,7 +18,6 @@ */ package org.apache.hyracks.api.exceptions; -import java.io.File; import java.io.InputStream; import java.util.Map; @@ -59,6 +58,8 @@ public class ErrorCode { public static final int DUPLICATE_DISTRIBUTED_JOB = 22; public static final int DISTRIBUTED_JOB_FAILURE = 23; public static final int NO_RESULTSET = 24; + public static final int JOB_CANCELED = 25; + public static final int NODE_FAILED = 26; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10001; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java index 404104d..6c581f0 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java @@ -28,6 +28,13 @@ import org.apache.hyracks.api.util.ErrorMessageUtil; */ public class HyracksDataException extends HyracksException { + public static HyracksDataException create(Throwable cause) { + if (cause instanceof HyracksDataException) { + return (HyracksDataException) cause; + } + return new HyracksDataException(cause); + } + public static HyracksDataException create(int code, Serializable... params) { return new HyracksDataException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java index 559468d..1f2c7a5 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java @@ -33,6 +33,13 @@ public class HyracksException extends IOException { private final String nodeId; private transient volatile String msgCache; + public static HyracksException create(Throwable cause) { + if (cause instanceof HyracksException) { + return (HyracksException) cause; + } + return new HyracksException(cause); + } + public static HyracksException create(int code, Serializable... params) { return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index 2ac392b..1c4f916 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -221,11 +221,11 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable for (Future<Void> initializationTask : initializationTasks) { initializationTask.get(); } - } catch (Throwable th) { + } catch (Exception e) { for (Future<Void> initializationTask : initializationTasks) { initializationTask.cancel(true); } - throw new HyracksDataException(th); + throw new HyracksDataException(e); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 3bf5a9a..12601fb 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -43,5 +43,7 @@ 22 = The distributed job %1$s already exists 23 = The distributed work failed for %1$s at %2$s 24 = No result set for job %1$s +25 = Job %1$s has been cancelled by a user +26 = Node %1$s failed 10000 = The given rule collection %1$s is not an instance of the List class. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java index 265d3ef..ced3d67 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java @@ -27,6 +27,7 @@ import org.apache.hyracks.api.dataset.DatasetJobRecord.Status; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobIdFactory; import org.apache.hyracks.api.job.JobInfo; +import org.apache.hyracks.control.cc.work.CancelJobWork; import org.apache.hyracks.control.cc.work.CliDeployBinaryWork; import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork; import org.apache.hyracks.control.cc.work.ClusterShutdownWork; @@ -94,6 +95,12 @@ class ClientInterfaceIPCI implements IIPCI { ccs.getWorkQueue() .schedule(new DestroyJobWork(ccs, dsjf.getJobId(), new IPCResponder<JobId>(handle, mid))); break; + case CANCEL_JOB: + HyracksClientInterfaceFunctions.CancelJobFunction cjf = + (HyracksClientInterfaceFunctions.CancelJobFunction) fn; + ccs.getWorkQueue().schedule( + new CancelJobWork(ccs.getJobManager(), cjf.getJobId(), new IPCResponder<Void>(handle, mid))); + break; case START_JOB: HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java index 8f7b0cb..084bd1b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java @@ -43,6 +43,7 @@ import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataflow.TaskId; import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy; import org.apache.hyracks.api.deployment.DeploymentId; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.ActivityCluster; import org.apache.hyracks.api.job.ActivityClusterGraph; @@ -83,6 +84,8 @@ public class JobExecutor { private final Random random; + private boolean cancelled = false; + public JobExecutor(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints, boolean predistributed) { this.ccs = ccs; @@ -112,6 +115,19 @@ public class JobExecutor { ccs.getApplicationContext().notifyJobStart(jobRun.getJobId()); } + public void cancelJob() throws HyracksException { + // If the job is already terminated or failed, do nothing here. + if (jobRun.getPendingStatus() != null) { + return; + } + // Sets the cancelled flag. + cancelled = true; + // Aborts on-ongoing task clusters. + abortOngoingTaskClusters(ta -> false, ta -> null); + // Aborts the whole job. + abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobRun.getJobId()))); + } + private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots) throws HyracksException { for (ActivityCluster root : roots) { @@ -661,7 +677,7 @@ public class JobExecutor { ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions); abortTaskCluster(lastAttempt, TaskClusterAttempt.TaskClusterStatus.FAILED); abortDoomedTaskClusters(); - if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts()) { + if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts() || isCancelled()) { abortJob(exceptions); return; } @@ -691,42 +707,70 @@ public class JobExecutor { jobManager.finalComplete(jobRun); return; } - for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) { - if (!isPlanned(ac)) { - continue; - } - TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters(); - if (taskClusters == null) { + abortOngoingTaskClusters(ta -> deadNodes.contains(ta.getNodeId()), + ta -> HyracksException.create(ErrorCode.NODE_FAILED, ta.getNodeId())); + startRunnableActivityClusters(); + } catch (Exception e) { + abortJob(Collections.singletonList(e)); + } + } + + private interface ITaskFilter { + boolean directlyMarkAsFailed(TaskAttempt ta); + } + + private interface IExceptionGenerator { + HyracksException getException(TaskAttempt ta); + } + + /** + * Aborts ongoing task clusters. + * + * @param taskFilter, + * selects tasks that should be directly marked as failed without doing the aborting RPC. + * @param exceptionGenerator, + * generates an exception for tasks that are directly marked as failed. + */ + private void abortOngoingTaskClusters(ITaskFilter taskFilter, IExceptionGenerator exceptionGenerator) + throws HyracksException { + for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) { + if (!isPlanned(ac)) { + continue; + } + TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters(); + if (taskClusters == null) { + continue; + } + for (TaskCluster tc : taskClusters) { + TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc); + if (lastTaskClusterAttempt == null || !(lastTaskClusterAttempt + .getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED + || lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) { continue; } - for (TaskCluster tc : taskClusters) { - TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc); - if (lastTaskClusterAttempt == null || !(lastTaskClusterAttempt - .getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED - || lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) { - continue; - } - boolean abort = false; - for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) { - assert ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED - || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING; - if (deadNodes.contains(ta.getNodeId())) { - ta.setStatus(TaskAttempt.TaskStatus.FAILED, - Collections.singletonList(new Exception("Node " + ta.getNodeId() + " failed"))); - ta.setEndTime(System.currentTimeMillis()); - abort = true; - } - } - if (abort) { - abortTaskCluster(lastTaskClusterAttempt, TaskClusterAttempt.TaskClusterStatus.ABORTED); + boolean abort = false; + for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) { + assert ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED + || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING; + if (taskFilter.directlyMarkAsFailed(ta)) { + // Directly mark it as fail, without further aborting. + ta.setStatus(TaskAttempt.TaskStatus.FAILED, + Collections.singletonList(exceptionGenerator.getException(ta))); + ta.setEndTime(System.currentTimeMillis()); + abort = true; } } - abortDoomedTaskClusters(); + if (abort) { + abortTaskCluster(lastTaskClusterAttempt, TaskClusterAttempt.TaskClusterStatus.ABORTED); + } } - startRunnableActivityClusters(); - } catch (Exception e) { - abortJob(Collections.singletonList(e)); + abortDoomedTaskClusters(); } } + // Returns whether the job has been cancelled. + private boolean isCancelled() { + return cancelled; + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java index 180e850..21fc08f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java @@ -45,6 +45,14 @@ public interface IJobManager { void add(JobRun jobRun) throws HyracksException; /** + * Cancel a job with a given job id. + * + * @param jobId, + * the id of the job. + */ + void cancel(JobId jobId) throws HyracksException; + + /** * This method is called when the master process decides to complete job. * The implementation of this method should instruct all involved worker processes to clean the state of each * individual parallel partition up. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 741e3db..031303b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -47,7 +47,6 @@ import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue; import org.apache.hyracks.control.cc.scheduler.IJobQueue; import org.apache.hyracks.control.cc.work.JobCleanupWork; import org.apache.hyracks.control.common.controllers.CCConfig; -import org.apache.hyracks.control.common.work.IResultCallback; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -108,8 +107,7 @@ public class JobManager implements IJobManager { IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job); switch (status) { case QUEUE: - jobRun.setStatus(JobStatus.PENDING, null); - jobQueue.add(jobRun); + queueJob(jobRun); break; case EXECUTE: executeJob(jobRun); @@ -118,6 +116,32 @@ public class JobManager implements IJobManager { } @Override + public void cancel(JobId jobId) throws HyracksException { + if (jobId == null) { + return; + } + // Cancels a running job. + if (activeRunMap.containsKey(jobId)) { + JobRun jobRun = activeRunMap.get(jobId); + // The following call will abort all ongoing tasks and then consequently + // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job. + // Therefore, we do not remove the job out of activeRunMap here. + jobRun.getExecutor().cancelJob(); + return; + } + // Removes a pending job. + JobRun jobRun = jobQueue.remove(jobId); + if (jobRun != null) { + List<Exception> exceptions = Collections + .singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId)); + // Since the job has not been executed, we only need to update its status and lifecyle here. + jobRun.setStatus(JobStatus.FAILURE, exceptions); + runMapArchive.put(jobId, jobRun); + runMapHistory.put(jobId, exceptions); + } + } + + @Override public void prepareComplete(JobRun run, JobStatus status, List<Exception> exceptions) throws HyracksException { checkJob(run); if (status == JobStatus.FAILURE_BEFORE_EXECUTION) { @@ -244,9 +268,12 @@ public class JobManager implements IJobManager { @Override public JobRun get(JobId jobId) { - JobRun jobRun = activeRunMap.get(jobId); + JobRun jobRun = activeRunMap.get(jobId); // Running job. + if (jobRun == null) { + jobRun = jobQueue.get(jobId); // Pending job. + } if (jobRun == null) { - jobRun = runMapArchive.get(jobId); + jobRun = runMapArchive.get(jobId); // Completed job. } return jobRun; } @@ -256,7 +283,7 @@ public class JobManager implements IJobManager { return runMapHistory.get(jobId); } - private void pickJobsToRun() { + private void pickJobsToRun() throws HyracksException { List<JobRun> selectedRuns = jobQueue.pull(); for (JobRun run : selectedRuns) { executeJob(run); @@ -264,24 +291,24 @@ public class JobManager implements IJobManager { } // Executes a job when the required capacity for the job is met. - private void executeJob(JobRun run) { - IResultCallback<JobId> callback = run.getCallback(); - try { - run.setStartTime(System.currentTimeMillis()); - JobId jobId = run.getJobId(); - activeRunMap.put(jobId, run); - - CCApplicationContext appCtx = ccs.getApplicationContext(); - JobSpecification spec = run.getJobSpecification(); - if (!run.getExecutor().isPredistributed()) { - appCtx.notifyJobCreation(jobId, spec); - } - run.setStatus(JobStatus.RUNNING, null); - executeJobInternal(run); - callback.setValue(jobId); - } catch (Exception e) { - callback.setException(e); + private void executeJob(JobRun run) throws HyracksException { + run.setStartTime(System.currentTimeMillis()); + JobId jobId = run.getJobId(); + activeRunMap.put(jobId, run); + + CCApplicationContext appCtx = ccs.getApplicationContext(); + JobSpecification spec = run.getJobSpecification(); + if (!run.getExecutor().isPredistributed()) { + appCtx.notifyJobCreation(jobId, spec); } + run.setStatus(JobStatus.RUNNING, null); + executeJobInternal(run); + } + + // Queue a job when the required capacity for the job is not met. + private void queueJob(JobRun jobRun) throws HyracksException { + jobRun.setStatus(JobStatus.PENDING, null); + jobQueue.add(jobRun); } private void executeJobInternal(JobRun run) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java index 3aa9043..55a7a82 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java @@ -52,7 +52,6 @@ import org.apache.hyracks.control.cc.executor.JobExecutor; import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker; import org.apache.hyracks.control.common.job.profiling.om.JobProfile; import org.apache.hyracks.control.common.utils.ExceptionUtils; -import org.apache.hyracks.control.common.work.IResultCallback; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -99,9 +98,7 @@ public class JobRun implements IJobStatusConditionVariable { private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations; - private final IResultCallback<JobId> callback; - - private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, IResultCallback<JobId> callback, + private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, JobSpecification spec, ActivityClusterGraph acg) { this.deploymentId = deploymentId; this.jobId = jobId; @@ -116,14 +113,13 @@ public class JobRun implements IJobStatusConditionVariable { connectorPolicyMap = new HashMap<>(); operatorLocations = new HashMap<>(); createTime = System.currentTimeMillis(); - this.callback = callback; } //Run a Pre-distributed job by passing the JobId - public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, IResultCallback<JobId> callback, + public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, PreDistributedJobDescriptor distributedJobDescriptor) throws HyracksException { - this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class), callback, + this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class), distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph()); Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints(); this.scheduler = new JobExecutor(ccs, this, constaints, true); @@ -131,9 +127,8 @@ public class JobRun implements IJobStatusConditionVariable { //Run a new job by creating an ActivityClusterGraph public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, - IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags, - IResultCallback<JobId> callback) { - this(deploymentId, jobId, jobFlags, callback, acggf.getJobSpecification(), acgg.initialize()); + IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags) { + this(deploymentId, jobId, jobFlags, acggf.getJobSpecification(), acgg.initialize()); this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), false); } @@ -196,10 +191,6 @@ public class JobRun implements IJobStatusConditionVariable { return createTime; } - public IResultCallback<JobId> getCallback() { - return callback; - } - public long getStartTime() { return startTime; } @@ -231,13 +222,7 @@ public class JobRun implements IJobStatusConditionVariable { wait(); } if (exceptions != null && !exceptions.isEmpty()) { - StringBuilder buffer = new StringBuilder(); - buffer.append("Job failed on account of:\n"); - for (Exception e : exceptions) { - buffer.append(e.getMessage()).append('\n'); - } - HyracksException he; - he = new HyracksException(buffer.toString(), exceptions.get(0)); + HyracksException he = HyracksException.create(exceptions.get(0)); for (int i = 1; i < exceptions.size(); ++i) { he.addSuppressed(exceptions.get(i)); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java index 6cf75bb..0377692 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java @@ -23,13 +23,15 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; -import java.util.LinkedList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.job.resource.IJobCapacityController; @@ -42,9 +44,9 @@ import org.apache.hyracks.control.cc.job.JobRun; public class FIFOJobQueue implements IJobQueue { private static final Logger LOGGER = Logger.getLogger(FIFOJobQueue.class.getName()); - private static final int CAPACITY = 4096; - private final List<JobRun> jobQueue = new LinkedList<>(); + + private final Map<JobId, JobRun> jobListMap = new LinkedHashMap<>(); private final IJobManager jobManager; private final IJobCapacityController jobCapacityController; @@ -55,17 +57,27 @@ public class FIFOJobQueue implements IJobQueue { @Override public void add(JobRun run) throws HyracksException { - int size = jobQueue.size(); + int size = jobListMap.size(); if (size >= CAPACITY) { - throw HyracksException.create(ErrorCode.JOB_QUEUE_FULL, new Integer(CAPACITY)); + throw HyracksException.create(ErrorCode.JOB_QUEUE_FULL, CAPACITY); } - jobQueue.add(run); + jobListMap.put(run.getJobId(), run); + } + + @Override + public JobRun remove(JobId jobId) { + return jobListMap.remove(jobId); + } + + @Override + public JobRun get(JobId jobId) { + return jobListMap.get(jobId); } @Override public List<JobRun> pull() { List<JobRun> jobRuns = new ArrayList<>(); - Iterator<JobRun> runIterator = jobQueue.iterator(); + Iterator<JobRun> runIterator = jobListMap.values().iterator(); while (runIterator.hasNext()) { JobRun run = runIterator.next(); JobSpecification job = run.getJobSpecification(); @@ -89,7 +101,6 @@ public class FIFOJobQueue implements IJobQueue { } catch (HyracksException e) { LOGGER.log(Level.SEVERE, e.getMessage(), e); } - continue; } } return jobRuns; @@ -97,7 +108,7 @@ public class FIFOJobQueue implements IJobQueue { @Override public Collection<JobRun> jobs() { - return Collections.unmodifiableCollection(jobQueue); + return Collections.unmodifiableCollection(jobListMap.values()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java index 2c26799..e666224 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.List; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.cc.job.JobRun; /** @@ -41,6 +42,22 @@ public interface IJobQueue { void add(JobRun run) throws HyracksException; /** + * Removes a job with a given jobId from the job queue. + * + * @param jobId, + * the job id of the job to be removed. + */ + JobRun remove(JobId jobId); + + /** + * Retrieves a job with a given jobId from the job queue. + * + * @param jobId, + * the job id of the job to be retrieved. + */ + JobRun get(JobId jobId); + + /** * Pull a list of jobs from the job queque, when more cluster capacity becomes available. * * @return a list of jobs whose capacity requirements can all be met at the same time. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java new file mode 100644 index 0000000..f3b67c9 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.control.cc.work; + +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.cc.job.IJobManager; +import org.apache.hyracks.control.common.work.IResultCallback; +import org.apache.hyracks.control.common.work.SynchronizableWork; + +/** + * This work cancels a job with the given job id. + * It is triggered by the cancel call with a job id from the client. + */ +public class CancelJobWork extends SynchronizableWork { + private final IJobManager jobManager; + private final JobId jobId; + private final IResultCallback<Void> callback; + + public CancelJobWork(IJobManager jobManager, JobId jobId, IResultCallback<Void> callback) { + this.jobId = jobId; + this.jobManager = jobManager; + this.callback = callback; + } + + @Override + protected void doRun() throws Exception { + try { + if (jobId != null) { + jobManager.cancel(jobId); + } + callback.setValue(null); + } catch (Exception e) { + callback.setException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java index c608712..1253cf7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java @@ -65,14 +65,14 @@ public class JobStartWork extends SynchronizableWork { .deserialize(acggfBytes, deploymentId, appCtx); IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags); - run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags, callback); + run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags); } else { //ActivityClusterGraph has already been distributed - run = new JobRun(ccs, deploymentId, jobId, callback, + run = new JobRun(ccs, deploymentId, jobId, ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId)); } jobManager.add(run); - + callback.setValue(jobId); } catch (Exception e) { callback.setException(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java index 88b8939..3bb08bd 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java @@ -199,6 +199,61 @@ public class JobManagerTest { Assert.assertTrue(jobManager.getPendingJobs().isEmpty()); } + @Test + public void testCancel() throws HyracksException { + CCConfig ccConfig = new CCConfig(); + IJobCapacityController jobCapacityController = mock(IJobCapacityController.class); + IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController)); + + // Submits runnable jobs. + List<JobRun> acceptedRuns = new ArrayList<>(); + for (int id = 0; id < 4096; ++id) { + // Mocks an immediately executable job. + JobRun run = mockJobRun(id); + JobSpecification job = mock(JobSpecification.class); + when(run.getJobSpecification()).thenReturn(job); + when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); + + // Submits the job. + acceptedRuns.add(run); + jobManager.add(run); + Assert.assertTrue(jobManager.getRunningJobs().size() == id + 1); + Assert.assertTrue(jobManager.getPendingJobs().isEmpty()); + } + + // Submits jobs that will be deferred due to the capacity limitation. + List<JobRun> deferredRuns = new ArrayList<>(); + for (int id = 4096; id < 8192; ++id) { + // Mocks a deferred job. + JobRun run = mockJobRun(id); + JobSpecification job = mock(JobSpecification.class); + when(run.getJobSpecification()).thenReturn(job); + when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE) + .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); + + // Submits the job. + deferredRuns.add(run); + jobManager.add(run); + Assert.assertTrue(jobManager.getRunningJobs().size() == 4096); + Assert.assertTrue(jobManager.getPendingJobs().size() == id + 1 - 4096); + } + + // Cancels deferred jobs. + for (JobRun run : deferredRuns) { + jobManager.cancel(run.getJobId()); + } + + // Cancels runnable jobs. + for (JobRun run : acceptedRuns) { + jobManager.cancel(run.getJobId()); + } + + Assert.assertTrue(jobManager.getPendingJobs().isEmpty()); + Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize); + verify(jobManager, times(0)).prepareComplete(any(), any(), any()); + verify(jobManager, times(0)).finalComplete(any()); + } + private JobRun mockJobRun(long id) { JobRun run = mock(JobRun.class, Mockito.RETURNS_DEEP_STUBS); when(run.getExceptions()).thenReturn(Collections.emptyList()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java index 4163e46..3d6ac00 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java @@ -26,8 +26,9 @@ import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.FileUtils; +import org.apache.hyracks.api.application.ICCApplicationContext; +import org.apache.hyracks.api.application.ICCApplicationEntryPoint; import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.comm.IFrameTupleAccessor; @@ -35,9 +36,11 @@ import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.api.dataset.IHyracksDatasetReader; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.client.dataset.HyracksDataset; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.controllers.CCConfig; @@ -46,12 +49,14 @@ import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.control.nc.resources.memory.FrameManager; import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; -import com.fasterxml.jackson.databind.node.ArrayNode; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.rules.TemporaryFolder; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; + public abstract class AbstractMultiNCIntegrationTest { private static final Logger LOGGER = Logger.getLogger(AbstractMultiNCIntegrationTest.class.getName()); @@ -88,6 +93,7 @@ public abstract class AbstractMultiNCIntegrationTest { ccRoot.delete(); ccRoot.mkdir(); ccConfig.ccRoot = ccRoot.getAbsolutePath(); + ccConfig.appCCMainClass = DummyApplicationEntryPoint.class.getName(); cc = new ClusterControllerService(ccConfig); cc.start(); @@ -122,6 +128,18 @@ public abstract class AbstractMultiNCIntegrationTest { cc.stop(); } + protected JobId startJob(JobSpecification spec) throws Exception { + return hcc.startJob(spec); + } + + protected void waitForCompletion(JobId jobId) throws Exception { + hcc.waitForCompletion(jobId); + } + + protected void cancelJob(JobId jobId) throws Exception { + hcc.cancelJob(jobId); + } + protected void runTest(JobSpecification spec) throws Exception { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info(spec.toJSON().asText()); @@ -201,4 +219,40 @@ public abstract class AbstractMultiNCIntegrationTest { return tempFile; } + public static class DummyApplicationEntryPoint implements ICCApplicationEntryPoint { + + @Override + public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception { + + } + + @Override + public void stop() throws Exception { + + } + + @Override + public void startupCompleted() throws Exception { + + } + + @Override + public IJobCapacityController getJobCapacityController() { + return new IJobCapacityController() { + private long maxRAM = Runtime.getRuntime().maxMemory(); + + @Override + public JobSubmissionStatus allocate(JobSpecification job) throws HyracksException { + return maxRAM > job.getRequiredClusterCapacity().getAggregatedMemoryByteSize() + ? JobSubmissionStatus.EXECUTE : JobSubmissionStatus.QUEUE; + } + + @Override + public void release(JobSpecification job) { + + } + }; + } + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java new file mode 100644 index 0000000..7c3b66f --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.tests.integration; + +import java.io.File; +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.constraints.PartitionConstraintHelper; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.IConnectorDescriptor; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.api.io.ManagedFileSplit; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.resource.ClusterCapacity; +import org.apache.hyracks.api.job.resource.IClusterCapacity; +import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory; +import org.apache.hyracks.data.std.primitive.UTF8StringPointable; +import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory; +import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; +import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory; +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; +import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor; +import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; +import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider; +import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory; +import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; +import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor; +import org.junit.Assert; +import org.junit.Test; + +public class CancelJobTest extends AbstractMultiNCIntegrationTest { + + @Test + public void cancelExecutingJobAfterWaitForCompletion() throws Exception { + //Cancels executing jobs after waitForCompletion() is called. + for (JobSpecification spec : testJobs()) { + cancelAfterWaitForCompletion(spec); + } + } + + @Test + public void cancelExecutingJobBeforeWaitForCompletion() throws Exception { + //Cancels executing jobs before waitForCompletion is called. + for (JobSpecification spec : testJobs()) { + cancelBeforeWaitForCompletion(spec); + } + } + + @Test + public void cancelExecutingJobWithoutWaitForCompletion() throws Exception { + //Cancels executing jobs without calling waitForCompletion. + for (JobSpecification spec : testJobs()) { + cancelWithoutWait(spec); + } + } + + @Test + public void cancelPendingJobAfterWaitForCompletion() throws Exception { + //Cancels pending jobs after waitForCompletion() is called. + for (JobSpecification spec : testJobs()) { + setJobCapacity(spec); + cancelAfterWaitForCompletion(spec); + } + } + + @Test + public void cancelPendingJobBeforeWaitForCompletion() throws Exception { + //Cancels pending jobs before waitForCompletion is called. + for (JobSpecification spec : testJobs()) { + setJobCapacity(spec); + cancelBeforeWaitForCompletion(spec); + } + } + + @Test + public void cancelPendingJobWithoutWaitForCompletion() throws Exception { + //Cancels pending jobs without calling waitForCompletion. + for (JobSpecification spec : testJobs()) { + setJobCapacity(spec); + cancelWithoutWait(spec); + } + } + + private JobSpecification[] testJobs() { + return new JobSpecification[] { jobWithSleepSourceOp(), jobWithSleepOp() }; + } + + private void setJobCapacity(JobSpecification spec) { + IClusterCapacity reqCapacity = new ClusterCapacity(); + reqCapacity.setAggregatedMemoryByteSize(Long.MAX_VALUE); + spec.setRequiredClusterCapacity(reqCapacity); + } + + private void cancelAfterWaitForCompletion(JobSpecification spec) throws Exception { + JobId jobId = startJob(spec); + // A thread for canceling the job. + Thread thread = new Thread(() -> { + try { + synchronized (this) { + this.wait(500); // Make sure waitForCompletion be called first. + } + cancelJob(jobId); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + // Cancels the job. + thread.start(); + + // Checks the resulting Exception. + boolean exceptionMatched = false; + try { + waitForCompletion(jobId); + } catch (Exception e) { + exceptionMatched = true; + Assert.assertTrue(e instanceof HyracksException); + HyracksException hyracksException = (HyracksException) e; + Assert.assertTrue(hyracksException.getErrorCode() == ErrorCode.JOB_CANCELED); + } finally { + Assert.assertTrue(exceptionMatched); + } + thread.join(); + } + + private void cancelBeforeWaitForCompletion(JobSpecification spec) throws Exception { + boolean exceptionMatched = false; + try { + JobId jobId = startJob(spec); + cancelJob(jobId); + waitForCompletion(jobId); + } catch (HyracksException e) { + exceptionMatched = true; + Assert.assertTrue(e.getErrorCode() == ErrorCode.JOB_CANCELED); + } finally { + Assert.assertTrue(exceptionMatched); + } + } + + private void cancelWithoutWait(JobSpecification spec) throws Exception { + JobId jobId = startJob(spec); + cancelJob(jobId); + } + + private JobSpecification jobWithSleepSourceOp() { + JobSpecification spec = new JobSpecification(); + SleepSourceOperatorDescriptor sourceOpDesc = new SleepSourceOperatorDescriptor(spec); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sourceOpDesc, ASTERIX_IDS); + SinkOperatorDescriptor sinkOpDesc = new SinkOperatorDescriptor(spec, 1); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sinkOpDesc, ASTERIX_IDS); + IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec); + spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0); + spec.addRoot(sinkOpDesc); + return spec; + } + + private JobSpecification jobWithSleepOp() { + JobSpecification spec = new JobSpecification(); + FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(ASTERIX_IDS[0], + "data" + File.separator + "tpch0.001" + File.separator + "orders-part1.tbl") }; + IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); + RecordDescriptor recordDesc = new RecordDescriptor( + new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); + + // File scan operator. + FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, + new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, + UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), + recordDesc); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, ASTERIX_IDS[0]); + + // Sleep operator. + SleepOperatorDescriptor sleepOp = new SleepOperatorDescriptor(spec); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sleepOp, ASTERIX_IDS); + + // Sink operator. + SinkOperatorDescriptor sinkOp = new SinkOperatorDescriptor(spec, 1); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sinkOp, ASTERIX_IDS); + + // Hash-repartitioning connector. + IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec, + new FieldHashPartitionComputerFactory(new int[] { 0 }, new IBinaryHashFunctionFactory[] { + PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) })); + spec.connect(conn1, scanOp, 0, sleepOp, 0); + + // One-to-one connector. + IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec); + spec.connect(conn2, sleepOp, 0, sinkOp, 0); + return spec; + } + +} + +class SleepSourceOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { + private static final long serialVersionUID = 1L; + + public SleepSourceOperatorDescriptor(JobSpecification spec) { + super(spec, 0, 1); + } + + @Override + public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { + return new AbstractUnaryOutputSourceOperatorNodePushable() { + + @Override + public void initialize() throws HyracksDataException { + try { + writer.open(); + while (true) { + synchronized (this) { + wait(); + } + } + } catch (Exception e) { + writer.fail(); + } finally { + writer.close(); + } + } + }; + } +} + +class SleepOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { + private static final long serialVersionUID = 1L; + + public SleepOperatorDescriptor(JobSpecification spec) { + super(spec, 1, 1); + } + + @Override + public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { + return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { + + @Override + public void open() throws HyracksDataException { + writer.open(); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + try { + while (true) { + synchronized (this) { + wait(); + } + } + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + @Override + public void fail() throws HyracksDataException { + writer.fail(); + } + + @Override + public void close() throws HyracksDataException { + writer.close(); + } + }; + } +} \ No newline at end of file
