This is an automated email from the ASF dual-hosted git repository.
isjarana pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push:
new fba23ee247 return resource to pool
new 29b3934272 Merge pull request #414 from isururanawaka/metaschedular
fba23ee247 is described below
commit fba23ee247a13bbd58a0d1ab9323bcf286d0faad
Author: Isuru Ranawaka <[email protected]>
AuthorDate: Mon Apr 10 09:59:32 2023 -0400
return resource to pool
---
.../airavata/metascheduler/core/utils/Utils.java | 63 +++++++++++++++-------
.../ComputationalResourceMonitoringService.java | 5 +-
2 files changed, 47 insertions(+), 21 deletions(-)
diff --git
a/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
b/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
index 466abae27d..90ff631869 100644
---
a/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
+++
b/modules/airavata-metascheduler/metascheduler-core/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
@@ -71,33 +71,56 @@ public class Utils {
public static void saveAndPublishProcessStatus(ProcessState processState,
String processId,
String experimentId, String
gatewayId)
throws RegistryServiceException, TException, AiravataException {
+ RegistryService.Client registryClient = null;
+ try {
+ registryClient = registryClientPool.getResource();
+ ProcessStatus processStatus = new ProcessStatus(processState);
+
processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- ProcessStatus processStatus = new ProcessStatus(processState);
-
processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-
- registryClientPool.getResource().addProcessStatus(processStatus,
processId);
- ProcessIdentifier identifier = new ProcessIdentifier(processId,
experimentId, gatewayId);
- ProcessStatusChangeEvent processStatusChangeEvent = new
ProcessStatusChangeEvent(processState, identifier);
- MessageContext msgCtx = new MessageContext(processStatusChangeEvent,
MessageType.PROCESS,
- AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
- msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- getStatusPublisher().publish(msgCtx);
+ registryClientPool.getResource().addProcessStatus(processStatus,
processId);
+ ProcessIdentifier identifier = new ProcessIdentifier(processId,
experimentId, gatewayId);
+ ProcessStatusChangeEvent processStatusChangeEvent = new
ProcessStatusChangeEvent(processState, identifier);
+ MessageContext msgCtx = new
MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+ AiravataUtils.getId(MessageType.PROCESS.name()),
gatewayId);
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ getStatusPublisher().publish(msgCtx);
+ } catch (Exception ex){
+ if (registryClient != null) {
+ registryClientPool.returnBrokenResource(registryClient);
+ registryClient = null;
+ }
+ } finally {
+ if (registryClient != null) {
+ registryClientPool.returnResource(registryClient);
+ }
+ }
}
public static void updateProcessStatusAndPublishStatus(ProcessState
processState, String processId,
String experimentId, String
gatewayId)
throws RegistryServiceException, TException, AiravataException {
+ RegistryService.Client registryClient = null;
+ try {
+ ProcessStatus processStatus = new ProcessStatus(processState);
+
processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- ProcessStatus processStatus = new ProcessStatus(processState);
-
processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-
- registryClientPool.getResource().updateProcessStatus(processStatus,
processId);
- ProcessIdentifier identifier = new ProcessIdentifier(processId,
experimentId, gatewayId);
- ProcessStatusChangeEvent processStatusChangeEvent = new
ProcessStatusChangeEvent(processState, identifier);
- MessageContext msgCtx = new MessageContext(processStatusChangeEvent,
MessageType.PROCESS,
- AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
- msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- getStatusPublisher().publish(msgCtx);
+
registryClientPool.getResource().updateProcessStatus(processStatus, processId);
+ ProcessIdentifier identifier = new ProcessIdentifier(processId,
experimentId, gatewayId);
+ ProcessStatusChangeEvent processStatusChangeEvent = new
ProcessStatusChangeEvent(processState, identifier);
+ MessageContext msgCtx = new
MessageContext(processStatusChangeEvent, MessageType.PROCESS,
+ AiravataUtils.getId(MessageType.PROCESS.name()),
gatewayId);
+ msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ getStatusPublisher().publish(msgCtx);
+ } catch (Exception ex) {
+ if (registryClient != null) {
+ registryClientPool.returnBrokenResource(registryClient);
+ registryClient = null;
+ }
+ } finally {
+ if (registryClient != null) {
+ registryClientPool.returnResource(registryClient);
+ }
+ }
}
public static synchronized Publisher getStatusPublisher() throws
AiravataException {
diff --git
a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java
b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java
index 015c3b729d..631542f9d4 100644
---
a/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java
+++
b/modules/computer-resource-monitoring-service/src/main/java/org/apache/airavata/compute/resource/monitoring/ComputationalResourceMonitoringService.java
@@ -12,6 +12,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Computational Resource Monitoring Service
@@ -89,7 +90,9 @@ public class ComputationalResourceMonitoringService
implements IServer {
@Override
public void stop() throws Exception {
- scheduler.unscheduleJobs(new ArrayList(jobTriggerMap.values()));
+ scheduler.unscheduleJobs(jobTriggerMap.values().stream().map(trigger
-> {
+ return trigger.getKey();
+ }).collect(Collectors.toList()));
}
@Override