implementing passive gfac submitter using rabbbitmq
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/88d27d95 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/88d27d95 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/88d27d95 Branch: refs/heads/master Commit: 88d27d9574f9b077d334eabde147a0787b186899 Parents: 30aefc4 Author: Lahiru Gunathilake <[email protected]> Authored: Tue Feb 10 14:04:13 2015 -0500 Committer: Lahiru Gunathilake <[email protected]> Committed: Tue Feb 10 14:04:13 2015 -0500 ---------------------------------------------------------------------- .../server/handler/AiravataServerHandler.java | 2 +- .../lib/airavata/messagingEvents_types.cpp | 186 +++++- .../lib/airavata/messagingEvents_types.h | 99 +++- .../Airavata/Model/Messaging/Event/Types.php | 208 +++++++ .../model/messaging/event/MessageType.java | 8 +- .../model/messaging/event/TaskSubmitEvent.java | 588 +++++++++++++++++++ .../messaging/event/TaskTerminateEvent.java | 492 ++++++++++++++++ airavata-api/generate-thrift-files.sh | 22 +- .../messagingEvents.thrift | 15 +- .../airavata/common/utils/ServerSettings.java | 5 + .../main/resources/airavata-server.properties | 4 +- .../main/resources/airavata-server.properties | 4 +- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 2 +- .../messaging/core/PublisherFactory.java | 21 +- .../messaging/core/impl/RabbitMQPublisher.java | 99 ---- .../core/impl/RabbitMQStatusPublisher.java | 99 ++++ .../core/impl/RabbitMQTaskLaunchPublisher.java | 88 +++ .../server/OrchestratorServerHandler.java | 3 +- modules/orchestrator/orchestrator-core/pom.xml | 14 +- .../core/context/OrchestratorContext.java | 11 + .../core/impl/GFACPassiveJobSubmitter.java | 247 ++++++++ .../core/impl/GFACRPCJobSubmitter.java | 212 +++++++ .../core/impl/GFACServiceJobSubmitter.java | 212 ------- .../workflow/engine/WorkflowEngineImpl.java | 4 +- pom.xml | 7 + 25 files changed, 2314 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java index 8e2ca17..8d1cd75 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java @@ -118,7 +118,7 @@ public class AiravataServerHandler implements Airavata.Iface { public AiravataServerHandler() { try { if (ServerSettings.isRabbitMqPublishEnabled()) { - publisher = PublisherFactory.createPublisher(); + publisher = PublisherFactory.createActivityPublisher(); } } catch (ApplicationSettingsException e) { logger.error("Error occured while reading airavata-server properties..", e); http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp index 1f839f3..a2e72f5 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp @@ -45,15 +45,19 @@ int _kMessageTypeValues[] = { MessageType::EXPERIMENT, MessageType::TASK, MessageType::WORKFLOWNODE, - MessageType::JOB + MessageType::JOB, + MessageType::LAUNCHTASK, + MessageType::TERMINATETASK }; const char* _kMessageTypeNames[] = { "EXPERIMENT", "TASK", "WORKFLOWNODE", - "JOB" + "JOB", + "LAUNCHTASK", + "TERMINATETASK" }; -const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(4, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(6, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); const char* ExperimentStatusChangeEvent::ascii_fingerprint = "38C252E94E93B69D04EB3A6EE2F9EDFB"; const uint8_t ExperimentStatusChangeEvent::binary_fingerprint[16] = {0x38,0xC2,0x52,0xE9,0x4E,0x93,0xB6,0x9D,0x04,0xEB,0x3A,0x6E,0xE2,0xF9,0xED,0xFB}; @@ -835,6 +839,182 @@ void swap(JobIdentifier &a, JobIdentifier &b) { swap(a.gatewayId, b.gatewayId); } +const char* TaskSubmitEvent::ascii_fingerprint = "AB879940BD15B6B25691265F7384B271"; +const uint8_t TaskSubmitEvent::binary_fingerprint[16] = {0xAB,0x87,0x99,0x40,0xBD,0x15,0xB6,0xB2,0x56,0x91,0x26,0x5F,0x73,0x84,0xB2,0x71}; + +uint32_t TaskSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + bool isset_experimentId = false; + bool isset_taskId = false; + bool isset_gatewayId = false; + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->experimentId); + isset_experimentId = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->taskId); + isset_taskId = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->gatewayId); + isset_gatewayId = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + if (!isset_experimentId) + throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_taskId) + throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_gatewayId) + throw TProtocolException(TProtocolException::INVALID_DATA); + return xfer; +} + +uint32_t TaskSubmitEvent::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("TaskSubmitEvent"); + + xfer += oprot->writeFieldBegin("experimentId", ::apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeString(this->experimentId); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("taskId", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString(this->taskId); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("gatewayId", ::apache::thrift::protocol::T_STRING, 3); + xfer += oprot->writeString(this->gatewayId); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(TaskSubmitEvent &a, TaskSubmitEvent &b) { + using ::std::swap; + swap(a.experimentId, b.experimentId); + swap(a.taskId, b.taskId); + swap(a.gatewayId, b.gatewayId); +} + +const char* TaskTerminateEvent::ascii_fingerprint = "07A9615F837F7D0A952B595DD3020972"; +const uint8_t TaskTerminateEvent::binary_fingerprint[16] = {0x07,0xA9,0x61,0x5F,0x83,0x7F,0x7D,0x0A,0x95,0x2B,0x59,0x5D,0xD3,0x02,0x09,0x72}; + +uint32_t TaskTerminateEvent::read(::apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + bool isset_experimentId = false; + bool isset_taskId = false; + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->experimentId); + isset_experimentId = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->taskId); + isset_taskId = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + if (!isset_experimentId) + throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_taskId) + throw TProtocolException(TProtocolException::INVALID_DATA); + return xfer; +} + +uint32_t TaskTerminateEvent::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("TaskTerminateEvent"); + + xfer += oprot->writeFieldBegin("experimentId", ::apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeString(this->experimentId); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("taskId", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString(this->taskId); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(TaskTerminateEvent &a, TaskTerminateEvent &b) { + using ::std::swap; + swap(a.experimentId, b.experimentId); + swap(a.taskId, b.taskId); +} + const char* JobStatusChangeEvent::ascii_fingerprint = "062775D589B60D1687103FD465B0F5E8"; const uint8_t JobStatusChangeEvent::binary_fingerprint[16] = {0x06,0x27,0x75,0xD5,0x89,0xB6,0x0D,0x16,0x87,0x10,0x3F,0xD4,0x65,0xB0,0xF5,0xE8}; http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h index 572a8bd..f063fc2 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h @@ -52,7 +52,9 @@ struct MessageType { EXPERIMENT = 0, TASK = 1, WORKFLOWNODE = 2, - JOB = 3 + JOB = 3, + LAUNCHTASK = 4, + TERMINATETASK = 5 }; }; @@ -460,6 +462,101 @@ class JobIdentifier { void swap(JobIdentifier &a, JobIdentifier &b); +class TaskSubmitEvent { + public: + + static const char* ascii_fingerprint; // = "AB879940BD15B6B25691265F7384B271"; + static const uint8_t binary_fingerprint[16]; // = {0xAB,0x87,0x99,0x40,0xBD,0x15,0xB6,0xB2,0x56,0x91,0x26,0x5F,0x73,0x84,0xB2,0x71}; + + TaskSubmitEvent() : experimentId(), taskId(), gatewayId() { + } + + virtual ~TaskSubmitEvent() throw() {} + + std::string experimentId; + std::string taskId; + std::string gatewayId; + + void __set_experimentId(const std::string& val) { + experimentId = val; + } + + void __set_taskId(const std::string& val) { + taskId = val; + } + + void __set_gatewayId(const std::string& val) { + gatewayId = val; + } + + bool operator == (const TaskSubmitEvent & rhs) const + { + if (!(experimentId == rhs.experimentId)) + return false; + if (!(taskId == rhs.taskId)) + return false; + if (!(gatewayId == rhs.gatewayId)) + return false; + return true; + } + bool operator != (const TaskSubmitEvent &rhs) const { + return !(*this == rhs); + } + + bool operator < (const TaskSubmitEvent & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +void swap(TaskSubmitEvent &a, TaskSubmitEvent &b); + + +class TaskTerminateEvent { + public: + + static const char* ascii_fingerprint; // = "07A9615F837F7D0A952B595DD3020972"; + static const uint8_t binary_fingerprint[16]; // = {0x07,0xA9,0x61,0x5F,0x83,0x7F,0x7D,0x0A,0x95,0x2B,0x59,0x5D,0xD3,0x02,0x09,0x72}; + + TaskTerminateEvent() : experimentId(), taskId() { + } + + virtual ~TaskTerminateEvent() throw() {} + + std::string experimentId; + std::string taskId; + + void __set_experimentId(const std::string& val) { + experimentId = val; + } + + void __set_taskId(const std::string& val) { + taskId = val; + } + + bool operator == (const TaskTerminateEvent & rhs) const + { + if (!(experimentId == rhs.experimentId)) + return false; + if (!(taskId == rhs.taskId)) + return false; + return true; + } + bool operator != (const TaskTerminateEvent &rhs) const { + return !(*this == rhs); + } + + bool operator < (const TaskTerminateEvent & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +void swap(TaskTerminateEvent &a, TaskTerminateEvent &b); + + class JobStatusChangeEvent { public: http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php index d20392a..40810d3 100644 --- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php +++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php @@ -35,11 +35,15 @@ final class MessageType { const TASK = 1; const WORKFLOWNODE = 2; const JOB = 3; + const LAUNCHTASK = 4; + const TERMINATETASK = 5; static public $__names = array( 0 => 'EXPERIMENT', 1 => 'TASK', 2 => 'WORKFLOWNODE', 3 => 'JOB', + 4 => 'LAUNCHTASK', + 5 => 'TERMINATETASK', ); } @@ -967,6 +971,210 @@ class JobIdentifier { } +class TaskSubmitEvent { + static $_TSPEC; + + public $experimentId = null; + public $taskId = null; + public $gatewayId = null; + + public function __construct($vals=null) { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + 1 => array( + 'var' => 'experimentId', + 'type' => TType::STRING, + ), + 2 => array( + 'var' => 'taskId', + 'type' => TType::STRING, + ), + 3 => array( + 'var' => 'gatewayId', + 'type' => TType::STRING, + ), + ); + } + if (is_array($vals)) { + if (isset($vals['experimentId'])) { + $this->experimentId = $vals['experimentId']; + } + if (isset($vals['taskId'])) { + $this->taskId = $vals['taskId']; + } + if (isset($vals['gatewayId'])) { + $this->gatewayId = $vals['gatewayId']; + } + } + } + + public function getName() { + return 'TaskSubmitEvent'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + case 1: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->experimentId); + } else { + $xfer += $input->skip($ftype); + } + break; + case 2: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->taskId); + } else { + $xfer += $input->skip($ftype); + } + break; + case 3: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->gatewayId); + } else { + $xfer += $input->skip($ftype); + } + break; + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('TaskSubmitEvent'); + if ($this->experimentId !== null) { + $xfer += $output->writeFieldBegin('experimentId', TType::STRING, 1); + $xfer += $output->writeString($this->experimentId); + $xfer += $output->writeFieldEnd(); + } + if ($this->taskId !== null) { + $xfer += $output->writeFieldBegin('taskId', TType::STRING, 2); + $xfer += $output->writeString($this->taskId); + $xfer += $output->writeFieldEnd(); + } + if ($this->gatewayId !== null) { + $xfer += $output->writeFieldBegin('gatewayId', TType::STRING, 3); + $xfer += $output->writeString($this->gatewayId); + $xfer += $output->writeFieldEnd(); + } + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} + +class TaskTerminateEvent { + static $_TSPEC; + + public $experimentId = null; + public $taskId = null; + + public function __construct($vals=null) { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + 1 => array( + 'var' => 'experimentId', + 'type' => TType::STRING, + ), + 2 => array( + 'var' => 'taskId', + 'type' => TType::STRING, + ), + ); + } + if (is_array($vals)) { + if (isset($vals['experimentId'])) { + $this->experimentId = $vals['experimentId']; + } + if (isset($vals['taskId'])) { + $this->taskId = $vals['taskId']; + } + } + } + + public function getName() { + return 'TaskTerminateEvent'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + case 1: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->experimentId); + } else { + $xfer += $input->skip($ftype); + } + break; + case 2: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->taskId); + } else { + $xfer += $input->skip($ftype); + } + break; + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('TaskTerminateEvent'); + if ($this->experimentId !== null) { + $xfer += $output->writeFieldBegin('experimentId', TType::STRING, 1); + $xfer += $output->writeString($this->experimentId); + $xfer += $output->writeFieldEnd(); + } + if ($this->taskId !== null) { + $xfer += $output->writeFieldBegin('taskId', TType::STRING, 2); + $xfer += $output->writeString($this->taskId); + $xfer += $output->writeFieldEnd(); + } + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} + class JobStatusChangeEvent { static $_TSPEC; http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java index d00f404..230b87b 100644 --- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java +++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java @@ -32,7 +32,9 @@ import org.apache.thrift.TEnum; EXPERIMENT(0), TASK(1), WORKFLOWNODE(2), - JOB(3); + JOB(3), + LAUNCHTASK(4), + TERMINATETASK(5); private final int value; @@ -61,6 +63,10 @@ import org.apache.thrift.TEnum; return WORKFLOWNODE; case 3: return JOB; + case 4: + return LAUNCHTASK; + case 5: + return TERMINATETASK; default: return null; } http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java new file mode 100644 index 0000000..c813c76 --- /dev/null +++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java @@ -0,0 +1,588 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Autogenerated by Thrift Compiler (0.9.1) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.airavata.model.messaging.event; + +import org.apache.thrift.scheme.IScheme; +import org.apache.thrift.scheme.SchemeFactory; +import org.apache.thrift.scheme.StandardScheme; + +import org.apache.thrift.scheme.TupleScheme; +import org.apache.thrift.protocol.TTupleProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.server.AbstractNonblockingServer.*; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("all") public class TaskSubmitEvent implements org.apache.thrift.TBase<TaskSubmitEvent, TaskSubmitEvent._Fields>, java.io.Serializable, Cloneable, Comparable<TaskSubmitEvent> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TaskSubmitEvent"); + + private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1); + private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)2); + private static final org.apache.thrift.protocol.TField GATEWAY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("gatewayId", org.apache.thrift.protocol.TType.STRING, (short)3); + + private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new TaskSubmitEventStandardSchemeFactory()); + schemes.put(TupleScheme.class, new TaskSubmitEventTupleSchemeFactory()); + } + + private String experimentId; // required + private String taskId; // required + private String gatewayId; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { + EXPERIMENT_ID((short)1, "experimentId"), + TASK_ID((short)2, "taskId"), + GATEWAY_ID((short)3, "gatewayId"); + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // EXPERIMENT_ID + return EXPERIMENT_ID; + case 2: // TASK_ID + return TASK_ID; + case 3: // GATEWAY_ID + return GATEWAY_ID; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.EXPERIMENT_ID, new org.apache.thrift.meta_data.FieldMetaData("experimentId", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.TASK_ID, new org.apache.thrift.meta_data.FieldMetaData("taskId", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.GATEWAY_ID, new org.apache.thrift.meta_data.FieldMetaData("gatewayId", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TaskSubmitEvent.class, metaDataMap); + } + + public TaskSubmitEvent() { + } + + public TaskSubmitEvent( + String experimentId, + String taskId, + String gatewayId) + { + this(); + this.experimentId = experimentId; + this.taskId = taskId; + this.gatewayId = gatewayId; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public TaskSubmitEvent(TaskSubmitEvent other) { + if (other.isSetExperimentId()) { + this.experimentId = other.experimentId; + } + if (other.isSetTaskId()) { + this.taskId = other.taskId; + } + if (other.isSetGatewayId()) { + this.gatewayId = other.gatewayId; + } + } + + public TaskSubmitEvent deepCopy() { + return new TaskSubmitEvent(this); + } + + @Override + public void clear() { + this.experimentId = null; + this.taskId = null; + this.gatewayId = null; + } + + public String getExperimentId() { + return this.experimentId; + } + + public void setExperimentId(String experimentId) { + this.experimentId = experimentId; + } + + public void unsetExperimentId() { + this.experimentId = null; + } + + /** Returns true if field experimentId is set (has been assigned a value) and false otherwise */ + public boolean isSetExperimentId() { + return this.experimentId != null; + } + + public void setExperimentIdIsSet(boolean value) { + if (!value) { + this.experimentId = null; + } + } + + public String getTaskId() { + return this.taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public void unsetTaskId() { + this.taskId = null; + } + + /** Returns true if field taskId is set (has been assigned a value) and false otherwise */ + public boolean isSetTaskId() { + return this.taskId != null; + } + + public void setTaskIdIsSet(boolean value) { + if (!value) { + this.taskId = null; + } + } + + public String getGatewayId() { + return this.gatewayId; + } + + public void setGatewayId(String gatewayId) { + this.gatewayId = gatewayId; + } + + public void unsetGatewayId() { + this.gatewayId = null; + } + + /** Returns true if field gatewayId is set (has been assigned a value) and false otherwise */ + public boolean isSetGatewayId() { + return this.gatewayId != null; + } + + public void setGatewayIdIsSet(boolean value) { + if (!value) { + this.gatewayId = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case EXPERIMENT_ID: + if (value == null) { + unsetExperimentId(); + } else { + setExperimentId((String)value); + } + break; + + case TASK_ID: + if (value == null) { + unsetTaskId(); + } else { + setTaskId((String)value); + } + break; + + case GATEWAY_ID: + if (value == null) { + unsetGatewayId(); + } else { + setGatewayId((String)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case EXPERIMENT_ID: + return getExperimentId(); + + case TASK_ID: + return getTaskId(); + + case GATEWAY_ID: + return getGatewayId(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case EXPERIMENT_ID: + return isSetExperimentId(); + case TASK_ID: + return isSetTaskId(); + case GATEWAY_ID: + return isSetGatewayId(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof TaskSubmitEvent) + return this.equals((TaskSubmitEvent)that); + return false; + } + + public boolean equals(TaskSubmitEvent that) { + if (that == null) + return false; + + boolean this_present_experimentId = true && this.isSetExperimentId(); + boolean that_present_experimentId = true && that.isSetExperimentId(); + if (this_present_experimentId || that_present_experimentId) { + if (!(this_present_experimentId && that_present_experimentId)) + return false; + if (!this.experimentId.equals(that.experimentId)) + return false; + } + + boolean this_present_taskId = true && this.isSetTaskId(); + boolean that_present_taskId = true && that.isSetTaskId(); + if (this_present_taskId || that_present_taskId) { + if (!(this_present_taskId && that_present_taskId)) + return false; + if (!this.taskId.equals(that.taskId)) + return false; + } + + boolean this_present_gatewayId = true && this.isSetGatewayId(); + boolean that_present_gatewayId = true && that.isSetGatewayId(); + if (this_present_gatewayId || that_present_gatewayId) { + if (!(this_present_gatewayId && that_present_gatewayId)) + return false; + if (!this.gatewayId.equals(that.gatewayId)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public int compareTo(TaskSubmitEvent other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetExperimentId()).compareTo(other.isSetExperimentId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetExperimentId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.experimentId, other.experimentId); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetTaskId()).compareTo(other.isSetTaskId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTaskId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.taskId, other.taskId); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetGatewayId()).compareTo(other.isSetGatewayId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetGatewayId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.gatewayId, other.gatewayId); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("TaskSubmitEvent("); + boolean first = true; + + sb.append("experimentId:"); + if (this.experimentId == null) { + sb.append("null"); + } else { + sb.append(this.experimentId); + } + first = false; + if (!first) sb.append(", "); + sb.append("taskId:"); + if (this.taskId == null) { + sb.append("null"); + } else { + sb.append(this.taskId); + } + first = false; + if (!first) sb.append(", "); + sb.append("gatewayId:"); + if (this.gatewayId == null) { + sb.append("null"); + } else { + sb.append(this.gatewayId); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + if (!isSetExperimentId()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'experimentId' is unset! Struct:" + toString()); + } + + if (!isSetTaskId()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'taskId' is unset! Struct:" + toString()); + } + + if (!isSetGatewayId()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'gatewayId' is unset! Struct:" + toString()); + } + + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class TaskSubmitEventStandardSchemeFactory implements SchemeFactory { + public TaskSubmitEventStandardScheme getScheme() { + return new TaskSubmitEventStandardScheme(); + } + } + + private static class TaskSubmitEventStandardScheme extends StandardScheme<TaskSubmitEvent> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, TaskSubmitEvent struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // EXPERIMENT_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.experimentId = iprot.readString(); + struct.setExperimentIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // TASK_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.taskId = iprot.readString(); + struct.setTaskIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 3: // GATEWAY_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.gatewayId = iprot.readString(); + struct.setGatewayIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, TaskSubmitEvent struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.experimentId != null) { + oprot.writeFieldBegin(EXPERIMENT_ID_FIELD_DESC); + oprot.writeString(struct.experimentId); + oprot.writeFieldEnd(); + } + if (struct.taskId != null) { + oprot.writeFieldBegin(TASK_ID_FIELD_DESC); + oprot.writeString(struct.taskId); + oprot.writeFieldEnd(); + } + if (struct.gatewayId != null) { + oprot.writeFieldBegin(GATEWAY_ID_FIELD_DESC); + oprot.writeString(struct.gatewayId); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class TaskSubmitEventTupleSchemeFactory implements SchemeFactory { + public TaskSubmitEventTupleScheme getScheme() { + return new TaskSubmitEventTupleScheme(); + } + } + + private static class TaskSubmitEventTupleScheme extends TupleScheme<TaskSubmitEvent> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, TaskSubmitEvent struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + oprot.writeString(struct.experimentId); + oprot.writeString(struct.taskId); + oprot.writeString(struct.gatewayId); + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, TaskSubmitEvent struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + struct.experimentId = iprot.readString(); + struct.setExperimentIdIsSet(true); + struct.taskId = iprot.readString(); + struct.setTaskIdIsSet(true); + struct.gatewayId = iprot.readString(); + struct.setGatewayIdIsSet(true); + } + } + +} + http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskTerminateEvent.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskTerminateEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskTerminateEvent.java new file mode 100644 index 0000000..59b9f85 --- /dev/null +++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskTerminateEvent.java @@ -0,0 +1,492 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Autogenerated by Thrift Compiler (0.9.1) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.airavata.model.messaging.event; + +import org.apache.thrift.scheme.IScheme; +import org.apache.thrift.scheme.SchemeFactory; +import org.apache.thrift.scheme.StandardScheme; + +import org.apache.thrift.scheme.TupleScheme; +import org.apache.thrift.protocol.TTupleProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.server.AbstractNonblockingServer.*; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("all") public class TaskTerminateEvent implements org.apache.thrift.TBase<TaskTerminateEvent, TaskTerminateEvent._Fields>, java.io.Serializable, Cloneable, Comparable<TaskTerminateEvent> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TaskTerminateEvent"); + + private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1); + private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)2); + + private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new TaskTerminateEventStandardSchemeFactory()); + schemes.put(TupleScheme.class, new TaskTerminateEventTupleSchemeFactory()); + } + + private String experimentId; // required + private String taskId; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { + EXPERIMENT_ID((short)1, "experimentId"), + TASK_ID((short)2, "taskId"); + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // EXPERIMENT_ID + return EXPERIMENT_ID; + case 2: // TASK_ID + return TASK_ID; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.EXPERIMENT_ID, new org.apache.thrift.meta_data.FieldMetaData("experimentId", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.TASK_ID, new org.apache.thrift.meta_data.FieldMetaData("taskId", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TaskTerminateEvent.class, metaDataMap); + } + + public TaskTerminateEvent() { + } + + public TaskTerminateEvent( + String experimentId, + String taskId) + { + this(); + this.experimentId = experimentId; + this.taskId = taskId; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public TaskTerminateEvent(TaskTerminateEvent other) { + if (other.isSetExperimentId()) { + this.experimentId = other.experimentId; + } + if (other.isSetTaskId()) { + this.taskId = other.taskId; + } + } + + public TaskTerminateEvent deepCopy() { + return new TaskTerminateEvent(this); + } + + @Override + public void clear() { + this.experimentId = null; + this.taskId = null; + } + + public String getExperimentId() { + return this.experimentId; + } + + public void setExperimentId(String experimentId) { + this.experimentId = experimentId; + } + + public void unsetExperimentId() { + this.experimentId = null; + } + + /** Returns true if field experimentId is set (has been assigned a value) and false otherwise */ + public boolean isSetExperimentId() { + return this.experimentId != null; + } + + public void setExperimentIdIsSet(boolean value) { + if (!value) { + this.experimentId = null; + } + } + + public String getTaskId() { + return this.taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public void unsetTaskId() { + this.taskId = null; + } + + /** Returns true if field taskId is set (has been assigned a value) and false otherwise */ + public boolean isSetTaskId() { + return this.taskId != null; + } + + public void setTaskIdIsSet(boolean value) { + if (!value) { + this.taskId = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case EXPERIMENT_ID: + if (value == null) { + unsetExperimentId(); + } else { + setExperimentId((String)value); + } + break; + + case TASK_ID: + if (value == null) { + unsetTaskId(); + } else { + setTaskId((String)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case EXPERIMENT_ID: + return getExperimentId(); + + case TASK_ID: + return getTaskId(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case EXPERIMENT_ID: + return isSetExperimentId(); + case TASK_ID: + return isSetTaskId(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof TaskTerminateEvent) + return this.equals((TaskTerminateEvent)that); + return false; + } + + public boolean equals(TaskTerminateEvent that) { + if (that == null) + return false; + + boolean this_present_experimentId = true && this.isSetExperimentId(); + boolean that_present_experimentId = true && that.isSetExperimentId(); + if (this_present_experimentId || that_present_experimentId) { + if (!(this_present_experimentId && that_present_experimentId)) + return false; + if (!this.experimentId.equals(that.experimentId)) + return false; + } + + boolean this_present_taskId = true && this.isSetTaskId(); + boolean that_present_taskId = true && that.isSetTaskId(); + if (this_present_taskId || that_present_taskId) { + if (!(this_present_taskId && that_present_taskId)) + return false; + if (!this.taskId.equals(that.taskId)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public int compareTo(TaskTerminateEvent other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetExperimentId()).compareTo(other.isSetExperimentId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetExperimentId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.experimentId, other.experimentId); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetTaskId()).compareTo(other.isSetTaskId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTaskId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.taskId, other.taskId); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("TaskTerminateEvent("); + boolean first = true; + + sb.append("experimentId:"); + if (this.experimentId == null) { + sb.append("null"); + } else { + sb.append(this.experimentId); + } + first = false; + if (!first) sb.append(", "); + sb.append("taskId:"); + if (this.taskId == null) { + sb.append("null"); + } else { + sb.append(this.taskId); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + if (!isSetExperimentId()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'experimentId' is unset! Struct:" + toString()); + } + + if (!isSetTaskId()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'taskId' is unset! Struct:" + toString()); + } + + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class TaskTerminateEventStandardSchemeFactory implements SchemeFactory { + public TaskTerminateEventStandardScheme getScheme() { + return new TaskTerminateEventStandardScheme(); + } + } + + private static class TaskTerminateEventStandardScheme extends StandardScheme<TaskTerminateEvent> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, TaskTerminateEvent struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // EXPERIMENT_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.experimentId = iprot.readString(); + struct.setExperimentIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // TASK_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.taskId = iprot.readString(); + struct.setTaskIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, TaskTerminateEvent struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.experimentId != null) { + oprot.writeFieldBegin(EXPERIMENT_ID_FIELD_DESC); + oprot.writeString(struct.experimentId); + oprot.writeFieldEnd(); + } + if (struct.taskId != null) { + oprot.writeFieldBegin(TASK_ID_FIELD_DESC); + oprot.writeString(struct.taskId); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class TaskTerminateEventTupleSchemeFactory implements SchemeFactory { + public TaskTerminateEventTupleScheme getScheme() { + return new TaskTerminateEventTupleScheme(); + } + } + + private static class TaskTerminateEventTupleScheme extends TupleScheme<TaskTerminateEvent> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, TaskTerminateEvent struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + oprot.writeString(struct.experimentId); + oprot.writeString(struct.taskId); + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, TaskTerminateEvent struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + struct.experimentId = iprot.readString(); + struct.setExperimentIdIsSet(true); + struct.taskId = iprot.readString(); + struct.setTaskIdIsSet(true); + } + } + +} + http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/generate-thrift-files.sh ---------------------------------------------------------------------- diff --git a/airavata-api/generate-thrift-files.sh b/airavata-api/generate-thrift-files.sh index 4cd3288..bd823e4 100755 --- a/airavata-api/generate-thrift-files.sh +++ b/airavata-api/generate-thrift-files.sh @@ -27,7 +27,7 @@ DATAMODEL_SRC_DIR='airavata-data-models/src/main/java' JAVA_API_SDK_DIR='airavata-api-stubs/src/main/java' CPP_SDK_DIR='airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/' PHP_SDK_DIR='airavata-client-sdks/airavata-php-sdk/src/main/resources/lib' - +THRIFT_EXEC=thrift # The Function fail prints error messages on failure and quits the script. fail() { echo $@ @@ -96,7 +96,7 @@ copy_changed_files() { # Generation of thrift files will require installing Apache Thrift. Please add thrift to your path. # Verify is thrift is installed, is in the path is at a specified version. -VERSION=$(thrift -version 2>/dev/null | grep -F "${REQUIRED_THRIFT_VERSION}" | wc -l) +VERSION=$($THRIFT_EXEC -version 2>/dev/null | grep -F "${REQUIRED_THRIFT_VERSION}" | wc -l) if [ "$VERSION" -ne 1 ] ; then echo "****************************************************" echo "*** thrift is not installed or is not in the path" @@ -125,11 +125,11 @@ rm -rf ${JAVA_BEAN_GEN_DIR} # Generate the Airavata Data Model using thrift Java Beans generator. This will take generate the classes in bean style # with members being private and setters returning voids. # The airavataDataModel.thrift includes rest of data models. -thrift ${THRIFT_ARGS} --gen java:beans ${THRIFT_IDL_DIR}/airavataDataModel.thrift || fail unable to generate java bean thrift classes on base data model +$THRIFT_EXEC ${THRIFT_ARGS} --gen java:beans ${THRIFT_IDL_DIR}/airavataDataModel.thrift || fail unable to generate java bean thrift classes on base data model -thrift ${THRIFT_ARGS} --gen java:beans ${THRIFT_IDL_DIR}/appCatalogModels.thrift || fail unable to generate java bean thrift classes on app catalog data models +$THRIFT_EXEC ${THRIFT_ARGS} --gen java:beans ${THRIFT_IDL_DIR}/appCatalogModels.thrift || fail unable to generate java bean thrift classes on app catalog data models -thrift ${THRIFT_ARGS} --gen java:beans ${THRIFT_IDL_DIR}/workflowDataModel.thrift || fail unable to generate java bean thrift classes on app workflow data models +$THRIFT_EXEC ${THRIFT_ARGS} --gen java:beans ${THRIFT_IDL_DIR}/workflowDataModel.thrift || fail unable to generate java bean thrift classes on app workflow data models # For the generated java beans add the ASF V2 License header add_license_header $JAVA_BEAN_GEN_DIR @@ -150,9 +150,9 @@ rm -rf ${JAVA_GEN_DIR} # Using thrift Java generator, generate the java classes based on Airavata API. This # The airavataAPI.thrift includes rest of data models. -thrift ${THRIFT_ARGS} --gen java ${THRIFT_IDL_DIR}/airavataAPI.thrift || fail unable to generate java thrift classes on AiravataAPI +$THRIFT_EXEC ${THRIFT_ARGS} --gen java ${THRIFT_IDL_DIR}/airavataAPI.thrift || fail unable to generate java thrift classes on AiravataAPI -#thrift ${THRIFT_ARGS} --gen java ${THRIFT_IDL_DIR}/workflowAPI.thrift || fail unable to generate java thrift classes on WorkflowAPI +#$THRIFT_EXEC ${THRIFT_ARGS} --gen java ${THRIFT_IDL_DIR}/workflowAPI.thrift || fail unable to generate java thrift classes on WorkflowAPI # For the generated java classes add the ASF V2 License header add_license_header $JAVA_GEN_DIR @@ -173,9 +173,9 @@ rm -rf ${CPP_GEN_DIR} # Using thrift Java generator, generate the java classes based on Airavata API. This # The airavataAPI.thrift includes rest of data models. -thrift ${THRIFT_ARGS} --gen cpp ${THRIFT_IDL_DIR}/airavataAPI.thrift || fail unable to generate C++ thrift classes +/usr/local/Cellar/thrift/0.9.1/bin/thrift ${THRIFT_ARGS} --gen cpp ${THRIFT_IDL_DIR}/airavataAPI.thrift || fail unable to generate C++ thrift classes -#thrift ${THRIFT_ARGS} --gen cpp ${THRIFT_IDL_DIR}/workflowAPI.thrift || fail unable to generate C++ thrift classes for WorkflowAPI +#$THRIFT_EXEC ${THRIFT_ARGS} --gen cpp ${THRIFT_IDL_DIR}/workflowAPI.thrift || fail unable to generate C++ thrift classes for WorkflowAPI # For the generated CPP classes add the ASF V2 License header add_license_header $CPP_GEN_DIR @@ -195,9 +195,9 @@ rm -rf ${PHP_GEN_DIR} # Using thrift Java generator, generate the java classes based on Airavata API. This # The airavataAPI.thrift includes rest of data models. -thrift ${THRIFT_ARGS} --gen php:autoload ${THRIFT_IDL_DIR}/airavataAPI.thrift || fail unable to generate PHP thrift classes +$THRIFT_EXEC ${THRIFT_ARGS} --gen php:autoload ${THRIFT_IDL_DIR}/airavataAPI.thrift || fail unable to generate PHP thrift classes -#thrift ${THRIFT_ARGS} --gen php:autoload ${THRIFT_IDL_DIR}/workflowAPI.thrift || fail unable to generate PHP thrift classes for WorkflowAPI +#$THRIFT_EXEC ${THRIFT_ARGS} --gen php:autoload ${THRIFT_IDL_DIR}/workflowAPI.thrift || fail unable to generate PHP thrift classes for WorkflowAPI # For the generated java classes add the ASF V2 License header ## TODO Write PHP license parser http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/airavata-api/thrift-interface-descriptions/messagingEvents.thrift ---------------------------------------------------------------------- diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift index c9f3808..d736701 100644 --- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift +++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift @@ -38,7 +38,9 @@ enum MessageType { EXPERIMENT, TASK, WORKFLOWNODE, - JOB + JOB, + LAUNCHTASK, + TERMINATETASK } struct ExperimentStatusChangeEvent { @@ -100,6 +102,17 @@ struct JobIdentifier { // //8: // } +struct TaskSubmitEvent{ + 1: required string experimentId, + 2: required string taskId, + 3: required string gatewayId +} + +struct TaskTerminateEvent{ + 1: required string experimentId, + 2: required string taskId, +} + struct JobStatusChangeEvent { 1: required experimentModel.JobState state; 2: required JobIdentifier jobIdentity; http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java index 988ad3d..4ea0b44 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java @@ -52,6 +52,7 @@ public class ServerSettings extends ApplicationSettings { private static final String MY_PROXY_PASSWORD = "myproxy.password"; private static final String MY_PROXY_LIFETIME = "myproxy.life"; private static final String ACTIVITY_PUBLISHER = "activity.publisher"; + private static final String TASK_LAUNCH_PUBLISHER = "task.launch.publisher"; private static final String ACTIVITY_LISTENERS = "activity.listeners"; public static final String PUBLISH_RABBITMQ = "publish.rabbitmq"; public static final String JOB_NOTIFICATION_ENABLE = "job.notification.enable"; @@ -154,6 +155,10 @@ public class ServerSettings extends ApplicationSettings { return getSetting(ACTIVITY_PUBLISHER); } + public static String getTaskLaunchPublisher() throws ApplicationSettingsException{ + return getSetting(TASK_LAUNCH_PUBLISHER); + } + public static boolean isRabbitMqPublishEnabled() throws ApplicationSettingsException{ String setting = getSetting(PUBLISH_RABBITMQ); return Boolean.parseBoolean(setting); http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties index c73e61a..c90fab1 100644 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@ -216,6 +216,7 @@ connection.name=xsede activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher publish.rabbitmq=false activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher +task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchPublisher rabbitmq.broker.url=amqp://localhost:5672 rabbitmq.exchange.name=airavata_rabbitmq_exchange @@ -224,7 +225,8 @@ rabbitmq.exchange.name=airavata_rabbitmq_exchange ########################################################################### #job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter -job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter +#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter +job.submitter=org.apache.airavata.orchestrator.core.impl.GFACRPCJobSubmitter job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator submitter.interval=10000 threadpool.size=10 http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties index fb02901..d6be51a 100644 --- a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties +++ b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties @@ -201,6 +201,7 @@ connection.name=xsede activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher publish.rabbitmq=false activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher +task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchPublisher rabbitmq.broker.url=amqp://localhost:5672 rabbitmq.exchange.name=airavata_rabbitmq_exchange @@ -209,7 +210,8 @@ rabbitmq.exchange.name=airavata_rabbitmq_exchange ########################################################################### #job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter -job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter +#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACPassiveJobSubmitter +job.submitter=org.apache.airavata.orchestrator.core.impl.GFACRPCJobSubmitter job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator submitter.interval=10000 threadpool.size=10 http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 00d313c..bb612a6 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -116,7 +116,7 @@ public class BetterGfacImpl implements GFac,Watcher { String[] listenerClassList = ServerSettings.getActivityListeners(); Publisher rabbitMQPublisher = null; if (ServerSettings.isRabbitMqPublishEnabled()){ - rabbitMQPublisher = PublisherFactory.createPublisher(); + rabbitMQPublisher = PublisherFactory.createActivityPublisher(); } for (String listenerClass : listenerClassList) { Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class); http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java index 2080cc6..59cdbdf 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; public class PublisherFactory { private static Logger log = LoggerFactory.getLogger(PublisherFactory.class); - public static Publisher createPublisher() throws AiravataException { + public static Publisher createActivityPublisher() throws AiravataException { String activityPublisher = ServerSettings.getActivityPublisher(); if (activityPublisher == null) { @@ -47,4 +47,23 @@ public class PublisherFactory { throw new AiravataException(msg, e); } } + + public static Publisher createTaskLaunchPublisher() throws AiravataException { + String taskLaunchPublisher = ServerSettings.getTaskLaunchPublisher(); + + if (taskLaunchPublisher == null) { + String s = "Task launch publisher is not specified"; + log.error(s); + throw new AiravataException(s); + } + + try { + Class<? extends Publisher> aPublisher = Class.forName(taskLaunchPublisher).asSubclass(Publisher.class); + return aPublisher.newInstance(); + } catch (Exception e) { + String msg = "Failed to load the publisher from the publisher class property: " + taskLaunchPublisher; + log.error(msg, e); + throw new AiravataException(msg, e); + } + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java deleted file mode 100644 index ff14a8c..0000000 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.messaging.core.impl; - -import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.common.utils.ThriftUtils; -import org.apache.airavata.messaging.core.MessageContext; -import org.apache.airavata.messaging.core.MessagingConstants; -import org.apache.airavata.messaging.core.Publisher; -import org.apache.airavata.model.messaging.event.*; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RabbitMQPublisher implements Publisher { - - private static Logger log = LoggerFactory.getLogger(RabbitMQPublisher.class); - - private RabbitMQProducer rabbitMQProducer; - - - public RabbitMQPublisher() throws Exception { - String brokerUrl; - String exchangeName; - try { - brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); - exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME); - } catch (ApplicationSettingsException e) { - String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; - log.error(message, e); - throw new AiravataException(message, e); - } - rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName); - rabbitMQProducer.open(); - } - - public void publish(MessageContext msgCtx) throws AiravataException { - try { - log.info("Publishing status to rabbitmq..."); - byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); - Message message = new Message(); - message.setEvent(body); - message.setMessageId(msgCtx.getMessageId()); - message.setMessageType(msgCtx.getType()); - message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); - String routingKey = null; - if (msgCtx.getType().equals(MessageType.EXPERIMENT)){ - ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent(); - routingKey = event.getExperimentId(); - } else if (msgCtx.getType().equals(MessageType.TASK)) { - TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); - routingKey = event.getTaskIdentity().getExperimentId() + "." + - event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId(); - }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){ - WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent(); - WorkflowIdentifier workflowNodeIdentity = event.getWorkflowNodeIdentity(); - routingKey = workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId(); - }else if (msgCtx.getType().equals(MessageType.JOB)){ - JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent(); - JobIdentifier identity = event.getJobIdentity(); - routingKey = identity.getExperimentId() + "." + - identity.getWorkflowNodeId() + "." + - identity.getTaskId() + "." + - identity.getJobId(); - } - byte[] messageBody = ThriftUtils.serializeThriftObject(message); - rabbitMQProducer.send(messageBody, routingKey); - } catch (TException e) { - String msg = "Error while deserializing the object"; - log.error(msg, e); - throw new AiravataException(msg, e); - } catch (Exception e) { - String msg = "Error while sending to rabbitmq"; - log.error(msg, e); - throw new AiravataException(msg, e); - } - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/88d27d95/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java new file mode 100644 index 0000000..a4b4d1a --- /dev/null +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.messaging.core.impl; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.model.messaging.event.*; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RabbitMQStatusPublisher implements Publisher { + + private static Logger log = LoggerFactory.getLogger(RabbitMQStatusPublisher.class); + + private RabbitMQProducer rabbitMQProducer; + + + public RabbitMQStatusPublisher() throws Exception { + String brokerUrl; + String exchangeName; + try { + brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); + exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME); + } catch (ApplicationSettingsException e) { + String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; + log.error(message, e); + throw new AiravataException(message, e); + } + rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName); + rabbitMQProducer.open(); + } + + public void publish(MessageContext msgCtx) throws AiravataException { + try { + log.info("Publishing status to rabbitmq..."); + byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); + Message message = new Message(); + message.setEvent(body); + message.setMessageId(msgCtx.getMessageId()); + message.setMessageType(msgCtx.getType()); + message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); + String routingKey = null; + if (msgCtx.getType().equals(MessageType.EXPERIMENT)){ + ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent(); + routingKey = event.getExperimentId(); + } else if (msgCtx.getType().equals(MessageType.TASK)) { + TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); + routingKey = event.getTaskIdentity().getExperimentId() + "." + + event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId(); + }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){ + WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent(); + WorkflowIdentifier workflowNodeIdentity = event.getWorkflowNodeIdentity(); + routingKey = workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId(); + }else if (msgCtx.getType().equals(MessageType.JOB)){ + JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent(); + JobIdentifier identity = event.getJobIdentity(); + routingKey = identity.getExperimentId() + "." + + identity.getWorkflowNodeId() + "." + + identity.getTaskId() + "." + + identity.getJobId(); + } + byte[] messageBody = ThriftUtils.serializeThriftObject(message); + rabbitMQProducer.send(messageBody, routingKey); + } catch (TException e) { + String msg = "Error while deserializing the object"; + log.error(msg, e); + throw new AiravataException(msg, e); + } catch (Exception e) { + String msg = "Error while sending to rabbitmq"; + log.error(msg, e); + throw new AiravataException(msg, e); + } + } +}
