http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/WorkflowIdentity.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/WorkflowIdentity.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/WorkflowIdentity.java deleted file mode 100644 index 7c57a56..0000000 --- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/WorkflowIdentity.java +++ /dev/null @@ -1,492 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Autogenerated by Thrift Compiler (0.9.1) - * - * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING - * @generated - */ -package org.apache.airavata.model.messaging.event; - -import org.apache.thrift.scheme.IScheme; -import org.apache.thrift.scheme.SchemeFactory; -import org.apache.thrift.scheme.StandardScheme; - -import org.apache.thrift.scheme.TupleScheme; -import org.apache.thrift.protocol.TTupleProtocol; -import org.apache.thrift.protocol.TProtocolException; -import org.apache.thrift.EncodingUtils; -import org.apache.thrift.TException; -import org.apache.thrift.async.AsyncMethodCallback; -import org.apache.thrift.server.AbstractNonblockingServer.*; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.EnumMap; -import java.util.Set; -import java.util.HashSet; -import java.util.EnumSet; -import java.util.Collections; -import java.util.BitSet; -import java.nio.ByteBuffer; -import java.util.Arrays; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@SuppressWarnings("all") public class WorkflowIdentity implements org.apache.thrift.TBase<WorkflowIdentity, WorkflowIdentity._Fields>, java.io.Serializable, Cloneable, Comparable<WorkflowIdentity> { - private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("WorkflowIdentity"); - - private static final org.apache.thrift.protocol.TField WORKFLOW_NODE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("workflowNodeId", org.apache.thrift.protocol.TType.STRING, (short)1); - private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)2); - - private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); - static { - schemes.put(StandardScheme.class, new WorkflowIdentityStandardSchemeFactory()); - schemes.put(TupleScheme.class, new WorkflowIdentityTupleSchemeFactory()); - } - - private String workflowNodeId; // required - private String experimentId; // required - - /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ - @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { - WORKFLOW_NODE_ID((short)1, "workflowNodeId"), - EXPERIMENT_ID((short)2, "experimentId"); - - private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); - - static { - for (_Fields field : EnumSet.allOf(_Fields.class)) { - byName.put(field.getFieldName(), field); - } - } - - /** - * Find the _Fields constant that matches fieldId, or null if its not found. - */ - public static _Fields findByThriftId(int fieldId) { - switch(fieldId) { - case 1: // WORKFLOW_NODE_ID - return WORKFLOW_NODE_ID; - case 2: // EXPERIMENT_ID - return EXPERIMENT_ID; - default: - return null; - } - } - - /** - * Find the _Fields constant that matches fieldId, throwing an exception - * if it is not found. - */ - public static _Fields findByThriftIdOrThrow(int fieldId) { - _Fields fields = findByThriftId(fieldId); - if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); - return fields; - } - - /** - * Find the _Fields constant that matches name, or null if its not found. - */ - public static _Fields findByName(String name) { - return byName.get(name); - } - - private final short _thriftId; - private final String _fieldName; - - _Fields(short thriftId, String fieldName) { - _thriftId = thriftId; - _fieldName = fieldName; - } - - public short getThriftFieldId() { - return _thriftId; - } - - public String getFieldName() { - return _fieldName; - } - } - - // isset id assignments - public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; - static { - Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.WORKFLOW_NODE_ID, new org.apache.thrift.meta_data.FieldMetaData("workflowNodeId", org.apache.thrift.TFieldRequirementType.REQUIRED, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); - tmpMap.put(_Fields.EXPERIMENT_ID, new org.apache.thrift.meta_data.FieldMetaData("experimentId", org.apache.thrift.TFieldRequirementType.REQUIRED, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); - metaDataMap = Collections.unmodifiableMap(tmpMap); - org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(WorkflowIdentity.class, metaDataMap); - } - - public WorkflowIdentity() { - } - - public WorkflowIdentity( - String workflowNodeId, - String experimentId) - { - this(); - this.workflowNodeId = workflowNodeId; - this.experimentId = experimentId; - } - - /** - * Performs a deep copy on <i>other</i>. - */ - public WorkflowIdentity(WorkflowIdentity other) { - if (other.isSetWorkflowNodeId()) { - this.workflowNodeId = other.workflowNodeId; - } - if (other.isSetExperimentId()) { - this.experimentId = other.experimentId; - } - } - - public WorkflowIdentity deepCopy() { - return new WorkflowIdentity(this); - } - - @Override - public void clear() { - this.workflowNodeId = null; - this.experimentId = null; - } - - public String getWorkflowNodeId() { - return this.workflowNodeId; - } - - public void setWorkflowNodeId(String workflowNodeId) { - this.workflowNodeId = workflowNodeId; - } - - public void unsetWorkflowNodeId() { - this.workflowNodeId = null; - } - - /** Returns true if field workflowNodeId is set (has been assigned a value) and false otherwise */ - public boolean isSetWorkflowNodeId() { - return this.workflowNodeId != null; - } - - public void setWorkflowNodeIdIsSet(boolean value) { - if (!value) { - this.workflowNodeId = null; - } - } - - public String getExperimentId() { - return this.experimentId; - } - - public void setExperimentId(String experimentId) { - this.experimentId = experimentId; - } - - public void unsetExperimentId() { - this.experimentId = null; - } - - /** Returns true if field experimentId is set (has been assigned a value) and false otherwise */ - public boolean isSetExperimentId() { - return this.experimentId != null; - } - - public void setExperimentIdIsSet(boolean value) { - if (!value) { - this.experimentId = null; - } - } - - public void setFieldValue(_Fields field, Object value) { - switch (field) { - case WORKFLOW_NODE_ID: - if (value == null) { - unsetWorkflowNodeId(); - } else { - setWorkflowNodeId((String)value); - } - break; - - case EXPERIMENT_ID: - if (value == null) { - unsetExperimentId(); - } else { - setExperimentId((String)value); - } - break; - - } - } - - public Object getFieldValue(_Fields field) { - switch (field) { - case WORKFLOW_NODE_ID: - return getWorkflowNodeId(); - - case EXPERIMENT_ID: - return getExperimentId(); - - } - throw new IllegalStateException(); - } - - /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ - public boolean isSet(_Fields field) { - if (field == null) { - throw new IllegalArgumentException(); - } - - switch (field) { - case WORKFLOW_NODE_ID: - return isSetWorkflowNodeId(); - case EXPERIMENT_ID: - return isSetExperimentId(); - } - throw new IllegalStateException(); - } - - @Override - public boolean equals(Object that) { - if (that == null) - return false; - if (that instanceof WorkflowIdentity) - return this.equals((WorkflowIdentity)that); - return false; - } - - public boolean equals(WorkflowIdentity that) { - if (that == null) - return false; - - boolean this_present_workflowNodeId = true && this.isSetWorkflowNodeId(); - boolean that_present_workflowNodeId = true && that.isSetWorkflowNodeId(); - if (this_present_workflowNodeId || that_present_workflowNodeId) { - if (!(this_present_workflowNodeId && that_present_workflowNodeId)) - return false; - if (!this.workflowNodeId.equals(that.workflowNodeId)) - return false; - } - - boolean this_present_experimentId = true && this.isSetExperimentId(); - boolean that_present_experimentId = true && that.isSetExperimentId(); - if (this_present_experimentId || that_present_experimentId) { - if (!(this_present_experimentId && that_present_experimentId)) - return false; - if (!this.experimentId.equals(that.experimentId)) - return false; - } - - return true; - } - - @Override - public int hashCode() { - return 0; - } - - @Override - public int compareTo(WorkflowIdentity other) { - if (!getClass().equals(other.getClass())) { - return getClass().getName().compareTo(other.getClass().getName()); - } - - int lastComparison = 0; - - lastComparison = Boolean.valueOf(isSetWorkflowNodeId()).compareTo(other.isSetWorkflowNodeId()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetWorkflowNodeId()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.workflowNodeId, other.workflowNodeId); - if (lastComparison != 0) { - return lastComparison; - } - } - lastComparison = Boolean.valueOf(isSetExperimentId()).compareTo(other.isSetExperimentId()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetExperimentId()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.experimentId, other.experimentId); - if (lastComparison != 0) { - return lastComparison; - } - } - return 0; - } - - public _Fields fieldForId(int fieldId) { - return _Fields.findByThriftId(fieldId); - } - - public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { - schemes.get(iprot.getScheme()).getScheme().read(iprot, this); - } - - public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { - schemes.get(oprot.getScheme()).getScheme().write(oprot, this); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder("WorkflowIdentity("); - boolean first = true; - - sb.append("workflowNodeId:"); - if (this.workflowNodeId == null) { - sb.append("null"); - } else { - sb.append(this.workflowNodeId); - } - first = false; - if (!first) sb.append(", "); - sb.append("experimentId:"); - if (this.experimentId == null) { - sb.append("null"); - } else { - sb.append(this.experimentId); - } - first = false; - sb.append(")"); - return sb.toString(); - } - - public void validate() throws org.apache.thrift.TException { - // check for required fields - if (!isSetWorkflowNodeId()) { - throw new org.apache.thrift.protocol.TProtocolException("Required field 'workflowNodeId' is unset! Struct:" + toString()); - } - - if (!isSetExperimentId()) { - throw new org.apache.thrift.protocol.TProtocolException("Required field 'experimentId' is unset! Struct:" + toString()); - } - - // check for sub-struct validity - } - - private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { - try { - write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { - try { - read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private static class WorkflowIdentityStandardSchemeFactory implements SchemeFactory { - public WorkflowIdentityStandardScheme getScheme() { - return new WorkflowIdentityStandardScheme(); - } - } - - private static class WorkflowIdentityStandardScheme extends StandardScheme<WorkflowIdentity> { - - public void read(org.apache.thrift.protocol.TProtocol iprot, WorkflowIdentity struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TField schemeField; - iprot.readStructBegin(); - while (true) - { - schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { - break; - } - switch (schemeField.id) { - case 1: // WORKFLOW_NODE_ID - if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { - struct.workflowNodeId = iprot.readString(); - struct.setWorkflowNodeIdIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 2: // EXPERIMENT_ID - if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { - struct.experimentId = iprot.readString(); - struct.setExperimentIdIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - default: - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - iprot.readFieldEnd(); - } - iprot.readStructEnd(); - struct.validate(); - } - - public void write(org.apache.thrift.protocol.TProtocol oprot, WorkflowIdentity struct) throws org.apache.thrift.TException { - struct.validate(); - - oprot.writeStructBegin(STRUCT_DESC); - if (struct.workflowNodeId != null) { - oprot.writeFieldBegin(WORKFLOW_NODE_ID_FIELD_DESC); - oprot.writeString(struct.workflowNodeId); - oprot.writeFieldEnd(); - } - if (struct.experimentId != null) { - oprot.writeFieldBegin(EXPERIMENT_ID_FIELD_DESC); - oprot.writeString(struct.experimentId); - oprot.writeFieldEnd(); - } - oprot.writeFieldStop(); - oprot.writeStructEnd(); - } - - } - - private static class WorkflowIdentityTupleSchemeFactory implements SchemeFactory { - public WorkflowIdentityTupleScheme getScheme() { - return new WorkflowIdentityTupleScheme(); - } - } - - private static class WorkflowIdentityTupleScheme extends TupleScheme<WorkflowIdentity> { - - @Override - public void write(org.apache.thrift.protocol.TProtocol prot, WorkflowIdentity struct) throws org.apache.thrift.TException { - TTupleProtocol oprot = (TTupleProtocol) prot; - oprot.writeString(struct.workflowNodeId); - oprot.writeString(struct.experimentId); - } - - @Override - public void read(org.apache.thrift.protocol.TProtocol prot, WorkflowIdentity struct) throws org.apache.thrift.TException { - TTupleProtocol iprot = (TTupleProtocol) prot; - struct.workflowNodeId = iprot.readString(); - struct.setWorkflowNodeIdIsSet(true); - struct.experimentId = iprot.readString(); - struct.setExperimentIdIsSet(true); - } - } - -} -
http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/WorkflowNodeStatusChangeEvent.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/WorkflowNodeStatusChangeEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/WorkflowNodeStatusChangeEvent.java index 305ff19..519d8cc 100644 --- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/WorkflowNodeStatusChangeEvent.java +++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/WorkflowNodeStatusChangeEvent.java @@ -62,7 +62,7 @@ import org.slf4j.LoggerFactory; } private org.apache.airavata.model.workspace.experiment.WorkflowNodeState state; // required - private WorkflowIdentity workflowNodeIdentity; // required + private WorkflowIdentifier workflowNodeIdentity; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -136,7 +136,7 @@ import org.slf4j.LoggerFactory; tmpMap.put(_Fields.STATE, new org.apache.thrift.meta_data.FieldMetaData("state", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, org.apache.airavata.model.workspace.experiment.WorkflowNodeState.class))); tmpMap.put(_Fields.WORKFLOW_NODE_IDENTITY, new org.apache.thrift.meta_data.FieldMetaData("workflowNodeIdentity", org.apache.thrift.TFieldRequirementType.REQUIRED, - new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, WorkflowIdentity.class))); + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, WorkflowIdentifier.class))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(WorkflowNodeStatusChangeEvent.class, metaDataMap); } @@ -146,7 +146,7 @@ import org.slf4j.LoggerFactory; public WorkflowNodeStatusChangeEvent( org.apache.airavata.model.workspace.experiment.WorkflowNodeState state, - WorkflowIdentity workflowNodeIdentity) + WorkflowIdentifier workflowNodeIdentity) { this(); this.state = state; @@ -161,7 +161,7 @@ import org.slf4j.LoggerFactory; this.state = other.state; } if (other.isSetWorkflowNodeIdentity()) { - this.workflowNodeIdentity = new WorkflowIdentity(other.workflowNodeIdentity); + this.workflowNodeIdentity = new WorkflowIdentifier(other.workflowNodeIdentity); } } @@ -206,11 +206,11 @@ import org.slf4j.LoggerFactory; } } - public WorkflowIdentity getWorkflowNodeIdentity() { + public WorkflowIdentifier getWorkflowNodeIdentity() { return this.workflowNodeIdentity; } - public void setWorkflowNodeIdentity(WorkflowIdentity workflowNodeIdentity) { + public void setWorkflowNodeIdentity(WorkflowIdentifier workflowNodeIdentity) { this.workflowNodeIdentity = workflowNodeIdentity; } @@ -243,7 +243,7 @@ import org.slf4j.LoggerFactory; if (value == null) { unsetWorkflowNodeIdentity(); } else { - setWorkflowNodeIdentity((WorkflowIdentity)value); + setWorkflowNodeIdentity((WorkflowIdentifier)value); } break; @@ -443,7 +443,7 @@ import org.slf4j.LoggerFactory; break; case 2: // WORKFLOW_NODE_IDENTITY if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { - struct.workflowNodeIdentity = new WorkflowIdentity(); + struct.workflowNodeIdentity = new WorkflowIdentifier(); struct.workflowNodeIdentity.read(iprot); struct.setWorkflowNodeIdentityIsSet(true); } else { @@ -499,7 +499,7 @@ import org.slf4j.LoggerFactory; TTupleProtocol iprot = (TTupleProtocol) prot; struct.state = org.apache.airavata.model.workspace.experiment.WorkflowNodeState.findByValue(iprot.readI32()); struct.setStateIsSet(true); - struct.workflowNodeIdentity = new WorkflowIdentity(); + struct.workflowNodeIdentity = new WorkflowIdentifier(); struct.workflowNodeIdentity.read(iprot); struct.setWorkflowNodeIdentityIsSet(true); } http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/airavata-api/thrift-interface-descriptions/messagingEvents.thrift ---------------------------------------------------------------------- diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift index 8eaddcb..aceca83 100644 --- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift +++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift @@ -45,17 +45,17 @@ struct ExperimentStatusChangeEvent { 2: required string experimentId; } -struct WorkflowIdentity { +struct WorkflowIdentifier { 1: required string workflowNodeId; 2: required string experimentId; } struct WorkflowNodeStatusChangeEvent { 1: required experimentModel.WorkflowNodeState state; - 2: required WorkflowIdentity workflowNodeIdentity; + 2: required WorkflowIdentifier workflowNodeIdentity; } -struct TaskIdentity { +struct TaskIdentifier { 1: required string taskId; 2: required string workflowNodeId; 3: required string experimentId; @@ -63,15 +63,20 @@ struct TaskIdentity { struct TaskStatusChangeEvent { 1: required experimentModel.TaskState state; - 2: required TaskIdentity taskIdentity; + 2: required TaskIdentifier taskIdentity; +} + +struct TaskStatusChangeRequestEvent { + 1: required experimentModel.TaskState state; + 2: required TaskIdentifier taskIdentity; } struct TaskOutputChangeEvent { 1: required list<experimentModel.DataObjectType> output; - 2: required TaskIdentity taskIdentity; + 2: required TaskIdentifier taskIdentity; } -struct JobIdentity { +struct JobIdentifier { 1: required string jobId; 2: required string taskId; 3: required string workflowNodeId; @@ -92,8 +97,12 @@ struct JobIdentity { struct JobStatusChangeEvent { 1: required experimentModel.JobState state; - 2: required JobIdentity jobIdentity; -// 3: required JobMonitor jobMonitor; + 2: required JobIdentifier jobIdentity; +} + +struct JobStatusChangeRequestEvent { + 1: required experimentModel.JobState state; + 2: required JobIdentifier jobIdentity; } struct Message { http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties index 9e54f9f..d303dbd 100644 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@ -190,7 +190,7 @@ connection.name=xsede activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher publish.rabbitmq=false activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher -rabbitmq.broker.url=http://localhost +rabbitmq.broker.url=amqp://localhost:5672 rabbitmq.exchange.name=airavata_rabbitmq_exchange ###---------------------------Orchestrator module Configurations---------------------------### http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 75f5694..c70c8a8 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -57,7 +57,9 @@ import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; import org.apache.airavata.model.appcatalog.computeresource.*; import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; +import org.apache.airavata.model.messaging.event.JobIdentifier; import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.TaskIdentifier; import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; import org.apache.airavata.model.workspace.experiment.*; import org.apache.airavata.registry.api.AiravataRegistry2; @@ -128,8 +130,11 @@ public class BetterGfacImpl implements GFac { try { String[] listenerClassList = ServerSettings.getActivityListeners(); String activityPublisher = ServerSettings.getActivityPublisher(); - Class<? extends Publisher> aPublisher = Class.forName(activityPublisher).asSubclass(Publisher.class); - Publisher rabbitMQPublisher = aPublisher.newInstance(); + Publisher rabbitMQPublisher = null; + if (ServerSettings.isRabbitMqPublishEnabled()){ + Class<? extends Publisher> aPublisher = Class.forName(activityPublisher).asSubclass(Publisher.class); + rabbitMQPublisher = aPublisher.newInstance(); + } for (String listenerClass : listenerClassList) { Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class); AbstractActivityListener abstractActivityListener = aClass.newInstance(); @@ -556,7 +561,7 @@ public class BetterGfacImpl implements GFac { // jobExecutionContext.getTaskData().getTaskID()), // TaskState.FAILED // )); - org.apache.airavata.model.messaging.event.JobIdentity jobIdentity = new org.apache.airavata.model.messaging.event.JobIdentity( + JobIdentifier jobIdentity = new JobIdentifier( jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID()); @@ -567,7 +572,7 @@ public class BetterGfacImpl implements GFac { // monitorPublisher // .publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED)); // Updating the task status if there's any task associated - org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(jobExecutionContext.getTaskData().getTaskID(), + TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID()); monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity)); @@ -618,7 +623,7 @@ public class BetterGfacImpl implements GFac { // jobExecutionContext.getTaskData().getTaskID()), // TaskState.FAILED // )); - org.apache.airavata.model.messaging.event.JobIdentity jobIdentity = new org.apache.airavata.model.messaging.event.JobIdentity( + JobIdentifier jobIdentity = new JobIdentifier( jobExecutionContext.getJobDetails().getJobID(),jobExecutionContext.getTaskData().getTaskID(),jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID()); monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity)); @@ -627,7 +632,7 @@ public class BetterGfacImpl implements GFac { + "NullPointerException occurred because at this point there might not have Job Created", e1, e); //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED)); // Updating the task status if there's any task associated - org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(jobExecutionContext.getTaskData().getTaskID(), + TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID()); monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity)); @@ -805,7 +810,7 @@ public class BetterGfacImpl implements GFac { // ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), // ExperimentState.COMPLETED)); // Updating the task status if there's any task associated - org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(jobExecutionContext.getTaskData().getTaskID(), + TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID()); monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity)); @@ -934,7 +939,7 @@ public class BetterGfacImpl implements GFac { // ExperimentState.COMPLETED)); // Updating the task status if there's any task associated - org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(jobExecutionContext.getTaskData().getTaskID(), + TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID()); monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity)); http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java index 4a58f68..9062df9 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java @@ -57,9 +57,9 @@ import org.apache.airavata.gfac.core.provider.GFacProvider; import org.apache.airavata.gfac.core.scheduler.HostScheduler; import org.apache.airavata.gfac.core.states.GfacExperimentState; import org.apache.airavata.gfac.core.utils.GFacUtils; -import org.apache.airavata.model.messaging.event.JobIdentity; +import org.apache.airavata.model.messaging.event.JobIdentifier; import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; -import org.apache.airavata.model.messaging.event.TaskIdentity; +import org.apache.airavata.model.messaging.event.TaskIdentifier; import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; import org.apache.airavata.model.workspace.experiment.DataObjectType; import org.apache.airavata.model.workspace.experiment.Experiment; @@ -326,7 +326,7 @@ public class GFacImpl implements GFac { } } catch (Exception e) { try { - JobIdentity jobIdentity = new JobIdentity(jobExecutionContext.getJobDetails().getJobID(), + JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID()); @@ -338,7 +338,7 @@ public class GFacImpl implements GFac { // monitorPublisher // .publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED)); // Updating the task status if there's any task associated - TaskIdentity taskIdentity = new TaskIdentity(jobExecutionContext.getTaskData().getTaskID(), + TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID()); monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity)); @@ -458,7 +458,7 @@ public class GFacImpl implements GFac { // ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), // ExperimentState.COMPLETED)); // Updating the task status if there's any task associated - TaskIdentity taskIdentity = new TaskIdentity(jobExecutionContext.getTaskData().getTaskID(), + TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID()); monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity)); http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java index c31aee3..784973e 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java @@ -27,7 +27,9 @@ import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.messaging.core.impl.RabbitMQPublisher; import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent; import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobState; @@ -57,7 +59,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { @Subscribe - public void updateRegistry(JobStatusChangeEvent jobStatus) throws Exception{ + public void updateRegistry(JobStatusChangeRequestEvent jobStatus) throws Exception{ /* Here we need to parse the jobStatus message and update the registry accordingly, for now we are just printing to standard Out */ @@ -68,10 +70,10 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { String jobID = jobStatus.getJobIdentity().getJobId(); updateJobStatus(taskID, jobID, state); logger.debug("Publishing job status for "+jobStatus.getJobIdentity().getJobId()+":"+state.toString()); - monitorPublisher.publish(new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity())); - JobStatusChangeEvent changeEvent = new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()); + JobStatusChangeEvent event = new JobStatusChangeEvent(jobStatus.getState(), jobStatus.getJobIdentity()); + monitorPublisher.publish(event); String messageId = AiravataUtils.getId("JOB"); - MessageContext msgCntxt = new MessageContext(changeEvent, MessageType.JOB, messageId); + MessageContext msgCntxt = new MessageContext(event, MessageType.JOB, messageId); msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); if ( ServerSettings.isRabbitMqPublishEnabled()){ publisher.publish(msgCntxt); @@ -84,7 +86,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { } public void updateJobStatus(String taskId, String jobID, JobState state) throws Exception { - logger.debug("Updating job status for "+jobID+":"+state.toString()); + logger.info("Updating job status for " + jobID + ":" + state.toString()); CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID); JobDetails details = (JobDetails)airavataRegistry.get(RegistryModelType.JOB_DETAIL, ids); if(details == null) { http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java index c9a9b03..4e44372 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java @@ -27,10 +27,7 @@ import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.Publisher; -import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; -import org.apache.airavata.model.messaging.event.MessageType; -import org.apache.airavata.model.messaging.event.TaskIdentity; -import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; +import org.apache.airavata.model.messaging.event.*; import org.apache.airavata.model.workspace.experiment.TaskDetails; import org.apache.airavata.model.workspace.experiment.TaskState; import org.apache.airavata.registry.cpi.Registry; @@ -53,17 +50,26 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { public void setAiravataRegistry(Registry airavataRegistry) { this.airavataRegistry = airavataRegistry; } -// -// @Subscribe -// public void setupTaskStatus(TaskStatusChangeRequest taskStatus){ -// try { -// updateTaskStatus(taskStatus.getIdentity().getTaskId(), taskStatus.getState()); -// logger.debug("Publishing task status for "+taskStatus.getIdentity().getTaskId()+":"+taskStatus.getState().toString()); -// monitorPublisher.publish(new TaskStatusChangedEvent(taskStatus.getIdentity(),taskStatus.getState())); -// } catch (Exception e) { -// logger.error("Error persisting data" + e.getLocalizedMessage(), e); -// } -// } + + @Subscribe + public void setupTaskStatus(TaskStatusChangeRequestEvent taskStatus) throws Exception{ + try { + updateTaskStatus(taskStatus.getTaskIdentity().getTaskId(), taskStatus.getState()); + logger.debug("Publishing task status for "+taskStatus.getTaskIdentity().getTaskId()+":"+taskStatus.getState().toString()); + TaskStatusChangeEvent event = new TaskStatusChangeEvent(taskStatus.getState(), taskStatus.getTaskIdentity()); + monitorPublisher.publish(event); + String messageId = AiravataUtils.getId("TASK"); + MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId); + msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + if ( ServerSettings.isRabbitMqPublishEnabled()){ + publisher.publish(msgCntxt); + } + } catch (Exception e) { + String msg = "Error persisting data task status to database..."; + logger.error(msg + e.getLocalizedMessage(), e); + throw new Exception(msg, e); + } + } @Subscribe public void setupTaskStatus(JobStatusChangeEvent jobStatus) throws Exception{ @@ -93,13 +99,13 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { try { updateTaskStatus(jobStatus.getJobIdentity().getTaskId(), state); logger.debug("Publishing task status for "+jobStatus.getJobIdentity().getTaskId()+":"+state.toString()); - TaskIdentity taskIdentity = new TaskIdentity(jobStatus.getJobIdentity().getTaskId(), + TaskIdentifier taskIdentity = new TaskIdentifier(jobStatus.getJobIdentity().getTaskId(), jobStatus.getJobIdentity().getWorkflowNodeId(), jobStatus.getJobIdentity().getExperimentId()); - monitorPublisher.publish(new TaskStatusChangeEvent(state, taskIdentity)); - TaskStatusChangeEvent changeEvent = new TaskStatusChangeEvent(state, taskIdentity); + TaskStatusChangeEvent event = new TaskStatusChangeEvent(state, taskIdentity); + monitorPublisher.publish(event); String messageId = AiravataUtils.getId("TASK"); - MessageContext msgCntxt = new MessageContext(changeEvent, MessageType.TASK, messageId); + MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId); msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); if ( ServerSettings.isRabbitMqPublishEnabled()){ publisher.publish(msgCntxt); @@ -112,7 +118,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { } public void updateTaskStatus(String taskId, TaskState state) throws Exception { - logger.debug("Updating task status for "+taskId+":"+state.toString()); + logger.info("Updating task status for " + taskId + ":" + state.toString()); TaskDetails details = (TaskDetails)airavataRegistry.get(RegistryModelType.TASK_DETAIL, taskId); if(details == null) { details = new TaskDetails(); http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java index 5632ef9..2ba08e1 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java @@ -29,7 +29,7 @@ import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; -import org.apache.airavata.model.messaging.event.WorkflowIdentity; +import org.apache.airavata.model.messaging.event.WorkflowIdentifier; import org.apache.airavata.model.messaging.event.WorkflowNodeStatusChangeEvent; import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; @@ -81,11 +81,11 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen try { updateWorkflowNodeStatus(taskStatus.getTaskIdentity().getWorkflowNodeId(), state); logger.debug("Publishing workflow node status for "+taskStatus.getTaskIdentity().getWorkflowNodeId()+":"+state.toString()); - WorkflowIdentity workflowIdentity = new WorkflowIdentity(taskStatus.getTaskIdentity().getWorkflowNodeId(), taskStatus.getTaskIdentity().getExperimentId()); - monitorPublisher.publish(new WorkflowNodeStatusChangeEvent(state, workflowIdentity)); - WorkflowNodeStatusChangeEvent changeEvent = new WorkflowNodeStatusChangeEvent(state, workflowIdentity); + WorkflowIdentifier workflowIdentity = new WorkflowIdentifier(taskStatus.getTaskIdentity().getWorkflowNodeId(), taskStatus.getTaskIdentity().getExperimentId()); + WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent(state, workflowIdentity); + monitorPublisher.publish(event); String messageId = AiravataUtils.getId("WFNODE"); - MessageContext msgCntxt = new MessageContext(changeEvent, MessageType.WORKFLOWNODE, messageId); + MessageContext msgCntxt = new MessageContext(event, MessageType.WORKFLOWNODE, messageId); msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); if ( ServerSettings.isRabbitMqPublishEnabled()){ @@ -98,7 +98,7 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen } public void updateWorkflowNodeStatus(String workflowNodeId, WorkflowNodeState state) throws Exception { - logger.debug("Updating workflow node status for "+workflowNodeId+":"+state.toString()); + logger.info("Updating workflow node status for "+workflowNodeId+":"+state.toString()); WorkflowNodeDetails details = (WorkflowNodeDetails)airavataRegistry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeId); if(details == null) { details = new WorkflowNodeDetails(); http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java index d5f043f..dd1d9d8 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/ExperimentIdentity.java @@ -1,36 +1,36 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ - -package org.apache.airavata.gfac.core.monitor; - -public class ExperimentIdentity { - private String experimentID; - public ExperimentIdentity(String experimentId) { - setExperimentID(experimentId); - } - public String getExperimentID() { - return experimentID; - } - - public void setExperimentID(String experimentID) { - this.experimentID = experimentID; - } -} +///* +//* +//* Licensed to the Apache Software Foundation (ASF) under one +//* or more contributor license agreements. See the NOTICE file +//* distributed with this work for additional information +//* regarding copyright ownership. The ASF licenses this file +//* to you under the Apache License, Version 2.0 (the +//* "License"); you may not use this file except in compliance +//* with the License. You may obtain a copy of the License at +//* +//* http://www.apache.org/licenses/LICENSE-2.0 +//* +//* Unless required by applicable law or agreed to in writing, +//* software distributed under the License is distributed on an +//* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//* KIND, either express or implied. See the License for the +//* specific language governing permissions and limitations +//* under the License. +//* +//*/ +// +//package org.apache.airavata.gfac.core.monitor; +// +//public class ExperimentIdentity { +// private String experimentID; +// public ExperimentIdentity(String experimentId) { +// setExperimentID(experimentId); +// } +// public String getExperimentID() { +// return experimentID; +// } +// +// public void setExperimentID(String experimentID) { +// this.experimentID = experimentID; +// } +//} http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java index 304e3eb..881dacd 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/JobIdentity.java @@ -1,39 +1,39 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ - -package org.apache.airavata.gfac.core.monitor; - -public class JobIdentity extends TaskIdentity { - private String jobId; - - public JobIdentity(String experimentId, String workflowNodeId, String taskId, String jobId) { - super(experimentId,workflowNodeId,taskId); - setJobId(jobId); - } - - public String getJobId() { - return jobId; - } - - public void setJobId(String jobId) { - this.jobId = jobId; - } -} +///* +//* +//* Licensed to the Apache Software Foundation (ASF) under one +//* or more contributor license agreements. See the NOTICE file +//* distributed with this work for additional information +//* regarding copyright ownership. The ASF licenses this file +//* to you under the Apache License, Version 2.0 (the +//* "License"); you may not use this file except in compliance +//* with the License. You may obtain a copy of the License at +//* +//* http://www.apache.org/licenses/LICENSE-2.0 +//* +//* Unless required by applicable law or agreed to in writing, +//* software distributed under the License is distributed on an +//* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//* KIND, either express or implied. See the License for the +//* specific language governing permissions and limitations +//* under the License. +//* +//*/ +// +//package org.apache.airavata.gfac.core.monitor; +// +//public class JobIdentity extends TaskIdentity { +// private String jobId; +// +// public JobIdentity(String experimentId, String workflowNodeId, String taskId, String jobId) { +// super(experimentId,workflowNodeId,taskId); +// setJobId(jobId); +// } +// +// public String getJobId() { +// return jobId; +// } +// +// public void setJobId(String jobId) { +// this.jobId = jobId; +// } +//} http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java index db03348..369b7a0 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/TaskIdentity.java @@ -1,38 +1,38 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ - -package org.apache.airavata.gfac.core.monitor; - -public class TaskIdentity extends WorkflowNodeIdentity { - private String taskId; - - public TaskIdentity(String experimentId, String workflowNodeId, String taskId) { - super(experimentId,workflowNodeId); - setTaskId(taskId); - } - public String getTaskId() { - return taskId; - } - - public void setTaskId(String taskId) { - this.taskId = taskId; - } -} +///* +//* +//* Licensed to the Apache Software Foundation (ASF) under one +//* or more contributor license agreements. See the NOTICE file +//* distributed with this work for additional information +//* regarding copyright ownership. The ASF licenses this file +//* to you under the Apache License, Version 2.0 (the +//* "License"); you may not use this file except in compliance +//* with the License. You may obtain a copy of the License at +//* +//* http://www.apache.org/licenses/LICENSE-2.0 +//* +//* Unless required by applicable law or agreed to in writing, +//* software distributed under the License is distributed on an +//* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//* KIND, either express or implied. See the License for the +//* specific language governing permissions and limitations +//* under the License. +//* +//*/ +// +//package org.apache.airavata.gfac.core.monitor; +// +//public class TaskIdentity extends WorkflowNodeIdentity { +// private String taskId; +// +// public TaskIdentity(String experimentId, String workflowNodeId, String taskId) { +// super(experimentId,workflowNodeId); +// setTaskId(taskId); +// } +// public String getTaskId() { +// return taskId; +// } +// +// public void setTaskId(String taskId) { +// this.taskId = taskId; +// } +//} http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java index e15f733..ba6f828 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/WorkflowNodeIdentity.java @@ -1,37 +1,37 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ - -package org.apache.airavata.gfac.core.monitor; - -public class WorkflowNodeIdentity extends ExperimentIdentity { - private String workflowNodeID; - public WorkflowNodeIdentity(String experimentId, String workflowNodeId) { - super(experimentId); - setWorkflowNodeID(workflowNodeId); - } - public String getWorkflowNodeID() { - return workflowNodeID; - } - - public void setWorkflowNodeID(String workflowNodeID) { - this.workflowNodeID = workflowNodeID; - } -} +///* +//* +//* Licensed to the Apache Software Foundation (ASF) under one +//* or more contributor license agreements. See the NOTICE file +//* distributed with this work for additional information +//* regarding copyright ownership. The ASF licenses this file +//* to you under the Apache License, Version 2.0 (the +//* "License"); you may not use this file except in compliance +//* with the License. You may obtain a copy of the License at +//* +//* http://www.apache.org/licenses/LICENSE-2.0 +//* +//* Unless required by applicable law or agreed to in writing, +//* software distributed under the License is distributed on an +//* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//* KIND, either express or implied. See the License for the +//* specific language governing permissions and limitations +//* under the License. +//* +//*/ +// +//package org.apache.airavata.gfac.core.monitor; +// +//public class WorkflowNodeIdentity extends ExperimentIdentity { +// private String workflowNodeID; +// public WorkflowNodeIdentity(String experimentId, String workflowNodeId) { +// super(experimentId); +// setWorkflowNodeID(workflowNodeId); +// } +// public String getWorkflowNodeID() { +// return workflowNodeID; +// } +// +// public void setWorkflowNodeID(String workflowNodeID) { +// this.workflowNodeID = workflowNodeID; +// } +//} http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java index 704bf26..386424e 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java @@ -20,25 +20,25 @@ */ package org.apache.airavata.gfac.core.monitor.state; -import org.apache.airavata.gfac.core.monitor.JobIdentity; import org.apache.airavata.gfac.core.monitor.MonitorID; import org.apache.airavata.gfac.core.states.GfacExperimentState; +import org.apache.airavata.model.messaging.event.JobIdentifier; public class GfacExperimentStateChangeRequest { private GfacExperimentState state; - private JobIdentity identity; + private JobIdentifier identity; private MonitorID monitorID; public GfacExperimentStateChangeRequest(MonitorID monitorID, GfacExperimentState state) { - setIdentity(new JobIdentity(monitorID.getExperimentID(), monitorID.getWorkflowNodeID(), + setIdentity(new JobIdentifier(monitorID.getExperimentID(), monitorID.getWorkflowNodeID(), monitorID.getTaskID(), monitorID.getJobID())); setMonitorID(monitorID); this.state = state; } - public GfacExperimentStateChangeRequest(MonitorID monitorID, JobIdentity jobId, GfacExperimentState state) { + public GfacExperimentStateChangeRequest(MonitorID monitorID, JobIdentifier jobId, GfacExperimentState state) { setIdentity(jobId); setMonitorID(monitorID); this.state = state; @@ -53,11 +53,11 @@ public class GfacExperimentStateChangeRequest { this.state = state; } - public JobIdentity getIdentity() { + public JobIdentifier getIdentity() { return identity; } - public void setIdentity(JobIdentity identity) { + public void setIdentity(JobIdentifier identity) { this.identity = identity; } http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java index 5c86560..c48a0ea 100644 --- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java +++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java @@ -37,9 +37,9 @@ import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.core.utils.OutputUtils; import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter; import org.apache.airavata.gfac.local.utils.InputUtils; -import org.apache.airavata.model.messaging.event.JobIdentity; +import org.apache.airavata.model.messaging.event.JobIdentifier; import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; -import org.apache.airavata.model.messaging.event.TaskIdentity; +import org.apache.airavata.model.messaging.event.TaskIdentifier; import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; import org.apache.airavata.model.workspace.experiment.DataObjectType; import org.apache.airavata.model.workspace.experiment.JobDetails; @@ -174,7 +174,7 @@ public class LocalProvider extends AbstractProvider { // updating the job status to complete because there's nothing to monitor in local jobs // MonitorID monitorID = createMonitorID(jobExecutionContext); - JobIdentity jobIdentity = new JobIdentity(jobExecutionContext.getJobDetails().getJobID(), + JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID()); @@ -232,7 +232,7 @@ public class LocalProvider extends AbstractProvider { registry.update(RegistryModelType.TASK_DETAIL, taskDetails, taskDetails.getTaskID()); } registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID()); - TaskIdentity taskIdentity = new TaskIdentity(jobExecutionContext.getTaskData().getTaskID(), + TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID()); getMonitorPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity)); http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 591bca6..2fa5f62 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -20,16 +20,7 @@ */ package org.apache.airavata.gfac.monitor.impl.pull.qstat; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; - +import com.google.common.eventbus.EventBus; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.commons.gfac.type.HostDescription; @@ -43,8 +34,9 @@ import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; import org.apache.airavata.gfac.monitor.util.CommonUtils; import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; -import org.apache.airavata.model.messaging.event.JobIdentity; -import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.JobIdentifier; +import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent; +import org.apache.airavata.model.messaging.event.TaskIdentifier; import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.airavata.model.workspace.experiment.TaskState; @@ -53,7 +45,10 @@ import org.apache.airavata.schemas.gfac.SSHHostType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.eventbus.EventBus; +import java.sql.Timestamp; +import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; /** * This monitor is based on qstat command which can be run @@ -139,7 +134,7 @@ public class HPCPullMonitor extends PullMonitor { //todo this polling will not work with multiple usernames but with single user // and multiple hosts, currently monitoring will work UserMonitorData take = null; - JobStatusChangeEvent jobStatus = new JobStatusChangeEvent(); + JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent(); MonitorID currentMonitorID = null; HostDescription currentHostDescription = null; try { @@ -164,7 +159,7 @@ public class HPCPullMonitor extends PullMonitor { for (MonitorID iMonitorID : monitorID) { currentMonitorID = iMonitorID; iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic - jobStatus.setJobIdentity(new JobIdentity(iMonitorID.getJobID(), iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), iMonitorID.getExperimentID())); + jobStatus.setJobIdentity(new JobIdentifier(iMonitorID.getJobID(), iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), iMonitorID.getExperimentID())); jobStatus.setState(iMonitorID.getStatus()); // we have this JobStatus class to handle amqp monitoring @@ -178,7 +173,7 @@ public class HPCPullMonitor extends PullMonitor { try { gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext()); } catch (GFacException e) { - org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), + TaskIdentifier taskIdentity = new TaskIdentifier(iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), iMonitorID.getExperimentID()); publisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity)); //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status @@ -195,7 +190,7 @@ public class HPCPullMonitor extends PullMonitor { logger.error("Launching outflow handlers to check output are genereated or not"); gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext()); } catch (GFacException e) { - org.apache.airavata.model.messaging.event.TaskIdentity taskIdentity = new org.apache.airavata.model.messaging.event.TaskIdentity(iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), + TaskIdentifier taskIdentity = new TaskIdentifier(iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), iMonitorID.getExperimentID()); publisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity)); logger.info(e.getLocalizedMessage(), e); http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java index 72338fe..9422d05 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java @@ -31,12 +31,12 @@ import java.util.concurrent.BlockingQueue; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.commons.gfac.type.HostDescription; -import org.apache.airavata.gfac.core.monitor.JobIdentity; import org.apache.airavata.gfac.core.monitor.MonitorID; import org.apache.airavata.gfac.monitor.core.PushMonitor; import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil; import org.apache.airavata.gfac.monitor.util.CommonUtils; +import org.apache.airavata.model.messaging.event.JobIdentifier; import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; import org.apache.airavata.model.workspace.experiment.JobState; import org.slf4j.Logger; @@ -202,7 +202,7 @@ public class AMQPMonitor extends PushMonitor { } } next.setStatus(monitorID.getStatus()); - org.apache.airavata.model.messaging.event.JobIdentity jobIdentity = new org.apache.airavata.model.messaging.event.JobIdentity(next.getJobID(), + JobIdentifier jobIdentity = new JobIdentifier(next.getJobID(), next.getTaskID(), next.getWorkflowNodeID(), next.getExperimentID()); publisher.publish(new JobStatusChangeEvent(next.getStatus(),jobIdentity)); return true; http://git-wip-us.apache.org/repos/asf/airavata/blob/255dd9e3/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java index 490d33c..edbb28d 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java @@ -52,11 +52,12 @@ public class RabbitMQPublisher implements Publisher { throw new AiravataException(message, e); } rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName); + rabbitMQProducer.open(); } public void publish(MessageContext msgCtx) throws AiravataException { try { - rabbitMQProducer.open(); + log.info("Publishing status to rabbitmq..."); byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); Message message = new Message(); message.setEvent(body); @@ -73,11 +74,11 @@ public class RabbitMQPublisher implements Publisher { event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId(); }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){ WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent(); - WorkflowIdentity workflowNodeIdentity = event.getWorkflowNodeIdentity(); + WorkflowIdentifier workflowNodeIdentity = event.getWorkflowNodeIdentity(); routingKey = workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId(); }else if (msgCtx.getType().equals(MessageType.JOB)){ JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent(); - JobIdentity identity = event.getJobIdentity(); + JobIdentifier identity = event.getJobIdentity(); routingKey = identity.getExperimentId() + "." + identity.getWorkflowNodeId() + "." + identity.getTaskId() + "." +
