Get max job count from compute resource description of particular resource instead of global max value
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/8bc79959 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/8bc79959 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/8bc79959 Branch: refs/heads/orchestratorJobThrottleFeature Commit: 8bc79959ff306f7236b233377c6acf94aa7078ab Parents: a78137b Author: shamrath <shameerai...@gmail.com> Authored: Mon Sep 22 10:22:00 2014 -0400 Committer: shamrath <shameerai...@gmail.com> Committed: Mon Sep 22 10:22:00 2014 -0400 ---------------------------------------------------------------------- .../handlers/GridPullMonitorHandler.java | 19 ++- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 23 ++- .../airavata/gfac/monitor/util/CommonUtils.java | 50 ++++++ modules/orchestrator/orchestrator-core/pom.xml | 2 +- .../core/validator/impl/JobCountValidator.java | 162 ++++++++----------- 5 files changed, 148 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/8bc79959/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java index 451466d..5188163 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java @@ -22,6 +22,7 @@ package org.apache.airavata.gfac.monitor.handlers; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.credential.store.util.AuthenticationInfo; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; import org.apache.airavata.gfac.core.handler.GFacHandlerException; @@ -32,8 +33,9 @@ import org.apache.airavata.gfac.monitor.HPCMonitorID; import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor; import org.apache.airavata.gfac.monitor.util.CommonUtils; -import org.apache.airavata.credential.store.util.AuthenticationInfo; import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.workspace.experiment.TaskDetails; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -101,7 +103,20 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{ } CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID); if (ServerSettings.getEnableJobRestrictionValidation().equals("true")) { - CommonUtils.increaseZkJobCount(monitorID); // update change job count to zookeeper + try { + TaskDetails taskDetails = monitorID.getJobExecutionContext().getTaskData(); + + ComputeResourceDescription computeResourceDescription = + CommonUtils.getComputeResourceDescription(taskDetails); + if (computeResourceDescription.getBatchQueues().size() > 0 && + computeResourceDescription.getBatchQueues().get(0).getMaxJobsInQueue() > 0) { + + CommonUtils.increaseZkJobCount(monitorID); // update change job count to zookeeper + } + } catch (Exception e) { + logger.error("Error reading max job count from Computer Resource Description," + + " zookeeper job count update process failed"); + } } } catch (AiravataMonitorException e) { logger.error("Error adding monitorID object to the queue with experiment ", monitorID.getExperimentID()); http://git-wip-us.apache.org/repos/asf/airavata/blob/8bc79959/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 25a1ab2..dcbe905 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -38,6 +38,7 @@ import org.apache.airavata.gfac.monitor.impl.push.amqp.SimpleJobFinishConsumer; import org.apache.airavata.gfac.monitor.util.CommonUtils; import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.credential.store.util.AuthenticationInfo; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.airavata.model.workspace.experiment.TaskState; import org.apache.airavata.schemas.gfac.GsisshHostType; @@ -273,15 +274,21 @@ public class HPCPullMonitor extends PullMonitor { for (MonitorID completedJob : completedJobs) { CommonUtils.removeMonitorFromQueue(queue, completedJob); if (ServerSettings.getEnableJobRestrictionValidation().equals("true")) { // is job restriction available? - if (zk == null) { - zk = completedJob.getJobExecutionContext().getZk(); - } - String key = CommonUtils.getJobCountUpdatePath(completedJob); - int i = 0; - if (jobRemoveCountMap.containsKey(key)) { - i = Integer.valueOf(jobRemoveCountMap.get(key)); + ComputeResourceDescription computeResourceDesc = CommonUtils.getComputeResourceDescription( + completedJob.getJobExecutionContext().getTaskData()); + if (computeResourceDesc.getBatchQueuesSize() > 0 && computeResourceDesc.getBatchQueues().get(0).getMaxJobsInQueue() > 0) { + if (zk == null) { + zk = completedJob.getJobExecutionContext().getZk(); + } + String key = CommonUtils.getJobCountUpdatePath(completedJob); + int i = 0; + if (jobRemoveCountMap.containsKey(key)) { + i = Integer.valueOf(jobRemoveCountMap.get(key)); + } + jobRemoveCountMap.put(key, ++i); + } else { + // ignore } - jobRemoveCountMap.put(key, ++i); } } if (ServerSettings.getEnableJobRestrictionValidation().equals("true") && completedJobs.size() > 0) { http://git-wip-us.apache.org/repos/asf/airavata/blob/8bc79959/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java index fb4d898..923eb78 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java @@ -20,18 +20,29 @@ */ package org.apache.airavata.gfac.monitor.util; +import org.airavata.appcatalog.cpi.AppCatalog; +import org.airavata.appcatalog.cpi.ComputeResource; +import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; +import org.apache.aiaravata.application.catalog.data.resources.AbstractResource; +import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataZKUtils; import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.commons.gfac.type.HostDescription; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.handler.GFacHandler; import org.apache.airavata.gfac.core.handler.GFacHandlerConfig; import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.core.scheduler.HostScheduler; import org.apache.airavata.gfac.monitor.HostMonitorData; import org.apache.airavata.gfac.monitor.UserMonitorData; import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.workspace.experiment.TaskDetails; import org.apache.airavata.schemas.gfac.GsisshHostType; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -43,6 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -251,6 +263,7 @@ public class CommonUtils { for (String path : changeCountMap.keySet()) { if (isAdd) { CommonUtils.checkAndCreateZNode(zk, path); + logger.info("Recursively created znode : " + path); } byte[] byteData = zk.getData(path, null, null); String nodeData; @@ -330,4 +343,41 @@ public class CommonUtils { zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// create a znode } } + + public static ComputeResourceDescription getComputeResourceDescription(TaskDetails taskDetails) throws AiravataException { + try { + AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + ApplicationInterfaceDescription applicationInterface = appCatalog.getApplicationInterface(). + getApplicationInterface(taskDetails.getApplicationId()); + + List<String> applicationModules = applicationInterface.getApplicationModules(); + String selectedModuleId = applicationModules.get(0); // get the first module + Map<String, String> moduleIdFilter = new HashMap<String, String>(); + moduleIdFilter.put(AbstractResource.ApplicationDeploymentConstants.APP_MODULE_ID, selectedModuleId); + if (taskDetails.getTaskScheduling()!=null && taskDetails.getTaskScheduling().getResourceHostId() != null) { + moduleIdFilter.put(AbstractResource.ApplicationDeploymentConstants.COMPUTE_HOST_ID, + taskDetails.getTaskScheduling().getResourceHostId()); + } + List<ApplicationDeploymentDescription> applicationDeployements = appCatalog.getApplicationDeployment() + .getApplicationDeployements(moduleIdFilter); + Map<ComputeResourceDescription, ApplicationDeploymentDescription> deploymentMap = + new HashMap<ComputeResourceDescription, ApplicationDeploymentDescription>(); + ComputeResource computeResource = appCatalog.getComputeResource(); + for (ApplicationDeploymentDescription deploymentDescription : applicationDeployements) { + deploymentMap.put(computeResource.getComputeResource(deploymentDescription.getComputeHostId()), + deploymentDescription); + } + List<ComputeResourceDescription> computeHostList = new ArrayList<ComputeResourceDescription>(); + computeHostList.addAll(deploymentMap.keySet()); + Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass( + HostScheduler.class); + HostScheduler hostScheduler = aClass.newInstance(); + ComputeResourceDescription ComputeResourceDescription = hostScheduler.schedule(computeHostList); + ApplicationDeploymentDescription applicationDeploymentDescription = deploymentMap.get(ComputeResourceDescription); + return appCatalog.getComputeResource().getComputeResource(applicationDeploymentDescription.getComputeHostId()); + } catch (Exception e) { + throw new AiravataException("Error while getting Compute Resource Description", e); + } + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/8bc79959/modules/orchestrator/orchestrator-core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml index 61a7efc..576690a 100644 --- a/modules/orchestrator/orchestrator-core/pom.xml +++ b/modules/orchestrator/orchestrator-core/pom.xml @@ -68,7 +68,7 @@ the License. --> <groupId>org.apache.airavata</groupId> <artifactId>airavata-gfac-hpc-monitor</artifactId> <version>${project.version}</version> - <scope>test</scope> + <!--<scope>test</scope>--> </dependency> <dependency> <groupId>org.apache.airavata</groupId> http://git-wip-us.apache.org/repos/asf/airavata/blob/8bc79959/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java index 2c66fa2..0fb98ac 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java @@ -23,17 +23,14 @@ package org.apache.airavata.orchestrator.core.validator.impl; import org.airavata.appcatalog.cpi.AppCatalog; import org.airavata.appcatalog.cpi.AppCatalogException; -import org.airavata.appcatalog.cpi.ComputeResource; import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; -import org.apache.aiaravata.application.catalog.data.resources.AbstractResource; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.Constants; import org.apache.airavata.common.utils.RequestData; import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.gfac.core.scheduler.HostScheduler; +import org.apache.airavata.gfac.monitor.util.CommonUtils; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; -import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; @@ -47,9 +44,6 @@ import org.apache.airavata.persistance.registry.jpa.model.TaskDetail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; import java.util.Map; /** @@ -63,79 +57,50 @@ public class JobCountValidator implements JobMetadataValidator { String credStoreToken) { ValidatorResult result; try { - AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); - ApplicationInterfaceDescription applicationInterface = appCatalog.getApplicationInterface(). - getApplicationInterface(taskID.getApplicationId()); - - List<String> applicationModules = applicationInterface.getApplicationModules(); - String selectedModuleId = applicationModules.get(0); - Map<String, String> moduleIdFilter = new HashMap<String, String>(); - moduleIdFilter.put(AbstractResource.ApplicationDeploymentConstants.APP_MODULE_ID, selectedModuleId); - if (taskID.getTaskScheduling()!=null && taskID.getTaskScheduling().getResourceHostId() != null) { - moduleIdFilter.put(AbstractResource.ApplicationDeploymentConstants.COMPUTE_HOST_ID, - taskID.getTaskScheduling().getResourceHostId()); - } - List<ApplicationDeploymentDescription> applicationDeployements = appCatalog.getApplicationDeployment() - .getApplicationDeployements(moduleIdFilter); - Map<ComputeResourceDescription, ApplicationDeploymentDescription> deploymentMap = - new HashMap<ComputeResourceDescription, ApplicationDeploymentDescription>(); - ComputeResource computeResource = appCatalog.getComputeResource(); - for (ApplicationDeploymentDescription deploymentDescription : applicationDeployements) { - deploymentMap.put(computeResource.getComputeResource(deploymentDescription.getComputeHostId()), - deploymentDescription); - } - List<ComputeResourceDescription> computeHostList = new ArrayList<ComputeResourceDescription>(); - computeHostList.addAll(deploymentMap.keySet()); - - Class<? extends HostScheduler> aClass = Class.forName( - ServerSettings.getHostScheduler()).asSubclass( - HostScheduler.class); - HostScheduler hostScheduler = aClass.newInstance(); - ComputeResourceDescription ComputeResourceDescription = hostScheduler.schedule(computeHostList); - ApplicationDeploymentDescription applicationDeploymentDescription = deploymentMap.get(ComputeResourceDescription); - - ComputeResourceDescription computeResourceDescription = appCatalog.getComputeResource(). - getComputeResource(applicationDeploymentDescription.getComputeHostId()); - for (JobSubmissionInterface jobSubmissionInterface : computeResourceDescription.getJobSubmissionInterfaces()) { - switch (jobSubmissionInterface.getJobSubmissionProtocol()) { - case LOCAL: - // nothing to do - return new ValidatorResult(true); - case SSH: - SSHJobSubmission sshJobSubmission = - appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId()); - switch (sshJobSubmission.getSecurityProtocol()) { - case GSI: - // gsi - RequestData requestData = new RequestData(ServerSettings.getDefaultUserGateway()); - requestData.setTokenId(credStoreToken); - if (isJobSpaceAvailable(requestData.getMyProxyUserName(), computeHostList)) { - return new ValidatorResult(true); - } else { + ComputeResourceDescription computeResourceDes = CommonUtils.getComputeResourceDescription(taskID); + if (computeResourceDes.getBatchQueuesSize() > 0 && + computeResourceDes.getBatchQueues().get(0).getMaxJobsInQueue() > 0) { + int resourceMaxJobCount = computeResourceDes.getBatchQueues().get(0).getMaxJobsInQueue(); + for (JobSubmissionInterface jobSubmissionInterface : computeResourceDes.getJobSubmissionInterfaces()) { + switch (jobSubmissionInterface.getJobSubmissionProtocol()) { + case LOCAL: + // nothing to do + return new ValidatorResult(true); + case SSH: + SSHJobSubmission sshJobSubmission = + AppCatalogFactory.getAppCatalog().getComputeResource().getSSHJobSubmission( + jobSubmissionInterface.getJobSubmissionInterfaceId()); + switch (sshJobSubmission.getSecurityProtocol()) { + case GSI: + // gsi + RequestData requestData = new RequestData(ServerSettings.getDefaultUserGateway()); + requestData.setTokenId(credStoreToken); + return isJobSpaceAvailable(requestData.getMyProxyUserName(), + computeResourceDes.getHostName(), resourceMaxJobCount); + case SSH_KEYS: result = new ValidatorResult(false); - result.setErrorDetails("Please honour to the gobal max job count " + ServerSettings.getGlobalMaxJobCount()); + result.setErrorDetails("SSH_KEY base job count validation is not yet implemented"); return result; - } -// TokenizedMyProxyAuthInfo tokenizedMyProxyAuthInfo = new TokenizedMyProxyAuthInfo(requestData); - case SSH_KEYS: - result = new ValidatorResult(false); - result.setErrorDetails("SSH_KEY base job count validation is not yet implemented"); - return result; // ssh - default: - result = new ValidatorResult(false); - result.setErrorDetails("Doesn't support " + sshJobSubmission.getSecurityProtocol() + " protocol yet"); - return result; - } - default: - result = new ValidatorResult(false); - result.setErrorDetails("Doesn't support " + jobSubmissionInterface.getJobSubmissionProtocol() + " protocol yet"); - return result; + default: + result = new ValidatorResult(false); + result.setErrorDetails("Doesn't support " + sshJobSubmission.getSecurityProtocol() + + " protocol yet"); + return result; + } + default: + result = new ValidatorResult(false); + result.setErrorDetails("Doesn't support " + + jobSubmissionInterface.getJobSubmissionProtocol() + " protocol yet"); + return result; + } } + result = new ValidatorResult(false); + result.setErrorDetails("No JobSubmission interface found"); + return result; + } else { + return new ValidatorResult(true); } - result = new ValidatorResult(false); - result.setErrorDetails("No JobSubmission interface found"); - return result; } catch (Exception e) { result = new ValidatorResult(false); result.setErrorDetails("Exception occur while running validation process "); @@ -144,29 +109,32 @@ public class JobCountValidator implements JobMetadataValidator { } - private boolean isJobSpaceAvailable(String communityUserName, List<ComputeResourceDescription> computeHostList) throws ApplicationSettingsException { - String keyPath = new StringBuilder("/" + Constants.STAT).append("/").append(communityUserName).append("/").toString(); - for (ComputeResourceDescription computeResDesc : computeHostList) { - String key = keyPath + computeResDesc.getHostName() + "/" + Constants.JOB; - Map<String, Integer> jobCountMap = AiravataUtils.getJobCountMap(OrchestratorContext.getZk()); - if (jobCountMap.containsKey(key)) { - int count = jobCountMap.get(key); - if (count < Integer.parseInt(ServerSettings.getGlobalMaxJobCount())) { - return true; - } - }else { - return true; + private ValidatorResult isJobSpaceAvailable(String communityUserName, String computeHostName, int resourceMaxJobCount) + throws ApplicationSettingsException { + if (communityUserName == null) { + throw new IllegalArgumentException("Community user name should not be null"); + } + if (computeHostName == null) { + throw new IllegalArgumentException("Compute resource should not be null"); + } + String keyPath = new StringBuilder("/" + Constants.STAT).append("/").append(communityUserName) + .append("/").toString(); + String key = keyPath + computeHostName + "/" + Constants.JOB; + Map<String, Integer> jobCountMap = AiravataUtils.getJobCountMap(OrchestratorContext.getZk()); + if (jobCountMap.containsKey(key)) { + int count = jobCountMap.get(key); + logger.info("Submitted job count = " + count + ", max job count = " + resourceMaxJobCount); + if (count < resourceMaxJobCount) { + return new ValidatorResult(true); } + } else { + logger.info("Job count map doesn't has key : " + key); + return new ValidatorResult(true); } - return false; - } - - private void getAppDeployment(String applicationId, TaskDetail taskData) throws AppCatalogException { - return; - - } - - private ApplicationDeploymentDescription getAppDeployment(AppCatalog appCatalog, TaskDetail taskData, String selectedModuleId) { - return null; + logger.info("Resource " + computeHostName + " doesn't has space to submit another job, " + + "Configured resource max job count is " + resourceMaxJobCount + "."); + ValidatorResult result = new ValidatorResult(false); + result.setErrorDetails("Please honour to the gobal max job count " + resourceMaxJobCount); + return result; } }