This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit eaebacfe7e42f0b82b0f1501a84dead9fd626ccf Author: Ali Alsuliman <[email protected]> AuthorDate: Tue Jul 23 14:16:45 2024 +0300 [ASTERIXDB-3467][HYR] ConcurrentModificationException when picking new jobs to run - user model changes: no - storage format changes: no - interface changes: no Details: When picking new jobs from the job queue, if a job cannot be picked (e.g. due to cluster state), then collect those jobs first instead of failing them and calling jobManager.prepareComplete() one by one. Completing them one by one could lead to one job calling pickJobsToRun() again and concurrently modifying the job queue map. Ext-ref: MB-62857 Change-Id: I6ec0c6625d9d84cd0797964781256e93f5346a91 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18512 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Ali Alsuliman <[email protected]> --- .../runtime/job/resource/JobCapacityController.java | 2 +- .../src/main/resources/errormsg/en.properties | 2 +- .../hyracks/control/cc/scheduler/FIFOJobQueue.java | 20 +++++++++++++++----- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java index 236056ca02..1296d04af5 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java @@ -53,7 +53,7 @@ public class JobCapacityController implements IJobCapacityController { public JobSubmissionStatus allocate(JobSpecification job, JobId jobId, Set<JobFlag> jobFlags) throws HyracksException { if (!ccApp.acceptingJobs(jobFlags)) { - throw HyracksDataException.create(ErrorCode.JOB_REJECTED, job); + throw HyracksDataException.create(ErrorCode.JOB_REJECTED, jobId); } IClusterCapacity requiredCapacity = job.getRequiredClusterCapacity(); long reqAggregatedMemoryByteSize = requiredCapacity.getAggregatedMemoryByteSize(); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 7da6bbd040..e94c12e30c 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -146,7 +146,7 @@ 126 = Illegal state. %1$s 127 = Decoding error - %1$s 128 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s -129 = Job %1$s not run. Cluster is not accepting jobs +129 = Job %1$s failed to run. Cluster is not accepting jobs. 130 = Frame data=%1$s (requiring %2$s) is bigger than the sort budget. Used=%3$s, max=%4$s. Please increase the sort memory budget. 10000 = The given rule collection %1$s is not an instance of the List class. diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java index d0038535bb..ec9333cb4a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java @@ -27,6 +27,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; @@ -81,9 +82,10 @@ public class FIFOJobQueue implements IJobQueue { } @Override - public List<JobRun> pull() { + public synchronized List<JobRun> pull() { List<JobRun> jobRuns = new ArrayList<>(); Iterator<JobRun> runIterator = jobListMap.values().iterator(); + List<Pair<JobRun, List<Exception>>> failingJobs = null; while (runIterator.hasNext()) { JobRun run = runIterator.next(); JobSpecification job = run.getJobSpecification(); @@ -98,13 +100,21 @@ public class FIFOJobQueue implements IJobQueue { runIterator.remove(); // Removes the selected job. } } catch (HyracksException exception) { - // The required capacity exceeds maximum capacity. - List<Exception> exceptions = new ArrayList<>(); + if (failingJobs == null) { + failingJobs = new ArrayList<>(); + } + // The required capacity exceeds maximum capacity or the job cannot be run at this time. + List<Exception> exceptions = new ArrayList<>(1); exceptions.add(exception); + failingJobs.add(Pair.of(run, exceptions)); runIterator.remove(); // Removes the job from the queue. + } + } + if (failingJobs != null) { + for (int i = 0; i < failingJobs.size(); i++) { try { - // Fails the job. - jobManager.prepareComplete(run, JobStatus.FAILURE_BEFORE_EXECUTION, exceptions); + Pair<JobRun, List<Exception>> job = failingJobs.get(i); + jobManager.prepareComplete(job.getLeft(), JobStatus.FAILURE_BEFORE_EXECUTION, job.getRight()); } catch (HyracksException e) { LOGGER.log(Level.ERROR, e.getMessage(), e); }
