This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 2aa55e000ce027c287c7ec94464cbe392814d5cf
Author: Hussain Towaileb <[email protected]>
AuthorDate: Thu May 2 13:42:08 2024 +0300

    [NO ISSUE]: Track more requests + jobs counts
    
    Change-Id: I8fa31a1e6bb6b1f1bcf90c59da646fc47546fc7c
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18263
    Tested-by: Jenkins <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../api/http/server/QueryServiceServlet.java       |  2 +-
 .../asterix/app/translator/QueryTranslator.java    |  3 +
 .../apache/asterix/common/api/IRequestTracker.java | 10 +++
 .../asterix/runtime/utils/RequestTracker.java      | 12 +++
 .../apache/hyracks/control/cc/job/IJobManager.java | 20 +++++
 .../apache/hyracks/control/cc/job/JobManager.java  | 99 +++++++++++++++++++---
 6 files changed, 131 insertions(+), 15 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index f84e45a063..da21769a99 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -166,7 +166,7 @@ public class QueryServiceServlet extends 
AbstractQueryApiServlet {
             return resultStatus;
         }
 
-        HttpResponseStatus getHttpStatus() {
+        public HttpResponseStatus getHttpStatus() {
             return httpResponseStatus;
         }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 1110cd3fea..3cac3d8791 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -529,6 +529,9 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                                 "Unexpected statement: " + kind);
                 }
             }
+        } catch (Exception ex) {
+            this.appCtx.getRequestTracker().incrementFailedRequests();
+            throw ex;
         } finally {
             // async queries are completed after their job completes
             if (ResultDelivery.ASYNC != resultDelivery) {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
index 0019015df8..b2dc3098ff 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
@@ -81,4 +81,14 @@ public interface IRequestTracker {
      * @return the total number of requests since cluster start/restart
      */
     long getTotalNumberOfRequests();
+
+    /**
+     * increments total number of failed requests
+     */
+    void incrementFailedRequests();
+
+    /**
+     * @return the total number of failed requests
+     */
+    long getTotalNumberOfFailedRequests();
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index c9425c6065..c16f825672 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -39,11 +39,13 @@ public class RequestTracker implements IRequestTracker {
     private final CircularFifoQueue<IClientRequest> completedRequests;
     private final ICcApplicationContext ccAppCtx;
     private final AtomicLong numRequests;
+    private final AtomicLong numOfFailedRequests;
 
     public RequestTracker(ICcApplicationContext ccAppCtx) {
         this.ccAppCtx = ccAppCtx;
         completedRequests = new 
CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize());
         numRequests = new AtomicLong(0);
+        numOfFailedRequests = new AtomicLong(0);
     }
 
     @Override
@@ -120,4 +122,14 @@ public class RequestTracker implements IRequestTracker {
     public long getTotalNumberOfRequests() {
         return numRequests.get();
     }
+
+    @Override
+    public void incrementFailedRequests() {
+        numOfFailedRequests.incrementAndGet();
+    }
+
+    @Override
+    public long getTotalNumberOfFailedRequests() {
+        return numOfFailedRequests.get();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index b2795d4d5d..f65b261e3d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -130,4 +130,24 @@ public interface IJobManager {
      * @return the maximum number of jobs to queue before rejecting new jobs
      */
     int getJobQueueCapacity();
+
+    /**
+     * @return total successful jobs
+     */
+    long getSuccessfulJobs();
+
+    /**
+     * @return total failed jobs
+     */
+    long getTotalFailedJobs();
+
+    /**
+     * @return total cancelled jobs
+     */
+    long getTotalCancelledJobs();
+
+    /**
+     * @return total rejected jobs
+     */
+    long getTotalRejectedJobs();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 4882f4ac49..703294ce57 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -30,10 +30,12 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.exceptions.IError;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -66,11 +68,19 @@ public class JobManager implements IJobManager {
     private final Map<JobId, JobRun> runMapArchive;
     private final Map<JobId, List<Exception>> runMapHistory;
     private final IJobCapacityController jobCapacityController;
+    private final AtomicLong successfulJobs;
+    private final AtomicLong totalFailedJobs;
+    private final AtomicLong totalCancelledJobs;
+    private final AtomicLong totalRejectedJobs;
     private IJobQueue jobQueue;
 
     public JobManager(CCConfig ccConfig, ClusterControllerService ccs, 
IJobCapacityController jobCapacityController) {
         this.ccs = ccs;
         this.jobCapacityController = jobCapacityController;
+        this.successfulJobs = new AtomicLong();
+        this.totalFailedJobs = new AtomicLong();
+        this.totalCancelledJobs = new AtomicLong();
+        this.totalRejectedJobs = new AtomicLong();
         try {
             Constructor<?> jobQueueConstructor = 
this.getClass().getClassLoader().loadClass(ccConfig.getJobQueueClass())
                     .getConstructor(IJobManager.class, 
IJobCapacityController.class);
@@ -84,7 +94,7 @@ public class JobManager implements IJobManager {
             jobQueue = new FIFOJobQueue(this, jobCapacityController);
         }
         activeRunMap = new HashMap<>();
-        runMapArchive = new LinkedHashMap<JobId, JobRun>() {
+        runMapArchive = new LinkedHashMap<>() {
             private static final long serialVersionUID = -1406441385508773629L;
 
             @Override
@@ -92,7 +102,7 @@ public class JobManager implements IJobManager {
                 return size() > ccConfig.getJobHistorySize();
             }
         };
-        runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
+        runMapHistory = new LinkedHashMap<>() {
             private static final long serialVersionUID = 7572062687032652986L;
             /** history size + 1 is for the case when history size = 0 */
             private final int allowedSize = 100 * 
(ccConfig.getJobHistorySize() + 1);
@@ -108,18 +118,24 @@ public class JobManager implements IJobManager {
     public void add(JobRun jobRun) throws HyracksException {
         checkJob(jobRun);
         JobSpecification job = jobRun.getJobSpecification();
-        IJobCapacityController.JobSubmissionStatus status = 
jobCapacityController.allocate(job);
-        CCServiceContext serviceCtx = ccs.getContext();
-        serviceCtx.notifyJobCreation(jobRun.getJobId(), job);
-        switch (status) {
-            case QUEUE:
-                queueJob(jobRun);
-                break;
-            case EXECUTE:
-                executeJob(jobRun);
-                break;
-            default:
-                throw new IllegalStateException("unknown submission status: " 
+ status);
+        IJobCapacityController.JobSubmissionStatus status;
+        try {
+            status = jobCapacityController.allocate(job);
+            CCServiceContext serviceCtx = ccs.getContext();
+            serviceCtx.notifyJobCreation(jobRun.getJobId(), job);
+            switch (status) {
+                case QUEUE:
+                    queueJob(jobRun);
+                    break;
+                case EXECUTE:
+                    executeJob(jobRun);
+                    break;
+                default:
+                    throw new IllegalStateException("unknown submission 
status: " + status);
+            }
+        } catch (HyracksDataException ex) {
+            handleException(ex);
+            throw ex;
         }
     }
 
@@ -132,11 +148,13 @@ public class JobManager implements IJobManager {
             // trigger JobCleanupWork and JobCleanupNotificationWork which 
will update the lifecyle of the job.
             // Therefore, we do not remove the job out of activeRunMap here.
             jobRun.getExecutor().cancelJob(callback);
+            incrementCancelledJobs();
             return;
         }
         // Removes a pending job.
         JobRun jobRun = jobQueue.remove(jobId);
         if (jobRun != null) {
+            incrementCancelledJobs();
             List<Exception> exceptions =
                     
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, 
jobId));
             // Since the job has not been executed, we only need to update its 
status and lifecyle here.
@@ -219,6 +237,12 @@ public class JobManager implements IJobManager {
     @Override
     public void finalComplete(JobRun run) throws HyracksException {
         checkJob(run);
+        if (run.getPendingStatus() == JobStatus.FAILURE) {
+            incrementFailedJobs();
+        } else if (run.getPendingStatus() == JobStatus.TERMINATED) {
+            incrementSuccessfulJobs();
+        }
+
         JobId jobId = run.getJobId();
         Throwable caughtException = null;
         CCServiceContext serviceCtx = ccs.getContext();
@@ -301,6 +325,26 @@ public class JobManager implements IJobManager {
         return ccs.getCCConfig().getJobQueueCapacity();
     }
 
+    @Override
+    public long getSuccessfulJobs() {
+        return successfulJobs.get();
+    }
+
+    @Override
+    public long getTotalFailedJobs() {
+        return totalFailedJobs.get();
+    }
+
+    @Override
+    public long getTotalCancelledJobs() {
+        return totalCancelledJobs.get();
+    }
+
+    @Override
+    public long getTotalRejectedJobs() {
+        return totalRejectedJobs.get();
+    }
+
     private void pickJobsToRun() throws HyracksException {
         List<JobRun> selectedRuns = jobQueue.pull();
         for (JobRun run : selectedRuns) {
@@ -356,4 +400,31 @@ public class JobManager implements IJobManager {
         final JobSpecification job = jobRun.getJobSpecification();
         jobCapacityController.release(job);
     }
+
+    private void handleException(HyracksException ex) {
+        if (ex.getError().isPresent()) {
+            IError error = ex.getError().get();
+            switch ((ErrorCode) error) {
+                case JOB_QUEUE_FULL:
+                case JOB_REQUIREMENTS_EXCEED_CAPACITY:
+                    incrementRejectedJobs();
+            }
+        }
+    }
+
+    private void incrementSuccessfulJobs() {
+        successfulJobs.incrementAndGet();
+    }
+
+    private void incrementFailedJobs() {
+        totalFailedJobs.incrementAndGet();
+    }
+
+    private void incrementCancelledJobs() {
+        totalCancelledJobs.incrementAndGet();
+    }
+
+    private void incrementRejectedJobs() {
+        totalRejectedJobs.incrementAndGet();
+    }
 }

Reply via email to