publishing experiment statuses
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/20fe7b44 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/20fe7b44 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/20fe7b44 Branch: refs/heads/master Commit: 20fe7b44a1353b9b5ee060bcf0820b1265951e13 Parents: 70358df Author: Chathuri Wimalasena <[email protected]> Authored: Tue Jan 19 16:43:57 2016 -0500 Committer: Chathuri Wimalasena <[email protected]> Committed: Tue Jan 19 16:43:57 2016 -0500 ---------------------------------------------------------------------- .../server/OrchestratorServerHandler.java | 19 +++++------------- .../orchestrator/util/OrchestratorUtils.java | 21 ++++++++++++++++---- 2 files changed, 22 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/20fe7b44/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index 66861dd..f04fdae 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -27,7 +27,6 @@ import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.common.utils.ZkConstants; -import org.apache.airavata.credential.store.store.CredentialReader; import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.core.scheduler.HostScheduler; import org.apache.airavata.messaging.core.MessageContext; @@ -44,7 +43,6 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfil import org.apache.airavata.model.error.LaunchValidationException; import org.apache.airavata.model.experiment.ExperimentModel; import org.apache.airavata.model.experiment.ExperimentType; -import org.apache.airavata.model.experiment.UserConfigurationDataModel; import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.messaging.event.ProcessIdentifier; @@ -180,15 +178,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED); status.setReason("submitted all processes"); status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - OrchestratorUtils.updageExperimentStatus(experimentId, status); + OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); log.info("expId: {}, Launched experiment ", experimentId); - ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.LAUNCHED, - experimentId, - gatewayId); - String messageId = AiravataUtils.getId("EXPERIMENT"); - MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, messageId, gatewayId); - messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); - publisher.publish(messageContext); OrchestratorServerThreadPoolExecutor.getCachedThreadPool().execute(new SingleAppExperimentRunner(experimentId, token, gatewayId)); } else if (executionType == ExperimentType.WORKFLOW) { //its a workflow execution experiment @@ -368,7 +359,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { ExperimentStatus status = new ExperimentStatus(ExperimentState.CANCELING); status.setReason("Experiment cancel request processed"); status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - OrchestratorUtils.updageExperimentStatus(experimentId, status); + OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); log.info("expId : " + experimentId + " :- Experiment status updated to " + status.getState()); return true; } @@ -423,12 +414,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { // ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED); // status.setReason("submitted all processes"); // status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); -// OrchestratorUtils.updageExperimentStatus(experimentId, status); +// OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status); // log.info("expId: {}, Launched experiment ", experimentId); } catch (Exception e) { ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED); status.setReason("Error while updating task status"); - OrchestratorUtils.updageExperimentStatus(experimentId, status); + OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status, publisher, gatewayId); log.error("expId: " + experimentId + ", Error while updating task status, hence updated experiment status to " + ExperimentState.FAILED, e); ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.FAILED, @@ -547,7 +538,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { } if (status.getState() != null) { status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - OrchestratorUtils.updageExperimentStatus(processIdentity.getExperimentId(), status); + OrchestratorUtils.updageAndPublishExperimentStatus(processIdentity.getExperimentId(), status, publisher, gatewayName); log.info("expId : " + processIdentity.getExperimentId() + " :- Experiment status updated to " + status.getState()); } http://git-wip-us.apache.org/repos/asf/airavata/blob/20fe7b44/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java index 834d3b6..0a9617d 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java @@ -20,8 +20,12 @@ */ package org.apache.airavata.orchestrator.util; +import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.model.status.ExperimentState; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; +import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.status.ExperimentStatus; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; @@ -32,14 +36,23 @@ import org.slf4j.LoggerFactory; public class OrchestratorUtils { private static final Logger log = LoggerFactory.getLogger(OrchestratorUtils.class); - public static void updageExperimentStatus(String experimentId, ExperimentStatus status) { + public static void updageAndPublishExperimentStatus(String experimentId, ExperimentStatus status, Publisher publisher, String gatewayId) { try { RegistryFactory.getDefaultExpCatalog().update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status, experimentId); + ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(status.getState(), + experimentId, + gatewayId); + String messageId = AiravataUtils.getId("EXPERIMENT"); + MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, messageId, gatewayId); + messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + publisher.publish(messageContext); } catch (RegistryException e) { log.error("expId : " + experimentId + " Error while updating experiment status to " + status.toString(), e); - } - } + } catch (AiravataException e) { + log.error("expId : " + experimentId + " Error while publishing experiment status to " + status.toString(), e); + } + } public static ExperimentStatus getExperimentStatus(String experimentId) throws RegistryException { return ((ExperimentStatus) RegistryFactory.getDefaultExpCatalog().get(ExperimentCatalogModelType
