[NO ISSUE][CLUS] Ensure Active Jobs Capacity is Released Only Once

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Ensure active jobs capacity is released only once.
- Warn if the cluster maximum capacity is exceeded.

Change-Id: Ia53c6918a68f7050bd8af482dbe8e161d1315844
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2938
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhub...@apache.org>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/6a6394a3
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/6a6394a3
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/6a6394a3

Branch: refs/heads/master
Commit: 6a6394a30f448ecc12ad4728ede5c6c2c53059b4
Parents: a952e01
Author: Murtadha Hubail <mhub...@apache.org>
Authored: Thu Aug 30 14:04:12 2018 +0300
Committer: Michael Blow <mb...@apache.org>
Committed: Thu Aug 30 09:45:55 2018 -0700

----------------------------------------------------------------------
 .../runtime/job/resource/JobCapacityController.java   | 13 +++++++++++++
 .../org/apache/hyracks/control/cc/job/JobManager.java | 14 ++++++++------
 2 files changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6a6394a3/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index 8ea1fa7..b123a5e 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -26,12 +26,15 @@ import org.apache.hyracks.api.job.resource.IClusterCapacity;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 import org.apache.hyracks.control.cc.scheduler.IResourceManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 // To avoid the computation cost for checking the capacity constraint for each 
node,
 // currently the admit/allocation decisions are based on the aggregated 
resource information.
 // TODO(buyingyi): investigate partition-aware resource control.
 public class JobCapacityController implements IJobCapacityController {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     private final IResourceManager resourceManager;
 
     public JobCapacityController(IResourceManager resourceManager) {
@@ -71,6 +74,16 @@ public class JobCapacityController implements 
IJobCapacityController {
         int aggregatedNumCores = currentCapacity.getAggregatedCores();
         currentCapacity.setAggregatedMemoryByteSize(aggregatedMemoryByteSize + 
reqAggregatedMemoryByteSize);
         currentCapacity.setAggregatedCores(aggregatedNumCores + 
reqAggregatedNumCores);
+        ensureMaxCapacity();
     }
 
+    private void ensureMaxCapacity() {
+        final IClusterCapacity currentCapacity = 
resourceManager.getCurrentCapacity();
+        final IReadOnlyClusterCapacity maximumCapacity = 
resourceManager.getMaximumCapacity();
+        if (currentCapacity.getAggregatedCores() > 
maximumCapacity.getAggregatedCores()
+                || currentCapacity.getAggregatedMemoryByteSize() > 
maximumCapacity.getAggregatedMemoryByteSize()) {
+            LOGGER.warn("Current cluster available capacity {} is more than 
its maximum capacity {}", currentCapacity,
+                    maximumCapacity);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6a6394a3/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 7dc636c..7e1ca61 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -229,8 +229,9 @@ public class JobManager implements IJobManager {
         }
         run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
         run.setEndTime(System.currentTimeMillis());
-        if (activeRunMap.remove(jobId) == null) {
-            LOGGER.warn("Job {} was not found running but is getting archived 
and capacity released", jobId);
+        if (activeRunMap.remove(jobId) != null) {
+            // non-active jobs have zero capacity
+            releaseJobCapacity(run);
         }
         runMapArchive.put(jobId, run);
         runMapHistory.put(jobId, run.getExceptions());
@@ -247,10 +248,6 @@ public class JobManager implements IJobManager {
             }
         }
 
-        // Releases cluster capacitys occupied by the job.
-        JobSpecification job = run.getJobSpecification();
-        jobCapacityController.release(job);
-
         // Picks the next job to execute.
         pickJobsToRun();
 
@@ -347,4 +344,9 @@ public class JobManager implements IJobManager {
             throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
         }
     }
+
+    private void releaseJobCapacity(JobRun jobRun) {
+        final JobSpecification job = jobRun.getJobSpecification();
+        jobCapacityController.release(job);
+    }
 }

Reply via email to