This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 2aa55e000ce027c287c7ec94464cbe392814d5cf Author: Hussain Towaileb <[email protected]> AuthorDate: Thu May 2 13:42:08 2024 +0300 [NO ISSUE]: Track more requests + jobs counts Change-Id: I8fa31a1e6bb6b1f1bcf90c59da646fc47546fc7c Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18263 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../api/http/server/QueryServiceServlet.java | 2 +- .../asterix/app/translator/QueryTranslator.java | 3 + .../apache/asterix/common/api/IRequestTracker.java | 10 +++ .../asterix/runtime/utils/RequestTracker.java | 12 +++ .../apache/hyracks/control/cc/job/IJobManager.java | 20 +++++ .../apache/hyracks/control/cc/job/JobManager.java | 99 +++++++++++++++++++--- 6 files changed, 131 insertions(+), 15 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index f84e45a063..da21769a99 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -166,7 +166,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { return resultStatus; } - HttpResponseStatus getHttpStatus() { + public HttpResponseStatus getHttpStatus() { return httpResponseStatus; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 1110cd3fea..3cac3d8791 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -529,6 +529,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen "Unexpected statement: " + kind); } } + } catch (Exception ex) { + this.appCtx.getRequestTracker().incrementFailedRequests(); + throw ex; } finally { // async queries are completed after their job completes if (ResultDelivery.ASYNC != resultDelivery) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java index 0019015df8..b2dc3098ff 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java @@ -81,4 +81,14 @@ public interface IRequestTracker { * @return the total number of requests since cluster start/restart */ long getTotalNumberOfRequests(); + + /** + * increments total number of failed requests + */ + void incrementFailedRequests(); + + /** + * @return the total number of failed requests + */ + long getTotalNumberOfFailedRequests(); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java index c9425c6065..c16f825672 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java @@ -39,11 +39,13 @@ public class RequestTracker implements IRequestTracker { private final CircularFifoQueue<IClientRequest> completedRequests; private final ICcApplicationContext ccAppCtx; private final AtomicLong numRequests; + private final AtomicLong numOfFailedRequests; public RequestTracker(ICcApplicationContext ccAppCtx) { this.ccAppCtx = ccAppCtx; completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize()); numRequests = new AtomicLong(0); + numOfFailedRequests = new AtomicLong(0); } @Override @@ -120,4 +122,14 @@ public class RequestTracker implements IRequestTracker { public long getTotalNumberOfRequests() { return numRequests.get(); } + + @Override + public void incrementFailedRequests() { + numOfFailedRequests.incrementAndGet(); + } + + @Override + public long getTotalNumberOfFailedRequests() { + return numOfFailedRequests.get(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java index b2795d4d5d..f65b261e3d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java @@ -130,4 +130,24 @@ public interface IJobManager { * @return the maximum number of jobs to queue before rejecting new jobs */ int getJobQueueCapacity(); + + /** + * @return total successful jobs + */ + long getSuccessfulJobs(); + + /** + * @return total failed jobs + */ + long getTotalFailedJobs(); + + /** + * @return total cancelled jobs + */ + long getTotalCancelledJobs(); + + /** + * @return total rejected jobs + */ + long getTotalRejectedJobs(); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 4882f4ac49..703294ce57 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -30,10 +30,12 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.exceptions.IError; import org.apache.hyracks.api.job.ActivityClusterGraph; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; @@ -66,11 +68,19 @@ public class JobManager implements IJobManager { private final Map<JobId, JobRun> runMapArchive; private final Map<JobId, List<Exception>> runMapHistory; private final IJobCapacityController jobCapacityController; + private final AtomicLong successfulJobs; + private final AtomicLong totalFailedJobs; + private final AtomicLong totalCancelledJobs; + private final AtomicLong totalRejectedJobs; private IJobQueue jobQueue; public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController) { this.ccs = ccs; this.jobCapacityController = jobCapacityController; + this.successfulJobs = new AtomicLong(); + this.totalFailedJobs = new AtomicLong(); + this.totalCancelledJobs = new AtomicLong(); + this.totalRejectedJobs = new AtomicLong(); try { Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.getJobQueueClass()) .getConstructor(IJobManager.class, IJobCapacityController.class); @@ -84,7 +94,7 @@ public class JobManager implements IJobManager { jobQueue = new FIFOJobQueue(this, jobCapacityController); } activeRunMap = new HashMap<>(); - runMapArchive = new LinkedHashMap<JobId, JobRun>() { + runMapArchive = new LinkedHashMap<>() { private static final long serialVersionUID = -1406441385508773629L; @Override @@ -92,7 +102,7 @@ public class JobManager implements IJobManager { return size() > ccConfig.getJobHistorySize(); } }; - runMapHistory = new LinkedHashMap<JobId, List<Exception>>() { + runMapHistory = new LinkedHashMap<>() { private static final long serialVersionUID = 7572062687032652986L; /** history size + 1 is for the case when history size = 0 */ private final int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1); @@ -108,18 +118,24 @@ public class JobManager implements IJobManager { public void add(JobRun jobRun) throws HyracksException { checkJob(jobRun); JobSpecification job = jobRun.getJobSpecification(); - IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job); - CCServiceContext serviceCtx = ccs.getContext(); - serviceCtx.notifyJobCreation(jobRun.getJobId(), job); - switch (status) { - case QUEUE: - queueJob(jobRun); - break; - case EXECUTE: - executeJob(jobRun); - break; - default: - throw new IllegalStateException("unknown submission status: " + status); + IJobCapacityController.JobSubmissionStatus status; + try { + status = jobCapacityController.allocate(job); + CCServiceContext serviceCtx = ccs.getContext(); + serviceCtx.notifyJobCreation(jobRun.getJobId(), job); + switch (status) { + case QUEUE: + queueJob(jobRun); + break; + case EXECUTE: + executeJob(jobRun); + break; + default: + throw new IllegalStateException("unknown submission status: " + status); + } + } catch (HyracksDataException ex) { + handleException(ex); + throw ex; } } @@ -132,11 +148,13 @@ public class JobManager implements IJobManager { // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job. // Therefore, we do not remove the job out of activeRunMap here. jobRun.getExecutor().cancelJob(callback); + incrementCancelledJobs(); return; } // Removes a pending job. JobRun jobRun = jobQueue.remove(jobId); if (jobRun != null) { + incrementCancelledJobs(); List<Exception> exceptions = Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId)); // Since the job has not been executed, we only need to update its status and lifecyle here. @@ -219,6 +237,12 @@ public class JobManager implements IJobManager { @Override public void finalComplete(JobRun run) throws HyracksException { checkJob(run); + if (run.getPendingStatus() == JobStatus.FAILURE) { + incrementFailedJobs(); + } else if (run.getPendingStatus() == JobStatus.TERMINATED) { + incrementSuccessfulJobs(); + } + JobId jobId = run.getJobId(); Throwable caughtException = null; CCServiceContext serviceCtx = ccs.getContext(); @@ -301,6 +325,26 @@ public class JobManager implements IJobManager { return ccs.getCCConfig().getJobQueueCapacity(); } + @Override + public long getSuccessfulJobs() { + return successfulJobs.get(); + } + + @Override + public long getTotalFailedJobs() { + return totalFailedJobs.get(); + } + + @Override + public long getTotalCancelledJobs() { + return totalCancelledJobs.get(); + } + + @Override + public long getTotalRejectedJobs() { + return totalRejectedJobs.get(); + } + private void pickJobsToRun() throws HyracksException { List<JobRun> selectedRuns = jobQueue.pull(); for (JobRun run : selectedRuns) { @@ -356,4 +400,31 @@ public class JobManager implements IJobManager { final JobSpecification job = jobRun.getJobSpecification(); jobCapacityController.release(job); } + + private void handleException(HyracksException ex) { + if (ex.getError().isPresent()) { + IError error = ex.getError().get(); + switch ((ErrorCode) error) { + case JOB_QUEUE_FULL: + case JOB_REQUIREMENTS_EXCEED_CAPACITY: + incrementRejectedJobs(); + } + } + } + + private void incrementSuccessfulJobs() { + successfulJobs.incrementAndGet(); + } + + private void incrementFailedJobs() { + totalFailedJobs.incrementAndGet(); + } + + private void incrementCancelledJobs() { + totalCancelledJobs.incrementAndGet(); + } + + private void incrementRejectedJobs() { + totalRejectedJobs.incrementAndGet(); + } }
