This is an automated email from the ASF dual-hosted git repository.

isjarana pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new fba23ee247 return resource to pool
     new 29b3934272 Merge pull request #414 from isururanawaka/metaschedular
fba23ee247 is described below

commit fba23ee247a13bbd58a0d1ab9323bcf286d0faad
Author: Isuru Ranawaka <[email protected]>
AuthorDate: Mon Apr 10 09:59:32 2023 -0400

    return resource to pool
---
 .../airavata/metascheduler/core/utils/Utils.java   | 63 +++++++++++++++-------
 .../ComputationalResourceMonitoringService.java    |  5 +-
 2 files changed, 47 insertions(+), 21 deletions(-)

diff --git 
a/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
 
b/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
index 466abae27d..90ff631869 100644
--- 
a/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
+++ 
b/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
@@ -71,33 +71,56 @@ public class Utils {
     public static void saveAndPublishProcessStatus(ProcessState processState, 
String processId,
                                                    String experimentId, String 
gatewayId)
             throws RegistryServiceException, TException, AiravataException {
+         RegistryService.Client registryClient = null;
+        try {
+            registryClient = registryClientPool.getResource();
+            ProcessStatus processStatus = new ProcessStatus(processState);
+            
processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
 
-        ProcessStatus processStatus = new ProcessStatus(processState);
-        
processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-
-        registryClientPool.getResource().addProcessStatus(processStatus, 
processId);
-        ProcessIdentifier identifier = new ProcessIdentifier(processId, 
experimentId, gatewayId);
-        ProcessStatusChangeEvent processStatusChangeEvent = new 
ProcessStatusChangeEvent(processState, identifier);
-        MessageContext msgCtx = new MessageContext(processStatusChangeEvent, 
MessageType.PROCESS,
-                AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
-        msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-        getStatusPublisher().publish(msgCtx);
+            registryClientPool.getResource().addProcessStatus(processStatus, 
processId);
+            ProcessIdentifier identifier = new ProcessIdentifier(processId, 
experimentId, gatewayId);
+            ProcessStatusChangeEvent processStatusChangeEvent = new 
ProcessStatusChangeEvent(processState, identifier);
+            MessageContext msgCtx = new 
MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+                    AiravataUtils.getId(MessageType.PROCESS.name()), 
gatewayId);
+            msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            getStatusPublisher().publish(msgCtx);
+        } catch (Exception ex){
+            if (registryClient != null) {
+                registryClientPool.returnBrokenResource(registryClient);
+                registryClient = null;
+            }
+        } finally {
+            if (registryClient != null) {
+                registryClientPool.returnResource(registryClient);
+            }
+        }
     }
 
     public static void updateProcessStatusAndPublishStatus(ProcessState 
processState, String processId,
                                                    String experimentId, String 
gatewayId)
             throws RegistryServiceException, TException, AiravataException {
+        RegistryService.Client registryClient = null;
+        try {
+            ProcessStatus processStatus = new ProcessStatus(processState);
+            
processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
 
-        ProcessStatus processStatus = new ProcessStatus(processState);
-        
processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-
-        registryClientPool.getResource().updateProcessStatus(processStatus, 
processId);
-        ProcessIdentifier identifier = new ProcessIdentifier(processId, 
experimentId, gatewayId);
-        ProcessStatusChangeEvent processStatusChangeEvent = new 
ProcessStatusChangeEvent(processState, identifier);
-        MessageContext msgCtx = new MessageContext(processStatusChangeEvent, 
MessageType.PROCESS,
-                AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
-        msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-        getStatusPublisher().publish(msgCtx);
+            
registryClientPool.getResource().updateProcessStatus(processStatus, processId);
+            ProcessIdentifier identifier = new ProcessIdentifier(processId, 
experimentId, gatewayId);
+            ProcessStatusChangeEvent processStatusChangeEvent = new 
ProcessStatusChangeEvent(processState, identifier);
+            MessageContext msgCtx = new 
MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+                    AiravataUtils.getId(MessageType.PROCESS.name()), 
gatewayId);
+            msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            getStatusPublisher().publish(msgCtx);
+        } catch (Exception ex) {
+            if (registryClient != null) {
+                registryClientPool.returnBrokenResource(registryClient);
+                registryClient = null;
+            }
+        } finally {
+            if (registryClient != null) {
+                registryClientPool.returnResource(registryClient);
+            }
+        }
     }
 
     public static synchronized Publisher getStatusPublisher() throws 
AiravataException {
diff --git 
a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java
 
b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java
index 015c3b729d..631542f9d4 100644
--- 
a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java
+++ 
b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java
@@ -12,6 +12,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Computational Resource Monitoring Service
@@ -89,7 +90,9 @@ public class ComputationalResourceMonitoringService 
implements IServer {
 
     @Override
     public void stop() throws Exception {
-        scheduler.unscheduleJobs(new ArrayList(jobTriggerMap.values()));
+        scheduler.unscheduleJobs(jobTriggerMap.values().stream().map(trigger 
-> {
+           return trigger.getKey();
+        }).collect(Collectors.toList()));
     }
 
     @Override

Reply via email to