[NO ISSUE][HYR] Ensure IJobLifecycleListener is notified on cancelled queued 
jobs

Change-Id: I7e26c9d1015725f895876f5873eccd3f86b17653
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2624
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhub...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/c2d19a55
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/c2d19a55
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/c2d19a55

Branch: refs/heads/master
Commit: c2d19a558508bc6397ca044430546ce5be4e0872
Parents: d1c696b
Author: Michael Blow <mb...@apache.org>
Authored: Fri Apr 27 16:08:15 2018 -0400
Committer: Michael Blow <mb...@apache.org>
Committed: Fri Apr 27 17:56:43 2018 -0700

----------------------------------------------------------------------
 .../api/exceptions/HyracksException.java        | 11 +++
 .../hyracks/control/cc/job/JobManager.java      | 90 +++++++++++---------
 .../apache/hyracks/control/cc/job/JobRun.java   |  9 +-
 3 files changed, 62 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c2d19a55/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 7aa84e2..210779e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -40,6 +40,17 @@ public class HyracksException extends IOException {
         return new HyracksException(cause);
     }
 
+    public static HyracksException wrapOrThrowUnchecked(Throwable cause) {
+        if (cause instanceof Error) {
+            throw (Error) cause;
+        } else if (cause instanceof RuntimeException) {
+            throw (RuntimeException) cause;
+        } else if (cause instanceof HyracksException) {
+            return (HyracksException) cause;
+        }
+        return new HyracksException(cause);
+    }
+
     public static HyracksException create(int code, Serializable... params) {
         return new HyracksException(ErrorCode.HYRACKS, code, 
ErrorCode.getErrorMessage(code), params);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c2d19a55/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 9fd1a02..3ba25f5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -31,12 +31,14 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.application.CCServiceContext;
@@ -65,8 +67,7 @@ public class JobManager implements IJobManager {
     private final IJobCapacityController jobCapacityController;
     private IJobQueue jobQueue;
 
-    public JobManager(CCConfig ccConfig, ClusterControllerService ccs, 
IJobCapacityController jobCapacityController)
-            throws HyracksException {
+    public JobManager(CCConfig ccConfig, ClusterControllerService ccs, 
IJobCapacityController jobCapacityController) {
         this.ccs = ccs;
         this.jobCapacityController = jobCapacityController;
         try {
@@ -116,6 +117,8 @@ public class JobManager implements IJobManager {
             case EXECUTE:
                 executeJob(jobRun);
                 break;
+            default:
+                throw new IllegalStateException("unknown submission status: " 
+ status);
         }
     }
 
@@ -136,9 +139,18 @@ public class JobManager implements IJobManager {
             List<Exception> exceptions =
                     
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, 
jobId));
             // Since the job has not been executed, we only need to update its 
status and lifecyle here.
-            jobRun.setStatus(JobStatus.FAILURE, exceptions);
+            jobRun.setStatus(JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
             runMapArchive.put(jobId, jobRun);
             runMapHistory.put(jobId, exceptions);
+            CCServiceContext serviceCtx = ccs.getContext();
+            if (serviceCtx != null) {
+                try {
+                    serviceCtx.notifyJobFinish(jobId, 
JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
+                } catch (Exception e) {
+                    LOGGER.error("Exception notifying cancel on pending job 
{}", jobId, e);
+                    throw HyracksDataException.create(e);
+                }
+            }
         }
         callback.setValue(null);
     }
@@ -152,14 +164,12 @@ public class JobManager implements IJobManager {
             finalComplete(run);
             return;
         }
-        JobId jobId = run.getJobId();
-        HyracksException caughtException = null;
         if (run.getPendingStatus() != null && 
run.getCleanupPendingNodeIds().isEmpty()) {
             finalComplete(run);
             return;
         }
         if (run.getPendingStatus() != null) {
-            LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: " + 
jobId);
+            LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: {}", 
run::getJobId);
             return;
         }
         Set<String> targetNodes = run.getParticipatingNodeIds();
@@ -168,38 +178,40 @@ public class JobManager implements IJobManager {
             run.setPendingStatus(status, exceptions);
         }
 
-        if (targetNodes != null && !targetNodes.isEmpty()) {
-            INodeManager nodeManager = ccs.getNodeManager();
-            Set<String> toDelete = new HashSet<>();
-            for (String n : targetNodes) {
-                NodeControllerState ncs = 
nodeManager.getNodeControllerState(n);
+        if (!targetNodes.isEmpty()) {
+            cleanupJobOnNodes(run, status, targetNodes);
+        } else {
+            finalComplete(run);
+        }
+
+    }
+
+    private void cleanupJobOnNodes(JobRun run, JobStatus status, Set<String> 
targetNodes) throws HyracksException {
+        Throwable caughtException = null;
+        JobId jobId = run.getJobId();
+        INodeManager nodeManager = ccs.getNodeManager();
+        Set<String> toDelete = new HashSet<>();
+        for (String n : targetNodes) {
+            NodeControllerState ncs = nodeManager.getNodeControllerState(n);
+            if (ncs == null) {
+                toDelete.add(n);
+            } else {
                 try {
-                    if (ncs == null) {
-                        toDelete.add(n);
-                    } else {
-                        ncs.getNodeController().cleanUpJoblet(jobId, status);
-                    }
+                    ncs.getNodeController().cleanUpJoblet(jobId, status);
                 } catch (Exception e) {
-                    LOGGER.log(Level.ERROR, e.getMessage(), e);
-                    if (caughtException == null) {
-                        caughtException = HyracksException.create(e);
-                    } else {
-                        caughtException.addSuppressed(e);
-                    }
+                    LOGGER.error("Exception cleaning up joblet {} on node {}", 
jobId, n, e);
+                    caughtException = ExceptionUtils.suppress(caughtException, 
e);
                 }
             }
-            targetNodes.removeAll(toDelete);
-            run.getCleanupPendingNodeIds().removeAll(toDelete);
-            if (run.getCleanupPendingNodeIds().isEmpty()) {
-                finalComplete(run);
-            }
-        } else {
+        }
+        targetNodes.removeAll(toDelete);
+        run.getCleanupPendingNodeIds().removeAll(toDelete);
+        if (run.getCleanupPendingNodeIds().isEmpty()) {
             finalComplete(run);
         }
-
         // throws caught exceptions if any
         if (caughtException != null) {
-            throw caughtException;
+            throw HyracksException.wrapOrThrowUnchecked(caughtException);
         }
     }
 
@@ -207,13 +219,13 @@ public class JobManager implements IJobManager {
     public void finalComplete(JobRun run) throws HyracksException {
         checkJob(run);
         JobId jobId = run.getJobId();
-        HyracksException caughtException = null;
+        Throwable caughtException = null;
         CCServiceContext serviceCtx = ccs.getContext();
         if (serviceCtx != null) {
             try {
                 serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), 
run.getPendingExceptions());
-            } catch (HyracksException e) {
-                LOGGER.log(Level.ERROR, e.getMessage(), e);
+            } catch (Exception e) {
+                LOGGER.error("Exception notifying job finish {}", jobId, e);
                 caughtException = e;
             }
         }
@@ -224,18 +236,14 @@ public class JobManager implements IJobManager {
         runMapHistory.put(jobId, run.getExceptions());
 
         if (run.getActivityClusterGraph().isReportTaskDetails()) {
-            /**
+            /*
              * log job details when profiling is enabled
              */
             try {
                 ccs.getJobLogFile().log(createJobLogObject(run));
             } catch (Exception e) {
-                LOGGER.log(Level.ERROR, e.getMessage(), e);
-                if (caughtException == null) {
-                    caughtException = HyracksException.create(e);
-                } else {
-                    caughtException.addSuppressed(e);
-                }
+                LOGGER.error("Exception reporting task details for job {}", 
jobId, e);
+                caughtException = ExceptionUtils.suppress(caughtException, e);
             }
         }
 
@@ -248,7 +256,7 @@ public class JobManager implements IJobManager {
 
         // throws caught exceptions if any
         if (caughtException != null) {
-            throw caughtException;
+            throw HyracksException.wrapOrThrowUnchecked(caughtException);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c2d19a55/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index e4699c7..5b98260 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -209,17 +209,12 @@ public class JobRun implements 
IJobStatusConditionVariable {
     }
 
     public void registerOperatorLocation(OperatorDescriptorId op, int 
partition, String location) {
-        Map<Integer, String> locations = operatorLocations.get(op);
-        if (locations == null) {
-            locations = new HashMap<Integer, String>();
-            operatorLocations.put(op, locations);
-        }
-        locations.put(partition, location);
+        operatorLocations.computeIfAbsent(op, k -> new 
HashMap<>()).put(partition, location);
     }
 
     @Override
     public synchronized void waitForCompletion() throws Exception {
-        while (status != JobStatus.TERMINATED && status != JobStatus.FAILURE) {
+        while (status == JobStatus.PENDING || status == JobStatus.RUNNING) {
             wait();
         }
         if (exceptions != null && !exceptions.isEmpty()) {

Reply via email to