Repository: airavata Updated Branches: refs/heads/develop b9b2480c5 -> 01716e536
ApiServer publish experiment submit events Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4e3dc9a9 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4e3dc9a9 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4e3dc9a9 Branch: refs/heads/develop Commit: 4e3dc9a9e90e6043173a98d6f546b25ff7f2896a Parents: 63696ff Author: Shameera Rathnayaka <[email protected]> Authored: Wed Aug 10 18:38:23 2016 -0400 Committer: Shameera Rathnayaka <[email protected]> Committed: Wed Aug 10 18:38:23 2016 -0400 ---------------------------------------------------------------------- .../server/handler/AiravataServerHandler.java | 154 +++--- .../lib/airavata/messaging_events_types.cpp | 227 ++++++--- .../lib/airavata/messaging_events_types.h | 60 ++- .../Airavata/Model/Messaging/Event/Types.php | 124 ++++- .../airavata/model/messaging/event/ttypes.py | 121 ++++- .../messaging/event/ExperimentSubmitEvent.java | 507 +++++++++++++++++++ .../model/messaging/event/MessageType.java | 25 +- .../messaging/core/MessagingFactory.java | 20 +- .../messaging/core/impl/ExperimentConsumer.java | 74 ++- .../messaging/core/impl/ProcessConsumer.java | 6 +- .../core/impl/GFACPassiveJobSubmitter.java | 2 +- .../airavata-apis/messaging_events.thrift | 6 + 12 files changed, 1153 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java index ff0149e..cd21124 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java @@ -48,7 +48,14 @@ import org.apache.airavata.model.WorkflowModel; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationModule; import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; -import org.apache.airavata.model.appcatalog.computeresource.*; +import org.apache.airavata.model.appcatalog.computeresource.CloudJobSubmission; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; +import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; +import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; +import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission; import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile; import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference; @@ -57,16 +64,33 @@ import org.apache.airavata.model.application.io.InputDataObjectType; import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.commons.airavata_commonsConstants; import org.apache.airavata.model.data.movement.DMType; -import org.apache.airavata.model.data.movement.*; +import org.apache.airavata.model.data.movement.DataMovementInterface; +import org.apache.airavata.model.data.movement.DataMovementProtocol; +import org.apache.airavata.model.data.movement.GridFTPDataMovement; +import org.apache.airavata.model.data.movement.LOCALDataMovement; +import org.apache.airavata.model.data.movement.SCPDataMovement; +import org.apache.airavata.model.data.movement.UnicoreDataMovement; import org.apache.airavata.model.data.replica.DataProductModel; import org.apache.airavata.model.data.replica.DataReplicaLocationModel; -import org.apache.airavata.model.error.*; -import org.apache.airavata.model.experiment.*; +import org.apache.airavata.model.error.AiravataClientException; +import org.apache.airavata.model.error.AiravataErrorType; +import org.apache.airavata.model.error.AiravataSystemException; +import org.apache.airavata.model.error.AuthorizationException; +import org.apache.airavata.model.error.ExperimentNotFoundException; +import org.apache.airavata.model.error.InvalidRequestException; +import org.apache.airavata.model.error.ProjectNotFoundException; +import org.apache.airavata.model.experiment.ExperimentModel; +import org.apache.airavata.model.experiment.ExperimentSearchFields; +import org.apache.airavata.model.experiment.ExperimentStatistics; +import org.apache.airavata.model.experiment.ExperimentSummaryModel; +import org.apache.airavata.model.experiment.ProjectSearchFields; +import org.apache.airavata.model.experiment.UserConfigurationDataModel; import org.apache.airavata.model.group.GroupModel; import org.apache.airavata.model.group.ResourcePermissionType; import org.apache.airavata.model.group.ResourceType; import org.apache.airavata.model.job.JobModel; import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; +import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent; import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel; import org.apache.airavata.model.security.AuthzToken; @@ -76,14 +100,16 @@ import org.apache.airavata.model.status.JobStatus; import org.apache.airavata.model.workspace.Gateway; import org.apache.airavata.model.workspace.Notification; import org.apache.airavata.model.workspace.Project; -import org.apache.airavata.orchestrator.client.OrchestratorClientFactory; -import org.apache.airavata.orchestrator.cpi.OrchestratorService; -import org.apache.airavata.orchestrator.cpi.OrchestratorService.Client; import org.apache.airavata.registry.api.RegistryService; import org.apache.airavata.registry.api.client.RegistryServiceClientFactory; import org.apache.airavata.registry.api.exception.RegistryServiceException; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; -import org.apache.airavata.registry.cpi.*; +import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.airavata.registry.cpi.ComputeResource; +import org.apache.airavata.registry.cpi.ExperimentCatalog; +import org.apache.airavata.registry.cpi.ExperimentCatalogException; +import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; +import org.apache.airavata.registry.cpi.RegistryException; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,15 +118,18 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; public class AiravataServerHandler implements Airavata.Iface { private static final Logger logger = LoggerFactory.getLogger(AiravataServerHandler.class); - private Publisher publisher; + private Publisher statusPublisher; + private Publisher experimentPublisher; private CredentialStoreService.Client csClient; public AiravataServerHandler() { try { - publisher = MessagingFactory.getPublisher(Type.STATUS); + statusPublisher = MessagingFactory.getPublisher(Type.STATUS); + experimentPublisher = MessagingFactory.getPublisher(Type.EXPERIMENT_LAUNCH); } catch (ApplicationSettingsException e) { logger.error("Error occured while reading airavata-server properties..", e); } catch (AiravataException e) { @@ -365,7 +394,7 @@ public class AiravataServerHandler implements Airavata.Iface { * * @param authzToken * @param gatewayId The identifier for the requested Gateway. - * @param userName The User for which the credential should be registered. For community accounts, this user is the name of the + * @param portalUserName The User for which the credential should be registered. For community accounts, this user is the name of the * community user name. For computational resources, this user name need not be the same user name on resoruces. * @param password * @return airavataCredStoreToken @@ -867,8 +896,8 @@ public class AiravataServerHandler implements Airavata.Iface { String messageId = AiravataUtils.getId("EXPERIMENT"); MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, messageId, gatewayId); messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); - if(publisher!=null) { - publisher.publish(messageContext); + if(statusPublisher !=null) { + statusPublisher.publish(messageContext); } logger.debug(experimentId, "Created new experiment with experiment name {}", experiment.getExperimentName()); return experimentId; @@ -1122,9 +1151,9 @@ public class AiravataServerHandler implements Airavata.Iface { */ @Override @SecurityCheck - public boolean validateExperiment(AuthzToken authzToken, String airavataExperimentId) throws InvalidRequestException, - ExperimentNotFoundException, AiravataClientException, AiravataSystemException, AuthorizationException, TException { - try { + public boolean validateExperiment(AuthzToken authzToken, String airavataExperimentId) throws TException { + // TODO - call validation module and validate experiment +/* try { ExperimentModel experimentModel = getRegistryServiceClient().getExperiment(airavataExperimentId); if (experimentModel == null) { logger.error(airavataExperimentId, "Experiment validation failed , experiment {} doesn't exist.", airavataExperimentId); @@ -1151,9 +1180,9 @@ public class AiravataServerHandler implements Airavata.Iface { }finally { orchestratorClient.getOutputProtocol().getTransport().close(); orchestratorClient.getInputProtocol().getTransport().close(); - } - + }*/ + return true; } /** @@ -1180,8 +1209,7 @@ public class AiravataServerHandler implements Airavata.Iface { */ @Override @SecurityCheck - public ExperimentStatus getExperimentStatus(AuthzToken authzToken, String airavataExperimentId) throws InvalidRequestException, - ExperimentNotFoundException, AiravataClientException, AiravataSystemException, AuthorizationException, TException { + public ExperimentStatus getExperimentStatus(AuthzToken authzToken, String airavataExperimentId) throws TException { try { return getRegistryServiceClient().getExperimentStatus(airavataExperimentId); } catch (ApplicationSettingsException e) { @@ -1274,50 +1302,37 @@ public class AiravataServerHandler implements Airavata.Iface { @Override @SecurityCheck public void launchExperiment(AuthzToken authzToken, final String airavataExperimentId, String gatewayId) - throws AuthorizationException, TException { - try { + throws TException { + try { ExperimentModel experiment = getRegistryServiceClient().getExperiment(airavataExperimentId); if (experiment == null) { logger.error(airavataExperimentId, "Error while launching experiment, experiment {} doesn't exist.", airavataExperimentId); throw new ExperimentNotFoundException("Requested experiment id " + airavataExperimentId + " does not exist in the system.."); } -// FIXME -// String applicationID = experiment.getExecutionId(); -// if (!appCatalog.getApplicationInterface().isApplicationInterfaceExists(applicationID)){ -// logger.error(airavataExperimentId, "Error while launching experiment, application id {} for experiment {} doesn't exist.", applicationID, airavataExperimentId); -// AiravataSystemException exception = new AiravataSystemException(); -// exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR); -// exception.setMessage("Error while launching experiment, application id : " + applicationID + " for experiment : " + airavataExperimentId + -// " doesn't exist.."); -// throw exception; -// } - OrchestratorService.Client orchestratorClient = getOrchestratorClient(); - if (orchestratorClient.validateExperiment(airavataExperimentId)) { - orchestratorClient.launchExperiment(airavataExperimentId, gatewayId); - logger.debug("Airavata launched experiment with experiment id : " + airavataExperimentId); - }else { - logger.error(airavataExperimentId, "Couldn't identify experiment type, experiment {} is neither single application nor workflow.", airavataExperimentId); - throw new InvalidRequestException("Experiment '" + airavataExperimentId + "' launch failed. Unable to figureout execution type for application " + experiment.getExecutionId()); - } + submitExperiment(gatewayId, airavataExperimentId); } catch (RegistryServiceException | ApplicationSettingsException e1) { logger.error(airavataExperimentId, "Error while instantiate the registry instance", e1); AiravataSystemException exception = new AiravataSystemException(); exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR); exception.setMessage("Error while instantiate the registry instance. More info : " + e1.getMessage()); throw exception; + } catch (AiravataException ex) { + logger.error("Experiment publish event fails", ex); + } } - private OrchestratorService.Client getOrchestratorClient() throws TException { - try { - final String serverHost = ServerSettings.getOrchestratorServerHost(); - final int serverPort = ServerSettings.getOrchestratorServerPort(); - return OrchestratorClientFactory.createOrchestratorClient(serverHost, serverPort); - } catch (AiravataException e) { - throw new TException(e); - } - } + +// private OrchestratorService.Client getOrchestratorClient() throws TException { +// try { +// final String serverHost = ServerSettings.getOrchestratorServerHost(); +// final int serverPort = ServerSettings.getOrchestratorServerPort(); +// return OrchestratorClientFactory.createOrchestratorClient(serverHost, serverPort); +// } catch (AiravataException e) { +// throw new TException(e); +// } +// } /** * Clone an specified experiment with a new name. A copy of the experiment configuration is made and is persisted with new metadata. @@ -1437,26 +1452,34 @@ public class AiravataServerHandler implements Airavata.Iface { */ @Override @SecurityCheck - public void terminateExperiment(AuthzToken authzToken, String airavataExperimentId, String gatewayId) throws InvalidRequestException, - ExperimentNotFoundException, AiravataClientException, AiravataSystemException, AuthorizationException, TException { + public void terminateExperiment(AuthzToken authzToken, String airavataExperimentId, String gatewayId) + throws TException { try { RegistryService.Client regClient = getRegistryServiceClient(); ExperimentModel existingExperiment = regClient.getExperiment(airavataExperimentId); if (existingExperiment == null){ - logger.error(airavataExperimentId, "Error while cloning experiment {}, experiment doesn't exist.", airavataExperimentId); + logger.error(airavataExperimentId, "Error while cancelling experiment {}, experiment doesn't exist.", airavataExperimentId); throw new ExperimentNotFoundException("Requested experiment id " + airavataExperimentId + " does not exist in the system.."); } + ExperimentStatus experimentStatus = null; + switch (experimentStatus.getState()) { + case COMPLETED: case CANCELED: case FAILED: case CANCELING: + logger.warn("Can't terminate already {} experiment", experimentStatus.getState().name()); + case CREATED: + logger.warn("Experiment termination is only allowed for launched experiments."); + default: + submitCancelExperiment(airavataExperimentId, gatewayId); - Client client = getOrchestratorClient(); - client.terminateExperiment(airavataExperimentId, gatewayId); + } logger.debug("Airavata cancelled experiment with experiment id : " + airavataExperimentId); - } catch (RegistryServiceException | ApplicationSettingsException e) { + } catch (RegistryServiceException | AiravataException e) { logger.error(airavataExperimentId, "Error while cancelling the experiment...", e); AiravataSystemException exception = new AiravataSystemException(); exception.setAiravataErrorType(AiravataErrorType.INTERNAL_ERROR); exception.setMessage("Error while cancelling the experiment. More info : " + e.getMessage()); throw exception; } + } /** @@ -2902,8 +2925,9 @@ public class AiravataServerHandler implements Airavata.Iface { */ @Override @SecurityCheck - public boolean updateGatewayResourceProfile(AuthzToken authzToken, String gatewayID, GatewayResourceProfile gatewayResourceProfile) - throws InvalidRequestException, AiravataClientException, AiravataSystemException, AuthorizationException, TException { + public boolean updateGatewayResourceProfile(AuthzToken authzToken, + String gatewayID, + GatewayResourceProfile gatewayResourceProfile) throws TException { try { return getRegistryServiceClient().updateGatewayResourceProfile(gatewayID, gatewayResourceProfile); } catch (ApplicationSettingsException | RegistryServiceException e) { @@ -2924,8 +2948,7 @@ public class AiravataServerHandler implements Airavata.Iface { */ @Override @SecurityCheck - public boolean deleteGatewayResourceProfile(AuthzToken authzToken, String gatewayID) throws InvalidRequestException, - AiravataClientException, AiravataSystemException, AuthorizationException, TException { + public boolean deleteGatewayResourceProfile(AuthzToken authzToken, String gatewayID) throws TException { try { return getRegistryServiceClient().deleteGatewayResourceProfile(gatewayID); } catch (ApplicationSettingsException | RegistryServiceException e) { @@ -3652,6 +3675,19 @@ public class AiravataServerHandler implements Airavata.Iface { return allAccessibleResources; } + + private void submitExperiment(String gatewayId,String experimentId) throws AiravataException { + ExperimentSubmitEvent event = new ExperimentSubmitEvent(experimentId, gatewayId); + MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, "LAUNCH.EXP-" + UUID.randomUUID().toString(), gatewayId); + experimentPublisher.publish(messageContext); + } + + private void submitCancelExperiment(String gatewayId, String experimentId) throws AiravataException { + ExperimentSubmitEvent event = new ExperimentSubmitEvent(experimentId, gatewayId); + MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT_CANCEL, "CANCEL.EXP-" + UUID.randomUUID().toString(), gatewayId); + experimentPublisher.publish(messageContext); + } + private CredentialStoreService.Client getCredentialStoreServiceClient() throws TException, ApplicationSettingsException { final int serverPort = Integer.parseInt(ServerSettings.getCredentialStoreServerPort()); final String serverHost = ServerSettings.getCredentialStoreServerHost(); http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.cpp index 8317b71..f7d5d87 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.cpp +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.cpp @@ -46,6 +46,7 @@ const std::map<int, const char*> _MessageLevel_VALUES_TO_NAMES(::apache::thrift: int _kMessageTypeValues[] = { MessageType::EXPERIMENT, + MessageType::EXPERIMENT_CANCEL, MessageType::TASK, MessageType::PROCESS, MessageType::JOB, @@ -55,6 +56,7 @@ int _kMessageTypeValues[] = { }; const char* _kMessageTypeNames[] = { "EXPERIMENT", + "EXPERIMENT_CANCEL", "TASK", "PROCESS", "JOB", @@ -62,7 +64,7 @@ const char* _kMessageTypeNames[] = { "TERMINATEPROCESS", "PROCESSOUTPUT" }; -const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(7, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map<int, const char*> _MessageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(8, _kMessageTypeValues, _kMessageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); ExperimentStatusChangeEvent::~ExperimentStatusChangeEvent() throw() { @@ -1237,6 +1239,115 @@ void JobIdentifier::printTo(std::ostream& out) const { } +ExperimentSubmitEvent::~ExperimentSubmitEvent() throw() { +} + + +void ExperimentSubmitEvent::__set_experimentId(const std::string& val) { + this->experimentId = val; +} + +void ExperimentSubmitEvent::__set_gatewayId(const std::string& val) { + this->gatewayId = val; +} + +uint32_t ExperimentSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) { + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + bool isset_experimentId = false; + bool isset_gatewayId = false; + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->experimentId); + isset_experimentId = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->gatewayId); + isset_gatewayId = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + if (!isset_experimentId) + throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_gatewayId) + throw TProtocolException(TProtocolException::INVALID_DATA); + return xfer; +} + +uint32_t ExperimentSubmitEvent::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("ExperimentSubmitEvent"); + + xfer += oprot->writeFieldBegin("experimentId", ::apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeString(this->experimentId); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("gatewayId", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString(this->gatewayId); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(ExperimentSubmitEvent &a, ExperimentSubmitEvent &b) { + using ::std::swap; + swap(a.experimentId, b.experimentId); + swap(a.gatewayId, b.gatewayId); +} + +ExperimentSubmitEvent::ExperimentSubmitEvent(const ExperimentSubmitEvent& other29) { + experimentId = other29.experimentId; + gatewayId = other29.gatewayId; +} +ExperimentSubmitEvent& ExperimentSubmitEvent::operator=(const ExperimentSubmitEvent& other30) { + experimentId = other30.experimentId; + gatewayId = other30.gatewayId; + return *this; +} +void ExperimentSubmitEvent::printTo(std::ostream& out) const { + using ::apache::thrift::to_string; + out << "ExperimentSubmitEvent("; + out << "experimentId=" << to_string(experimentId); + out << ", " << "gatewayId=" << to_string(gatewayId); + out << ")"; +} + + ProcessSubmitEvent::~ProcessSubmitEvent() throw() { } @@ -1368,17 +1479,17 @@ void swap(ProcessSubmitEvent &a, ProcessSubmitEvent &b) { swap(a.tokenId, b.tokenId); } -ProcessSubmitEvent::ProcessSubmitEvent(const ProcessSubmitEvent& other29) { - processId = other29.processId; - gatewayId = other29.gatewayId; - experimentId = other29.experimentId; - tokenId = other29.tokenId; +ProcessSubmitEvent::ProcessSubmitEvent(const ProcessSubmitEvent& other31) { + processId = other31.processId; + gatewayId = other31.gatewayId; + experimentId = other31.experimentId; + tokenId = other31.tokenId; } -ProcessSubmitEvent& ProcessSubmitEvent::operator=(const ProcessSubmitEvent& other30) { - processId = other30.processId; - gatewayId = other30.gatewayId; - experimentId = other30.experimentId; - tokenId = other30.tokenId; +ProcessSubmitEvent& ProcessSubmitEvent::operator=(const ProcessSubmitEvent& other32) { + processId = other32.processId; + gatewayId = other32.gatewayId; + experimentId = other32.experimentId; + tokenId = other32.tokenId; return *this; } void ProcessSubmitEvent::printTo(std::ostream& out) const { @@ -1503,15 +1614,15 @@ void swap(ProcessTerminateEvent &a, ProcessTerminateEvent &b) { swap(a.tokenId, b.tokenId); } -ProcessTerminateEvent::ProcessTerminateEvent(const ProcessTerminateEvent& other31) { - processId = other31.processId; - gatewayId = other31.gatewayId; - tokenId = other31.tokenId; +ProcessTerminateEvent::ProcessTerminateEvent(const ProcessTerminateEvent& other33) { + processId = other33.processId; + gatewayId = other33.gatewayId; + tokenId = other33.tokenId; } -ProcessTerminateEvent& ProcessTerminateEvent::operator=(const ProcessTerminateEvent& other32) { - processId = other32.processId; - gatewayId = other32.gatewayId; - tokenId = other32.tokenId; +ProcessTerminateEvent& ProcessTerminateEvent::operator=(const ProcessTerminateEvent& other34) { + processId = other34.processId; + gatewayId = other34.gatewayId; + tokenId = other34.tokenId; return *this; } void ProcessTerminateEvent::printTo(std::ostream& out) const { @@ -1561,9 +1672,9 @@ uint32_t JobStatusChangeEvent::read(::apache::thrift::protocol::TProtocol* iprot { case 1: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast33; - xfer += iprot->readI32(ecast33); - this->state = ( ::apache::airavata::model::status::JobState::type)ecast33; + int32_t ecast35; + xfer += iprot->readI32(ecast35); + this->state = ( ::apache::airavata::model::status::JobState::type)ecast35; isset_state = true; } else { xfer += iprot->skip(ftype); @@ -1617,13 +1728,13 @@ void swap(JobStatusChangeEvent &a, JobStatusChangeEvent &b) { swap(a.jobIdentity, b.jobIdentity); } -JobStatusChangeEvent::JobStatusChangeEvent(const JobStatusChangeEvent& other34) { - state = other34.state; - jobIdentity = other34.jobIdentity; +JobStatusChangeEvent::JobStatusChangeEvent(const JobStatusChangeEvent& other36) { + state = other36.state; + jobIdentity = other36.jobIdentity; } -JobStatusChangeEvent& JobStatusChangeEvent::operator=(const JobStatusChangeEvent& other35) { - state = other35.state; - jobIdentity = other35.jobIdentity; +JobStatusChangeEvent& JobStatusChangeEvent::operator=(const JobStatusChangeEvent& other37) { + state = other37.state; + jobIdentity = other37.jobIdentity; return *this; } void JobStatusChangeEvent::printTo(std::ostream& out) const { @@ -1672,9 +1783,9 @@ uint32_t JobStatusChangeRequestEvent::read(::apache::thrift::protocol::TProtocol { case 1: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast36; - xfer += iprot->readI32(ecast36); - this->state = ( ::apache::airavata::model::status::JobState::type)ecast36; + int32_t ecast38; + xfer += iprot->readI32(ecast38); + this->state = ( ::apache::airavata::model::status::JobState::type)ecast38; isset_state = true; } else { xfer += iprot->skip(ftype); @@ -1728,13 +1839,13 @@ void swap(JobStatusChangeRequestEvent &a, JobStatusChangeRequestEvent &b) { swap(a.jobIdentity, b.jobIdentity); } -JobStatusChangeRequestEvent::JobStatusChangeRequestEvent(const JobStatusChangeRequestEvent& other37) { - state = other37.state; - jobIdentity = other37.jobIdentity; +JobStatusChangeRequestEvent::JobStatusChangeRequestEvent(const JobStatusChangeRequestEvent& other39) { + state = other39.state; + jobIdentity = other39.jobIdentity; } -JobStatusChangeRequestEvent& JobStatusChangeRequestEvent::operator=(const JobStatusChangeRequestEvent& other38) { - state = other38.state; - jobIdentity = other38.jobIdentity; +JobStatusChangeRequestEvent& JobStatusChangeRequestEvent::operator=(const JobStatusChangeRequestEvent& other40) { + state = other40.state; + jobIdentity = other40.jobIdentity; return *this; } void JobStatusChangeRequestEvent::printTo(std::ostream& out) const { @@ -1814,9 +1925,9 @@ uint32_t Message::read(::apache::thrift::protocol::TProtocol* iprot) { break; case 3: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast39; - xfer += iprot->readI32(ecast39); - this->messageType = (MessageType::type)ecast39; + int32_t ecast41; + xfer += iprot->readI32(ecast41); + this->messageType = (MessageType::type)ecast41; isset_messageType = true; } else { xfer += iprot->skip(ftype); @@ -1832,9 +1943,9 @@ uint32_t Message::read(::apache::thrift::protocol::TProtocol* iprot) { break; case 5: if (ftype == ::apache::thrift::protocol::T_I32) { - int32_t ecast40; - xfer += iprot->readI32(ecast40); - this->messageLevel = (MessageLevel::type)ecast40; + int32_t ecast42; + xfer += iprot->readI32(ecast42); + this->messageLevel = (MessageLevel::type)ecast42; this->__isset.messageLevel = true; } else { xfer += iprot->skip(ftype); @@ -1900,21 +2011,21 @@ void swap(Message &a, Message &b) { swap(a.__isset, b.__isset); } -Message::Message(const Message& other41) { - event = other41.event; - messageId = other41.messageId; - messageType = other41.messageType; - updatedTime = other41.updatedTime; - messageLevel = other41.messageLevel; - __isset = other41.__isset; -} -Message& Message::operator=(const Message& other42) { - event = other42.event; - messageId = other42.messageId; - messageType = other42.messageType; - updatedTime = other42.updatedTime; - messageLevel = other42.messageLevel; - __isset = other42.__isset; +Message::Message(const Message& other43) { + event = other43.event; + messageId = other43.messageId; + messageType = other43.messageType; + updatedTime = other43.updatedTime; + messageLevel = other43.messageLevel; + __isset = other43.__isset; +} +Message& Message::operator=(const Message& other44) { + event = other44.event; + messageId = other44.messageId; + messageType = other44.messageType; + updatedTime = other44.updatedTime; + messageLevel = other44.messageLevel; + __isset = other44.__isset; return *this; } void Message::printTo(std::ostream& out) const { http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.h index d5a2411..15caed1 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.h +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messaging_events_types.h @@ -53,12 +53,13 @@ extern const std::map<int, const char*> _MessageLevel_VALUES_TO_NAMES; struct MessageType { enum type { EXPERIMENT = 0, - TASK = 1, - PROCESS = 2, - JOB = 3, - LAUNCHPROCESS = 4, - TERMINATEPROCESS = 5, - PROCESSOUTPUT = 6 + EXPERIMENT_CANCEL = 1, + TASK = 2, + PROCESS = 3, + JOB = 4, + LAUNCHPROCESS = 5, + TERMINATEPROCESS = 6, + PROCESSOUTPUT = 7 }; }; @@ -82,6 +83,8 @@ class TaskOutputChangeEvent; class JobIdentifier; +class ExperimentSubmitEvent; + class ProcessSubmitEvent; class ProcessTerminateEvent; @@ -533,6 +536,51 @@ inline std::ostream& operator<<(std::ostream& out, const JobIdentifier& obj) } +class ExperimentSubmitEvent { + public: + + ExperimentSubmitEvent(const ExperimentSubmitEvent&); + ExperimentSubmitEvent& operator=(const ExperimentSubmitEvent&); + ExperimentSubmitEvent() : experimentId(), gatewayId() { + } + + virtual ~ExperimentSubmitEvent() throw(); + std::string experimentId; + std::string gatewayId; + + void __set_experimentId(const std::string& val); + + void __set_gatewayId(const std::string& val); + + bool operator == (const ExperimentSubmitEvent & rhs) const + { + if (!(experimentId == rhs.experimentId)) + return false; + if (!(gatewayId == rhs.gatewayId)) + return false; + return true; + } + bool operator != (const ExperimentSubmitEvent &rhs) const { + return !(*this == rhs); + } + + bool operator < (const ExperimentSubmitEvent & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + + virtual void printTo(std::ostream& out) const; +}; + +void swap(ExperimentSubmitEvent &a, ExperimentSubmitEvent &b); + +inline std::ostream& operator<<(std::ostream& out, const ExperimentSubmitEvent& obj) +{ + obj.printTo(out); + return out; +} + + class ProcessSubmitEvent { public: http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php index 9c6a1e8..96a015e 100644 --- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php +++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php @@ -32,20 +32,22 @@ final class MessageLevel { final class MessageType { const EXPERIMENT = 0; - const TASK = 1; - const PROCESS = 2; - const JOB = 3; - const LAUNCHPROCESS = 4; - const TERMINATEPROCESS = 5; - const PROCESSOUTPUT = 6; + const EXPERIMENT_CANCEL = 1; + const TASK = 2; + const PROCESS = 3; + const JOB = 4; + const LAUNCHPROCESS = 5; + const TERMINATEPROCESS = 6; + const PROCESSOUTPUT = 7; static public $__names = array( 0 => 'EXPERIMENT', - 1 => 'TASK', - 2 => 'PROCESS', - 3 => 'JOB', - 4 => 'LAUNCHPROCESS', - 5 => 'TERMINATEPROCESS', - 6 => 'PROCESSOUTPUT', + 1 => 'EXPERIMENT_CANCEL', + 2 => 'TASK', + 3 => 'PROCESS', + 4 => 'JOB', + 5 => 'LAUNCHPROCESS', + 6 => 'TERMINATEPROCESS', + 7 => 'PROCESSOUTPUT', ); } @@ -1145,6 +1147,104 @@ class JobIdentifier { } +class ExperimentSubmitEvent { + static $_TSPEC; + + /** + * @var string + */ + public $experimentId = null; + /** + * @var string + */ + public $gatewayId = null; + + public function __construct($vals=null) { + if (!isset(self::$_TSPEC)) { + self::$_TSPEC = array( + 1 => array( + 'var' => 'experimentId', + 'type' => TType::STRING, + ), + 2 => array( + 'var' => 'gatewayId', + 'type' => TType::STRING, + ), + ); + } + if (is_array($vals)) { + if (isset($vals['experimentId'])) { + $this->experimentId = $vals['experimentId']; + } + if (isset($vals['gatewayId'])) { + $this->gatewayId = $vals['gatewayId']; + } + } + } + + public function getName() { + return 'ExperimentSubmitEvent'; + } + + public function read($input) + { + $xfer = 0; + $fname = null; + $ftype = 0; + $fid = 0; + $xfer += $input->readStructBegin($fname); + while (true) + { + $xfer += $input->readFieldBegin($fname, $ftype, $fid); + if ($ftype == TType::STOP) { + break; + } + switch ($fid) + { + case 1: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->experimentId); + } else { + $xfer += $input->skip($ftype); + } + break; + case 2: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->gatewayId); + } else { + $xfer += $input->skip($ftype); + } + break; + default: + $xfer += $input->skip($ftype); + break; + } + $xfer += $input->readFieldEnd(); + } + $xfer += $input->readStructEnd(); + return $xfer; + } + + public function write($output) { + $xfer = 0; + $xfer += $output->writeStructBegin('ExperimentSubmitEvent'); + if ($this->experimentId !== null) { + $xfer += $output->writeFieldBegin('experimentId', TType::STRING, 1); + $xfer += $output->writeString($this->experimentId); + $xfer += $output->writeFieldEnd(); + } + if ($this->gatewayId !== null) { + $xfer += $output->writeFieldBegin('gatewayId', TType::STRING, 2); + $xfer += $output->writeString($this->gatewayId); + $xfer += $output->writeFieldEnd(); + } + $xfer += $output->writeFieldStop(); + $xfer += $output->writeStructEnd(); + return $xfer; + } + +} + class ProcessSubmitEvent { static $_TSPEC; http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/messaging/event/ttypes.py ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/messaging/event/ttypes.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/messaging/event/ttypes.py index 312e07a..818841d 100644 --- a/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/messaging/event/ttypes.py +++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/messaging/event/ttypes.py @@ -42,31 +42,34 @@ class MessageLevel: class MessageType: EXPERIMENT = 0 - TASK = 1 - PROCESS = 2 - JOB = 3 - LAUNCHPROCESS = 4 - TERMINATEPROCESS = 5 - PROCESSOUTPUT = 6 + EXPERIMENT_CANCEL = 1 + TASK = 2 + PROCESS = 3 + JOB = 4 + LAUNCHPROCESS = 5 + TERMINATEPROCESS = 6 + PROCESSOUTPUT = 7 _VALUES_TO_NAMES = { 0: "EXPERIMENT", - 1: "TASK", - 2: "PROCESS", - 3: "JOB", - 4: "LAUNCHPROCESS", - 5: "TERMINATEPROCESS", - 6: "PROCESSOUTPUT", + 1: "EXPERIMENT_CANCEL", + 2: "TASK", + 3: "PROCESS", + 4: "JOB", + 5: "LAUNCHPROCESS", + 6: "TERMINATEPROCESS", + 7: "PROCESSOUTPUT", } _NAMES_TO_VALUES = { "EXPERIMENT": 0, - "TASK": 1, - "PROCESS": 2, - "JOB": 3, - "LAUNCHPROCESS": 4, - "TERMINATEPROCESS": 5, - "PROCESSOUTPUT": 6, + "EXPERIMENT_CANCEL": 1, + "TASK": 2, + "PROCESS": 3, + "JOB": 4, + "LAUNCHPROCESS": 5, + "TERMINATEPROCESS": 6, + "PROCESSOUTPUT": 7, } @@ -927,6 +930,88 @@ class JobIdentifier: def __ne__(self, other): return not (self == other) +class ExperimentSubmitEvent: + """ + Attributes: + - experimentId + - gatewayId + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'experimentId', None, None, ), # 1 + (2, TType.STRING, 'gatewayId', None, None, ), # 2 + ) + + def __init__(self, experimentId=None, gatewayId=None,): + self.experimentId = experimentId + self.gatewayId = gatewayId + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.experimentId = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.gatewayId = iprot.readString() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('ExperimentSubmitEvent') + if self.experimentId is not None: + oprot.writeFieldBegin('experimentId', TType.STRING, 1) + oprot.writeString(self.experimentId) + oprot.writeFieldEnd() + if self.gatewayId is not None: + oprot.writeFieldBegin('gatewayId', TType.STRING, 2) + oprot.writeString(self.gatewayId) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.experimentId is None: + raise TProtocol.TProtocolException(message='Required field experimentId is unset!') + if self.gatewayId is None: + raise TProtocol.TProtocolException(message='Required field gatewayId is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.experimentId) + value = (value * 31) ^ hash(self.gatewayId) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class ProcessSubmitEvent: """ Attributes: http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentSubmitEvent.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentSubmitEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentSubmitEvent.java new file mode 100644 index 0000000..7ec7315 --- /dev/null +++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/ExperimentSubmitEvent.java @@ -0,0 +1,507 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Autogenerated by Thrift Compiler (0.9.3) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.airavata.model.messaging.event; + +import org.apache.thrift.scheme.IScheme; +import org.apache.thrift.scheme.SchemeFactory; +import org.apache.thrift.scheme.StandardScheme; + +import org.apache.thrift.scheme.TupleScheme; +import org.apache.thrift.protocol.TTupleProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.server.AbstractNonblockingServer.*; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import javax.annotation.Generated; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)") +public class ExperimentSubmitEvent implements org.apache.thrift.TBase<ExperimentSubmitEvent, ExperimentSubmitEvent._Fields>, java.io.Serializable, Cloneable, Comparable<ExperimentSubmitEvent> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ExperimentSubmitEvent"); + + private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1); + private static final org.apache.thrift.protocol.TField GATEWAY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("gatewayId", org.apache.thrift.protocol.TType.STRING, (short)2); + + private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new ExperimentSubmitEventStandardSchemeFactory()); + schemes.put(TupleScheme.class, new ExperimentSubmitEventTupleSchemeFactory()); + } + + private String experimentId; // required + private String gatewayId; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + EXPERIMENT_ID((short)1, "experimentId"), + GATEWAY_ID((short)2, "gatewayId"); + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // EXPERIMENT_ID + return EXPERIMENT_ID; + case 2: // GATEWAY_ID + return GATEWAY_ID; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.EXPERIMENT_ID, new org.apache.thrift.meta_data.FieldMetaData("experimentId", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.GATEWAY_ID, new org.apache.thrift.meta_data.FieldMetaData("gatewayId", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ExperimentSubmitEvent.class, metaDataMap); + } + + public ExperimentSubmitEvent() { + } + + public ExperimentSubmitEvent( + String experimentId, + String gatewayId) + { + this(); + this.experimentId = experimentId; + this.gatewayId = gatewayId; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public ExperimentSubmitEvent(ExperimentSubmitEvent other) { + if (other.isSetExperimentId()) { + this.experimentId = other.experimentId; + } + if (other.isSetGatewayId()) { + this.gatewayId = other.gatewayId; + } + } + + public ExperimentSubmitEvent deepCopy() { + return new ExperimentSubmitEvent(this); + } + + @Override + public void clear() { + this.experimentId = null; + this.gatewayId = null; + } + + public String getExperimentId() { + return this.experimentId; + } + + public void setExperimentId(String experimentId) { + this.experimentId = experimentId; + } + + public void unsetExperimentId() { + this.experimentId = null; + } + + /** Returns true if field experimentId is set (has been assigned a value) and false otherwise */ + public boolean isSetExperimentId() { + return this.experimentId != null; + } + + public void setExperimentIdIsSet(boolean value) { + if (!value) { + this.experimentId = null; + } + } + + public String getGatewayId() { + return this.gatewayId; + } + + public void setGatewayId(String gatewayId) { + this.gatewayId = gatewayId; + } + + public void unsetGatewayId() { + this.gatewayId = null; + } + + /** Returns true if field gatewayId is set (has been assigned a value) and false otherwise */ + public boolean isSetGatewayId() { + return this.gatewayId != null; + } + + public void setGatewayIdIsSet(boolean value) { + if (!value) { + this.gatewayId = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case EXPERIMENT_ID: + if (value == null) { + unsetExperimentId(); + } else { + setExperimentId((String)value); + } + break; + + case GATEWAY_ID: + if (value == null) { + unsetGatewayId(); + } else { + setGatewayId((String)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case EXPERIMENT_ID: + return getExperimentId(); + + case GATEWAY_ID: + return getGatewayId(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case EXPERIMENT_ID: + return isSetExperimentId(); + case GATEWAY_ID: + return isSetGatewayId(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof ExperimentSubmitEvent) + return this.equals((ExperimentSubmitEvent)that); + return false; + } + + public boolean equals(ExperimentSubmitEvent that) { + if (that == null) + return false; + + boolean this_present_experimentId = true && this.isSetExperimentId(); + boolean that_present_experimentId = true && that.isSetExperimentId(); + if (this_present_experimentId || that_present_experimentId) { + if (!(this_present_experimentId && that_present_experimentId)) + return false; + if (!this.experimentId.equals(that.experimentId)) + return false; + } + + boolean this_present_gatewayId = true && this.isSetGatewayId(); + boolean that_present_gatewayId = true && that.isSetGatewayId(); + if (this_present_gatewayId || that_present_gatewayId) { + if (!(this_present_gatewayId && that_present_gatewayId)) + return false; + if (!this.gatewayId.equals(that.gatewayId)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + List<Object> list = new ArrayList<Object>(); + + boolean present_experimentId = true && (isSetExperimentId()); + list.add(present_experimentId); + if (present_experimentId) + list.add(experimentId); + + boolean present_gatewayId = true && (isSetGatewayId()); + list.add(present_gatewayId); + if (present_gatewayId) + list.add(gatewayId); + + return list.hashCode(); + } + + @Override + public int compareTo(ExperimentSubmitEvent other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetExperimentId()).compareTo(other.isSetExperimentId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetExperimentId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.experimentId, other.experimentId); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetGatewayId()).compareTo(other.isSetGatewayId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetGatewayId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.gatewayId, other.gatewayId); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("ExperimentSubmitEvent("); + boolean first = true; + + sb.append("experimentId:"); + if (this.experimentId == null) { + sb.append("null"); + } else { + sb.append(this.experimentId); + } + first = false; + if (!first) sb.append(", "); + sb.append("gatewayId:"); + if (this.gatewayId == null) { + sb.append("null"); + } else { + sb.append(this.gatewayId); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + if (!isSetExperimentId()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'experimentId' is unset! Struct:" + toString()); + } + + if (!isSetGatewayId()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'gatewayId' is unset! Struct:" + toString()); + } + + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class ExperimentSubmitEventStandardSchemeFactory implements SchemeFactory { + public ExperimentSubmitEventStandardScheme getScheme() { + return new ExperimentSubmitEventStandardScheme(); + } + } + + private static class ExperimentSubmitEventStandardScheme extends StandardScheme<ExperimentSubmitEvent> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, ExperimentSubmitEvent struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // EXPERIMENT_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.experimentId = iprot.readString(); + struct.setExperimentIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // GATEWAY_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.gatewayId = iprot.readString(); + struct.setGatewayIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, ExperimentSubmitEvent struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.experimentId != null) { + oprot.writeFieldBegin(EXPERIMENT_ID_FIELD_DESC); + oprot.writeString(struct.experimentId); + oprot.writeFieldEnd(); + } + if (struct.gatewayId != null) { + oprot.writeFieldBegin(GATEWAY_ID_FIELD_DESC); + oprot.writeString(struct.gatewayId); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class ExperimentSubmitEventTupleSchemeFactory implements SchemeFactory { + public ExperimentSubmitEventTupleScheme getScheme() { + return new ExperimentSubmitEventTupleScheme(); + } + } + + private static class ExperimentSubmitEventTupleScheme extends TupleScheme<ExperimentSubmitEvent> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, ExperimentSubmitEvent struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + oprot.writeString(struct.experimentId); + oprot.writeString(struct.gatewayId); + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, ExperimentSubmitEvent struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + struct.experimentId = iprot.readString(); + struct.setExperimentIdIsSet(true); + struct.gatewayId = iprot.readString(); + struct.setGatewayIdIsSet(true); + } + } + +} + http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java index 48df9b4..59b6f33 100644 --- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java +++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/MessageType.java @@ -30,12 +30,13 @@ import org.apache.thrift.TEnum; public enum MessageType implements org.apache.thrift.TEnum { EXPERIMENT(0), - TASK(1), - PROCESS(2), - JOB(3), - LAUNCHPROCESS(4), - TERMINATEPROCESS(5), - PROCESSOUTPUT(6); + EXPERIMENT_CANCEL(1), + TASK(2), + PROCESS(3), + JOB(4), + LAUNCHPROCESS(5), + TERMINATEPROCESS(6), + PROCESSOUTPUT(7); private final int value; @@ -59,16 +60,18 @@ public enum MessageType implements org.apache.thrift.TEnum { case 0: return EXPERIMENT; case 1: - return TASK; + return EXPERIMENT_CANCEL; case 2: - return PROCESS; + return TASK; case 3: - return JOB; + return PROCESS; case 4: - return LAUNCHPROCESS; + return JOB; case 5: - return TERMINATEPROCESS; + return LAUNCHPROCESS; case 6: + return TERMINATEPROCESS; + case 7: return PROCESSOUTPUT; default: return null; http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java index 99c11b8..573304a 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingFactory.java @@ -22,6 +22,7 @@ package org.apache.airavata.messaging.core; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.messaging.core.impl.ExperimentConsumer; import org.apache.airavata.messaging.core.impl.ProcessConsumer; import org.apache.airavata.messaging.core.impl.RabbitMQPublisher; import org.apache.airavata.messaging.core.impl.RabbitMQSubscriber; @@ -45,6 +46,10 @@ public class MessagingFactory { switch (type) { case EXPERIMENT_LAUNCH: + subscriber = getExperimentSubscriber(rProperties); + subscriber.listen(((connection, channel) -> new ExperimentConsumer(messageHandler, connection, channel)), + null, + routingKeys); break; case PROCESS_LAUNCH: subscriber = getProcessSubscriber(rProperties); @@ -70,6 +75,7 @@ public class MessagingFactory { Publisher publiser = null; switch (type) { case EXPERIMENT_LAUNCH: + publiser = getExperimentPublisher(rProperties); break; case PROCESS_LAUNCH: publiser = gerProcessPublisher(rProperties); @@ -84,6 +90,11 @@ public class MessagingFactory { return publiser; } + private static Publisher getExperimentPublisher(RabbitMQProperties rProperties) throws AiravataException { + rProperties.setExchangeName(ServerSettings.getRabbitmqExperimentExchangeName()); + return new RabbitMQPublisher(rProperties, messageContext -> rProperties.getExchangeName()); + } + private static Publisher getStatusPublisher(RabbitMQProperties rProperties) throws AiravataException { rProperties.setExchangeName(ServerSettings.getRabbitmqStatusExchangeName()); return new RabbitMQPublisher(rProperties, MessagingFactory::statusRoutingkey); @@ -110,7 +121,6 @@ public class MessagingFactory { return new RabbitMQSubscriber(sp); } - private static RabbitMQSubscriber getProcessSubscriber(RabbitMQProperties sp) throws AiravataException { sp.setExchangeName(ServerSettings.getRabbitmqProcessExchangeName()) .setQueueName(ServerSettings.getRabbitmqProcessLaunchQueueName()) @@ -119,6 +129,14 @@ public class MessagingFactory { } + private static Subscriber getExperimentSubscriber(RabbitMQProperties sp) throws AiravataException { + sp.setExchangeName(ServerSettings.getRabbitmqExperimentExchangeName()) + .setAutoAck(false); + return new RabbitMQSubscriber(sp); + + } + + private static String statusRoutingkey(MessageContext msgCtx) { String gatewayId = msgCtx.getGatewayId(); String routingKey = null; http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java index 058b99e..6e4c46a 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ExperimentConsumer.java @@ -22,14 +22,37 @@ package org.apache.airavata.messaging.core.impl; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.QueueingConsumer; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent; +import org.apache.airavata.model.messaging.event.Message; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.ProcessSubmitEvent; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; public class ExperimentConsumer extends QueueingConsumer { - public ExperimentConsumer(Channel ch) { - super(ch); + private static final Logger log = LoggerFactory.getLogger(ExperimentConsumer.class); + + private MessageHandler handler; + private Channel channel; + private Connection connection; + + public ExperimentConsumer(MessageHandler messageHandler, Connection connection, Channel channel) { + super(channel); + this.handler = messageHandler; + this.connection = connection; + this.channel = channel; } @@ -38,5 +61,52 @@ public class ExperimentConsumer extends QueueingConsumer { Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + + Message message = new Message(); + + try { + ThriftUtils.createThriftFromBytes(body, message); + long deliveryTag = envelope.getDeliveryTag(); + if (message.getMessageType() == MessageType.EXPERIMENT || message.getMessageType() == MessageType.EXPERIMENT_CANCEL) { + TBase event = null; + String gatewayId = null; + ExperimentSubmitEvent experimentEvent = new ExperimentSubmitEvent(); + ThriftUtils.createThriftFromBytes(message.getEvent(), experimentEvent); + log.debug(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getMessageType() + "' for experimentId:" + + " " + + experimentEvent.getExperimentId()); + event = experimentEvent; + gatewayId = experimentEvent.getGatewayId(); + MessageContext messageContext = new MessageContext(event, message.getMessageType(), + message.getMessageId(), gatewayId, deliveryTag); + messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime())); + messageContext.setIsRedeliver(envelope.isRedeliver()); + handler.onMessage(messageContext); + } else { + log.error("{} message type is not handle in ProcessLaunch Subscriber. Sending ack for " + + "delivery tag {} ", message.getMessageType().name(), deliveryTag); + sendAck(deliveryTag); + } + } catch (TException e) { + String msg = "Failed to de-serialize the thrift message, from routing keys:" + envelope.getRoutingKey(); + log.warn(msg, e); + } + + } + + + private void sendAck(long deliveryTag){ + try { + if (channel.isOpen()){ + channel.basicAck(deliveryTag,false); + }else { + channel = connection.createChannel(); + channel.basicQos(ServerSettings.getRabbitmqPrefetchCount()); + channel.basicAck(deliveryTag, false); + } + } catch (IOException e) { + log.error(e.getMessage(), e); + } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java index 368c100..e95a7ca 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/ProcessConsumer.java @@ -48,17 +48,13 @@ public class ProcessConsumer extends QueueingConsumer{ private Connection connection; public ProcessConsumer(MessageHandler messageHandler, Connection connection, Channel channel){ - this(channel); + super(channel); this.handler = messageHandler; this.connection = connection; this.channel = channel; } - private ProcessConsumer(Channel ch) { - super(ch); - } - @Override public void handleDelivery(String consumerTag, Envelope envelope, http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java index 856f9f3..3438475 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java @@ -96,7 +96,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent(processId, gatewayId, experimentId, tokenId); MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.LAUNCHPROCESS, "LAUNCH" + - ".TASK-" + UUID.randomUUID().toString(), gatewayId); + ".PROCESS-" + UUID.randomUUID().toString(), gatewayId); messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); publisher.publish(messageContext); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/4e3dc9a9/thrift-interface-descriptions/airavata-apis/messaging_events.thrift ---------------------------------------------------------------------- diff --git a/thrift-interface-descriptions/airavata-apis/messaging_events.thrift b/thrift-interface-descriptions/airavata-apis/messaging_events.thrift index 7ffc88d..2533752 100644 --- a/thrift-interface-descriptions/airavata-apis/messaging_events.thrift +++ b/thrift-interface-descriptions/airavata-apis/messaging_events.thrift @@ -36,6 +36,7 @@ enum MessageLevel { enum MessageType { EXPERIMENT, + EXPERIMENT_CANCEL, TASK, PROCESS, JOB, @@ -108,6 +109,11 @@ struct JobIdentifier { // //8: // } +struct ExperimentSubmitEvent{ + 1: required string experimentId, + 2: required string gatewayId, +} + struct ProcessSubmitEvent{ 1: required string processId, 2: required string gatewayId,
