[NO ISSUE][CLUS] Ensure Active Jobs Capacity is Released Only Once - user model changes: no - storage format changes: no - interface changes: no
Details: - Ensure active jobs capacity is released only once. - Warn if the cluster maximum capacity is exceeded. Change-Id: Ia53c6918a68f7050bd8af482dbe8e161d1315844 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2938 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Michael Blow <mb...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/6a6394a3 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/6a6394a3 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/6a6394a3 Branch: refs/heads/master Commit: 6a6394a30f448ecc12ad4728ede5c6c2c53059b4 Parents: a952e01 Author: Murtadha Hubail <mhub...@apache.org> Authored: Thu Aug 30 14:04:12 2018 +0300 Committer: Michael Blow <mb...@apache.org> Committed: Thu Aug 30 09:45:55 2018 -0700 ---------------------------------------------------------------------- .../runtime/job/resource/JobCapacityController.java | 13 +++++++++++++ .../org/apache/hyracks/control/cc/job/JobManager.java | 14 ++++++++------ 2 files changed, 21 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6a6394a3/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java index 8ea1fa7..b123a5e 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java @@ -26,12 +26,15 @@ import org.apache.hyracks.api.job.resource.IClusterCapacity; import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity; import org.apache.hyracks.control.cc.scheduler.IResourceManager; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; // To avoid the computation cost for checking the capacity constraint for each node, // currently the admit/allocation decisions are based on the aggregated resource information. // TODO(buyingyi): investigate partition-aware resource control. public class JobCapacityController implements IJobCapacityController { + private static final Logger LOGGER = LogManager.getLogger(); private final IResourceManager resourceManager; public JobCapacityController(IResourceManager resourceManager) { @@ -71,6 +74,16 @@ public class JobCapacityController implements IJobCapacityController { int aggregatedNumCores = currentCapacity.getAggregatedCores(); currentCapacity.setAggregatedMemoryByteSize(aggregatedMemoryByteSize + reqAggregatedMemoryByteSize); currentCapacity.setAggregatedCores(aggregatedNumCores + reqAggregatedNumCores); + ensureMaxCapacity(); } + private void ensureMaxCapacity() { + final IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity(); + final IReadOnlyClusterCapacity maximumCapacity = resourceManager.getMaximumCapacity(); + if (currentCapacity.getAggregatedCores() > maximumCapacity.getAggregatedCores() + || currentCapacity.getAggregatedMemoryByteSize() > maximumCapacity.getAggregatedMemoryByteSize()) { + LOGGER.warn("Current cluster available capacity {} is more than its maximum capacity {}", currentCapacity, + maximumCapacity); + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6a6394a3/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 7dc636c..7e1ca61 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -229,8 +229,9 @@ public class JobManager implements IJobManager { } run.setStatus(run.getPendingStatus(), run.getPendingExceptions()); run.setEndTime(System.currentTimeMillis()); - if (activeRunMap.remove(jobId) == null) { - LOGGER.warn("Job {} was not found running but is getting archived and capacity released", jobId); + if (activeRunMap.remove(jobId) != null) { + // non-active jobs have zero capacity + releaseJobCapacity(run); } runMapArchive.put(jobId, run); runMapHistory.put(jobId, run.getExceptions()); @@ -247,10 +248,6 @@ public class JobManager implements IJobManager { } } - // Releases cluster capacitys occupied by the job. - JobSpecification job = run.getJobSpecification(); - jobCapacityController.release(job); - // Picks the next job to execute. pickJobsToRun(); @@ -347,4 +344,9 @@ public class JobManager implements IJobManager { throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER); } } + + private void releaseJobCapacity(JobRun jobRun) { + final JobSpecification job = jobRun.getJobSpecification(); + jobCapacityController.release(job); + } }