This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit eaebacfe7e42f0b82b0f1501a84dead9fd626ccf
Author: Ali Alsuliman <[email protected]>
AuthorDate: Tue Jul 23 14:16:45 2024 +0300

    [ASTERIXDB-3467][HYR] ConcurrentModificationException when picking new jobs 
to run
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    When picking new jobs from the job queue, if a job cannot be
    picked (e.g. due to cluster state), then collect those jobs
    first instead of failing them and calling jobManager.prepareComplete()
    one by one. Completing them one by one could lead to one job
    calling pickJobsToRun() again and concurrently modifying
    the job queue map.
    
    Ext-ref: MB-62857
    Change-Id: I6ec0c6625d9d84cd0797964781256e93f5346a91
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18512
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Tested-by: Ali Alsuliman <[email protected]>
---
 .../runtime/job/resource/JobCapacityController.java  |  2 +-
 .../src/main/resources/errormsg/en.properties        |  2 +-
 .../hyracks/control/cc/scheduler/FIFOJobQueue.java   | 20 +++++++++++++++-----
 3 files changed, 17 insertions(+), 7 deletions(-)

diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index 236056ca02..1296d04af5 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -53,7 +53,7 @@ public class JobCapacityController implements 
IJobCapacityController {
     public JobSubmissionStatus allocate(JobSpecification job, JobId jobId, 
Set<JobFlag> jobFlags)
             throws HyracksException {
         if (!ccApp.acceptingJobs(jobFlags)) {
-            throw HyracksDataException.create(ErrorCode.JOB_REJECTED, job);
+            throw HyracksDataException.create(ErrorCode.JOB_REJECTED, jobId);
         }
         IClusterCapacity requiredCapacity = job.getRequiredClusterCapacity();
         long reqAggregatedMemoryByteSize = 
requiredCapacity.getAggregatedMemoryByteSize();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 7da6bbd040..e94c12e30c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -146,7 +146,7 @@
 126 = Illegal state. %1$s
 127 = Decoding error - %1$s
 128 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s
-129 = Job %1$s not run. Cluster is not accepting jobs
+129 = Job %1$s failed to run. Cluster is not accepting jobs.
 130 = Frame data=%1$s (requiring %2$s) is bigger than the sort budget. 
Used=%3$s, max=%4$s. Please increase the sort memory budget.
 
 10000 = The given rule collection %1$s is not an instance of the List class.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index d0038535bb..ec9333cb4a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
@@ -81,9 +82,10 @@ public class FIFOJobQueue implements IJobQueue {
     }
 
     @Override
-    public List<JobRun> pull() {
+    public synchronized List<JobRun> pull() {
         List<JobRun> jobRuns = new ArrayList<>();
         Iterator<JobRun> runIterator = jobListMap.values().iterator();
+        List<Pair<JobRun, List<Exception>>> failingJobs = null;
         while (runIterator.hasNext()) {
             JobRun run = runIterator.next();
             JobSpecification job = run.getJobSpecification();
@@ -98,13 +100,21 @@ public class FIFOJobQueue implements IJobQueue {
                     runIterator.remove(); // Removes the selected job.
                 }
             } catch (HyracksException exception) {
-                // The required capacity exceeds maximum capacity.
-                List<Exception> exceptions = new ArrayList<>();
+                if (failingJobs == null) {
+                    failingJobs = new ArrayList<>();
+                }
+                // The required capacity exceeds maximum capacity or the job 
cannot be run at this time.
+                List<Exception> exceptions = new ArrayList<>(1);
                 exceptions.add(exception);
+                failingJobs.add(Pair.of(run, exceptions));
                 runIterator.remove(); // Removes the job from the queue.
+            }
+        }
+        if (failingJobs != null) {
+            for (int i = 0; i < failingJobs.size(); i++) {
                 try {
-                    // Fails the job.
-                    jobManager.prepareComplete(run, 
JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
+                    Pair<JobRun, List<Exception>> job = failingJobs.get(i);
+                    jobManager.prepareComplete(job.getLeft(), 
JobStatus.FAILURE_BEFORE_EXECUTION, job.getRight());
                 } catch (HyracksException e) {
                     LOGGER.log(Level.ERROR, e.getMessage(), e);
                 }

Reply via email to