Moved job restriction check logic to SimpleOrchestratorImpl
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b8cacbc3 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b8cacbc3 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b8cacbc3 Branch: refs/heads/orchestratorJobThrottleFeature Commit: b8cacbc3decd71085d2f01b1a81367d33139ff84 Parents: 459b7cb Author: shamrath <shameerai...@gmail.com> Authored: Wed Sep 24 10:31:37 2014 -0400 Committer: shamrath <shameerai...@gmail.com> Committed: Wed Sep 24 10:31:37 2014 -0400 ---------------------------------------------------------------------- .../main/resources/airavata-server.properties | 1 - .../main/resources/airavata-server.properties | 1 + .../handlers/GridPullMonitorHandler.java | 16 --- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 7 +- .../airavata/gfac/monitor/util/CommonUtils.java | 30 +++--- .../core/utils/OrchestratorUtils.java | 100 ++++++++++++++++++ .../core/validator/impl/JobCountValidator.java | 104 +++++++++++-------- .../cpi/impl/SimpleOrchestratorImpl.java | 49 ++++++++- 8 files changed, 223 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties index 42b1bc8..ba938f2 100644 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@ -200,7 +200,6 @@ start.submitter=true embedded.mode=true enable.validation=true enable.job.restriction.validation=true -global.max.job.count=50 orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer ###---------------------------API Server module Configurations---------------------------### http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties index 91a8ae6..bfc0504 100644 --- a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties +++ b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties @@ -287,6 +287,7 @@ threadpool.size=10 start.submitter=true embedded.mode=true enable.validation=true +enable.job.restriction.validation=true orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer ###---------------------------API Server module Configurations---------------------------### http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java index 2376520..6caf553 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java @@ -102,22 +102,6 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{ e.printStackTrace(); } CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID); - if (ServerSettings.getEnableJobRestrictionValidation().equals("true")) { - try { - TaskDetails taskDetails = monitorID.getJobExecutionContext().getTaskData(); - - ComputeResourceDescription computeResourceDescription = - CommonUtils.getComputeResourceDescription(taskDetails); - if (computeResourceDescription.getBatchQueues().size() > 0 && - computeResourceDescription.getBatchQueues().get(0).getMaxJobsInQueue() > 0) { - - CommonUtils.increaseZkJobCount(monitorID); // update change job count to zookeeper - } - } catch (Exception e) { - logger.error("Error reading max job count from Computer Resource Description," + - " zookeeper job count update process failed"); - } - } } catch (AiravataMonitorException e) { logger.error("Error adding monitorID object to the queue with experiment ", monitorID.getExperimentID()); } http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index e9098ac..022b0a6 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -42,6 +42,7 @@ import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.credential.store.util.AuthenticationInfo; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.workspace.experiment.JobState; +import org.apache.airavata.model.workspace.experiment.TaskDetails; import org.apache.airavata.model.workspace.experiment.TaskState; import org.apache.airavata.schemas.gfac.GsisshHostType; import org.apache.airavata.schemas.gfac.SSHHostType; @@ -266,9 +267,11 @@ public class HPCPullMonitor extends PullMonitor { for (MonitorID completedJob : completedJobs) { CommonUtils.removeMonitorFromQueue(queue, completedJob); if (ServerSettings.getEnableJobRestrictionValidation().equals("true")) { // is job restriction available? + TaskDetails taskDetails = completedJob.getJobExecutionContext().getTaskData(); ComputeResourceDescription computeResourceDesc = CommonUtils.getComputeResourceDescription( - completedJob.getJobExecutionContext().getTaskData()); - if (computeResourceDesc.getBatchQueuesSize() > 0 && computeResourceDesc.getBatchQueues().get(0).getMaxJobsInQueue() > 0) { + taskDetails); + if (computeResourceDesc.getBatchQueuesSize() > 0 && + taskDetails.getTaskScheduling().getQueueName() != null) { if (zk == null) { zk = completedJob.getJobExecutionContext().getZk(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java index d4e29ca..abc6b69 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java @@ -41,6 +41,7 @@ import org.apache.airavata.gfac.monitor.UserMonitorData; import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; +import org.apache.airavata.model.appcatalog.computeresource.BatchQueue; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.workspace.experiment.TaskDetails; import org.apache.airavata.schemas.gfac.GsisshHostType; @@ -243,23 +244,6 @@ public class CommonUtils { public static void updateZkWithJobCount(ZooKeeper zk, final Map<String, Integer> changeCountMap, boolean isAdd) { StringBuilder changeZNodePaths = new StringBuilder(); try { - if (zk == null || !zk.getState().isConnected()) { - try { - final CountDownLatch countDownLatch = new CountDownLatch(1); - zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, new Watcher() { - @Override - public void process(WatchedEvent event) { - countDownLatch.countDown(); - } - }); - countDownLatch.await(); - } catch (ApplicationSettingsException e) { - logger.error("Error while reading zookeeper hostport string"); - } catch (IOException e) { - logger.error("Error while reconnect attempt to zookeeper where zookeeper connection loss state"); - } - } - for (String path : changeCountMap.keySet()) { if (isAdd) { CommonUtils.checkAndCreateZNode(zk, path); @@ -325,7 +309,8 @@ public class CommonUtils { */ public static String getJobCountUpdatePath(MonitorID monitorID){ return new StringBuilder("/").append(Constants.STAT).append("/").append(monitorID.getUserName()) - .append("/").append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB).toString(); + .append("/").append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB). + append("/").append(monitorID.getJobExecutionContext().getTaskData().getTaskScheduling().getQueueName()).toString(); } /** @@ -380,4 +365,13 @@ public class CommonUtils { } } + public static BatchQueue getBatchQueueByName(List<BatchQueue> batchQueues, String queueName) { + for (BatchQueue bQueue : batchQueues) { + if (bQueue.getQueueName().equals(queueName)) { + return bQueue; + } + } + return null; + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java index 8a2d574..d51cce7 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java @@ -22,13 +22,31 @@ package org.apache.airavata.orchestrator.core.utils; import java.io.IOException; import java.util.Arrays; +import java.util.Map; +import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; +import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.common.utils.RequestData; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.commons.gfac.type.HostDescription; +import org.apache.airavata.credential.store.credential.AuditInfo; +import org.apache.airavata.credential.store.store.CredentialReader; +import org.apache.airavata.credential.store.store.CredentialReaderFactory; +import org.apache.airavata.gfac.monitor.util.CommonUtils; +import org.apache.airavata.model.appcatalog.computeresource.BatchQueue; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; +import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; +import org.apache.airavata.model.error.ValidatorResult; import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling; +import org.apache.airavata.model.workspace.experiment.Experiment; import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; import org.apache.airavata.orchestrator.core.OrchestratorConfiguration; +import org.apache.airavata.orchestrator.core.context.OrchestratorContext; import org.apache.airavata.orchestrator.core.exception.OrchestratorException; import org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter; import org.apache.airavata.orchestrator.core.job.JobSubmitter; @@ -68,4 +86,86 @@ public class OrchestratorUtils { // throw new OrchestratorException(e); // } // } + + public static String getCommunityUserName(Experiment experiment, + ComputeResourceDescription computeResourceDes, + TaskDetails taskID, + String credStoreToken) throws AiravataException { + ValidatorResult result; + try { + if (computeResourceDes.getBatchQueuesSize() > 0) { + BatchQueue batchQueue = CommonUtils.getBatchQueueByName(computeResourceDes.getBatchQueues(), + taskID.getTaskScheduling().getQueueName()); + if (batchQueue == null) { + throw new IllegalArgumentException("Invalid queue name, There is no queue with name :" + + taskID.getTaskScheduling().getQueueName()); + } + int resourceMaxJobCount = batchQueue.getMaxJobsInQueue(); + if (resourceMaxJobCount > 0) { + for (JobSubmissionInterface jobSubmissionInterface : computeResourceDes.getJobSubmissionInterfaces()) { + switch (jobSubmissionInterface.getJobSubmissionProtocol()) { + case LOCAL: + // nothing to do + return null; + case SSH: + SSHJobSubmission sshJobSubmission = + AppCatalogFactory.getAppCatalog().getComputeResource().getSSHJobSubmission( + jobSubmissionInterface.getJobSubmissionInterfaceId()); + switch (sshJobSubmission.getSecurityProtocol()) { + case GSI: + // gsi + RequestData requestData = new RequestData(ServerSettings.getDefaultUserGateway()); + requestData.setTokenId(credStoreToken); + return requestData.getMyProxyUserName(); + case SSH_KEYS: + CredentialReader credentialReader = CredentialReaderFactory.createCredentialStoreReader(); + AuditInfo auditInfo = credentialReader.getAuditInfo(experiment.getUserName(), credStoreToken); + return auditInfo.getCommunityUser().getUserName(); + // ssh + default: + //nothing to do + } + default: + //nothing to do + } + } + return null; + + }// end of inner if + }// end of outer if + return null; + } catch (Exception e) { + throw new AiravataException("Exception while getting community user name ", e); + } + + } + + public static boolean isJobSpaceAvailable(String communityUserName, String computeHostName, String queueName, int resourceMaxJobCount) + throws ApplicationSettingsException { + if (communityUserName == null) { + throw new IllegalArgumentException("Community user name should not be null"); + } + if (computeHostName == null) { + throw new IllegalArgumentException("Compute resource should not be null"); + } + String keyPath = new StringBuilder("/" + Constants.STAT).append("/").append(communityUserName) + .append("/").toString(); + String key = keyPath + computeHostName + "/" + Constants.JOB + "/" + queueName; + Map<String, Integer> jobCountMap = AiravataUtils.getJobCountMap(OrchestratorContext.getZk()); + if (jobCountMap.containsKey(key)) { + int count = jobCountMap.get(key); + logger.info("Submitted job count = " + count + ", max job count = " + resourceMaxJobCount); + if (count < resourceMaxJobCount) { + return true; + } + } else { + logger.info("Job count map doesn't has key : " + key); + return true; + } + logger.info("Resource " + computeHostName + " doesn't has space to submit another job, " + + "Configured resource max job count is " + resourceMaxJobCount + "."); + return false; + } + + } http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java index 0fb98ac..f1cc5f9 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java @@ -29,8 +29,12 @@ import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.Constants; import org.apache.airavata.common.utils.RequestData; import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.credential.store.credential.AuditInfo; +import org.apache.airavata.credential.store.store.CredentialReader; +import org.apache.airavata.credential.store.store.CredentialReaderFactory; import org.apache.airavata.gfac.monitor.util.CommonUtils; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +import org.apache.airavata.model.appcatalog.computeresource.BatchQueue; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; @@ -44,6 +48,7 @@ import org.apache.airavata.persistance.registry.jpa.model.TaskDetail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.Map; /** @@ -58,58 +63,67 @@ public class JobCountValidator implements JobMetadataValidator { ValidatorResult result; try { ComputeResourceDescription computeResourceDes = CommonUtils.getComputeResourceDescription(taskID); - if (computeResourceDes.getBatchQueuesSize() > 0 && - computeResourceDes.getBatchQueues().get(0).getMaxJobsInQueue() > 0) { - int resourceMaxJobCount = computeResourceDes.getBatchQueues().get(0).getMaxJobsInQueue(); - for (JobSubmissionInterface jobSubmissionInterface : computeResourceDes.getJobSubmissionInterfaces()) { - switch (jobSubmissionInterface.getJobSubmissionProtocol()) { - case LOCAL: - // nothing to do - return new ValidatorResult(true); - case SSH: - SSHJobSubmission sshJobSubmission = - AppCatalogFactory.getAppCatalog().getComputeResource().getSSHJobSubmission( - jobSubmissionInterface.getJobSubmissionInterfaceId()); - switch (sshJobSubmission.getSecurityProtocol()) { - case GSI: - // gsi - RequestData requestData = new RequestData(ServerSettings.getDefaultUserGateway()); - requestData.setTokenId(credStoreToken); - return isJobSpaceAvailable(requestData.getMyProxyUserName(), - computeResourceDes.getHostName(), resourceMaxJobCount); - case SSH_KEYS: - result = new ValidatorResult(false); - result.setErrorDetails("SSH_KEY base job count validation is not yet implemented"); - return result; - // ssh - default: - result = new ValidatorResult(false); - result.setErrorDetails("Doesn't support " + sshJobSubmission.getSecurityProtocol() + - " protocol yet"); - return result; - } - default: - result = new ValidatorResult(false); - result.setErrorDetails("Doesn't support " + - jobSubmissionInterface.getJobSubmissionProtocol() + " protocol yet"); - return result; - } + if (computeResourceDes.getBatchQueuesSize() > 0) { + BatchQueue batchQueue = CommonUtils.getBatchQueueByName(computeResourceDes.getBatchQueues(), + taskID.getTaskScheduling().getQueueName()); + if (batchQueue == null) { + throw new IllegalArgumentException("Invalid queue name, There is no queue with name :" + + taskID.getTaskScheduling().getQueueName()); } - result = new ValidatorResult(false); - result.setErrorDetails("No JobSubmission interface found"); - return result; - } else { - return new ValidatorResult(true); - } + int resourceMaxJobCount = batchQueue.getMaxJobsInQueue(); + if (resourceMaxJobCount > 0) { + for (JobSubmissionInterface jobSubmissionInterface : computeResourceDes.getJobSubmissionInterfaces()) { + switch (jobSubmissionInterface.getJobSubmissionProtocol()) { + case LOCAL: + // nothing to do + return new ValidatorResult(true); + case SSH: + SSHJobSubmission sshJobSubmission = + AppCatalogFactory.getAppCatalog().getComputeResource().getSSHJobSubmission( + jobSubmissionInterface.getJobSubmissionInterfaceId()); + switch (sshJobSubmission.getSecurityProtocol()) { + case GSI: + // gsi + RequestData requestData = new RequestData(ServerSettings.getDefaultUserGateway()); + requestData.setTokenId(credStoreToken); + return isJobSpaceAvailable(requestData.getMyProxyUserName(), + computeResourceDes.getHostName(), batchQueue.getQueueName(), resourceMaxJobCount); + case SSH_KEYS: + CredentialReader credentialReader = CredentialReaderFactory.createCredentialStoreReader(); + AuditInfo auditInfo = credentialReader.getAuditInfo(experiment.getUserName(), credStoreToken); + return isJobSpaceAvailable(auditInfo.getCommunityUser().getUserName(), + computeResourceDes.getHostName(), batchQueue.getQueueName(), resourceMaxJobCount); + // ssh + default: + result = new ValidatorResult(false); + result.setErrorDetails("Doesn't support " + sshJobSubmission.getSecurityProtocol() + + " protocol yet"); + return result; + } + default: + result = new ValidatorResult(false); + result.setErrorDetails("Doesn't support " + + jobSubmissionInterface.getJobSubmissionProtocol() + " protocol yet"); + return result; + } + } + result = new ValidatorResult(false); + result.setErrorDetails("No JobSubmission interface found"); + return result; + + }// end of inner if + }// end of outer if + return new ValidatorResult(true); } catch (Exception e) { + logger.error("Exception occur while running job count validation process ", e); result = new ValidatorResult(false); - result.setErrorDetails("Exception occur while running validation process "); + result.setErrorDetails("Exception occur while running job count validation process "); return result; } } - private ValidatorResult isJobSpaceAvailable(String communityUserName, String computeHostName, int resourceMaxJobCount) + private ValidatorResult isJobSpaceAvailable(String communityUserName, String computeHostName, String queueName, int resourceMaxJobCount) throws ApplicationSettingsException { if (communityUserName == null) { throw new IllegalArgumentException("Community user name should not be null"); @@ -119,7 +133,7 @@ public class JobCountValidator implements JobMetadataValidator { } String keyPath = new StringBuilder("/" + Constants.STAT).append("/").append(communityUserName) .append("/").toString(); - String key = keyPath + computeHostName + "/" + Constants.JOB; + String key = keyPath + computeHostName + "/" + Constants.JOB + "/" + queueName; Map<String, Integer> jobCountMap = AiravataUtils.getJobCountMap(OrchestratorContext.getZk()); if (jobCountMap.containsKey(key)) { int count = jobCountMap.get(key); http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java index a950469..c0760dd 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -20,14 +20,23 @@ */ package org.apache.airavata.orchestrator.cpi.impl; +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.gfac.monitor.util.CommonUtils; +import org.apache.airavata.model.appcatalog.computeresource.BatchQueue; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.error.LaunchValidationException; import org.apache.airavata.model.error.ValidationResults; import org.apache.airavata.model.error.ValidatorResult; import org.apache.airavata.model.util.ExperimentModelUtil; import org.apache.airavata.model.workspace.experiment.*; +import org.apache.airavata.orchestrator.core.context.OrchestratorContext; import org.apache.airavata.orchestrator.core.exception.OrchestratorException; import org.apache.airavata.orchestrator.core.job.JobSubmitter; +import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils; import org.apache.airavata.orchestrator.core.validator.JobMetadataValidator; +import org.apache.airavata.orchestrator.core.validator.impl.JobCountValidator; import org.apache.airavata.registry.cpi.ChildDataType; import org.apache.airavata.registry.cpi.Registry; import org.apache.airavata.registry.cpi.RegistryModelType; @@ -35,7 +44,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; public class SimpleOrchestratorImpl extends AbstractOrchestrator{ @@ -91,18 +102,50 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ } } - public boolean launchExperiment(Experiment experiment, WorkflowNodeDetails workflowNode, TaskDetails task,String tokenId) throws OrchestratorException { + public boolean launchExperiment(Experiment experiment, WorkflowNodeDetails workflowNode, TaskDetails task, + String tokenId) throws OrchestratorException { // we give higher priority to userExperimentID String experimentId = experiment.getExperimentID(); String taskId = task.getTaskID(); // creating monitorID to register with monitoring queue // this is a special case because amqp has to be in place before submitting the job try { - return jobSubmitter.submit(experimentId, taskId,tokenId); + if (ServerSettings.getEnableJobRestrictionValidation().equals("true") && + task.getTaskScheduling().getQueueName() != null) { + ComputeResourceDescription computeResourceDes = CommonUtils.getComputeResourceDescription(task); + String communityUserName = OrchestratorUtils.getCommunityUserName(experiment, computeResourceDes, task, + tokenId); + BatchQueue batchQueue = CommonUtils.getBatchQueueByName(computeResourceDes.getBatchQueues(), + task.getTaskScheduling().getQueueName()); + + synchronized (this) { + boolean spaceAvaialble = OrchestratorUtils.isJobSpaceAvailable(communityUserName, + computeResourceDes.getHostName(), batchQueue.getQueueName(), batchQueue.getMaxJobsInQueue()); + if (spaceAvaialble) { + if (jobSubmitter.submit(experimentId, taskId, tokenId)) { + logger.info("Job submitted, experiment Id : " + experimentId + " , task Id : " + taskId); + Map<String, Integer> jobUpdateMap = new HashMap<String, Integer>(); + StringBuilder sb = new StringBuilder("/").append(Constants.STAT).append("/") + .append(communityUserName).append("/").append(computeResourceDes.getHostName()) + .append("/").append(Constants.JOB).append("/").append(batchQueue.getQueueName()); + jobUpdateMap.put(sb.toString(), 1); + CommonUtils.updateZkWithJobCount(OrchestratorContext.getZk(), jobUpdateMap, true); // update change job count to zookeeper + return true; + } else { + return false; + } + } else { + throw new AiravataException("Please honour to the max job submission restriction policy," + + " max count is " + batchQueue.getMaxJobsInQueue()); + } + }// end of synchronized block + } else { + logger.info("Ignored job throttling"); + return jobSubmitter.submit(experimentId, taskId, tokenId); + } } catch (Exception e) { throw new OrchestratorException("Error launching the job", e); } - } /**