Moved job restriction check logic to SimpleOrchestratorImpl

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b8cacbc3
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b8cacbc3
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b8cacbc3

Branch: refs/heads/orchestratorJobThrottleFeature
Commit: b8cacbc3decd71085d2f01b1a81367d33139ff84
Parents: 459b7cb
Author: shamrath <shameerai...@gmail.com>
Authored: Wed Sep 24 10:31:37 2014 -0400
Committer: shamrath <shameerai...@gmail.com>
Committed: Wed Sep 24 10:31:37 2014 -0400

----------------------------------------------------------------------
 .../main/resources/airavata-server.properties   |   1 -
 .../main/resources/airavata-server.properties   |   1 +
 .../handlers/GridPullMonitorHandler.java        |  16 ---
 .../monitor/impl/pull/qstat/HPCPullMonitor.java |   7 +-
 .../airavata/gfac/monitor/util/CommonUtils.java |  30 +++---
 .../core/utils/OrchestratorUtils.java           | 100 ++++++++++++++++++
 .../core/validator/impl/JobCountValidator.java  | 104 +++++++++++--------
 .../cpi/impl/SimpleOrchestratorImpl.java        |  49 ++++++++-
 8 files changed, 223 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git 
a/modules/configuration/server/src/main/resources/airavata-server.properties 
b/modules/configuration/server/src/main/resources/airavata-server.properties
index 42b1bc8..ba938f2 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -200,7 +200,6 @@ start.submitter=true
 embedded.mode=true
 enable.validation=true
 enable.job.restriction.validation=true
-global.max.job.count=50
 orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer
 
 ###---------------------------API Server module 
Configurations---------------------------###

http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git 
a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
 
b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
index 91a8ae6..bfc0504 100644
--- 
a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
+++ 
b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
@@ -287,6 +287,7 @@ threadpool.size=10
 start.submitter=true
 embedded.mode=true
 enable.validation=true
+enable.job.restriction.validation=true
 orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer
 
 ###---------------------------API Server module 
Configurations---------------------------###

http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index 2376520..6caf553 100644
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -102,22 +102,6 @@ public class GridPullMonitorHandler extends 
ThreadedHandler implements Watcher{
                 e.printStackTrace();
             }
             CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), 
monitorID);
-            if 
(ServerSettings.getEnableJobRestrictionValidation().equals("true")) {
-                try {
-                    TaskDetails taskDetails = 
monitorID.getJobExecutionContext().getTaskData();
-
-                    ComputeResourceDescription computeResourceDescription =
-                            
CommonUtils.getComputeResourceDescription(taskDetails);
-                    if (computeResourceDescription.getBatchQueues().size() > 0 
&&
-                            
computeResourceDescription.getBatchQueues().get(0).getMaxJobsInQueue() > 0) {
-
-                    CommonUtils.increaseZkJobCount(monitorID); // update 
change job count to zookeeper
-                    }
-                } catch (Exception e) {
-                    logger.error("Error reading max job count from Computer 
Resource Description," +
-                            " zookeeper job count update process failed");
-                }
-            }
         } catch (AiravataMonitorException e) {
             logger.error("Error adding monitorID object to the queue with 
experiment ", monitorID.getExperimentID());
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index e9098ac..022b0a6 100644
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -42,6 +42,7 @@ import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.credential.store.util.AuthenticationInfo;
 import 
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.airavata.model.workspace.experiment.TaskState;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.apache.airavata.schemas.gfac.SSHHostType;
@@ -266,9 +267,11 @@ public class HPCPullMonitor extends PullMonitor {
             for (MonitorID completedJob : completedJobs) {
                 CommonUtils.removeMonitorFromQueue(queue, completedJob);
                 if 
(ServerSettings.getEnableJobRestrictionValidation().equals("true")) { // is job 
restriction available?
+                    TaskDetails taskDetails = 
completedJob.getJobExecutionContext().getTaskData();
                     ComputeResourceDescription computeResourceDesc = 
CommonUtils.getComputeResourceDescription(
-                            
completedJob.getJobExecutionContext().getTaskData());
-                    if (computeResourceDesc.getBatchQueuesSize() > 0 && 
computeResourceDesc.getBatchQueues().get(0).getMaxJobsInQueue() > 0) {
+                            taskDetails);
+                    if (computeResourceDesc.getBatchQueuesSize() > 0 &&
+                            taskDetails.getTaskScheduling().getQueueName() != 
null) {
                         if (zk == null) {
                             zk = completedJob.getJobExecutionContext().getZk();
                         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index d4e29ca..abc6b69 100644
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -41,6 +41,7 @@ import org.apache.airavata.gfac.monitor.UserMonitorData;
 import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
 import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import 
org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.BatchQueue;
 import 
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
@@ -243,23 +244,6 @@ public class CommonUtils {
     public static void updateZkWithJobCount(ZooKeeper zk, final Map<String, 
Integer> changeCountMap, boolean isAdd) {
         StringBuilder changeZNodePaths = new StringBuilder();
         try {
-            if (zk == null || !zk.getState().isConnected()) {
-                try {
-                    final CountDownLatch countDownLatch = new 
CountDownLatch(1);
-                    zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), 6000, 
new Watcher() {
-                        @Override
-                        public void process(WatchedEvent event) {
-                            countDownLatch.countDown();
-                        }
-                    });
-                    countDownLatch.await();
-                } catch (ApplicationSettingsException e) {
-                    logger.error("Error while reading zookeeper hostport 
string");
-                } catch (IOException e) {
-                    logger.error("Error while reconnect attempt to zookeeper 
where zookeeper connection loss state");
-                }
-            }
-
             for (String path : changeCountMap.keySet()) {
                 if (isAdd) {
                     CommonUtils.checkAndCreateZNode(zk, path);
@@ -325,7 +309,8 @@ public class CommonUtils {
      */
     public static String getJobCountUpdatePath(MonitorID monitorID){
         return new 
StringBuilder("/").append(Constants.STAT).append("/").append(monitorID.getUserName())
-                
.append("/").append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB).toString();
+                
.append("/").append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB).
+                        
append("/").append(monitorID.getJobExecutionContext().getTaskData().getTaskScheduling().getQueueName()).toString();
     }
 
     /**
@@ -380,4 +365,13 @@ public class CommonUtils {
         }
     }
 
+    public static BatchQueue getBatchQueueByName(List<BatchQueue> batchQueues, 
String queueName) {
+        for (BatchQueue bQueue : batchQueues) {
+            if (bQueue.getQueueName().equals(queueName)) {
+                return bQueue;
+            }
+        }
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
index 8a2d574..d51cce7 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java
@@ -22,13 +22,31 @@ package org.apache.airavata.orchestrator.core.utils;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Map;
 
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.RequestData;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.credential.store.credential.AuditInfo;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.credential.store.store.CredentialReaderFactory;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.model.appcatalog.computeresource.BatchQueue;
+import 
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.error.ValidatorResult;
 import 
org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
+import org.apache.airavata.model.workspace.experiment.Experiment;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
 import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
+import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter;
 import org.apache.airavata.orchestrator.core.job.JobSubmitter;
@@ -68,4 +86,86 @@ public class OrchestratorUtils {
 //            throw new OrchestratorException(e);
 //        }
 //    }
+
+    public  static String getCommunityUserName(Experiment experiment,
+                                               ComputeResourceDescription 
computeResourceDes,
+                                               TaskDetails taskID,
+                                               String credStoreToken) throws 
AiravataException {
+        ValidatorResult result;
+        try {
+            if (computeResourceDes.getBatchQueuesSize() > 0) {
+                BatchQueue batchQueue = 
CommonUtils.getBatchQueueByName(computeResourceDes.getBatchQueues(),
+                        taskID.getTaskScheduling().getQueueName());
+                if (batchQueue == null) {
+                    throw new IllegalArgumentException("Invalid queue name, 
There is no queue with name :" +
+                            taskID.getTaskScheduling().getQueueName());
+                }
+                int resourceMaxJobCount = batchQueue.getMaxJobsInQueue();
+                if (resourceMaxJobCount > 0) {
+                    for (JobSubmissionInterface jobSubmissionInterface : 
computeResourceDes.getJobSubmissionInterfaces()) {
+                        switch 
(jobSubmissionInterface.getJobSubmissionProtocol()) {
+                            case LOCAL:
+                                // nothing to do
+                                return null;
+                            case SSH:
+                                SSHJobSubmission sshJobSubmission =
+                                        
AppCatalogFactory.getAppCatalog().getComputeResource().getSSHJobSubmission(
+                                                
jobSubmissionInterface.getJobSubmissionInterfaceId());
+                                switch 
(sshJobSubmission.getSecurityProtocol()) {
+                                    case GSI:
+                                        // gsi
+                                        RequestData requestData = new 
RequestData(ServerSettings.getDefaultUserGateway());
+                                        requestData.setTokenId(credStoreToken);
+                                        return 
requestData.getMyProxyUserName();
+                                    case SSH_KEYS:
+                                        CredentialReader credentialReader = 
CredentialReaderFactory.createCredentialStoreReader();
+                                        AuditInfo auditInfo = 
credentialReader.getAuditInfo(experiment.getUserName(), credStoreToken);
+                                        return 
auditInfo.getCommunityUser().getUserName();
+                                    // ssh
+                                    default:
+                                        //nothing to do
+                                }
+                            default:
+                                //nothing to do
+                        }
+                    }
+                    return null;
+
+                }// end of inner if
+            }// end of outer if
+            return null;
+        } catch (Exception e) {
+            throw new AiravataException("Exception while getting community 
user name ", e);
+        }
+
+    }
+
+    public static boolean isJobSpaceAvailable(String communityUserName, String 
computeHostName, String queueName, int resourceMaxJobCount)
+            throws ApplicationSettingsException {
+        if (communityUserName == null) {
+            throw new IllegalArgumentException("Community user name should not 
be null");
+        }
+        if (computeHostName == null) {
+            throw new IllegalArgumentException("Compute resource should not be 
null");
+        }
+        String keyPath = new StringBuilder("/" + 
Constants.STAT).append("/").append(communityUserName)
+                .append("/").toString();
+        String key = keyPath + computeHostName + "/" + Constants.JOB + "/" + 
queueName;
+        Map<String, Integer> jobCountMap = 
AiravataUtils.getJobCountMap(OrchestratorContext.getZk());
+        if (jobCountMap.containsKey(key)) {
+            int count = jobCountMap.get(key);
+            logger.info("Submitted job count = " + count + ", max job count = 
" + resourceMaxJobCount);
+            if (count < resourceMaxJobCount) {
+                return true;
+            }
+        } else {
+            logger.info("Job count map doesn't has key : " + key);
+            return true;
+        }
+        logger.info("Resource " + computeHostName + " doesn't has space to 
submit another job, " +
+                "Configured resource max job count is " + resourceMaxJobCount 
+ ".");
+        return false;
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java
index 0fb98ac..f1cc5f9 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/validator/impl/JobCountValidator.java
@@ -29,8 +29,12 @@ import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.RequestData;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.credential.store.credential.AuditInfo;
+import org.apache.airavata.credential.store.store.CredentialReader;
+import org.apache.airavata.credential.store.store.CredentialReaderFactory;
 import org.apache.airavata.gfac.monitor.util.CommonUtils;
 import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.appcatalog.computeresource.BatchQueue;
 import 
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
 import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
@@ -44,6 +48,7 @@ import 
org.apache.airavata.persistance.registry.jpa.model.TaskDetail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -58,58 +63,67 @@ public class JobCountValidator implements 
JobMetadataValidator {
         ValidatorResult result;
         try {
             ComputeResourceDescription computeResourceDes = 
CommonUtils.getComputeResourceDescription(taskID);
-            if (computeResourceDes.getBatchQueuesSize() > 0 &&
-                    
computeResourceDes.getBatchQueues().get(0).getMaxJobsInQueue() > 0) {
-                int resourceMaxJobCount = 
computeResourceDes.getBatchQueues().get(0).getMaxJobsInQueue();
-                for (JobSubmissionInterface jobSubmissionInterface : 
computeResourceDes.getJobSubmissionInterfaces()) {
-                    switch (jobSubmissionInterface.getJobSubmissionProtocol()) 
{
-                        case LOCAL:
-                            // nothing to do
-                            return new ValidatorResult(true);
-                        case SSH:
-                            SSHJobSubmission sshJobSubmission =
-                                    
AppCatalogFactory.getAppCatalog().getComputeResource().getSSHJobSubmission(
-                                            
jobSubmissionInterface.getJobSubmissionInterfaceId());
-                            switch (sshJobSubmission.getSecurityProtocol()) {
-                                case GSI:
-                                    // gsi
-                                    RequestData requestData = new 
RequestData(ServerSettings.getDefaultUserGateway());
-                                    requestData.setTokenId(credStoreToken);
-                                    return 
isJobSpaceAvailable(requestData.getMyProxyUserName(),
-                                            computeResourceDes.getHostName(), 
resourceMaxJobCount);
-                                case SSH_KEYS:
-                                    result = new ValidatorResult(false);
-                                    result.setErrorDetails("SSH_KEY base job 
count validation is not yet implemented");
-                                    return result;
-                                // ssh
-                                default:
-                                    result = new ValidatorResult(false);
-                                    result.setErrorDetails("Doesn't support " 
+ sshJobSubmission.getSecurityProtocol() +
-                                            " protocol yet");
-                                    return result;
-                            }
-                        default:
-                            result = new ValidatorResult(false);
-                            result.setErrorDetails("Doesn't support " +
-                                    
jobSubmissionInterface.getJobSubmissionProtocol() + " protocol yet");
-                            return result;
-                    }
+            if (computeResourceDes.getBatchQueuesSize() > 0) {
+                BatchQueue batchQueue = 
CommonUtils.getBatchQueueByName(computeResourceDes.getBatchQueues(),
+                        taskID.getTaskScheduling().getQueueName());
+                if (batchQueue == null) {
+                    throw new IllegalArgumentException("Invalid queue name, 
There is no queue with name :" +
+                            taskID.getTaskScheduling().getQueueName());
                 }
-                result = new ValidatorResult(false);
-                result.setErrorDetails("No JobSubmission interface found");
-                return result;
-            } else {
-                return new ValidatorResult(true);
-            }
+                int resourceMaxJobCount = batchQueue.getMaxJobsInQueue();
+                if (resourceMaxJobCount > 0) {
+                    for (JobSubmissionInterface jobSubmissionInterface : 
computeResourceDes.getJobSubmissionInterfaces()) {
+                        switch 
(jobSubmissionInterface.getJobSubmissionProtocol()) {
+                            case LOCAL:
+                                // nothing to do
+                                return new ValidatorResult(true);
+                            case SSH:
+                                SSHJobSubmission sshJobSubmission =
+                                        
AppCatalogFactory.getAppCatalog().getComputeResource().getSSHJobSubmission(
+                                                
jobSubmissionInterface.getJobSubmissionInterfaceId());
+                                switch 
(sshJobSubmission.getSecurityProtocol()) {
+                                    case GSI:
+                                        // gsi
+                                        RequestData requestData = new 
RequestData(ServerSettings.getDefaultUserGateway());
+                                        requestData.setTokenId(credStoreToken);
+                                        return 
isJobSpaceAvailable(requestData.getMyProxyUserName(),
+                                                
computeResourceDes.getHostName(), batchQueue.getQueueName(), 
resourceMaxJobCount);
+                                    case SSH_KEYS:
+                                        CredentialReader credentialReader = 
CredentialReaderFactory.createCredentialStoreReader();
+                                        AuditInfo auditInfo = 
credentialReader.getAuditInfo(experiment.getUserName(), credStoreToken);
+                                        return 
isJobSpaceAvailable(auditInfo.getCommunityUser().getUserName(),
+                                                
computeResourceDes.getHostName(), batchQueue.getQueueName(), 
resourceMaxJobCount);
+                                    // ssh
+                                    default:
+                                        result = new ValidatorResult(false);
+                                        result.setErrorDetails("Doesn't 
support " + sshJobSubmission.getSecurityProtocol() +
+                                                " protocol yet");
+                                        return result;
+                                }
+                            default:
+                                result = new ValidatorResult(false);
+                                result.setErrorDetails("Doesn't support " +
+                                        
jobSubmissionInterface.getJobSubmissionProtocol() + " protocol yet");
+                                return result;
+                        }
+                    }
+                    result = new ValidatorResult(false);
+                    result.setErrorDetails("No JobSubmission interface found");
+                    return result;
+
+                }// end of inner if
+            }// end of outer if
+            return new ValidatorResult(true);
         } catch (Exception e) {
+            logger.error("Exception occur while running job count validation 
process ", e);
             result = new ValidatorResult(false);
-            result.setErrorDetails("Exception occur while running validation 
process ");
+            result.setErrorDetails("Exception occur while running job count 
validation process ");
             return result;
         }
 
     }
 
-    private ValidatorResult isJobSpaceAvailable(String communityUserName, 
String computeHostName, int resourceMaxJobCount)
+    private ValidatorResult isJobSpaceAvailable(String communityUserName, 
String computeHostName, String queueName, int resourceMaxJobCount)
             throws ApplicationSettingsException {
         if (communityUserName == null) {
             throw new IllegalArgumentException("Community user name should not 
be null");
@@ -119,7 +133,7 @@ public class JobCountValidator implements 
JobMetadataValidator {
         }
         String keyPath = new StringBuilder("/" + 
Constants.STAT).append("/").append(communityUserName)
                 .append("/").toString();
-        String key = keyPath + computeHostName + "/" + Constants.JOB;
+        String key = keyPath + computeHostName + "/" + Constants.JOB + "/" + 
queueName;
         Map<String, Integer> jobCountMap = 
AiravataUtils.getJobCountMap(OrchestratorContext.getZk());
         if (jobCountMap.containsKey(key)) {
             int count = jobCountMap.get(key);

http://git-wip-us.apache.org/repos/asf/airavata/blob/b8cacbc3/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index a950469..c0760dd 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -20,14 +20,23 @@
 */
 package org.apache.airavata.orchestrator.cpi.impl;
 
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.monitor.util.CommonUtils;
+import org.apache.airavata.model.appcatalog.computeresource.BatchQueue;
+import 
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.error.LaunchValidationException;
 import org.apache.airavata.model.error.ValidationResults;
 import org.apache.airavata.model.error.ValidatorResult;
 import org.apache.airavata.model.util.ExperimentModelUtil;
 import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.job.JobSubmitter;
+import org.apache.airavata.orchestrator.core.utils.OrchestratorUtils;
 import org.apache.airavata.orchestrator.core.validator.JobMetadataValidator;
+import org.apache.airavata.orchestrator.core.validator.impl.JobCountValidator;
 import org.apache.airavata.registry.cpi.ChildDataType;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryModelType;
@@ -35,7 +44,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 public class SimpleOrchestratorImpl extends AbstractOrchestrator{
@@ -91,18 +102,50 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
         }
     }
 
-    public boolean launchExperiment(Experiment experiment, WorkflowNodeDetails 
workflowNode, TaskDetails task,String tokenId) throws OrchestratorException {
+    public boolean launchExperiment(Experiment experiment, WorkflowNodeDetails 
workflowNode, TaskDetails task,
+                                    String tokenId) throws 
OrchestratorException {
         // we give higher priority to userExperimentID
         String experimentId = experiment.getExperimentID();
         String taskId = task.getTaskID();
         // creating monitorID to register with monitoring queue
         // this is a special case because amqp has to be in place before 
submitting the job
         try {
-            return jobSubmitter.submit(experimentId, taskId,tokenId);
+            if 
(ServerSettings.getEnableJobRestrictionValidation().equals("true") &&
+                    task.getTaskScheduling().getQueueName() != null) {
+                ComputeResourceDescription computeResourceDes = 
CommonUtils.getComputeResourceDescription(task);
+                String communityUserName = 
OrchestratorUtils.getCommunityUserName(experiment, computeResourceDes, task,
+                        tokenId);
+                BatchQueue batchQueue = 
CommonUtils.getBatchQueueByName(computeResourceDes.getBatchQueues(),
+                        task.getTaskScheduling().getQueueName());
+
+                synchronized (this) {
+                    boolean spaceAvaialble = 
OrchestratorUtils.isJobSpaceAvailable(communityUserName,
+                            computeResourceDes.getHostName(), 
batchQueue.getQueueName(), batchQueue.getMaxJobsInQueue());
+                    if (spaceAvaialble) {
+                        if (jobSubmitter.submit(experimentId, taskId, 
tokenId)) {
+                            logger.info("Job submitted, experiment Id : " + 
experimentId + " , task Id : " + taskId);
+                            Map<String, Integer> jobUpdateMap = new 
HashMap<String, Integer>();
+                            StringBuilder sb = new 
StringBuilder("/").append(Constants.STAT).append("/")
+                                    
.append(communityUserName).append("/").append(computeResourceDes.getHostName())
+                                    
.append("/").append(Constants.JOB).append("/").append(batchQueue.getQueueName());
+                            jobUpdateMap.put(sb.toString(), 1);
+                            
CommonUtils.updateZkWithJobCount(OrchestratorContext.getZk(), jobUpdateMap, 
true); // update change job count to zookeeper
+                            return true;
+                        } else {
+                            return false;
+                        }
+                    } else {
+                        throw new AiravataException("Please honour to the max 
job submission restriction policy," +
+                                " max count is " + 
batchQueue.getMaxJobsInQueue());
+                    }
+                }// end of synchronized block
+            } else {
+                logger.info("Ignored job throttling");
+                return jobSubmitter.submit(experimentId, taskId, tokenId);
+            }
         } catch (Exception e) {
             throw new OrchestratorException("Error launching the job", e);
         }
-
     }
 
     /**

Reply via email to