This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/master by this push:
new b933d96 Bringing back publishing process status over message bus
b933d96 is described below
commit b933d963904e2dc3d56b5db6640b8d4cf4cea3dc
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Thu May 2 16:32:13 2019 -0400
Bringing back publishing process status over message bus
---
.../apache/airavata/helix/impl/task/AiravataTask.java | 19 +++++++++++++++----
1 file changed, 15 insertions(+), 4 deletions(-)
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 0d5cfe1..b3380c9 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -189,12 +189,12 @@ public abstract class AiravataTask extends AbstractTask {
status.setTimeOfStateChange(status.getTimeOfStateChange());
}
getRegistryServiceClient().addProcessStatus(status,
getProcessId());
- /*ProcessIdentifier identifier = new
ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId());
+ ProcessIdentifier identifier = new
ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId());
ProcessStatusChangeEvent processStatusChangeEvent = new
ProcessStatusChangeEvent(status.getState(), identifier);
MessageContext msgCtx = new
MessageContext(processStatusChangeEvent, MessageType.PROCESS,
AiravataUtils.getId(MessageType.PROCESS.name()),
getGatewayId());
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- getStatusPublisher().publish(msgCtx);*/
+ getStatusPublisher().publish(msgCtx);
} catch (Exception e) {
logger.error("Failed to save process status of process " +
getProcessId(), e);
}
@@ -218,13 +218,13 @@ public abstract class AiravataTask extends AbstractTask {
getRegistryServiceClient().addJobStatus(jobStatus, taskId, jobId);
- /*JobIdentifier identifier = new JobIdentifier(jobId, taskId,
processId, experimentId, gateway);
+ JobIdentifier identifier = new JobIdentifier(jobId, taskId,
processId, experimentId, gateway);
JobStatusChangeEvent jobStatusChangeEvent = new
JobStatusChangeEvent(jobStatus.getJobState(), identifier);
MessageContext msgCtx = new MessageContext(jobStatusChangeEvent,
MessageType.JOB, AiravataUtils.getId
(MessageType.JOB.name()), gateway);
msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- getStatusPublisher().publish(msgCtx);*/
+ getStatusPublisher().publish(msgCtx);
} catch (Exception e) {
logger.error("Error persisting job status " +
e.getLocalizedMessage(), e);
@@ -298,6 +298,17 @@ public abstract class AiravataTask extends AbstractTask {
}
}
+ protected Publisher getStatusPublisher() throws AiravataException {
+ if (statusPublisher == null) {
+ synchronized (RabbitMQPublisher.class) {
+ if (statusPublisher == null) {
+ statusPublisher =
MessagingFactory.getPublisher(Type.STATUS);
+ }
+ }
+ }
+ return statusPublisher;
+ }
+
@Override
public TaskResult onRun(TaskHelper helper) {