new message thrift object
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/88d8638f Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/88d8638f Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/88d8638f Branch: refs/heads/messaging_framework Commit: 88d8638f27b18e0b38c24985906142460ef92d47 Parents: 412f1e7 Author: chathuriw <[email protected]> Authored: Wed Sep 24 15:54:05 2014 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Wed Sep 24 15:54:05 2014 -0400 ---------------------------------------------------------------------- .../AiravataExperimentStatusUpdator.java | 17 +- .../lib/airavata/messagingEvents_constants.cpp | 2 + .../lib/airavata/messagingEvents_constants.h | 1 + .../lib/airavata/messagingEvents_types.cpp | 157 ++++++++++++++++ .../lib/airavata/messagingEvents_types.h | 100 +++++++++++ .../Airavata/Model/Messaging/Event/Types.php | 180 +++++++++++++++++++ .../messagingEvents.thrift | 24 +++ .../airavata/common/utils/AiravataUtils.java | 6 + .../core/monitor/AiravataJobStatusUpdator.java | 16 +- .../core/monitor/AiravataTaskStatusUpdator.java | 14 +- .../AiravataWorkflowNodeStatusUpdator.java | 18 +- .../airavata/messaging/core/Publisher.java | 10 +- .../core/impl/AiravataRabbitMQPublisher.java | 19 +- 13 files changed, 529 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java index aff0e07..9c8bded 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java @@ -20,14 +20,16 @@ */ package org.apache.airavata.api.server.listener; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; import java.util.Calendar; import org.apache.airavata.api.server.util.DataModelUtils; +import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.messaging.core.Publisher; -import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; -import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent; +import org.apache.airavata.model.messaging.event.*; import org.apache.airavata.model.util.ExecutionType; import org.apache.airavata.model.workspace.experiment.Experiment; import org.apache.airavata.model.workspace.experiment.ExperimentState; @@ -87,7 +89,16 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener updateExperimentStatus(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), state); logger.debug("Publishing experiment status for "+nodeStatus.getWorkflowNodeIdentity().getExperimentId()+":"+state.toString()); monitorPublisher.publish(new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId())); - publisher.publish(new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId())); + ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId()); + Message message = new Message(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(experimentStatusChangeEvent); + message.setEvent(baos.toByteArray()); + message.setMessageType(MessageType.EXPERIMENT); + message.setMessageLevel(MessageLevel.INFO); + message.setMessageId(AiravataUtils.getId("EXP")); + publisher.publish(message); } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp index a02179a..5086851 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.cpp @@ -28,6 +28,8 @@ namespace apache { namespace airavata { namespace model { namespace messaging { const messagingEventsConstants g_messagingEvents_constants; messagingEventsConstants::messagingEventsConstants() { + DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS"; + } }}}}} // namespace http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h index c91fd31..b6d7dbc 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_constants.h @@ -32,6 +32,7 @@ class messagingEventsConstants { public: messagingEventsConstants(); + std::string DEFAULT_ID; }; extern const messagingEventsConstants g_messagingEvents_constants; http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp index a96736f..fae1f5d 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp @@ -27,6 +27,34 @@ namespace apache { namespace airavata { namespace model { namespace messaging { namespace event { +int _kMessageLevelValues[] = { + MessageLevel::INFO, + MessageLevel::DEBUG, + MessageLevel::ERROR, + MessageLevel::ACK +}; +const char* _kMessageLevelNames[] = { + "INFO", + "DEBUG", + "ERROR", + "ACK" +}; +const std::map<int, const char*> _MessageLevel_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(4, _kMessageLevelValues, _kMessageLevelNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); + +int _kMessageTypeValues[] = { + MessageType::EXPERIMENT, + MessageType::TASK, + MessageType::WORKFLOWNODE, + MessageType::JOB +}; +const char* _kMessageTypeNames[] = { + "EXPERIMENT", + "TASK", + "WORKFLOWNODE", + "JOB" +}; +const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(4, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); + const char* ExperimentStatusChangeEvent::ascii_fingerprint = "19B5240589E680301A7E32DF3971EFBE"; const uint8_t ExperimentStatusChangeEvent::binary_fingerprint[16] = {0x19,0xB5,0x24,0x05,0x89,0xE6,0x80,0x30,0x1A,0x7E,0x32,0xDF,0x39,0x71,0xEF,0xBE}; @@ -743,4 +771,133 @@ void swap(JobStatusChangeEvent &a, JobStatusChangeEvent &b) { swap(a.jobIdentity, b.jobIdentity); } +const char* Message::ascii_fingerprint = "6904C391426E568AF9DEAF69860C076A"; +const uint8_t Message::binary_fingerprint[16] = {0x69,0x04,0xC3,0x91,0x42,0x6E,0x56,0x8A,0xF9,0xDE,0xAF,0x69,0x86,0x0C,0x07,0x6A}; + +uint32_t Message::read(::apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + bool isset_event = false; + bool isset_messageId = false; + bool isset_messageType = false; + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readBinary(this->event); + isset_event = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->messageId); + isset_messageId = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_I32) { + int32_t ecast10; + xfer += iprot->readI32(ecast10); + this->messageType = (MessageType::type)ecast10; + isset_messageType = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->updatedTime); + this->__isset.updatedTime = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 5: + if (ftype == ::apache::thrift::protocol::T_I32) { + int32_t ecast11; + xfer += iprot->readI32(ecast11); + this->messageLevel = (MessageLevel::type)ecast11; + this->__isset.messageLevel = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + if (!isset_event) + throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_messageId) + throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_messageType) + throw TProtocolException(TProtocolException::INVALID_DATA); + return xfer; +} + +uint32_t Message::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("Message"); + + xfer += oprot->writeFieldBegin("event", ::apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeBinary(this->event); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("messageId", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString(this->messageId); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("messageType", ::apache::thrift::protocol::T_I32, 3); + xfer += oprot->writeI32((int32_t)this->messageType); + xfer += oprot->writeFieldEnd(); + + if (this->__isset.updatedTime) { + xfer += oprot->writeFieldBegin("updatedTime", ::apache::thrift::protocol::T_I64, 4); + xfer += oprot->writeI64(this->updatedTime); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.messageLevel) { + xfer += oprot->writeFieldBegin("messageLevel", ::apache::thrift::protocol::T_I32, 5); + xfer += oprot->writeI32((int32_t)this->messageLevel); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(Message &a, Message &b) { + using ::std::swap; + swap(a.event, b.event); + swap(a.messageId, b.messageId); + swap(a.messageType, b.messageType); + swap(a.updatedTime, b.updatedTime); + swap(a.messageLevel, b.messageLevel); + swap(a.__isset, b.__isset); +} + }}}}} // namespace http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h index ad3d052..fe80785 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h @@ -35,6 +35,28 @@ namespace apache { namespace airavata { namespace model { namespace messaging { namespace event { +struct MessageLevel { + enum type { + INFO = 0, + DEBUG = 1, + ERROR = 2, + ACK = 3 + }; +}; + +extern const std::map<int, const char*> _MessageLevel_VALUES_TO_NAMES; + +struct MessageType { + enum type { + EXPERIMENT = 0, + TASK = 1, + WORKFLOWNODE = 2, + JOB = 3 + }; +}; + +extern const std::map<int, const char*> _MessageType_VALUES_TO_NAMES; + class ExperimentStatusChangeEvent { public: @@ -408,6 +430,84 @@ class JobStatusChangeEvent { void swap(JobStatusChangeEvent &a, JobStatusChangeEvent &b); +typedef struct _Message__isset { + _Message__isset() : updatedTime(false), messageLevel(false) {} + bool updatedTime; + bool messageLevel; +} _Message__isset; + +class Message { + public: + + static const char* ascii_fingerprint; // = "6904C391426E568AF9DEAF69860C076A"; + static const uint8_t binary_fingerprint[16]; // = {0x69,0x04,0xC3,0x91,0x42,0x6E,0x56,0x8A,0xF9,0xDE,0xAF,0x69,0x86,0x0C,0x07,0x6A}; + + Message() : event(), messageId("DO_NOT_SET_AT_CLIENTS"), messageType((MessageType::type)0), updatedTime(0), messageLevel((MessageLevel::type)0) { + } + + virtual ~Message() throw() {} + + std::string event; + std::string messageId; + MessageType::type messageType; + int64_t updatedTime; + MessageLevel::type messageLevel; + + _Message__isset __isset; + + void __set_event(const std::string& val) { + event = val; + } + + void __set_messageId(const std::string& val) { + messageId = val; + } + + void __set_messageType(const MessageType::type val) { + messageType = val; + } + + void __set_updatedTime(const int64_t val) { + updatedTime = val; + __isset.updatedTime = true; + } + + void __set_messageLevel(const MessageLevel::type val) { + messageLevel = val; + __isset.messageLevel = true; + } + + bool operator == (const Message & rhs) const + { + if (!(event == rhs.event)) + return false; + if (!(messageId == rhs.messageId)) + return false; + if (!(messageType == rhs.messageType)) + return false; + if (__isset.updatedTime != rhs.__isset.updatedTime) + return false; + else if (__isset.updatedTime && !(updatedTime == rhs.updatedTime)) + return false; + if (__isset.messageLevel != rhs.__isset.messageLevel) + return false; + else if (__isset.messageLevel && !(messageLevel == rhs.messageLevel)) + return false; + return true; + } + bool operator != (const Message &rhs) const { + return !(*this == rhs); + } + + bool operator < (const Message & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +void swap(Message &a, Message &b); + }}}}} // namespace #endif http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php index 7bcf528..989e7b2 100644 --- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php +++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php @@ -17,6 +17,32 @@ use Thrift\Protocol\TBinaryProtocolAccelerated; use Thrift\Exception\TApplicationException; +final class MessageLevel { + const INFO = 0; + const DEBUG = 1; + const ERROR = 2; + const ACK = 3; + static public $__names = array( + 0 => 'INFO', + 1 => 'DEBUG', + 2 => 'ERROR', + 3 => 'ACK', + ); +} + +final class MessageType { + const EXPERIMENT = 0; + const TASK = 1; + const WORKFLOWNODE = 2; + const JOB = 3; + static public $__names = array( + 0 => 'EXPERIMENT', + 1 => 'TASK', + 2 => 'WORKFLOWNODE', + 3 => 'JOB', + ); +} + class ExperimentStatusChangeEvent { static $_TSPEC; @@ -861,4 +887,158 @@ class JobStatusChangeEvent { } +class Message { + static $_TSPEC; + + public $event = null; + public $messageId = "DO_NOT_SET_AT_CLIENTS"; + public $messageType = null; + public $updatedTime = null; + public $messageLevel = null; + + public function __construct($vals=null) { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + 1 => array( + 'var' => 'event', + 'type' => TType::STRING, + ), + 2 => array( + 'var' => 'messageId', + 'type' => TType::STRING, + ), + 3 => array( + 'var' => 'messageType', + 'type' => TType::I32, + ), + 4 => array( + 'var' => 'updatedTime', + 'type' => TType::I64, + ), + 5 => array( + 'var' => 'messageLevel', + 'type' => TType::I32, + ), + ); + } + if (is_array($vals)) { + if (isset($vals['event'])) { + $this->event = $vals['event']; + } + if (isset($vals['messageId'])) { + $this->messageId = $vals['messageId']; + } + if (isset($vals['messageType'])) { + $this->messageType = $vals['messageType']; + } + if (isset($vals['updatedTime'])) { + $this->updatedTime = $vals['updatedTime']; + } + if (isset($vals['messageLevel'])) { + $this->messageLevel = $vals['messageLevel']; + } + } + } + + public function getName() { + return 'Message'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + case 1: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->event); + } else { + $xfer += $input->skip($ftype); + } + break; + case 2: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->messageId); + } else { + $xfer += $input->skip($ftype); + } + break; + case 3: + if ($ftype == TType::I32) { + $xfer += $input->readI32($this->messageType); + } else { + $xfer += $input->skip($ftype); + } + break; + case 4: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->updatedTime); + } else { + $xfer += $input->skip($ftype); + } + break; + case 5: + if ($ftype == TType::I32) { + $xfer += $input->readI32($this->messageLevel); + } else { + $xfer += $input->skip($ftype); + } + break; + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('Message'); + if ($this->event !== null) { + $xfer += $output->writeFieldBegin('event', TType::STRING, 1); + $xfer += $output->writeString($this->event); + $xfer += $output->writeFieldEnd(); + } + if ($this->messageId !== null) { + $xfer += $output->writeFieldBegin('messageId', TType::STRING, 2); + $xfer += $output->writeString($this->messageId); + $xfer += $output->writeFieldEnd(); + } + if ($this->messageType !== null) { + $xfer += $output->writeFieldBegin('messageType', TType::I32, 3); + $xfer += $output->writeI32($this->messageType); + $xfer += $output->writeFieldEnd(); + } + if ($this->updatedTime !== null) { + $xfer += $output->writeFieldBegin('updatedTime', TType::I64, 4); + $xfer += $output->writeI64($this->updatedTime); + $xfer += $output->writeFieldEnd(); + } + if ($this->messageLevel !== null) { + $xfer += $output->writeFieldBegin('messageLevel', TType::I32, 5); + $xfer += $output->writeI32($this->messageLevel); + $xfer += $output->writeFieldEnd(); + } + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} + +$GLOBALS['messagingEvents_CONSTANTS']['DEFAULT_ID'] = "DO_NOT_SET_AT_CLIENTS"; + http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/airavata-api/thrift-interface-descriptions/messagingEvents.thrift ---------------------------------------------------------------------- diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift index 38c25a4..8eaddcb 100644 --- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift +++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift @@ -24,6 +24,22 @@ namespace java org.apache.airavata.model.messaging.event namespace php Airavata.Model.Messaging.Event namespace cpp apache.airavata.model.messaging.event +const string DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS" + +enum MessageLevel { + INFO, + DEBUG, + ERROR, + ACK +} + +enum MessageType { + EXPERIMENT, + TASK, + WORKFLOWNODE, + JOB +} + struct ExperimentStatusChangeEvent { 1: required experimentModel.ExperimentState state; 2: required string experimentId; @@ -80,6 +96,14 @@ struct JobStatusChangeEvent { // 3: required JobMonitor jobMonitor; } +struct Message { + 1: required binary event; + 2: required string messageId = DEFAULT_ID; + 3: required MessageType messageType; + 4: optional i64 updatedTime; + 5: optional MessageLevel messageLevel; +} + http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java index 6c29313..99b600f 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataUtils.java @@ -23,6 +23,7 @@ package org.apache.airavata.common.utils; import java.sql.Timestamp; import java.util.Calendar; +import java.util.UUID; public class AiravataUtils { public static final String EXECUTION_MODE="application.execution.mode"; @@ -66,4 +67,9 @@ public class AiravataUtils { } return new Timestamp(time); } + + public static String getId (String name){ + String id = name.replaceAll("\\s", ""); + return id + "_" + UUID.randomUUID(); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java index d142ceb..5f9e36f 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java @@ -20,12 +20,15 @@ */ package org.apache.airavata.gfac.core.monitor; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; import java.util.Calendar; +import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.messaging.core.Publisher; -import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.*; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.airavata.registry.cpi.CompositeIdentifier; @@ -67,7 +70,16 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { updateJobStatus(taskID, jobID, state); logger.debug("Publishing job status for "+jobStatus.getJobIdentity().getJobId()+":"+state.toString()); monitorPublisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity())); - publisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity())); + JobStatusChangeEvent changeEvent = new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()); + Message message = new Message(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(changeEvent); + message.setEvent(baos.toByteArray()); + message.setMessageType(MessageType.JOB); + message.setMessageLevel(MessageLevel.INFO); + message.setMessageId(AiravataUtils.getId("JOB")); + publisher.publish(message); } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java index f4e6241..dd91c61 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java @@ -20,8 +20,11 @@ */ package org.apache.airavata.gfac.core.monitor; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; import java.util.Calendar; +import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.messaging.core.Publisher; @@ -93,7 +96,16 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { jobStatus.getJobIdentity().getWorkflowNodeId(), jobStatus.getJobIdentity().getExperimentId()); monitorPublisher.publish(new TaskStatusChangeEvent(state, taskIdentity)); - publisher.publish(new TaskStatusChangeEvent(state, taskIdentity)); + TaskStatusChangeEvent changeEvent = new TaskStatusChangeEvent(state, taskIdentity); + Message message = new Message(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(changeEvent); + message.setEvent(baos.toByteArray()); + message.setMessageType(MessageType.TASK); + message.setMessageLevel(MessageLevel.INFO); + message.setMessageId(AiravataUtils.getId("TASK")); + publisher.publish(message); } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java index fe24bd0..5b9a5ed 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java @@ -20,14 +20,15 @@ */ package org.apache.airavata.gfac.core.monitor; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; import java.util.Calendar; +import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.messaging.core.Publisher; -import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; -import org.apache.airavata.model.messaging.event.WorkflowIdentity; -import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent; +import org.apache.airavata.model.messaging.event.*; import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus; @@ -80,7 +81,16 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen logger.debug("Publishing workflow node status for "+taskStatus.getTaskIdentity().getWorkflowNodeId()+":"+state.toString()); WorkflowIdentity workflowIdentity = new WorkflowIdentity(taskStatus.getTaskIdentity().getWorkflowNodeId(), taskStatus.getTaskIdentity().getExperimentId()); monitorPublisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity)); - publisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity)); + WorkflowNodeStatusChangeEvent changeEvent = new WorkflowNodeStatusChangeEvent(state, workflowIdentity); + Message message = new Message(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(changeEvent); + message.setEvent(baos.toByteArray()); + message.setMessageType(MessageType.WORKFLOWNODE); + message.setMessageLevel(MessageLevel.INFO); + message.setMessageId(AiravataUtils.getId("NODE")); + publisher.publish(message); } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java index 24cfe59..24c8e2a 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java @@ -1,13 +1,7 @@ package org.apache.airavata.messaging.core; -import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; -import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; -import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; -import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent; +import org.apache.airavata.model.messaging.event.*; public interface Publisher { - public void publish(ExperimentStatusChangeEvent event); - public void publish(WorkflowNodeStatusChangeEvent event); - public void publish(TaskStatusChangeEvent event); - public void publish(JobStatusChangeEvent event); + public void publish(Message message); } http://git-wip-us.apache.org/repos/asf/airavata/blob/88d8638f/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java index eaa6158..0fda442 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java @@ -22,10 +22,7 @@ package org.apache.airavata.messaging.core.impl; import org.apache.airavata.messaging.core.Publisher; -import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; -import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; -import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; -import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent; +import org.apache.airavata.model.messaging.event.*; public class AiravataRabbitMQPublisher implements Publisher { private String brokerUrl; @@ -39,19 +36,7 @@ public class AiravataRabbitMQPublisher implements Publisher { RabbitMQProducer rabbitMQProducer = new RabbitMQProducer(brokerUrl, routingKey, exchangeName, prefetchCount, isRequeueOnFail); } - public void publish(ExperimentStatusChangeEvent event) { - - } - - public void publish(WorkflowNodeStatusChangeEvent event) { - - } - - public void publish(TaskStatusChangeEvent event) { - - } - - public void publish(JobStatusChangeEvent event) { + public void publish(Message message) { } }
