update airavata to use new events
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/dcc647eb Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/dcc647eb Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/dcc647eb Branch: refs/heads/master Commit: dcc647ebc683b9da454ba113765d5521acabd6a7 Parents: 785394d Author: Chathuri Wimalasena <[email protected]> Authored: Mon Aug 11 15:31:29 2014 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Mon Aug 11 15:31:29 2014 -0400 ---------------------------------------------------------------------- .../AiravataExperimentStatusUpdator.java | 2 +- .../lib/airavata/messagingEvents_types.cpp | 106 +++- .../lib/airavata/messagingEvents_types.h | 44 ++ .../Airavata/Model/Messaging/Event/Types.php | 125 +++++ .../messaging/event/TaskOutputChangeEvent.java | 551 +++++++++++++++++++ .../messagingEvents.thrift | 5 + .../airavata/gfac/core/cpi/BetterGfacImpl.java | 111 ++-- .../apache/airavata/gfac/core/cpi/GFacImpl.java | 35 +- .../core/monitor/AiravataTaskStatusUpdator.java | 27 +- .../gfac/core/monitor/ExperimentIdentity.java | 72 +-- .../airavata/gfac/core/monitor/JobIdentity.java | 78 +-- .../gfac/core/monitor/TaskIdentity.java | 76 +-- .../gfac/core/monitor/WorkflowNodeIdentity.java | 74 +-- .../state/TaskOutputDataChangedEvent.java | 48 +- .../gfac/local/provider/impl/LocalProvider.java | 44 +- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 21 +- .../monitor/impl/push/amqp/AMQPMonitor.java | 6 +- .../impl/push/amqp/UnRegisterWorker.java | 5 +- .../apache/airavata/job/AMQPMonitorTest.java | 4 +- .../job/QstatMonitorTestWithMyProxyAuth.java | 6 +- .../engine/interpretor/WorkflowInterpreter.java | 12 +- 21 files changed, 1129 insertions(+), 323 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java index 903e630..c78390a 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java @@ -79,7 +79,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener break; } if (!updateExperimentStatus){ - ExecutionType executionType = DataModelUtils.getExecutionType((Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, nodeStatus.getWorkflowNodeIdentity().getExperimentID())); + ExecutionType executionType = DataModelUtils.getExecutionType((Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, nodeStatus.getWorkflowNodeIdentity().getExperimentId())); updateExperimentStatus=(executionType==ExecutionType.SINGLE_APP); } updateExperimentStatus(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), state); http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp index 956def7..a96736f 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp @@ -449,6 +449,106 @@ void swap(TaskStatusChangeEvent &a, TaskStatusChangeEvent &b) { swap(a.taskIdentity, b.taskIdentity); } +const char* TaskOutputChangeEvent::ascii_fingerprint = "6488123A3A8B4CF758D069C9B693C7EB"; +const uint8_t TaskOutputChangeEvent::binary_fingerprint[16] = {0x64,0x88,0x12,0x3A,0x3A,0x8B,0x4C,0xF7,0x58,0xD0,0x69,0xC9,0xB6,0x93,0xC7,0xEB}; + +uint32_t TaskOutputChangeEvent::read(::apache::thrift::protocol::TProtocol* iprot) { + + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + bool isset_output = false; + bool isset_taskIdentity = false; + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_LIST) { + { + this->output.clear(); + uint32_t _size3; + ::apache::thrift::protocol::TType _etype6; + xfer += iprot->readListBegin(_etype6, _size3); + this->output.resize(_size3); + uint32_t _i7; + for (_i7 = 0; _i7 < _size3; ++_i7) + { + xfer += this->output[_i7].read(iprot); + } + xfer += iprot->readListEnd(); + } + isset_output = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->taskIdentity.read(iprot); + isset_taskIdentity = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + if (!isset_output) + throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_taskIdentity) + throw TProtocolException(TProtocolException::INVALID_DATA); + return xfer; +} + +uint32_t TaskOutputChangeEvent::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + xfer += oprot->writeStructBegin("TaskOutputChangeEvent"); + + xfer += oprot->writeFieldBegin("output", ::apache::thrift::protocol::T_LIST, 1); + { + xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRUCT, static_cast<uint32_t>(this->output.size())); + std::vector< ::apache::airavata::model::workspace::experiment::DataObjectType> ::const_iterator _iter8; + for (_iter8 = this->output.begin(); _iter8 != this->output.end(); ++_iter8) + { + xfer += (*_iter8).write(oprot); + } + xfer += oprot->writeListEnd(); + } + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("taskIdentity", ::apache::thrift::protocol::T_STRUCT, 2); + xfer += this->taskIdentity.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(TaskOutputChangeEvent &a, TaskOutputChangeEvent &b) { + using ::std::swap; + swap(a.output, b.output); + swap(a.taskIdentity, b.taskIdentity); +} + const char* JobIdentity::ascii_fingerprint = "C93D890311F28844166CF6E571EB3AC2"; const uint8_t JobIdentity::binary_fingerprint[16] = {0xC9,0x3D,0x89,0x03,0x11,0xF2,0x88,0x44,0x16,0x6C,0xF6,0xE5,0x71,0xEB,0x3A,0xC2}; @@ -588,9 +688,9 @@ uint32_t JobStatusChangeEvent::read(::apache::thrift::protocol::TProtocol* iprot { case 1: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast3; - xfer += iprot->readI32(ecast3); - this->state = ( ::apache::airavata::model::workspace::experiment::JobState::type)ecast3; + int32_t ecast9; + xfer += iprot->readI32(ecast9); + this->state = ( ::apache::airavata::model::workspace::experiment::JobState::type)ecast9; isset_state = true; } else { xfer += iprot->skip(ftype); http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h index c4c07f9..ad3d052 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h @@ -263,6 +263,50 @@ class TaskStatusChangeEvent { void swap(TaskStatusChangeEvent &a, TaskStatusChangeEvent &b); +class TaskOutputChangeEvent { + public: + + static const char* ascii_fingerprint; // = "6488123A3A8B4CF758D069C9B693C7EB"; + static const uint8_t binary_fingerprint[16]; // = {0x64,0x88,0x12,0x3A,0x3A,0x8B,0x4C,0xF7,0x58,0xD0,0x69,0xC9,0xB6,0x93,0xC7,0xEB}; + + TaskOutputChangeEvent() { + } + + virtual ~TaskOutputChangeEvent() throw() {} + + std::vector< ::apache::airavata::model::workspace::experiment::DataObjectType> output; + TaskIdentity taskIdentity; + + void __set_output(const std::vector< ::apache::airavata::model::workspace::experiment::DataObjectType> & val) { + output = val; + } + + void __set_taskIdentity(const TaskIdentity& val) { + taskIdentity = val; + } + + bool operator == (const TaskOutputChangeEvent & rhs) const + { + if (!(output == rhs.output)) + return false; + if (!(taskIdentity == rhs.taskIdentity)) + return false; + return true; + } + bool operator != (const TaskOutputChangeEvent &rhs) const { + return !(*this == rhs); + } + + bool operator < (const TaskOutputChangeEvent & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +void swap(TaskOutputChangeEvent &a, TaskOutputChangeEvent &b); + + class JobIdentity { public: http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php index c72d900..7bcf528 100644 --- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php +++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php @@ -507,6 +507,131 @@ class TaskStatusChangeEvent { } +class TaskOutputChangeEvent { + static $_TSPEC; + + public $output = null; + public $taskIdentity = null; + + public function __construct($vals=null) { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + 1 => array( + 'var' => 'output', + 'type' => TType::LST, + 'etype' => TType::STRUCT, + 'elem' => array( + 'type' => TType::STRUCT, + 'class' => '\Airavata\Model\Workspace\Experiment\DataObjectType', + ), + ), + 2 => array( + 'var' => 'taskIdentity', + 'type' => TType::STRUCT, + 'class' => '\Airavata\Model\Messaging\Event\TaskIdentity', + ), + ); + } + if (is_array($vals)) { + if (isset($vals['output'])) { + $this->output = $vals['output']; + } + if (isset($vals['taskIdentity'])) { + $this->taskIdentity = $vals['taskIdentity']; + } + } + } + + public function getName() { + return 'TaskOutputChangeEvent'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + case 1: + if ($ftype == TType::LST) { + $this->output = array(); + $_size0 = 0; + $_etype3 = 0; + $xfer += $input->readListBegin($_etype3, $_size0); + for ($_i4 = 0; $_i4 < $_size0; ++$_i4) + { + $elem5 = null; + $elem5 = new \Airavata\Model\Workspace\Experiment\DataObjectType(); + $xfer += $elem5->read($input); + $this->output []= $elem5; + } + $xfer += $input->readListEnd(); + } else { + $xfer += $input->skip($ftype); + } + break; + case 2: + if ($ftype == TType::STRUCT) { + $this->taskIdentity = new \Airavata\Model\Messaging\Event\TaskIdentity(); + $xfer += $this->taskIdentity->read($input); + } else { + $xfer += $input->skip($ftype); + } + break; + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('TaskOutputChangeEvent'); + if ($this->output !== null) { + if (!is_array($this->output)) { + throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA); + } + $xfer += $output->writeFieldBegin('output', TType::LST, 1); + { + $output->writeListBegin(TType::STRUCT, count($this->output)); + { + foreach ($this->output as $iter6) + { + $xfer += $iter6->write($output); + } + } + $output->writeListEnd(); + } + $xfer += $output->writeFieldEnd(); + } + if ($this->taskIdentity !== null) { + if (!is_object($this->taskIdentity)) { + throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA); + } + $xfer += $output->writeFieldBegin('taskIdentity', TType::STRUCT, 2); + $xfer += $this->taskIdentity->write($output); + $xfer += $output->writeFieldEnd(); + } + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} + class JobIdentity { static $_TSPEC; http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskOutputChangeEvent.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskOutputChangeEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskOutputChangeEvent.java new file mode 100644 index 0000000..d73ab63 --- /dev/null +++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskOutputChangeEvent.java @@ -0,0 +1,551 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Autogenerated by Thrift Compiler (0.9.1) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.airavata.model.messaging.event; + +import org.apache.thrift.scheme.IScheme; +import org.apache.thrift.scheme.SchemeFactory; +import org.apache.thrift.scheme.StandardScheme; + +import org.apache.thrift.scheme.TupleScheme; +import org.apache.thrift.protocol.TTupleProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.server.AbstractNonblockingServer.*; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("all") public class TaskOutputChangeEvent implements org.apache.thrift.TBase<TaskOutputChangeEvent, TaskOutputChangeEvent._Fields>, java.io.Serializable, Cloneable, Comparable<TaskOutputChangeEvent> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TaskOutputChangeEvent"); + + private static final org.apache.thrift.protocol.TField OUTPUT_FIELD_DESC = new org.apache.thrift.protocol.TField("output", org.apache.thrift.protocol.TType.LIST, (short)1); + private static final org.apache.thrift.protocol.TField TASK_IDENTITY_FIELD_DESC = new org.apache.thrift.protocol.TField("taskIdentity", org.apache.thrift.protocol.TType.STRUCT, (short)2); + + private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new TaskOutputChangeEventStandardSchemeFactory()); + schemes.put(TupleScheme.class, new TaskOutputChangeEventTupleSchemeFactory()); + } + + private List<org.apache.airavata.model.workspace.experiment.DataObjectType> output; // required + private TaskIdentity taskIdentity; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { + OUTPUT((short)1, "output"), + TASK_IDENTITY((short)2, "taskIdentity"); + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // OUTPUT + return OUTPUT; + case 2: // TASK_IDENTITY + return TASK_IDENTITY; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.OUTPUT, new org.apache.thrift.meta_data.FieldMetaData("output", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.airavata.model.workspace.experiment.DataObjectType.class)))); + tmpMap.put(_Fields.TASK_IDENTITY, new org.apache.thrift.meta_data.FieldMetaData("taskIdentity", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TaskIdentity.class))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TaskOutputChangeEvent.class, metaDataMap); + } + + public TaskOutputChangeEvent() { + } + + public TaskOutputChangeEvent( + List<org.apache.airavata.model.workspace.experiment.DataObjectType> output, + TaskIdentity taskIdentity) + { + this(); + this.output = output; + this.taskIdentity = taskIdentity; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public TaskOutputChangeEvent(TaskOutputChangeEvent other) { + if (other.isSetOutput()) { + List<org.apache.airavata.model.workspace.experiment.DataObjectType> __this__output = new ArrayList<org.apache.airavata.model.workspace.experiment.DataObjectType>(other.output.size()); + for (org.apache.airavata.model.workspace.experiment.DataObjectType other_element : other.output) { + __this__output.add(new org.apache.airavata.model.workspace.experiment.DataObjectType(other_element)); + } + this.output = __this__output; + } + if (other.isSetTaskIdentity()) { + this.taskIdentity = new TaskIdentity(other.taskIdentity); + } + } + + public TaskOutputChangeEvent deepCopy() { + return new TaskOutputChangeEvent(this); + } + + @Override + public void clear() { + this.output = null; + this.taskIdentity = null; + } + + public int getOutputSize() { + return (this.output == null) ? 0 : this.output.size(); + } + + public java.util.Iterator<org.apache.airavata.model.workspace.experiment.DataObjectType> getOutputIterator() { + return (this.output == null) ? null : this.output.iterator(); + } + + public void addToOutput(org.apache.airavata.model.workspace.experiment.DataObjectType elem) { + if (this.output == null) { + this.output = new ArrayList<org.apache.airavata.model.workspace.experiment.DataObjectType>(); + } + this.output.add(elem); + } + + public List<org.apache.airavata.model.workspace.experiment.DataObjectType> getOutput() { + return this.output; + } + + public void setOutput(List<org.apache.airavata.model.workspace.experiment.DataObjectType> output) { + this.output = output; + } + + public void unsetOutput() { + this.output = null; + } + + /** Returns true if field output is set (has been assigned a value) and false otherwise */ + public boolean isSetOutput() { + return this.output != null; + } + + public void setOutputIsSet(boolean value) { + if (!value) { + this.output = null; + } + } + + public TaskIdentity getTaskIdentity() { + return this.taskIdentity; + } + + public void setTaskIdentity(TaskIdentity taskIdentity) { + this.taskIdentity = taskIdentity; + } + + public void unsetTaskIdentity() { + this.taskIdentity = null; + } + + /** Returns true if field taskIdentity is set (has been assigned a value) and false otherwise */ + public boolean isSetTaskIdentity() { + return this.taskIdentity != null; + } + + public void setTaskIdentityIsSet(boolean value) { + if (!value) { + this.taskIdentity = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case OUTPUT: + if (value == null) { + unsetOutput(); + } else { + setOutput((List<org.apache.airavata.model.workspace.experiment.DataObjectType>)value); + } + break; + + case TASK_IDENTITY: + if (value == null) { + unsetTaskIdentity(); + } else { + setTaskIdentity((TaskIdentity)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case OUTPUT: + return getOutput(); + + case TASK_IDENTITY: + return getTaskIdentity(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case OUTPUT: + return isSetOutput(); + case TASK_IDENTITY: + return isSetTaskIdentity(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof TaskOutputChangeEvent) + return this.equals((TaskOutputChangeEvent)that); + return false; + } + + public boolean equals(TaskOutputChangeEvent that) { + if (that == null) + return false; + + boolean this_present_output = true && this.isSetOutput(); + boolean that_present_output = true && that.isSetOutput(); + if (this_present_output || that_present_output) { + if (!(this_present_output && that_present_output)) + return false; + if (!this.output.equals(that.output)) + return false; + } + + boolean this_present_taskIdentity = true && this.isSetTaskIdentity(); + boolean that_present_taskIdentity = true && that.isSetTaskIdentity(); + if (this_present_taskIdentity || that_present_taskIdentity) { + if (!(this_present_taskIdentity && that_present_taskIdentity)) + return false; + if (!this.taskIdentity.equals(that.taskIdentity)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public int compareTo(TaskOutputChangeEvent other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetOutput()).compareTo(other.isSetOutput()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetOutput()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.output, other.output); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetTaskIdentity()).compareTo(other.isSetTaskIdentity()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTaskIdentity()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.taskIdentity, other.taskIdentity); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("TaskOutputChangeEvent("); + boolean first = true; + + sb.append("output:"); + if (this.output == null) { + sb.append("null"); + } else { + sb.append(this.output); + } + first = false; + if (!first) sb.append(", "); + sb.append("taskIdentity:"); + if (this.taskIdentity == null) { + sb.append("null"); + } else { + sb.append(this.taskIdentity); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + if (!isSetOutput()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'output' is unset! Struct:" + toString()); + } + + if (!isSetTaskIdentity()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'taskIdentity' is unset! Struct:" + toString()); + } + + // check for sub-struct validity + if (taskIdentity != null) { + taskIdentity.validate(); + } + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class TaskOutputChangeEventStandardSchemeFactory implements SchemeFactory { + public TaskOutputChangeEventStandardScheme getScheme() { + return new TaskOutputChangeEventStandardScheme(); + } + } + + private static class TaskOutputChangeEventStandardScheme extends StandardScheme<TaskOutputChangeEvent> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, TaskOutputChangeEvent struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // OUTPUT + if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { + { + org.apache.thrift.protocol.TList _list0 = iprot.readListBegin(); + struct.output = new ArrayList<org.apache.airavata.model.workspace.experiment.DataObjectType>(_list0.size); + for (int _i1 = 0; _i1 < _list0.size; ++_i1) + { + org.apache.airavata.model.workspace.experiment.DataObjectType _elem2; + _elem2 = new org.apache.airavata.model.workspace.experiment.DataObjectType(); + _elem2.read(iprot); + struct.output.add(_elem2); + } + iprot.readListEnd(); + } + struct.setOutputIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // TASK_IDENTITY + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.taskIdentity = new TaskIdentity(); + struct.taskIdentity.read(iprot); + struct.setTaskIdentityIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, TaskOutputChangeEvent struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.output != null) { + oprot.writeFieldBegin(OUTPUT_FIELD_DESC); + { + oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.output.size())); + for (org.apache.airavata.model.workspace.experiment.DataObjectType _iter3 : struct.output) + { + _iter3.write(oprot); + } + oprot.writeListEnd(); + } + oprot.writeFieldEnd(); + } + if (struct.taskIdentity != null) { + oprot.writeFieldBegin(TASK_IDENTITY_FIELD_DESC); + struct.taskIdentity.write(oprot); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class TaskOutputChangeEventTupleSchemeFactory implements SchemeFactory { + public TaskOutputChangeEventTupleScheme getScheme() { + return new TaskOutputChangeEventTupleScheme(); + } + } + + private static class TaskOutputChangeEventTupleScheme extends TupleScheme<TaskOutputChangeEvent> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, TaskOutputChangeEvent struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + { + oprot.writeI32(struct.output.size()); + for (org.apache.airavata.model.workspace.experiment.DataObjectType _iter4 : struct.output) + { + _iter4.write(oprot); + } + } + struct.taskIdentity.write(oprot); + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, TaskOutputChangeEvent struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + { + org.apache.thrift.protocol.TList _list5 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.output = new ArrayList<org.apache.airavata.model.workspace.experiment.DataObjectType>(_list5.size); + for (int _i6 = 0; _i6 < _list5.size; ++_i6) + { + org.apache.airavata.model.workspace.experiment.DataObjectType _elem7; + _elem7 = new org.apache.airavata.model.workspace.experiment.DataObjectType(); + _elem7.read(iprot); + struct.output.add(_elem7); + } + } + struct.setOutputIsSet(true); + struct.taskIdentity = new TaskIdentity(); + struct.taskIdentity.read(iprot); + struct.setTaskIdentityIsSet(true); + } + } + +} + http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/airavata-api/thrift-interface-descriptions/messagingEvents.thrift ---------------------------------------------------------------------- diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift index ebe1c57..38c25a4 100644 --- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift +++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift @@ -50,6 +50,11 @@ struct TaskStatusChangeEvent { 2: required TaskIdentity taskIdentity; } +struct TaskOutputChangeEvent { + 1: required list<experimentModel.DataObjectType> output; + 2: required TaskIdentity taskIdentity; +} + struct JobIdentity { 1: required string jobId; 2: required string taskId; http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index a68a302..e843e4d 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -20,16 +20,6 @@ */ package org.apache.airavata.gfac.core.cpi; -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.xpath.XPathExpressionException; - import org.airavata.appcatalog.cpi.AppCatalog; import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; import org.apache.airavata.client.api.AiravataAPI; @@ -48,20 +38,9 @@ import org.apache.airavata.gfac.Scheduler; import org.apache.airavata.gfac.core.context.ApplicationContext; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.context.MessageContext; -import org.apache.airavata.gfac.core.handler.GFacHandler; -import org.apache.airavata.gfac.core.handler.GFacHandlerConfig; -import org.apache.airavata.gfac.core.handler.GFacHandlerException; -import org.apache.airavata.gfac.core.handler.GFacRecoverableHandler; -import org.apache.airavata.gfac.core.handler.ThreadedHandler; -import org.apache.airavata.gfac.core.monitor.ExperimentIdentity; -import org.apache.airavata.gfac.core.monitor.JobIdentity; +import org.apache.airavata.gfac.core.handler.*; import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.core.monitor.TaskIdentity; -//import org.apache.airavata.api.server.listener.ExperimentStatusChangedEvent; import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest; -import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest; -import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest; -import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent; import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent; import org.apache.airavata.gfac.core.notification.listeners.LoggingListener; import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener; @@ -75,36 +54,16 @@ import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentD import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; -import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; -import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand; -import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; -import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission; -import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; -import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; +import org.apache.airavata.model.appcatalog.computeresource.*; import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; -import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling; -import org.apache.airavata.model.workspace.experiment.DataObjectType; -import org.apache.airavata.model.workspace.experiment.Experiment; -import org.apache.airavata.model.workspace.experiment.ExperimentState; -import org.apache.airavata.model.workspace.experiment.JobState; -import org.apache.airavata.model.workspace.experiment.TaskDetails; -import org.apache.airavata.model.workspace.experiment.TaskState; +import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; +import org.apache.airavata.model.workspace.experiment.*; import org.apache.airavata.registry.api.AiravataRegistry2; import org.apache.airavata.registry.cpi.Registry; import org.apache.airavata.registry.cpi.RegistryModelType; -import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; +import org.apache.airavata.schemas.gfac.*; import org.apache.airavata.schemas.gfac.DataType; -import org.apache.airavata.schemas.gfac.GsisshHostType; -import org.apache.airavata.schemas.gfac.HostDescriptionType; -import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType; -import org.apache.airavata.schemas.gfac.InputParameterType; -import org.apache.airavata.schemas.gfac.JobTypeType; -import org.apache.airavata.schemas.gfac.OutputParameterType; -import org.apache.airavata.schemas.gfac.ParameterType; -import org.apache.airavata.schemas.gfac.ProjectAccountType; -import org.apache.airavata.schemas.gfac.QueueType; -import org.apache.airavata.schemas.gfac.SSHHostType; -import org.apache.airavata.schemas.gfac.ServiceDescriptionType; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZKUtil; import org.apache.zookeeper.ZooKeeper; @@ -112,6 +71,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xml.sax.SAXException; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.xpath.XPathExpressionException; +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +//import org.apache.airavata.api.server.listener.ExperimentStatusChangedEvent; + /** * This is the GFac CPI class for external usage, this simply have a single method to submit a job to * the resource, required data for the job has to be stored in registry prior to invoke this object. @@ -582,17 +552,21 @@ public class BetterGfacImpl implements GFac { // jobExecutionContext.getTaskData().getTaskID()), // TaskState.FAILED // )); - monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext), new JobIdentity(jobExecutionContext.getExperimentID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext - .getJobDetails().getJobID()), JobState.FAILED)); + org.apache.airavata.model.messaging.event.JobIdentity jobIdentity = new org.apache.airavata.model.messaging.event.JobIdentity( + jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID()); + monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity)); } catch (NullPointerException e1) { log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " + "NullPointerException occurred because at this point there might not have Job Created", e1, e); // monitorPublisher // .publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED)); // Updating the task status if there's any task associated - monitorPublisher.publish(new TaskStatusChangedEvent(new TaskIdentity(jobExecutionContext.getExperimentID(), jobExecutionContext - .getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED)); + org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID()); + monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity)); } jobExecutionContext.setProperty(ERROR_SENT, "true"); @@ -640,16 +614,19 @@ public class BetterGfacImpl implements GFac { // jobExecutionContext.getTaskData().getTaskID()), // TaskState.FAILED // )); - monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext), new JobIdentity(jobExecutionContext.getExperimentID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext - .getJobDetails().getJobID()), JobState.FAILED)); + org.apache.airavata.model.messaging.event.JobIdentity jobIdentity = new org.apache.airavata.model.messaging.event.JobIdentity( + jobExecutionContext.getJobDetails().getJobID(),jobExecutionContext.getTaskData().getTaskID(),jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID()); + monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity)); } catch (NullPointerException e1) { log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " + "NullPointerException occurred because at this point there might not have Job Created", e1, e); //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED)); // Updating the task status if there's any task associated - monitorPublisher.publish(new TaskStatusChangeRequest(new TaskIdentity(jobExecutionContext.getExperimentID(), jobExecutionContext - .getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED)); + org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID()); + monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity)); } jobExecutionContext.setProperty(ERROR_SENT, "true"); @@ -824,11 +801,10 @@ public class BetterGfacImpl implements GFac { // ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), // ExperimentState.COMPLETED)); // Updating the task status if there's any task associated - monitorPublisher.publish(new TaskStatusChangeRequest( - new TaskIdentity(jobExecutionContext.getExperimentID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), - jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED - )); + org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID()); + monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity)); monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED)); } @@ -953,12 +929,11 @@ public class BetterGfacImpl implements GFac { // ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), // ExperimentState.COMPLETED)); // Updating the task status if there's any task associated - - monitorPublisher.publish(new TaskStatusChangedEvent( - new TaskIdentity(jobExecutionContext.getExperimentID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), - jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED - )); + + org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID()); + monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity)); monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED)); } http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java index d370924..4a58f68 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java @@ -50,14 +50,6 @@ import org.apache.airavata.gfac.core.handler.GFacHandler; import org.apache.airavata.gfac.core.handler.GFacHandlerConfig; import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.handler.ThreadedHandler; -import org.apache.airavata.gfac.core.monitor.ExperimentIdentity; -import org.apache.airavata.gfac.core.monitor.JobIdentity; -import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.core.monitor.TaskIdentity; -//import org.apache.airavata.api.server.listener.ExperimentStatusChangedEvent; -import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest; -import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest; -import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent; import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent; import org.apache.airavata.gfac.core.notification.listeners.LoggingListener; import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener; @@ -65,9 +57,12 @@ import org.apache.airavata.gfac.core.provider.GFacProvider; import org.apache.airavata.gfac.core.scheduler.HostScheduler; import org.apache.airavata.gfac.core.states.GfacExperimentState; import org.apache.airavata.gfac.core.utils.GFacUtils; +import org.apache.airavata.model.messaging.event.JobIdentity; +import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.TaskIdentity; +import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; import org.apache.airavata.model.workspace.experiment.DataObjectType; import org.apache.airavata.model.workspace.experiment.Experiment; -import org.apache.airavata.model.workspace.experiment.ExperimentState; import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.airavata.model.workspace.experiment.TaskDetails; import org.apache.airavata.model.workspace.experiment.TaskState; @@ -331,10 +326,11 @@ public class GFacImpl implements GFac { } } catch (Exception e) { try { - monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext), - new JobIdentity(jobExecutionContext.getExperimentID(), + JobIdentity jobIdentity = new JobIdentity(jobExecutionContext.getJobDetails().getJobID(), + jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), - jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getJobDetails().getJobID()), JobState.FAILED)); + jobExecutionContext.getExperimentID()); + monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED,jobIdentity)); } catch (NullPointerException e1) { log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " + "NullPointerException occurred because at this point there might not have Job Created", e1, e); @@ -342,8 +338,10 @@ public class GFacImpl implements GFac { // monitorPublisher // .publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED)); // Updating the task status if there's any task associated - monitorPublisher.publish(new TaskStatusChangedEvent(new TaskIdentity(jobExecutionContext.getExperimentID(), jobExecutionContext - .getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED)); + TaskIdentity taskIdentity = new TaskIdentity(jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID()); + monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity)); } jobExecutionContext.setProperty(ERROR_SENT, "true"); @@ -460,11 +458,10 @@ public class GFacImpl implements GFac { // ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), // ExperimentState.COMPLETED)); // Updating the task status if there's any task associated - monitorPublisher.publish(new TaskStatusChangeRequest( - new TaskIdentity(jobExecutionContext.getExperimentID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), - jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED - )); + TaskIdentity taskIdentity = new TaskIdentity(jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID()); + monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity)); } http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java index 56fbb1c..e6ab5ef 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java @@ -24,8 +24,6 @@ import java.util.Calendar; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.listener.AbstractActivityListener; -import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest; -import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent; import org.apache.airavata.model.messaging.event.*; import org.apache.airavata.model.messaging.event.TaskIdentity; import org.apache.airavata.model.workspace.experiment.TaskDetails; @@ -51,17 +49,17 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { public void setAiravataRegistry(Registry airavataRegistry) { this.airavataRegistry = airavataRegistry; } - - @Subscribe - public void setupTaskStatus(TaskStatusChangeRequest taskStatus){ - try { - updateTaskStatus(taskStatus.getIdentity().getTaskId(), taskStatus.getState()); - logger.debug("Publishing task status for "+taskStatus.getIdentity().getTaskId()+":"+taskStatus.getState().toString()); - monitorPublisher.publish(new TaskStatusChangedEvent(taskStatus.getIdentity(),taskStatus.getState())); - } catch (Exception e) { - logger.error("Error persisting data" + e.getLocalizedMessage(), e); - } - } +// +// @Subscribe +// public void setupTaskStatus(TaskStatusChangeRequest taskStatus){ +// try { +// updateTaskStatus(taskStatus.getIdentity().getTaskId(), taskStatus.getState()); +// logger.debug("Publishing task status for "+taskStatus.getIdentity().getTaskId()+":"+taskStatus.getState().toString()); +// monitorPublisher.publish(new TaskStatusChangedEvent(taskStatus.getIdentity(),taskStatus.getState())); +// } catch (Exception e) { +// logger.error("Error persisting data" + e.getLocalizedMessage(), e); +// } +// } @Subscribe public void setupTaskStatus(JobStatusChangeEvent jobStatus){ @@ -95,7 +93,8 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { jobStatus.getJobIdentity().getWorkflowNodeId(), jobStatus.getJobIdentity().getExperimentId()); monitorPublisher.publish(new TaskStatusChangeEvent(state, taskIdentity)); - } catch (Exception e) { + + } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java index e8d22f7..d5f043f 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java @@ -1,36 +1,36 @@ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -// */ -// -//package org.apache.airavata.gfac.core.monitor; -// -//public class ExperimentIdentity { -// private String experimentID; -// public ExperimentIdentity(String experimentId) { -// setExperimentID(experimentId); -// } -// public String getExperimentID() { -// return experimentID; -// } -// -// public void setExperimentID(String experimentID) { -// this.experimentID = experimentID; -// } -//} +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +package org.apache.airavata.gfac.core.monitor; + +public class ExperimentIdentity { + private String experimentID; + public ExperimentIdentity(String experimentId) { + setExperimentID(experimentId); + } + public String getExperimentID() { + return experimentID; + } + + public void setExperimentID(String experimentID) { + this.experimentID = experimentID; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java index 1773ff1..304e3eb 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java @@ -1,39 +1,39 @@ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -// */ -// -//package org.apache.airavata.gfac.core.monitor; -// -//public class JobIdentity extends TaskIdentity { -// private String jobId; -// -// public JobIdentity(String experimentId, String workflowNodeId, String taskId, String jobId) { -// super(experimentId,workflowNodeId,taskId); -// setJobId(jobId); -// } -// -// public String getJobId() { -// return jobId; -// } -// -// public void setJobId(String jobId) { -// this.jobId = jobId; -// } -//} +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +package org.apache.airavata.gfac.core.monitor; + +public class JobIdentity extends TaskIdentity { + private String jobId; + + public JobIdentity(String experimentId, String workflowNodeId, String taskId, String jobId) { + super(experimentId,workflowNodeId,taskId); + setJobId(jobId); + } + + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java index 8448437..db03348 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java @@ -1,38 +1,38 @@ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -// */ -// -//package org.apache.airavata.gfac.core.monitor; -// -//public class TaskIdentity extends WorkflowNodeIdentity { -// private String taskId; -// -// public TaskIdentity(String experimentId, String workflowNodeId, String taskId) { -// super(experimentId,workflowNodeId); -// setTaskId(taskId); -// } -// public String getTaskId() { -// return taskId; -// } -// -// public void setTaskId(String taskId) { -// this.taskId = taskId; -// } -//} +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +package org.apache.airavata.gfac.core.monitor; + +public class TaskIdentity extends WorkflowNodeIdentity { + private String taskId; + + public TaskIdentity(String experimentId, String workflowNodeId, String taskId) { + super(experimentId,workflowNodeId); + setTaskId(taskId); + } + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java index ebdc372..e15f733 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java @@ -1,37 +1,37 @@ -///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * -// */ -// -//package org.apache.airavata.gfac.core.monitor; -// -//public class WorkflowNodeIdentity extends ExperimentIdentity { -// private String workflowNodeID; -// public WorkflowNodeIdentity(String experimentId, String workflowNodeId) { -// super(experimentId); -// setWorkflowNodeID(workflowNodeId); -// } -// public String getWorkflowNodeID() { -// return workflowNodeID; -// } -// -// public void setWorkflowNodeID(String workflowNodeID) { -// this.workflowNodeID = workflowNodeID; -// } -//} +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +package org.apache.airavata.gfac.core.monitor; + +public class WorkflowNodeIdentity extends ExperimentIdentity { + private String workflowNodeID; + public WorkflowNodeIdentity(String experimentId, String workflowNodeId) { + super(experimentId); + setWorkflowNodeID(workflowNodeId); + } + public String getWorkflowNodeID() { + return workflowNodeID; + } + + public void setWorkflowNodeID(String workflowNodeID) { + this.workflowNodeID = workflowNodeID; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskOutputDataChangedEvent.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskOutputDataChangedEvent.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskOutputDataChangedEvent.java index cccca30..db7ee59 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskOutputDataChangedEvent.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskOutputDataChangedEvent.java @@ -1,22 +1,22 @@ ///* -// * -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, -// * software distributed under the License is distributed on an -// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// * KIND, either express or implied. See the License for the -// * specific language governing permissions and limitations -// * under the License. -// * +//* +//* Licensed to the Apache Software Foundation (ASF) under one +//* or more contributor license agreements. See the NOTICE file +//* distributed with this work for additional information +//* regarding copyright ownership. The ASF licenses this file +//* to you under the Apache License, Version 2.0 (the +//* "License"); you may not use this file except in compliance +//* with the License. You may obtain a copy of the License at +//* +//* http://www.apache.org/licenses/LICENSE-2.0 +//* +//* Unless required by applicable law or agreed to in writing, +//* software distributed under the License is distributed on an +//* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//* KIND, either express or implied. See the License for the +//* specific language governing permissions and limitations +//* under the License. +//* //*/ //package org.apache.airavata.gfac.core.monitor.state; // @@ -27,12 +27,12 @@ //import org.apache.airavata.model.workspace.experiment.DataObjectType; // ///** -// * This is the primary job state object used in -// * through out the monitor module. This use airavata-data-model JobState enum -// * Ideally after processing each event or monitoring message from remote system -// * Each monitoring implementation has to return this object with a state and -// * the monitoring ID -// */ +//* This is the primary job state object used in +//* through out the monitor module. This use airavata-data-model JobState enum +//* Ideally after processing each event or monitoring message from remote system +//* Each monitoring implementation has to return this object with a state and +//* the monitoring ID +//*/ //public class TaskOutputDataChangedEvent extends AbstractStateChangeRequest { // private List<DataObjectType> output; // private TaskIdentity identity; http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java index 514f901..5c86560 100644 --- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java +++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java @@ -29,10 +29,6 @@ import java.util.Map; import org.apache.airavata.gfac.Constants; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.core.monitor.TaskIdentity; -import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest; -import org.apache.airavata.gfac.core.monitor.state.TaskOutputDataChangedEvent; import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent; import org.apache.airavata.gfac.core.provider.AbstractProvider; import org.apache.airavata.gfac.core.provider.GFacProviderException; @@ -41,6 +37,10 @@ import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.core.utils.OutputUtils; import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter; import org.apache.airavata.gfac.local.utils.InputUtils; +import org.apache.airavata.model.messaging.event.JobIdentity; +import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.TaskIdentity; +import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; import org.apache.airavata.model.workspace.experiment.DataObjectType; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobState; @@ -173,10 +173,12 @@ public class LocalProvider extends AbstractProvider { log.info(buf.toString()); // updating the job status to complete because there's nothing to monitor in local jobs - MonitorID monitorID = createMonitorID(jobExecutionContext); - JobStatusChangeRequest jobStatusChangeRequest = new JobStatusChangeRequest(monitorID); - jobStatusChangeRequest.setState(JobState.COMPLETE); - this.getMonitorPublisher().publish(jobStatusChangeRequest); +// MonitorID monitorID = createMonitorID(jobExecutionContext); + JobIdentity jobIdentity = new JobIdentity(jobExecutionContext.getJobDetails().getJobID(), + jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID()); + this.getMonitorPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity)); } catch (IOException io) { throw new GFacProviderException(io.getMessage(), io); } catch (InterruptedException e) { @@ -186,13 +188,13 @@ public class LocalProvider extends AbstractProvider { } } - private MonitorID createMonitorID(JobExecutionContext jobExecutionContext) { - MonitorID monitorID = new MonitorID(jobExecutionContext.getApplicationContext().getHostDescription(), jobId, - jobExecutionContext.getTaskData().getTaskID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), - jobExecutionContext.getExperiment().getUserName(),jobId); - return monitorID; - } +// private MonitorID createMonitorID(JobExecutionContext jobExecutionContext) { +// MonitorID monitorID = new MonitorID(jobExecutionContext.getApplicationContext().getHostDescription(), jobId, +// jobExecutionContext.getTaskData().getTaskID(), +// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), +// jobExecutionContext.getExperiment().getUserName(),jobId); +// return monitorID; +// } // private void saveApplicationJob(JobExecutionContext jobExecutionContext) // throws GFacProviderException { @@ -225,11 +227,15 @@ public class LocalProvider extends AbstractProvider { Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters(); OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); TaskDetails taskDetails = (TaskDetails)registry.get(RegistryModelType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID()); - taskDetails.setApplicationOutputs(outputArray); - registry.update(RegistryModelType.TASK_DETAIL, taskDetails, taskDetails.getTaskID()); + if (taskDetails != null){ + taskDetails.setApplicationOutputs(outputArray); + registry.update(RegistryModelType.TASK_DETAIL, taskDetails, taskDetails.getTaskID()); + } registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID()); - MonitorID monitorId = createMonitorID(jobExecutionContext); - getMonitorPublisher().publish(new TaskOutputDataChangedEvent(new TaskIdentity(monitorId.getExperimentID(), monitorId.getWorkflowNodeID(), monitorId.getTaskID()), outputArray)); + TaskIdentity taskIdentity = new TaskIdentity(jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getExperimentID()); + getMonitorPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity)); } catch (XmlException e) { throw new GFacProviderException("Cannot read output:" + e.getMessage(), e); } catch (IOException io) { http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 2fea154..591bca6 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -36,9 +36,6 @@ import org.apache.airavata.commons.gfac.type.HostDescription; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.cpi.GFac; import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.core.monitor.TaskIdentity; -import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest; -import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest; import org.apache.airavata.gfac.monitor.HostMonitorData; import org.apache.airavata.gfac.monitor.UserMonitorData; import org.apache.airavata.gfac.monitor.core.PullMonitor; @@ -46,6 +43,9 @@ import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; import org.apache.airavata.gfac.monitor.util.CommonUtils; import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; +import org.apache.airavata.model.messaging.event.JobIdentity; +import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.airavata.model.workspace.experiment.TaskState; import org.apache.airavata.schemas.gfac.GsisshHostType; @@ -139,7 +139,7 @@ public class HPCPullMonitor extends PullMonitor { //todo this polling will not work with multiple usernames but with single user // and multiple hosts, currently monitoring will work UserMonitorData take = null; - JobStatusChangeRequest jobStatus = new JobStatusChangeRequest(); + JobStatusChangeEvent jobStatus = new JobStatusChangeEvent(); MonitorID currentMonitorID = null; HostDescription currentHostDescription = null; try { @@ -164,7 +164,8 @@ public class HPCPullMonitor extends PullMonitor { for (MonitorID iMonitorID : monitorID) { currentMonitorID = iMonitorID; iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic - jobStatus = new JobStatusChangeRequest(iMonitorID); + jobStatus.setJobIdentity(new JobIdentity(iMonitorID.getJobID(), iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), iMonitorID.getExperimentID())); + jobStatus.setState(iMonitorID.getStatus()); // we have this JobStatus class to handle amqp monitoring publisher.publish(jobStatus); @@ -177,8 +178,9 @@ public class HPCPullMonitor extends PullMonitor { try { gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext()); } catch (GFacException e) { - publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(iMonitorID.getExperimentID(), iMonitorID.getWorkflowNodeID(), - iMonitorID.getTaskID()), TaskState.FAILED)); + org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), + iMonitorID.getExperimentID()); + publisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity)); //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status //should be done understanding whole workflow of job submission and data transfer // publisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(iMonitorID.getExperimentID()), @@ -193,8 +195,9 @@ public class HPCPullMonitor extends PullMonitor { logger.error("Launching outflow handlers to check output are genereated or not"); gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext()); } catch (GFacException e) { - publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(iMonitorID.getExperimentID(), iMonitorID.getWorkflowNodeID(), - iMonitorID.getTaskID()), TaskState.FAILED)); + org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), + iMonitorID.getExperimentID()); + publisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity)); logger.info(e.getLocalizedMessage(), e); } } else { http://git-wip-us.apache.org/repos/asf/airavata/blob/dcc647eb/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java index 010d3bc..72338fe 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java @@ -33,11 +33,11 @@ import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.commons.gfac.type.HostDescription; import org.apache.airavata.gfac.core.monitor.JobIdentity; import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest; import org.apache.airavata.gfac.monitor.core.PushMonitor; import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil; import org.apache.airavata.gfac.monitor.util.CommonUtils; +import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; import org.apache.airavata.model.workspace.experiment.JobState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -202,7 +202,9 @@ public class AMQPMonitor extends PushMonitor { } } next.setStatus(monitorID.getStatus()); - publisher.publish(new JobStatusChangeRequest(next, new JobIdentity(next.getExperimentID(), next.getWorkflowNodeID(), next.getTaskID(), next.getJobID()),next.getStatus())); + org.apache.airavata.model.messaging.event.JobIdentity jobIdentity = new org.apache.airavata.model.messaging.event.JobIdentity(next.getJobID(), + next.getTaskID(), next.getWorkflowNodeID(), next.getExperimentID()); + publisher.publish(new JobStatusChangeEvent(next.getStatus(),jobIdentity)); return true; } @Override
