This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/master by this push:
     new b933d96  Bringing back publishing process status over message bus
b933d96 is described below

commit b933d963904e2dc3d56b5db6640b8d4cf4cea3dc
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Thu May 2 16:32:13 2019 -0400

    Bringing back publishing process status over message bus
---
 .../apache/airavata/helix/impl/task/AiravataTask.java | 19 +++++++++++++++----
 1 file changed, 15 insertions(+), 4 deletions(-)

diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 0d5cfe1..b3380c9 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -189,12 +189,12 @@ public abstract class AiravataTask extends AbstractTask {
                 status.setTimeOfStateChange(status.getTimeOfStateChange());
             }
             getRegistryServiceClient().addProcessStatus(status, 
getProcessId());
-            /*ProcessIdentifier identifier = new 
ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId());
+            ProcessIdentifier identifier = new 
ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId());
             ProcessStatusChangeEvent processStatusChangeEvent = new 
ProcessStatusChangeEvent(status.getState(), identifier);
             MessageContext msgCtx = new 
MessageContext(processStatusChangeEvent, MessageType.PROCESS,
                     AiravataUtils.getId(MessageType.PROCESS.name()), 
getGatewayId());
             msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-            getStatusPublisher().publish(msgCtx);*/
+            getStatusPublisher().publish(msgCtx);
         } catch (Exception e) {
             logger.error("Failed to save process status of process " + 
getProcessId(), e);
         }
@@ -218,13 +218,13 @@ public abstract class AiravataTask extends AbstractTask {
 
             getRegistryServiceClient().addJobStatus(jobStatus, taskId, jobId);
 
-            /*JobIdentifier identifier = new JobIdentifier(jobId, taskId, 
processId, experimentId, gateway);
+            JobIdentifier identifier = new JobIdentifier(jobId, taskId, 
processId, experimentId, gateway);
 
             JobStatusChangeEvent jobStatusChangeEvent = new 
JobStatusChangeEvent(jobStatus.getJobState(), identifier);
             MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, 
MessageType.JOB, AiravataUtils.getId
                     (MessageType.JOB.name()), gateway);
             msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-            getStatusPublisher().publish(msgCtx);*/
+            getStatusPublisher().publish(msgCtx);
 
         } catch (Exception e) {
             logger.error("Error persisting job status " + 
e.getLocalizedMessage(), e);
@@ -298,6 +298,17 @@ public abstract class AiravataTask extends AbstractTask {
         }
     }
 
+    protected Publisher getStatusPublisher() throws AiravataException {
+        if (statusPublisher == null) {
+            synchronized (RabbitMQPublisher.class) {
+                if (statusPublisher == null) {
+                    statusPublisher = 
MessagingFactory.getPublisher(Type.STATUS);
+                }
+            }
+        }
+        return statusPublisher;
+    }
+
     @Override
     public TaskResult onRun(TaskHelper helper) {
 

Reply via email to