[NO ISSUE][HYR] Ensure IJobLifecycleListener is notified on cancelled queued jobs
Change-Id: I7e26c9d1015725f895876f5873eccd3f86b17653 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2624 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/c2d19a55 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/c2d19a55 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/c2d19a55 Branch: refs/heads/master Commit: c2d19a558508bc6397ca044430546ce5be4e0872 Parents: d1c696b Author: Michael Blow <mb...@apache.org> Authored: Fri Apr 27 16:08:15 2018 -0400 Committer: Michael Blow <mb...@apache.org> Committed: Fri Apr 27 17:56:43 2018 -0700 ---------------------------------------------------------------------- .../api/exceptions/HyracksException.java | 11 +++ .../hyracks/control/cc/job/JobManager.java | 90 +++++++++++--------- .../apache/hyracks/control/cc/job/JobRun.java | 9 +- 3 files changed, 62 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c2d19a55/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java index 7aa84e2..210779e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java @@ -40,6 +40,17 @@ public class HyracksException extends IOException { return new HyracksException(cause); } + public static HyracksException wrapOrThrowUnchecked(Throwable cause) { + if (cause instanceof Error) { + throw (Error) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof HyracksException) { + return (HyracksException) cause; + } + return new HyracksException(cause); + } + public static HyracksException create(int code, Serializable... params) { return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c2d19a55/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 9fd1a02..3ba25f5 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -31,12 +31,14 @@ import java.util.Map; import java.util.Set; import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.ActivityClusterGraph; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.job.resource.IJobCapacityController; +import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; import org.apache.hyracks.control.cc.application.CCServiceContext; @@ -65,8 +67,7 @@ public class JobManager implements IJobManager { private final IJobCapacityController jobCapacityController; private IJobQueue jobQueue; - public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController) - throws HyracksException { + public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController) { this.ccs = ccs; this.jobCapacityController = jobCapacityController; try { @@ -116,6 +117,8 @@ public class JobManager implements IJobManager { case EXECUTE: executeJob(jobRun); break; + default: + throw new IllegalStateException("unknown submission status: " + status); } } @@ -136,9 +139,18 @@ public class JobManager implements IJobManager { List<Exception> exceptions = Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId)); // Since the job has not been executed, we only need to update its status and lifecyle here. - jobRun.setStatus(JobStatus.FAILURE, exceptions); + jobRun.setStatus(JobStatus.FAILURE_BEFORE_EXECUTION, exceptions); runMapArchive.put(jobId, jobRun); runMapHistory.put(jobId, exceptions); + CCServiceContext serviceCtx = ccs.getContext(); + if (serviceCtx != null) { + try { + serviceCtx.notifyJobFinish(jobId, JobStatus.FAILURE_BEFORE_EXECUTION, exceptions); + } catch (Exception e) { + LOGGER.error("Exception notifying cancel on pending job {}", jobId, e); + throw HyracksDataException.create(e); + } + } } callback.setValue(null); } @@ -152,14 +164,12 @@ public class JobManager implements IJobManager { finalComplete(run); return; } - JobId jobId = run.getJobId(); - HyracksException caughtException = null; if (run.getPendingStatus() != null && run.getCleanupPendingNodeIds().isEmpty()) { finalComplete(run); return; } if (run.getPendingStatus() != null) { - LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: " + jobId); + LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: {}", run::getJobId); return; } Set<String> targetNodes = run.getParticipatingNodeIds(); @@ -168,38 +178,40 @@ public class JobManager implements IJobManager { run.setPendingStatus(status, exceptions); } - if (targetNodes != null && !targetNodes.isEmpty()) { - INodeManager nodeManager = ccs.getNodeManager(); - Set<String> toDelete = new HashSet<>(); - for (String n : targetNodes) { - NodeControllerState ncs = nodeManager.getNodeControllerState(n); + if (!targetNodes.isEmpty()) { + cleanupJobOnNodes(run, status, targetNodes); + } else { + finalComplete(run); + } + + } + + private void cleanupJobOnNodes(JobRun run, JobStatus status, Set<String> targetNodes) throws HyracksException { + Throwable caughtException = null; + JobId jobId = run.getJobId(); + INodeManager nodeManager = ccs.getNodeManager(); + Set<String> toDelete = new HashSet<>(); + for (String n : targetNodes) { + NodeControllerState ncs = nodeManager.getNodeControllerState(n); + if (ncs == null) { + toDelete.add(n); + } else { try { - if (ncs == null) { - toDelete.add(n); - } else { - ncs.getNodeController().cleanUpJoblet(jobId, status); - } + ncs.getNodeController().cleanUpJoblet(jobId, status); } catch (Exception e) { - LOGGER.log(Level.ERROR, e.getMessage(), e); - if (caughtException == null) { - caughtException = HyracksException.create(e); - } else { - caughtException.addSuppressed(e); - } + LOGGER.error("Exception cleaning up joblet {} on node {}", jobId, n, e); + caughtException = ExceptionUtils.suppress(caughtException, e); } } - targetNodes.removeAll(toDelete); - run.getCleanupPendingNodeIds().removeAll(toDelete); - if (run.getCleanupPendingNodeIds().isEmpty()) { - finalComplete(run); - } - } else { + } + targetNodes.removeAll(toDelete); + run.getCleanupPendingNodeIds().removeAll(toDelete); + if (run.getCleanupPendingNodeIds().isEmpty()) { finalComplete(run); } - // throws caught exceptions if any if (caughtException != null) { - throw caughtException; + throw HyracksException.wrapOrThrowUnchecked(caughtException); } } @@ -207,13 +219,13 @@ public class JobManager implements IJobManager { public void finalComplete(JobRun run) throws HyracksException { checkJob(run); JobId jobId = run.getJobId(); - HyracksException caughtException = null; + Throwable caughtException = null; CCServiceContext serviceCtx = ccs.getContext(); if (serviceCtx != null) { try { serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions()); - } catch (HyracksException e) { - LOGGER.log(Level.ERROR, e.getMessage(), e); + } catch (Exception e) { + LOGGER.error("Exception notifying job finish {}", jobId, e); caughtException = e; } } @@ -224,18 +236,14 @@ public class JobManager implements IJobManager { runMapHistory.put(jobId, run.getExceptions()); if (run.getActivityClusterGraph().isReportTaskDetails()) { - /** + /* * log job details when profiling is enabled */ try { ccs.getJobLogFile().log(createJobLogObject(run)); } catch (Exception e) { - LOGGER.log(Level.ERROR, e.getMessage(), e); - if (caughtException == null) { - caughtException = HyracksException.create(e); - } else { - caughtException.addSuppressed(e); - } + LOGGER.error("Exception reporting task details for job {}", jobId, e); + caughtException = ExceptionUtils.suppress(caughtException, e); } } @@ -248,7 +256,7 @@ public class JobManager implements IJobManager { // throws caught exceptions if any if (caughtException != null) { - throw caughtException; + throw HyracksException.wrapOrThrowUnchecked(caughtException); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c2d19a55/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java index e4699c7..5b98260 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java @@ -209,17 +209,12 @@ public class JobRun implements IJobStatusConditionVariable { } public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) { - Map<Integer, String> locations = operatorLocations.get(op); - if (locations == null) { - locations = new HashMap<Integer, String>(); - operatorLocations.put(op, locations); - } - locations.put(partition, location); + operatorLocations.computeIfAbsent(op, k -> new HashMap<>()).put(partition, location); } @Override public synchronized void waitForCompletion() throws Exception { - while (status != JobStatus.TERMINATED && status != JobStatus.FAILURE) { + while (status == JobStatus.PENDING || status == JobStatus.RUNNING) { wait(); } if (exceptions != null && !exceptions.isEmpty()) {