This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 7c7d24096e9dbddcf638983adc11dec85793b2c9
Author: Ali Alsuliman <[email protected]>
AuthorDate: Thu May 9 22:00:29 2024 +0300

    [ASTERIXDB-3343][API] Capture job state changes in client requests
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    Instead of fetching the job state for a client request from
    the job manager, keep track of the request's job state while
    the job transitions from one state to another. Otherwise,
    the job archive in the job manager could have already recycled
    by the time someone tries to fetch the details of completed requests.
    
    - make IRequestTracker extends IJobLifecycleListener.
    - add the client request id as part of the job spec.
      this client request id will be used to report back
      to the request tracker about the job associated with
      the request id.
    
    Change-Id: I638682d48651ba0e771c7590ec875a3af1050ae3
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18279
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Tested-by: Ali Alsuliman <[email protected]>
---
 .../apache/asterix/translator/ClientRequest.java   | 98 ++++++++++++++--------
 .../apache/asterix/translator/Receptionist.java    |  6 +-
 .../app/active/ActiveNotificationHandler.java      | 10 ++-
 .../asterix/app/translator/QueryTranslator.java    |  3 +-
 .../asterix/hyracks/bootstrap/CCApplication.java   |  1 +
 .../http/servlet/QueryCancellationServletTest.java |  4 +-
 .../asterix/test/active/ActiveStatsTest.java       |  6 +-
 .../test/active/TestClusterControllerActor.java    |  7 +-
 .../apache/asterix/common/api/IClientRequest.java  | 32 +++++++
 .../apache/asterix/common/api/IReceptionist.java   |  9 +-
 .../apache/asterix/common/api/IRequestTracker.java |  3 +-
 .../runtime/job/listener/NodeJobTracker.java       |  9 +-
 .../asterix/runtime/utils/RequestTracker.java      | 40 +++++++++
 .../runtime/job/listener/NodeJobTrackerTest.java   |  7 +-
 .../hyracks/api/job/IJobLifecycleListener.java     | 31 ++++---
 .../apache/hyracks/api/job/JobSpecification.java   | 10 +++
 .../control/cc/application/CCServiceContext.java   | 16 ++--
 .../hyracks/control/cc/executor/JobExecutor.java   |  2 +-
 .../apache/hyracks/control/cc/job/JobManager.java  |  8 +-
 .../control/cc/result/ResultDirectoryService.java  |  9 +-
 .../integration/TestJobLifecycleListener.java      |  7 +-
 21 files changed, 227 insertions(+), 91 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 28549f31d3..9f2cbdd9d1 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.translator;
 
+import static 
org.apache.hyracks.api.job.resource.IJobCapacityController.JobSubmissionStatus.QUEUE;
+
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.api.ICommonRequestParameters;
@@ -26,9 +29,10 @@ import org.apache.asterix.om.base.AMutableDateTime;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.resource.IClusterCapacity;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+import org.apache.hyracks.api.util.ExceptionUtils;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -37,17 +41,17 @@ public class ClientRequest extends BaseClientRequest {
     protected final long creationTime = System.nanoTime();
     protected final Thread executor;
     protected final String statement;
-    protected final ClusterControllerService ccService;
     protected final String clientContextId;
+    protected final JobState jobState;
     protected volatile JobId jobId;
-    private String plan; // can be null
+    private volatile String plan; // can be null
 
-    public ClientRequest(ICommonRequestParameters requestParameters, 
ICcApplicationContext appCtx) {
+    public ClientRequest(ICommonRequestParameters requestParameters) {
         super(requestParameters.getRequestReference());
         this.clientContextId = requestParameters.getClientContextId();
         this.statement = requestParameters.getStatement();
         this.executor = Thread.currentThread();
-        this.ccService = (ClusterControllerService) 
appCtx.getServiceContext().getControllerService();
+        this.jobState = new JobState();
     }
 
     @Override
@@ -100,37 +104,55 @@ public class ClientRequest extends BaseClientRequest {
         return json;
     }
 
-    private void putJobDetails(ObjectNode json) {
-        if (jobId == null) {
-            json.putNull("jobId");
-        } else {
-            try {
-                json.put("jobId", jobId.toString());
-                JobRun jobRun = ccService.getJobManager().get(jobId);
-                if (jobRun != null) {
-                    json.put("jobStatus", String.valueOf(jobRun.getStatus()));
-                    putJobRequiredResources(json, jobRun);
-                    putTimes(json, jobRun);
-                }
-            } catch (Throwable th) {
-                // ignore
-            }
+    @Override
+    public void jobCreated(JobId jobId, IReadOnlyClusterCapacity 
requiredClusterCapacity,
+            IJobCapacityController.JobSubmissionStatus status) {
+        jobState.createTime = System.currentTimeMillis();
+        jobState.status = status == QUEUE ? JobStatus.PENDING : 
JobStatus.RUNNING;
+        jobState.requiredCPUs = requiredClusterCapacity.getAggregatedCores();
+        jobState.requiredMemoryInBytes = 
requiredClusterCapacity.getAggregatedMemoryByteSize();
+    }
+
+    @Override
+    public void jobStarted(JobId jobId) {
+        jobState.startTime = System.currentTimeMillis();
+        jobState.status = JobStatus.RUNNING;
+    }
+
+    @Override
+    public void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> 
exceptions) {
+        jobState.endTime = System.currentTimeMillis();
+        jobState.status = jobStatus;
+        if (exceptions != null && !exceptions.isEmpty()) {
+            jobState.errorMsg = processException(exceptions.get(0));
         }
     }
 
-    private static void putTimes(ObjectNode json, JobRun jobRun) {
-        AMutableDateTime dateTime = new AMutableDateTime(0);
-        putTime(json, jobRun.getCreateTime(), "jobCreateTime", dateTime);
-        putTime(json, jobRun.getStartTime(), "jobStartTime", dateTime);
-        putTime(json, jobRun.getEndTime(), "jobEndTime", dateTime);
-        json.put("jobQueueTime", 
TimeUnit.MILLISECONDS.toSeconds(jobRun.getQueueWaitTimeInMillis()));
+    protected String processException(Exception e) {
+        return ExceptionUtils.unwrap(e).getMessage();
     }
 
-    private static void putJobRequiredResources(ObjectNode json, JobRun 
jobRun) {
-        IClusterCapacity jobCapacity = 
jobRun.getJobSpecification().getRequiredClusterCapacity();
-        if (jobCapacity != null) {
-            json.put("jobRequiredCPUs", jobCapacity.getAggregatedCores());
-            json.put("jobRequiredMemory", 
jobCapacity.getAggregatedMemoryByteSize());
+    private void putJobDetails(ObjectNode json) {
+        try {
+            json.put("jobId", jobId != null ? jobId.toString() : null);
+            putJobState(json, jobState);
+        } catch (Throwable th) {
+            // ignore
+        }
+    }
+
+    private static void putJobState(ObjectNode json, JobState state) {
+        AMutableDateTime dateTime = new AMutableDateTime(0);
+        putTime(json, state.createTime, "jobCreateTime", dateTime);
+        putTime(json, state.startTime, "jobStartTime", dateTime);
+        putTime(json, state.endTime, "jobEndTime", dateTime);
+        long queueTime = (state.startTime > 0 ? state.startTime : 
System.currentTimeMillis()) - state.createTime;
+        json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime));
+        json.put("jobStatus", String.valueOf(state.status));
+        json.put("jobRequiredCPUs", state.requiredCPUs);
+        json.put("jobRequiredMemory", state.requiredMemoryInBytes);
+        if (state.errorMsg != null) {
+            json.put("error", state.errorMsg);
         }
     }
 
@@ -140,4 +162,14 @@ public class ClientRequest extends BaseClientRequest {
             json.put(label, dateTime.toSimpleString());
         }
     }
+
+    static class JobState {
+        volatile long createTime;
+        volatile long startTime;
+        volatile long endTime;
+        volatile long requiredMemoryInBytes;
+        volatile int requiredCPUs;
+        volatile JobStatus status;
+        volatile String errorMsg;
+    }
 }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index 6893d5c356..84bee6a6de 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -27,7 +27,6 @@ import org.apache.asterix.common.api.IReceptionist;
 import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.common.api.ISchedulableClientRequest;
 import org.apache.asterix.common.api.RequestReference;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.http.HttpHeaders;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
@@ -47,9 +46,8 @@ public class Receptionist implements IReceptionist {
     }
 
     @Override
-    public IClientRequest requestReceived(ICommonRequestParameters 
requestParameters, ICcApplicationContext appCtx)
-            throws HyracksDataException {
-        return new ClientRequest(requestParameters, appCtx);
+    public IClientRequest requestReceived(ICommonRequestParameters 
requestParameters) throws HyracksDataException {
+        return new ClientRequest(requestParameters);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 8821c674ec..4d654d5b2a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -41,6 +41,7 @@ import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.util.SingleThreadEventProcessor;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.logging.log4j.Level;
@@ -89,7 +90,8 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
     // *** IJobLifecycleListener
 
     @Override
-    public void notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) throws HyracksDataException {
+    public void notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification,
+            IJobCapacityController.JobSubmissionStatus status) throws 
HyracksDataException {
         Object property = 
jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
         if (!(property instanceof EntityId)) {
             if (property != null) {
@@ -119,7 +121,7 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
     }
 
     @Override
-    public synchronized void notifyJobStart(JobId jobId) throws 
HyracksException {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification 
spec) throws HyracksException {
         EntityId entityId = jobId2EntityId.get(jobId);
         if (entityId != null) {
             add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null));
@@ -128,8 +130,8 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
     }
 
     @Override
-    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, 
List<Exception> exceptions)
-            throws HyracksException {
+    public synchronized void notifyJobFinish(JobId jobId, JobSpecification 
spec, JobStatus jobStatus,
+            List<Exception> exceptions) throws HyracksException {
         EntityId entityId = jobId2EntityId.get(jobId);
         if (entityId != null) {
             LOGGER.debug("notified of ingestion job finish {}", jobId);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3cac3d8791..c89097b474 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4869,6 +4869,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
             // ensure request not cancelled before running job
             ensureNotCancelled(clientRequest);
+            jobSpec.setRequestId(clientRequest.getId());
             final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             if (LOGGER.isInfoEnabled()) {
                 LOGGER.info("Created job {} for query uuid:{}, 
clientContextID:{}", jobId,
@@ -5240,7 +5241,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
     }
 
     protected void trackRequest(IRequestParameters requestParameters) throws 
HyracksDataException {
-        final IClientRequest clientRequest = 
appCtx.getReceptionist().requestReceived(requestParameters, appCtx);
+        final IClientRequest clientRequest = 
appCtx.getReceptionist().requestReceived(requestParameters);
         this.appCtx.getRequestTracker().track(clientRequest);
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 1b011aeb86..31ec44cb58 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -171,6 +171,7 @@ public class CCApplication extends BaseCCApplication {
         ccServiceCtx.setDistributedState(proxy);
         MetadataManager.initialize(proxy, metadataProperties, appCtx);
         
ccServiceCtx.addJobLifecycleListener(appCtx.getActiveNotificationHandler());
+        ccServiceCtx.addJobLifecycleListener(appCtx.getRequestTracker());
 
         // create event loop groups
         webManager = new WebManager();
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 0e063ca6c0..673611d69a 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -79,7 +79,7 @@ public class QueryCancellationServletTest {
         final RequestReference requestReference = RequestReference.of("1", 
"node1", System.currentTimeMillis());
         RequestParameters requestParameters = new 
RequestParameters(requestReference, "select 1", null, null, null,
                 null, null, "1", null, null, null, true);
-        ClientRequest request = new ClientRequest(requestParameters, appCtx);
+        ClientRequest request = new ClientRequest(requestParameters);
         request.setJobId(new JobId(1));
         request.markCancellable();
         tracker.track(request);
@@ -96,7 +96,7 @@ public class QueryCancellationServletTest {
         final RequestReference requestReference2 = RequestReference.of("2", 
"node1", System.currentTimeMillis());
         requestParameters = new RequestParameters(requestReference2, "select 
1", null, null, null, null, null, "2",
                 null, null, null, true);
-        ClientRequest request2 = new ClientRequest(requestParameters, appCtx);
+        ClientRequest request2 = new ClientRequest(requestParameters);
         request2.setJobId(new JobId(2));
         request2.markCancellable();
         tracker.track(request2);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 126edb157c..4e5d5319bf 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -53,6 +53,7 @@ import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartit
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -135,8 +136,9 @@ public class ActiveStatsTest {
         TestUserActor user = new TestUserActor("Xikui", mdProvider, null);
         Action start = user.startActivity(eventsListener);
         startingSubscriber.sync();
-        activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec);
-        activeJobNotificationHandler.notifyJobStart(jobId);
+        activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec,
+                IJobCapacityController.JobSubmissionStatus.EXECUTE);
+        activeJobNotificationHandler.notifyJobStart(jobId, jobSpec);
         try {
             eventsListener.refreshStats(1000);
         } catch (HyracksDataException e) {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
index 52d4225156..883a0cb069 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
@@ -28,6 +28,7 @@ import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.mockito.Mockito;
 
 public class TestClusterControllerActor extends Actor {
@@ -49,8 +50,8 @@ public class TestClusterControllerActor extends Actor {
                 JobSpecification jobSpecification = 
Mockito.mock(JobSpecification.class);
                 
Mockito.when(jobSpecification.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME))
                         .thenReturn(entityId);
-                handler.notifyJobCreation(jobId, jobSpecification);
-                handler.notifyJobStart(jobId);
+                handler.notifyJobCreation(jobId, jobSpecification, 
IJobCapacityController.JobSubmissionStatus.EXECUTE);
+                handler.notifyJobStart(jobId, null);
             }
         };
         add(startJob);
@@ -72,7 +73,7 @@ public class TestClusterControllerActor extends Actor {
         Action delivery = new Action() {
             @Override
             protected void doExecute(MetadataProvider actorMdProvider) throws 
Exception {
-                handler.notifyJobFinish(jobId, jobStatus, exceptions);
+                handler.notifyJobFinish(jobId, null, jobStatus, exceptions);
             }
         };
         add(delivery);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 3157c64d29..8d2c91aeaa 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -18,8 +18,14 @@
  */
 package org.apache.asterix.common.api;
 
+import java.util.List;
+
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -97,4 +103,30 @@ public interface IClientRequest {
      */
     ObjectNode asJson();
 
+    /**
+     * Called when the job is created.
+     *
+     * @param jobId the job id
+     * @param requiredClusterCapacity the required resources by the job
+     * @param status the status of the job; whether it will be executed or 
queued
+     */
+    void jobCreated(JobId jobId, IReadOnlyClusterCapacity 
requiredClusterCapacity,
+            IJobCapacityController.JobSubmissionStatus status);
+
+    /**
+     * Called when the job starts running.
+     *
+     * @param jobId the job id
+     */
+    void jobStarted(JobId jobId);
+
+    /**
+     * Called when the job finishes.
+     *
+     * @param jobId the job id
+     * @param jobStatus the final job status
+     * @param exceptions exceptions encountered if any
+     */
+    void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> 
exceptions);
+
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index f0b4944ce6..153eea9a69 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.common.api;
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
 
@@ -35,13 +34,11 @@ public interface IReceptionist {
     /**
      * Generates a {@link IClientRequest} based on the requests parameters
      *
-     * @param requestParameters
-     * @param appCtx
+     * @param requestParameters the request parameters
      * @return the client request
-     * @throws HyracksDataException
+     * @throws HyracksDataException HyracksDataException
      */
-    IClientRequest requestReceived(ICommonRequestParameters requestParameters, 
ICcApplicationContext appCtx)
-            throws HyracksDataException;
+    IClientRequest requestReceived(ICommonRequestParameters requestParameters) 
throws HyracksDataException;
 
     /**
      * Ensures a client's request can be executed before its job is started
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
index b2dc3098ff..230eac3f50 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
@@ -21,8 +21,9 @@ package org.apache.asterix.common.api;
 import java.util.Collection;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
 
-public interface IRequestTracker {
+public interface IRequestTracker extends IJobLifecycleListener {
 
     /**
      * Starts tracking {@code request}
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
index a2ebd9a7b8..02f20f8e2f 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -36,6 +36,7 @@ import 
org.apache.hyracks.api.constraints.expressions.ConstantExpression;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 
 @ThreadSafe
@@ -44,17 +45,19 @@ public class NodeJobTracker implements INodeJobTracker {
     private final Map<String, Set<JobId>> nodeJobs = new HashMap<>();
 
     @Override
-    public synchronized void notifyJobCreation(JobId jobId, JobSpecification 
spec) {
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification 
spec,
+            IJobCapacityController.JobSubmissionStatus status) {
         
getJobParticipatingNodes(spec).stream().map(nodeJobs::get).forEach(jobsSet -> 
jobsSet.add(jobId));
     }
 
     @Override
-    public synchronized void notifyJobStart(JobId jobId) {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification 
spec) {
         // nothing to do
     }
 
     @Override
-    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, 
List<Exception> exceptions) {
+    public synchronized void notifyJobFinish(JobId jobId, JobSpecification 
spec, JobStatus jobStatus,
+            List<Exception> exceptions) {
         nodeJobs.values().forEach(jobsSet -> jobsSet.remove(jobId));
     }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index c16f825672..5771201c98 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -21,6 +21,7 @@ package org.apache.asterix.runtime.utils;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -31,6 +32,11 @@ import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 
 public class RequestTracker implements IRequestTracker {
 
@@ -132,4 +138,38 @@ public class RequestTracker implements IRequestTracker {
     public long getTotalNumberOfFailedRequests() {
         return numOfFailedRequests.get();
     }
+
+    public void notifyJobCreation(JobId jobId, JobSpecification spec, 
IJobCapacityController.JobSubmissionStatus status)
+            throws HyracksException {
+        String requestId = spec.getRequestId();
+        if (requestId != null) {
+            IClientRequest clientRequest = runningRequests.get(requestId);
+            if (clientRequest != null) {
+                clientRequest.jobCreated(jobId, 
spec.getRequiredClusterCapacity(), status);
+            }
+        }
+    }
+
+    @Override
+    public void notifyJobStart(JobId jobId, JobSpecification spec) throws 
HyracksException {
+        String requestId = spec.getRequestId();
+        if (requestId != null) {
+            IClientRequest clientRequest = runningRequests.get(requestId);
+            if (clientRequest != null) {
+                clientRequest.jobStarted(jobId);
+            }
+        }
+    }
+
+    @Override
+    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus 
jobStatus, List<Exception> exceptions)
+            throws HyracksException {
+        String requestId = spec.getRequestId();
+        if (requestId != null) {
+            IClientRequest clientRequest = runningRequests.get(requestId);
+            if (clientRequest != null) {
+                clientRequest.jobFinished(jobId, jobStatus, exceptions);
+            }
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
 
b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
index be75ecbdec..82d8cb771d 100644
--- 
a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
+++ 
b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
@@ -26,6 +26,7 @@ import 
org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -50,18 +51,18 @@ public class NodeJobTrackerTest {
         jobSpec.getUserConstraints().add(new Constraint(lValueMock, 
unknownLocation));
 
         JobId jobId = new JobId(1);
-        nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+        nodeJobTracker.notifyJobCreation(jobId, jobSpec, 
IJobCapacityController.JobSubmissionStatus.EXECUTE);
         // make sure nc1 has a pending job
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
         Assert.assertTrue(nodeJobTracker.getPendingJobs(unknown).isEmpty());
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc2).isEmpty());
-        nodeJobTracker.notifyJobFinish(jobId, JobStatus.TERMINATED, null);
+        nodeJobTracker.notifyJobFinish(jobId, jobSpec, JobStatus.TERMINATED, 
null);
         // make sure nc1 doesn't have pending jobs anymore
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
 
         // make sure node doesn't have pending jobs after failure
         jobId = new JobId(2);
-        nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+        nodeJobTracker.notifyJobCreation(jobId, jobSpec, 
IJobCapacityController.JobSubmissionStatus.EXECUTE);
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
         nodeJobTracker.notifyNodeFailure(Collections.singleton(nc1));
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
index 338c3315d4..6773dde29d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.api.job;
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 
 /**
  * A listener for job related events
@@ -29,27 +30,33 @@ public interface IJobLifecycleListener {
     /**
      * Notify the listener that a job has been created
      *
-     * @param jobId
-     * @param spec
-     * @throws HyracksException
+     * @param jobId the job id
+     * @param spec the job specification
+     * @param status the status of the job; whether it will be executed or 
queued
+     * @throws HyracksException HyracksException
      */
-    void notifyJobCreation(JobId jobId, JobSpecification spec) throws 
HyracksException;
+    void notifyJobCreation(JobId jobId, JobSpecification spec, 
IJobCapacityController.JobSubmissionStatus status)
+            throws HyracksException;
 
     /**
      * Notify the listener that the job has started on the cluster controller
      *
-     * @param jobId
-     * @throws HyracksException
+     * @param jobId the job id
+     * @param spec the job specification
+     *
+     * @throws HyracksException HyracksException
      */
-    void notifyJobStart(JobId jobId) throws HyracksException;
+    void notifyJobStart(JobId jobId, JobSpecification spec) throws 
HyracksException;
 
     /**
      * Notify the listener that the job has been terminated, passing 
exceptions in case of failure
      *
-     * @param jobId
-     * @param jobStatus
-     * @param exceptions
-     * @throws HyracksException
+     * @param jobId the job id
+     * @param spec the job specification
+     * @param jobStatus the job status
+     * @param exceptions the job exceptions
+     * @throws HyracksException HyracksException
      */
-    void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> 
exceptions) throws HyracksException;
+    void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus 
jobStatus, List<Exception> exceptions)
+            throws HyracksException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 2c51d3dbd2..2a11d79479 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -82,6 +82,8 @@ public class JobSpecification implements Serializable, 
IOperatorDescriptorRegist
 
     private long maxWarnings;
 
+    private String requestId;
+
     private IJobletEventListenerFactory jobletEventListenerFactory;
 
     private IGlobalJobDataFactory globalJobDataFactory;
@@ -258,6 +260,14 @@ public class JobSpecification implements Serializable, 
IOperatorDescriptorRegist
         this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
     }
 
+    public void setRequestId(String requestId) {
+        this.requestId = requestId;
+    }
+
+    public String getRequestId() {
+        return requestId;
+    }
+
     public void setFrameSize(int frameSize) {
         this.frameSize = frameSize;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index de166dd3df..555d37e089 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -37,6 +37,7 @@ import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.application.ServiceContext;
@@ -79,22 +80,23 @@ public class CCServiceContext extends ServiceContext 
implements ICCServiceContex
         jobLifecycleListeners.add(jobLifecycleListener);
     }
 
-    public synchronized void notifyJobStart(JobId jobId) throws 
HyracksException {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification 
spec) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobStart(jobId);
+            l.notifyJobStart(jobId, spec);
         }
     }
 
-    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, 
List<Exception> exceptions)
-            throws HyracksException {
+    public synchronized void notifyJobFinish(JobId jobId, JobSpecification 
spec, JobStatus jobStatus,
+            List<Exception> exceptions) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobFinish(jobId, jobStatus, exceptions);
+            l.notifyJobFinish(jobId, spec, jobStatus, exceptions);
         }
     }
 
-    public synchronized void notifyJobCreation(JobId jobId, JobSpecification 
spec) throws HyracksException {
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification 
spec,
+            IJobCapacityController.JobSubmissionStatus status) throws 
HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobCreation(jobId, spec);
+            l.notifyJobCreation(jobId, spec, status);
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 3574acda5f..1b0f377119 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -115,7 +115,7 @@ public class JobExecutor {
 
     public void startJob() throws HyracksException {
         startRunnableActivityClusters();
-        ccs.getContext().notifyJobStart(jobRun.getJobId());
+        ccs.getContext().notifyJobStart(jobRun.getJobId(), 
jobRun.getJobSpecification());
     }
 
     public void cancelJob(IResultCallback<Void> callback) throws 
HyracksException {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 703294ce57..9165953b0c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -122,7 +122,7 @@ public class JobManager implements IJobManager {
         try {
             status = jobCapacityController.allocate(job);
             CCServiceContext serviceCtx = ccs.getContext();
-            serviceCtx.notifyJobCreation(jobRun.getJobId(), job);
+            serviceCtx.notifyJobCreation(jobRun.getJobId(), job, status);
             switch (status) {
                 case QUEUE:
                     queueJob(jobRun);
@@ -164,7 +164,8 @@ public class JobManager implements IJobManager {
             CCServiceContext serviceCtx = ccs.getContext();
             if (serviceCtx != null) {
                 try {
-                    serviceCtx.notifyJobFinish(jobId, 
JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
+                    serviceCtx.notifyJobFinish(jobId, 
jobRun.getJobSpecification(), JobStatus.FAILURE_BEFORE_EXECUTION,
+                            exceptions);
                 } catch (Exception e) {
                     LOGGER.error("Exception notifying cancel on pending job 
{}", jobId, e);
                     throw HyracksDataException.create(e);
@@ -247,7 +248,8 @@ public class JobManager implements IJobManager {
         Throwable caughtException = null;
         CCServiceContext serviceCtx = ccs.getContext();
         try {
-            serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), 
run.getPendingExceptions());
+            serviceCtx.notifyJobFinish(jobId, run.getJobSpecification(), 
run.getPendingStatus(),
+                    run.getPendingExceptions());
         } catch (Exception e) {
             LOGGER.error("Exception notifying job finish {}", jobId, e);
             caughtException = e;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 9f8a7e24ea..a0c2ce4cfd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -34,6 +34,7 @@ import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.result.IJobResultCallback;
 import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.IResultStateRecord;
@@ -78,7 +79,8 @@ public class ResultDirectoryService extends 
AbstractResultManager implements IRe
     }
 
     @Override
-    public synchronized void notifyJobCreation(JobId jobId, JobSpecification 
spec) throws HyracksException {
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification 
spec,
+            IJobCapacityController.JobSubmissionStatus status) throws 
HyracksException {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug(getClass().getSimpleName() + " notified of new job " 
+ jobId);
         }
@@ -89,12 +91,13 @@ public class ResultDirectoryService extends 
AbstractResultManager implements IRe
     }
 
     @Override
-    public synchronized void notifyJobStart(JobId jobId) throws 
HyracksException {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification 
spec) throws HyracksException {
         jobResultLocations.get(jobId).getRecord().start();
     }
 
     @Override
-    public void notifyJobFinish(JobId jobId, JobStatus jobStatus, 
List<Exception> exceptions) throws HyracksException {
+    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus 
jobStatus, List<Exception> exceptions)
+            throws HyracksException {
         if (exceptions == null || exceptions.isEmpty()) {
             final ResultJobRecord resultJobRecord = getResultJobRecord(jobId);
             if (resultJobRecord == null) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
index 008be29584..4d4635aadc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
@@ -30,6 +30,7 @@ import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -47,7 +48,7 @@ public class TestJobLifecycleListener implements 
IJobLifecycleListener {
     private final Set<JobId> finishWithoutStart = new HashSet<>();
 
     @Override
-    public void notifyJobCreation(JobId jobId, JobSpecification spec) throws 
HyracksException {
+    public void notifyJobCreation(JobId jobId, JobSpecification spec, 
IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
         if (created.containsKey(jobId)) {
             LOGGER.log(Level.WARN, "Job " + jobId + "has been created before");
             increment(doubleCreated, jobId);
@@ -62,7 +63,7 @@ public class TestJobLifecycleListener implements 
IJobLifecycleListener {
     }
 
     @Override
-    public void notifyJobStart(JobId jobId) throws HyracksException {
+    public void notifyJobStart(JobId jobId, JobSpecification spec) throws 
HyracksException {
         if (!created.containsKey(jobId)) {
             LOGGER.log(Level.WARN, "Job " + jobId + "has not been created");
             startWithoutCreate.add(jobId);
@@ -75,7 +76,7 @@ public class TestJobLifecycleListener implements 
IJobLifecycleListener {
     }
 
     @Override
-    public void notifyJobFinish(JobId jobId, JobStatus jobStatus, 
List<Exception> exceptions) throws HyracksException {
+    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus 
jobStatus, List<Exception> exceptions) throws HyracksException {
         if (!started.contains(jobId)) {
             LOGGER.log(Level.WARN, "Job " + jobId + "has not been started");
             finishWithoutStart.add(jobId);


Reply via email to