This is an automated email from the ASF dual-hosted git repository.

isjarana pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3ec0e93da6 Fix monitoring issues
     new 229e943e0a Merge pull request #339 from isururanawaka/metaschedular
3ec0e93da6 is described below

commit 3ec0e93da6ae3f512ea18a203b2ae209919154cd
Author: Isuru Ranawaka <[email protected]>
AuthorDate: Mon Nov 21 14:11:08 2022 -0500

    Fix monitoring issues
---
 .../org/apache/airavata/agents/api/AgentUtils.java |   7 +-
 .../airavata/metascheduler/core/utils/Utils.java   |   8 +-
 .../resource/monitoring/job/MonitoringJob.java     | 190 +++++++++++----------
 .../monitoring/job/output/OutputParser.java        |   4 +-
 .../monitoring/job/output/OutputParserImpl.java    |  31 +++-
 .../resource/monitoring/utils/Constants.java       |   1 +
 6 files changed, 141 insertions(+), 100 deletions(-)

diff --git 
a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java
 
b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java
index fa6c4481b9..cb8b716f69 100644
--- 
a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java
+++ 
b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/AgentUtils.java
@@ -16,16 +16,19 @@ public class AgentUtils {
         try {
             final int serverPort = 
Integer.parseInt(ServerSettings.getRegistryServerPort());
             final String serverHost = ServerSettings.getRegistryServerHost();
-            return 
RegistryServiceClientFactory.createRegistryClient(serverHost, serverPort);
+
+            return 
RegistryServiceClientFactory.createRegistryClient(serverHost,
+                    serverPort);
         } catch (RegistryServiceException | ApplicationSettingsException e) {
             throw new AgentException("Unable to create registry client...", e);
         }
+
     }
 
     public static CredentialStoreService.Client getCredentialClient() throws 
AgentException {
         try {
             final int serverPort = 
Integer.parseInt(ServerSettings.getCredentialStoreServerPort());
-            final String serverHost 
=ServerSettings.getCredentialStoreServerHost();
+            final String serverHost = 
ServerSettings.getCredentialStoreServerHost();
             return 
CredentialStoreClientFactory.createAiravataCSClient(serverHost, serverPort);
         } catch (CredentialStoreException | ApplicationSettingsException e) {
             throw new AgentException("Unable to create credential client...", 
e);
diff --git 
a/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
 
b/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
index 493aa482e6..0e9c787b86 100644
--- 
a/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
+++ 
b/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
@@ -25,13 +25,13 @@ public class Utils {
             return registryClientPool;
         }
         try {
-            final int serverPort = 
Integer.parseInt(ServerSettings.getRegistryServerPort());
-            final String serverHost = ServerSettings.getRegistryServerHost();
+//            final int serverPort = 
Integer.parseInt(ServerSettings.getRegistryServerPort());
+//            final String serverHost = ServerSettings.getRegistryServerHost();
             registryClientPool = new ThriftClientPool<>(
                     tProtocol -> new RegistryService.Client(tProtocol),
                     
Utils.<RegistryService.Client>createGenericObjectPoolConfig(),
-                    serverHost,
-                    serverPort);
+                    "149.165.153.112",
+                    8970);
             return registryClientPool;
         } catch (Exception e) {
             throw new RuntimeException("Unable to create registry client...", 
e);
diff --git 
a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/MonitoringJob.java
 
b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/MonitoringJob.java
index 235fb2263c..463594c129 100644
--- 
a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/MonitoringJob.java
+++ 
b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/MonitoringJob.java
@@ -9,7 +9,7 @@ import 
org.apache.airavata.helix.core.support.adaptor.AdaptorSupportImpl;
 import org.apache.airavata.helix.impl.task.submission.config.JobFactory;
 import org.apache.airavata.helix.task.api.support.AdaptorSupport;
 import org.apache.airavata.model.appcatalog.computeresource.*;
-import 
org.apache.airavata.model.appcatalog.groupresourceprofile.GroupComputeResourcePreference;
+import 
org.apache.airavata.model.appcatalog.groupresourceprofile.ComputeResourcePolicy;
 import 
org.apache.airavata.model.appcatalog.groupresourceprofile.GroupResourceProfile;
 import org.apache.airavata.model.status.QueueStatusModel;
 import org.apache.airavata.registry.api.RegistryService;
@@ -37,11 +37,10 @@ public class MonitoringJob extends ComputeResourceMonitor 
implements Job {
     public void execute(JobExecutionContext jobExecutionContext) throws 
JobExecutionException {
         RegistryService.Client client = null;
         try {
-            LOGGER.info("Executing ComputeResources ....... ");
+            LOGGER.debug("Executing ComputeResources Monitoring Job....... ");
 
             client = this.registryClientPool.getResource();
 
-            AdaptorSupportImpl adaptorSupport = 
AdaptorSupportImpl.getInstance();
 
             JobDataMap jobDataMap = 
jobExecutionContext.getJobDetail().getJobDataMap();
             String metaSchedulerGateway = 
jobDataMap.getString(Constants.METASCHEDULER_GATEWAY);
@@ -50,29 +49,12 @@ public class MonitoringJob extends ComputeResourceMonitor 
implements Job {
             int jobId = 
jobDataMap.getInt(Constants.METASCHEDULER_SCANNING_JOB_ID);
             int parallelJobs = 
jobDataMap.getInt(Constants.METASCHEDULER_SCANNING_JOBS);
 
+            LOGGER.debug("Main Gateway:"+metaSchedulerGateway+" Group Resource 
Profile: "
+                    +metaSchedulerGRP+" username: "+username+" jobId: 
"+jobId+" parallellJobs: "+parallelJobs);
 
-            GroupResourceProfile groupResourceProfile = 
getGroupResourceProfile(metaSchedulerGRP);
-            List<GroupComputeResourcePreference> computeResourcePreferenceList 
= groupResourceProfile.getComputePreferences();
+            executeComputeResourceMonitoring(client, metaSchedulerGateway, 
username, metaSchedulerGRP, parallelJobs, jobId);
 
-            int size = computeResourcePreferenceList.size();
 
-            int chunkSize = size / parallelJobs;
-
-            int startIndex = jobId * chunkSize;
-
-            int endIndex = (jobId + 1) * chunkSize;
-
-            if (jobId == parallelJobs - 1) {
-                endIndex = size;
-            }
-
-            List<GroupComputeResourcePreference> computeResourcePreferences = 
computeResourcePreferenceList
-                    .subList(startIndex, endIndex);
-
-            for (GroupComputeResourcePreference computeResourcePreference : 
computeResourcePreferences) {
-                LOGGER.info("updating GRP########### 
PRID:"+computeResourcePreference.getComputeResourceId());
-                updateComputeResource(client, adaptorSupport, 
metaSchedulerGateway, username, metaSchedulerGRP, computeResourcePreference);
-            }
         } catch (Exception ex) {
             String msg = "Error occurred while executing job" + 
ex.getMessage();
             LOGGER.error(msg, ex);
@@ -85,13 +67,39 @@ public class MonitoringJob extends ComputeResourceMonitor 
implements Job {
 
     }
 
+    private void executeComputeResourceMonitoring(RegistryService.Client 
client, String metaSchedulerGateway, String username,
+                                                  String metaSchedulerGRP, int 
parallelJobs, int jobId) throws Exception {
+        AdaptorSupportImpl adaptorSupport = AdaptorSupportImpl.getInstance();
+        GroupResourceProfile groupResourceProfile = 
getGroupResourceProfile(metaSchedulerGRP);
+//        List<GroupComputeResourcePreference> computeResourcePreferenceList = 
groupResourceProfile.getComputePreferences();
+
+
+        int size = groupResourceProfile.getComputeResourcePoliciesSize();
+
+        int chunkSize = size / parallelJobs;
+
+        int startIndex = jobId * chunkSize;
+
+        int endIndex = (jobId + 1) * chunkSize;
+
+        if (jobId == parallelJobs - 1) {
+            endIndex = size;
+        }
+
+        List<ComputeResourcePolicy> computeResourcePolicyList = 
groupResourceProfile.getComputeResourcePolicies().
+                subList(startIndex, endIndex);
+
+        for (ComputeResourcePolicy computeResourcePolicy : 
computeResourcePolicyList) {
+            updateComputeResource(client, adaptorSupport, 
metaSchedulerGateway, username, computeResourcePolicy);
+        }
+    }
+
 
     private void updateComputeResource(RegistryService.Client client, 
AdaptorSupport adaptorSupport,
                                        String gatewayId,
                                        String username,
-                                       String groupResourceProfileId,
-                                       GroupComputeResourcePreference 
computeResourcePreference) throws Exception {
-        String computeResourceId = 
computeResourcePreference.getComputeResourceId();
+                                       ComputeResourcePolicy 
computeResourcePolicy) throws Exception {
+        String computeResourceId = 
computeResourcePolicy.getComputeResourceId();
         ComputeResourceDescription comResourceDes = 
client.getComputeResource(computeResourceId);
         List<JobSubmissionInterface> jobSubmissionInterfaces = 
comResourceDes.getJobSubmissionInterfaces();
         Collections.sort(jobSubmissionInterfaces, 
Comparator.comparingInt(JobSubmissionInterface::getPriorityOrder));
@@ -100,9 +108,8 @@ public class MonitoringJob extends ComputeResourceMonitor 
implements Job {
 
         ResourceJobManager resourceJobManager = 
JobFactory.getResourceJobManager(client, jobSubmissionProtocol, 
jobSubmissionInterface);
 
-        LOGGER.info(" type "+ resourceJobManager.getResourceJobManagerType()+" 
jobSubmissionProtocol "+jobSubmissionProtocol);
         //TODO: intial phase we are only supporting SLURM
-        if (resourceJobManager.getResourceJobManagerType().equals("SLURM")) {
+        if 
(resourceJobManager.getResourceJobManagerType().name().equals("SLURM")) {
             String baseCommand = "sinfo";
 
             if 
(resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_CLUSTER_INFO))
 {
@@ -110,75 +117,84 @@ public class MonitoringJob extends ComputeResourceMonitor 
implements Job {
             }
 
 
+            List<String> allowedBatchQueues = 
computeResourcePolicy.getAllowedBatchQueues();
+            List<QueueStatusModel> queueStatusModels = new ArrayList<>();
+            for (String queue : allowedBatchQueues) {
 
-            String finalCommand = baseCommand + "-p" + 
computeResourcePreference.getPreferredBatchQueue();
-
-            String computeResourceToken = getComputeResourceCredentialToken(
-                    gatewayId,
-                    username,
-                    computeResourceId,
-                    false,
-                    true,
-                    groupResourceProfileId);
-
-            String loginUsername = getComputeResourceLoginUserName(gatewayId,
-                    username,
-                    computeResourceId,
-                    false,
-                    true,
-                    groupResourceProfileId,
-                    null);
-
-            AgentAdaptor adaptor = adaptorSupport.fetchAdaptor(gatewayId,
-                    computeResourceId,
-                    jobSubmissionProtocol,
-                    computeResourceToken,
-                    loginUsername);
-
-            CommandOutput commandOutput = adaptor.executeCommand(finalCommand, 
null);
-
-            OutputParser outputParser = new OutputParserImpl();
-            boolean queueStatus = false;
-            int runningJobs = 0;
-            int pendingJobs = 0;
-            LOGGER.info("command output"+commandOutput.getStdOut()+" error 
"+commandOutput.getStdError()+" exist code "+commandOutput.getExitCode());
-            if (outputParser.isComputeResourceAvailable(commandOutput)) {
-                queueStatus = true;
-
-                String runningJobCommand = "squeue";
-                String pendingJobCommand = "squeue";
-                if 
(resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_NO_OF_RUNNING_JOBS))
 {
-                    runningJobCommand = 
resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_NO_OF_RUNNING_JOBS);
-                }
+                String finalCommand = baseCommand + " -p " + queue;
 
-                if 
(resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_NO_OF_PENDING_JOBS))
 {
-                    pendingJobCommand = 
resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_NO_OF_PENDING_JOBS);
-                }
+                String computeResourceToken = 
getComputeResourceCredentialToken(
+                        gatewayId,
+                        username,
+                        computeResourceId,
+                        false,
+                        true,
+                        computeResourcePolicy.getGroupResourceProfileId());
 
-                String runningJobsCommand = runningJobCommand + "-h -t running 
-r | wc -l";
-                String pendingJobsCommand = pendingJobCommand + "-h -t pending 
-r | wc -l";
+                String loginUsername = 
getComputeResourceLoginUserName(gatewayId,
+                        username,
+                        computeResourceId,
+                        false,
+                        true,
+                        computeResourcePolicy.getGroupResourceProfileId(),
+                        null);
 
-                CommandOutput runningJobsCommandOutput = 
adaptor.executeCommand(runningJobsCommand, null);
+                AgentAdaptor adaptor = adaptorSupport.fetchAdaptor(gatewayId,
+                        computeResourceId,
+                        jobSubmissionProtocol,
+                        computeResourceToken,
+                        loginUsername);
 
-                CommandOutput pendingJobsCommandOutput = 
adaptor.executeCommand(pendingJobsCommand, null);
+                CommandOutput commandOutput = 
adaptor.executeCommand(finalCommand, null);
 
-                runningJobs = 
outputParser.getNumberofJobs(runningJobsCommandOutput);
-                pendingJobs = 
outputParser.getNumberofJobs(pendingJobsCommandOutput);
+                OutputParser outputParser = new OutputParserImpl();
+                boolean queueStatus = false;
+                int runningJobs = 0;
+                int pendingJobs = 0;
 
-            }
+                if 
(outputParser.isComputeResourceAvailable(commandOutput,Constants.JOB_SUBMISSION_PROTOCOL_SLURM))
 {
+                    queueStatus = true;
 
-            QueueStatusModel queueStatusModel = new QueueStatusModel();
-            queueStatusModel.setHostName(comResourceDes.getHostName());
-            
queueStatusModel.setQueueName(computeResourcePreference.getPreferredBatchQueue());
-            LOGGER.info("Storing hostname "+comResourceDes.getHostName()+" 
batch queue "+computeResourcePreference.getPreferredBatchQueue());
-            queueStatusModel.setQueueUp(queueStatus);
-            queueStatusModel.setRunningJobs(runningJobs);
-            queueStatusModel.setQueuedJobs(pendingJobs);
-            List<QueueStatusModel> queueStatusModels = new ArrayList<>();
-            queueStatusModels.add(queueStatusModel);
+                    String runningJobCommand = "squeue";
+                    String pendingJobCommand = "squeue";
+                    if 
(resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_NO_OF_RUNNING_JOBS))
 {
+                        runningJobCommand = 
resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_NO_OF_RUNNING_JOBS);
+                    }
 
-            client.registerQueueStatuses(queueStatusModels);
+                    if 
(resourceJobManager.getJobManagerCommands().containsKey(JobManagerCommand.SHOW_NO_OF_PENDING_JOBS))
 {
+                        pendingJobCommand = 
resourceJobManager.getJobManagerCommands().get(JobManagerCommand.SHOW_NO_OF_PENDING_JOBS);
+                    }
+
+                    String runningJobsCommand = runningJobCommand + "-h -t 
running -r | wc -l";
+                    String pendingJobsCommand = pendingJobCommand + "-h -t 
pending -r | wc -l";
+
+                    CommandOutput runningJobsCommandOutput = 
adaptor.executeCommand(runningJobsCommand, null);
+
+                    CommandOutput pendingJobsCommandOutput = 
adaptor.executeCommand(pendingJobsCommand, null);
 
+                    runningJobs = 
outputParser.getNumberofJobs(runningJobsCommandOutput,Constants.JOB_SUBMISSION_PROTOCOL_SLURM);
+                    pendingJobs = 
outputParser.getNumberofJobs(pendingJobsCommandOutput,Constants.JOB_SUBMISSION_PROTOCOL_SLURM);
+
+                }
+
+                QueueStatusModel queueStatusModel = new QueueStatusModel();
+                queueStatusModel.setHostName(comResourceDes.getHostName());
+                queueStatusModel.setQueueName(queue);
+                queueStatusModel.setQueueUp(queueStatus);
+                queueStatusModel.setRunningJobs(runningJobs);
+                queueStatusModel.setQueuedJobs(pendingJobs);
+                queueStatusModels.add(queueStatusModel);
+                queueStatusModel.setTime(System.currentTimeMillis());
+            }
+            client.registerQueueStatuses(queueStatusModels);
         }
     }
+
+    public static void main(String[] args) throws Exception {
+        MonitoringJob monitoringJob = new MonitoringJob();
+        
monitoringJob.executeComputeResourceMonitoring(monitoringJob.registryClientPool.getResource(),
 "seagrid", "metascheacc",
+                "a2076a5a-0fbf-44f4-9d47-060153bc578b", 1, 0);
+    }
 }
+
+
diff --git 
a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParser.java
 
b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParser.java
index a06b9b5e02..b984940886 100644
--- 
a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParser.java
+++ 
b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParser.java
@@ -8,9 +8,9 @@ import org.apache.airavata.agents.api.CommandOutput;
 public interface OutputParser {
 
 
-    boolean isComputeResourceAvailable(CommandOutput commandOutput);
+    boolean isComputeResourceAvailable(CommandOutput commandOutput, String 
type);
 
-    int getNumberofJobs(CommandOutput commandOutput);
+    int getNumberofJobs(CommandOutput commandOutput, String type);
 
 
 
diff --git 
a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParserImpl.java
 
b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParserImpl.java
index e8a11ba274..37e51d95a0 100644
--- 
a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParserImpl.java
+++ 
b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/job/output/OutputParserImpl.java
@@ -1,6 +1,9 @@
 package org.apache.airavata.compute.resource.monitoring.job.output;
 
 import org.apache.airavata.agents.api.CommandOutput;
+import org.apache.airavata.compute.resource.monitoring.utils.Constants;
+
+import java.util.Scanner;
 
 /**
  * This is parser output implementation
@@ -9,17 +12,35 @@ public class OutputParserImpl implements OutputParser {
 
 
     @Override
-    public boolean isComputeResourceAvailable(CommandOutput commandOutput) {
-        if (commandOutput.getExitCode() == 0) {
-            return true;
+    public boolean isComputeResourceAvailable(CommandOutput commandOutput, 
String type) {
+        if (commandOutput.getStdOut() != null && 
!commandOutput.getStdOut().isEmpty()) {
+            if (type.equals(Constants.JOB_SUBMISSION_PROTOCOL_SLURM)) {
+                Scanner scanner = new Scanner(commandOutput.getStdOut());
+                if (scanner.hasNextLine()) {
+                    String firstLine = scanner.nextLine();
+                }
+                while (scanner.hasNextLine()) {
+                    String line = scanner.nextLine();
+                    String[] splittedString = line.split(" ");
+                    for (String splitted : splittedString) {
+                        if (splitted.trim().equals("up")) {
+                            return true;
+                        }
+                    }
+                }
+
+            }
+
         }
         return false;
     }
 
     @Override
-    public int getNumberofJobs(CommandOutput commandOutput) {
+    public int getNumberofJobs(CommandOutput commandOutput, String type) {
         if (commandOutput.getStdOut() != null && 
!commandOutput.getStdOut().isEmpty()) {
-            return Integer.parseInt(commandOutput.getStdOut());
+            if (type.equals(Constants.JOB_SUBMISSION_PROTOCOL_SLURM)) {
+                return Integer.parseInt(commandOutput.getStdOut().trim());
+            }
         }
         return 0;
     }
diff --git 
a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/utils/Constants.java
 
b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/utils/Constants.java
index 61ae5f87f7..76d7d0afaf 100644
--- 
a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/utils/Constants.java
+++ 
b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/utils/Constants.java
@@ -11,4 +11,5 @@ public class Constants {
     public static final String COMPUTE_RESOURCE_SCANNER_GROUP = 
"compute.resource.scanner.group";
     public static final String COMPUTE_RESOURCE_SCANNER_TRIGGER = 
"compute.resource.scanner.trigger";
     public static  final String COMPUTE_RESOURCE_SCANNER_JOB = 
"compute.resource.scanner.job";
+    public static  final String JOB_SUBMISSION_PROTOCOL_SLURM = "SLURM";
 }

Reply via email to