merging messaging_framework changes with master - AIRAVATA-1442
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/282362f1 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/282362f1 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/282362f1 Branch: refs/heads/master Commit: 282362f1088918d1dfd8c88c9b71bac119a665c9 Parents: dbb1c97 255dd9e Author: Chathuri Wimalasena <[email protected]> Authored: Mon Oct 6 15:54:40 2014 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Mon Oct 6 15:54:40 2014 -0400 ---------------------------------------------------------------------- .../AiravataExperimentStatusUpdator.java | 39 +- .../listener/ExperimentStatusChangedEvent.java | 128 +- .../lib/airavata/airavataDataModel_types.h | 1 + .../lib/airavata/messagingEvents_constants.cpp | 36 + .../lib/airavata/messagingEvents_constants.h | 42 + .../lib/airavata/messagingEvents_types.cpp | 1067 +++++++++++++++ .../lib/airavata/messagingEvents_types.h | 601 +++++++++ .../Airavata/Model/Messaging/Event/Types.php | 1238 ++++++++++++++++++ .../client/samples/CreateLaunchExperiment.java | 10 +- .../event/ExperimentStatusChangeEvent.java | 504 +++++++ .../model/messaging/event/JobIdentifier.java | 684 ++++++++++ .../messaging/event/JobStatusChangeEvent.java | 509 +++++++ .../event/JobStatusChangeRequestEvent.java | 509 +++++++ .../airavata/model/messaging/event/Message.java | 828 ++++++++++++ .../model/messaging/event/MessageLevel.java | 68 + .../model/messaging/event/MessageType.java | 68 + .../model/messaging/event/TaskIdentifier.java | 588 +++++++++ .../messaging/event/TaskOutputChangeEvent.java | 551 ++++++++ .../messaging/event/TaskStatusChangeEvent.java | 509 +++++++ .../event/TaskStatusChangeRequestEvent.java | 509 +++++++ .../messaging/event/WorkflowIdentifier.java | 492 +++++++ .../event/WorkflowNodeStatusChangeEvent.java | 509 +++++++ .../event/messagingEventsConstants.java | 56 + .../airavataDataModel.thrift | 1 + .../messagingEvents.thrift | 121 ++ modules/commons/utils/pom.xml | 5 + .../airavata/common/utils/AiravataUtils.java | 6 + .../airavata/common/utils/ServerSettings.java | 12 + .../airavata/common/utils/ThriftUtils.java | 37 + .../main/resources/airavata-server.properties | 5 + modules/distribution/server/pom.xml | 5 + .../server/src/main/assembly/bin-assembly.xml | 1 + modules/gfac/gfac-core/pom.xml | 5 + .../airavata/gfac/core/cpi/BetterGfacImpl.java | 137 +- .../core/monitor/AiravataJobStatusUpdator.java | 40 +- .../core/monitor/AiravataTaskStatusUpdator.java | 65 +- .../AiravataWorkflowNodeStatusUpdator.java | 44 +- .../gfac/core/monitor/ExperimentIdentity.java | 72 +- .../airavata/gfac/core/monitor/JobIdentity.java | 78 +- .../gfac/core/monitor/TaskIdentity.java | 76 +- .../gfac/core/monitor/WorkflowNodeIdentity.java | 74 +- .../state/GfacExperimentStateChangeRequest.java | 16 +- .../monitor/state/JobStatusChangeRequest.java | 162 +-- .../monitor/state/JobStatusChangedEvent.java | 162 +-- .../state/TaskOutputDataChangedEvent.java | 128 +- .../monitor/state/TaskStatusChangeRequest.java | 124 +- .../monitor/state/TaskStatusChangedEvent.java | 124 +- .../state/WorkflowNodeStatusChangedEvent.java | 128 +- .../gfac/core/utils/OutHandlerWorker.java | 8 +- .../gfac/local/provider/impl/LocalProvider.java | 38 +- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 26 +- .../monitor/impl/push/amqp/AMQPMonitor.java | 8 +- .../impl/push/amqp/UnRegisterWorker.java | 5 +- .../apache/airavata/job/AMQPMonitorTest.java | 4 +- .../job/QstatMonitorTestWithMyProxyAuth.java | 6 +- modules/messaging/core/pom.xml | 69 + .../airavata/messaging/core/MessageContext.java | 61 + .../airavata/messaging/core/Metadata.java | 25 + .../airavata/messaging/core/Publisher.java | 29 + .../messaging/core/PublisherFactory.java | 50 + .../messaging/core/impl/RabbitMQProducer.java | 195 +++ .../messaging/core/impl/RabbitMQPublisher.java | 99 ++ modules/messaging/pom.xml | 41 + .../engine/interpretor/WorkflowInterpreter.java | 12 +- pom.xml | 1 + 65 files changed, 11037 insertions(+), 814 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java ---------------------------------------------------------------------- diff --cc airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java index 9fee886,5d0996f..f65baea --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java @@@ -75,17 -81,25 +81,25 @@@ public class AiravataExperimentStatusUp state = ExperimentState.CANCELING; updateExperimentStatus = true; break; default: - break; + return; } if (!updateExperimentStatus){ - ExecutionType executionType = DataModelUtils.getExecutionType((Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, nodeStatus.getIdentity().getExperimentID())); + ExecutionType executionType = DataModelUtils.getExecutionType((Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, nodeStatus.getWorkflowNodeIdentity().getExperimentId())); updateExperimentStatus=(executionType==ExecutionType.SINGLE_APP); } - state = updateExperimentStatus(nodeStatus.getIdentity().getExperimentID(), state); - logger.debug("Publishing experiment status for "+nodeStatus.getIdentity().getExperimentID()+":"+state.toString()); - monitorPublisher.publish(new ExperimentStatusChangedEvent(nodeStatus.getIdentity(), state)); + updateExperimentStatus(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), state); + logger.debug("Publishing experiment status for "+nodeStatus.getWorkflowNodeIdentity().getExperimentId()+":"+state.toString()); + ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId()); + monitorPublisher.publish(event); + String messageId = AiravataUtils.getId("EXPERIMENT"); + MessageContext msgCntxt = new MessageContext(event, MessageType.EXPERIMENT, messageId); + msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + if ( ServerSettings.isRabbitMqPublishEnabled()){ + publisher.publish(msgCntxt); + } } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); + throw new Exception("Error persisting experiment status..", e); } } @@@ -96,17 -111,10 +110,18 @@@ details.setExperimentID(experimentId); } org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus(); + status.setExperimentState(state); status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); + if(!ExperimentState.CANCELED.equals(details.getExperimentStatus().getExperimentState())&& + !ExperimentState.CANCELING.equals(details.getExperimentStatus().getExperimentState())) { + status.setExperimentState(state); + }else{ + status.setExperimentState(details.getExperimentStatus().getExperimentState()); + } details.setExperimentStatus(status); + logger.info("Updating the experiment status of experiment: " + experimentId + " to " + status.getExperimentState().toString()); airavataRegistry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId); + return details.getExperimentStatus().getExperimentState(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --cc airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index 8d3ef3b,e9b74f5..140d631 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@@ -45,66 -57,85 +45,66 @@@ public class CreateLaunchExperiment private final static Logger logger = LoggerFactory.getLogger(CreateLaunchExperiment.class); private static final String DEFAULT_USER = "default.registry.user"; private static final String DEFAULT_GATEWAY = "default.registry.gateway"; - private static Airavata.Client client; - private static String localHostAppId = "localhost_de131adb-fd50-492e-a68d-292b17db4faf,SimpleEcho0_23670b88-3ea2-48ff-9b6d-cca9d7b297e6"; - private static String sshHostAppId; - private static String pbsEchoAppId = "trestles.sdsc.edu_07053ec9-2e8f-4d72-bb4d-3a12fe4360de,SimpleEcho2_c81741d5-47d0-4aa7-9ee5-2a6ad5f586e2"; - private static String pbsWRFAppId = "trestles.sdsc.edu_5fc718ca-b298-4284-a99b-b23e06b10f06,WRF_e304f95a-83d7-46ba-8292-083aa6a46880"; - private static String slurmAppId = "stampede.tacc.xsede.org_b2ef59cb-f626-4767-9ca0-601f94c42ba4,SimpleEcho3_b81c2559-a088-42a3-84ce-40119d874918"; - private static String sgeAppId; - private static String br2EchoAppId = "bigred2_9c1e6be8-f7d8-4494-98f2-bf508790e8c6,SimpleEchoBR_149fd613-98e2-46e7-ac7c-4d393349469e"; - private static String slurmWRFAppId = "stampede.tacc.xsede.org_2840c815-7e61-4579-8194-79fe15cea9a9,WRF_00817e82-7995-4986-8fe2-72da08b63ef0"; - private static String br2AmberAppId = "bigred2_5dc35993-31c4-499e-97c1-8d934007e135,AmberBR2_f63fd6f9-a93f-43a8-bd41-065740a32f1f"; - private static String slurmAmberAppId = "bigred2_5dc35993-31c4-499e-97c1-8d934007e135,AmberBR2_f63fd6f9-a93f-43a8-bd41-065740a32f1f"; - private static String trestlesAmberAppId = "trestles.sdsc.edu_8ca93e3d-135c-4e3a-bf58-bdcc2592625d,AmberTrestles_ea0e8e82-3b00-4ef7-9a78-867cfecebbf1"; - + private static Airavata.Client airavataClient; - private static String echoAppId = "Echo_89831769-edf5-4f27-a8c9-fe0ef96fd355"; - private static String wrfAppId = "WRF_15ae6599-a48f-4134-95b8-98e109ac6f88"; - private static String amberAppId = "Amber_a7b18a3a-31b3-4dc7-8faf-7c3144f14201"; ++ private static String echoAppId = "Echo_ab621572-8830-48dd-a785-0e49ee155f4f"; ++ private static String wrfAppId = "WRF_afd45537-fd4e-4a57-9cc4-bfc3cc17afa9"; ++ private static String amberAppId = "Amber_44c4e886-87e8-49b3-ac2b-1c336d67c160"; + private static String localHost = "localhost"; + private static String trestlesHostName = "trestles.sdsc.xsede.org"; + private static String stampedeHostName = "stampede.tacc.xsede.org"; + private static String br2HostName = "bigred2.uits.iu.edu"; public static void main(String[] args) { - try { - AiravataUtils.setExecutionAsClient(); - client = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT); - System.out.println("API version is " + client.getAPIVersion()); -// addDescriptors(); - -//// final String expId = createExperimentForSSHHost(airavata); - final String expId = createExperimentForTrestles(client); -//// final String expId = createExperimentForStampede(client); -// final String expId = createExperimentForLocalHost(client); -// final String expId = createExperimentForLonestar(airavata); -// final String expId = createExperimentWRFTrestles(client); -// final String expId = createExperimentForBR2(client); -// final String expId = createExperimentForBR2Amber(client); -// final String expId = createExperimentWRFStampede(client); -// final String expId = createExperimentForStampedeAmber(client); -// final String expId = createExperimentForTrestlesAmber(client); + try { + airavataClient = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT); + System.out.println("API version is " + airavataClient.getAPIVersion()); +// registerApplications(); // run this only the first time - // for (int i = 0; i < 100; i++) { ++ for (int i = 0; i < 10; i++) { +// final String expId = createExperimentForSSHHost(airavata); + final String expId = createEchoExperimentForTrestles(airavataClient); +// final String expId = createEchoExperimentForStampede(airavataClient); +// final String expId = createExperimentEchoForLocalHost(airavataClient); +// final String expId = createExperimentWRFTrestles(airavataClient); +// final String expId = createExperimentForBR2(airavataClient); +// final String expId = createExperimentForBR2Amber(airavataClient); +// final String expId = createExperimentWRFStampede(airavataClient); +// final String expId = createExperimentForStampedeAmber(airavataClient); +// final String expId = createExperimentForTrestlesAmber(airavataClient); System.out.println("Experiment ID : " + expId); // updateExperiment(airavata, expId); - launchExperiment(client, expId); - -// System.out.println("retrieved exp id : " + experiment.getExperimentID()); - } catch (Exception e) { - logger.error("Error while connecting with server", e.getMessage()); - e.printStackTrace(); - } + launchExperiment(airavataClient, expId); - // } ++ } + } catch (Exception e) { + logger.error("Error while connecting with server", e.getMessage()); + e.printStackTrace(); + } } - public static void addDescriptors() throws AiravataAPIInvocationException, ApplicationSettingsException { - try { - DocumentCreatorNew documentCreator = new DocumentCreatorNew(client); -// DocumentCreator documentCreator = new DocumentCreator(getAiravataAPI()); - localHostAppId = documentCreator.createLocalHostDocs(); - sshHostAppId = documentCreator.createSSHHostDocs(); -// documentCreator.createGramDocs(); - pbsEchoAppId =documentCreator.createPBSDocsForOGCE_Echo(); - pbsWRFAppId =documentCreator.createPBSDocsForOGCE_WRF(); - slurmAppId = documentCreator.createSlurmDocs(); - sgeAppId = documentCreator.createSGEDocs(); -// documentCreator.createEchoHostDocs(); - br2EchoAppId = documentCreator.createBigRedDocs(); - slurmWRFAppId = documentCreator.createSlumWRFDocs(); - br2AmberAppId = documentCreator.createBigRedAmberDocs(); - slurmAmberAppId = documentCreator.createStampedeAmberDocs(); - trestlesAmberAppId = documentCreator.createTrestlesAmberDocs(); - System.out.printf(localHostAppId); - System.out.println(sshHostAppId); - System.out.println(pbsEchoAppId); - System.out.println(pbsWRFAppId); - System.out.println(slurmAppId); - System.out.println(sgeAppId); - System.out.println(br2EchoAppId); - System.out.println(slurmWRFAppId); - System.out.println(br2AmberAppId); - System.out.println(trestlesAmberAppId); - } catch (Exception e) { - logger.error("Unable to create documents", e.getMessage()); - throw new ApplicationSettingsException(e.getMessage()); - } + public static void registerApplications() { + RegisterSampleApplications registerSampleApplications = new RegisterSampleApplications(airavataClient); + + // register localhost compute host + registerSampleApplications.registerLocalHost(); + + //Register all compute hosts + registerSampleApplications.registerXSEDEHosts(); + + //Register Gateway Resource Preferences + registerSampleApplications.registerGatewayResourceProfile(); + + //Register all application modules + registerSampleApplications.registerAppModules(); + + //Register all application deployments + registerSampleApplications.registerAppDeployments(); + + //Register all application interfaces + registerSampleApplications.registerAppInterfaces(); } - public static String createExperimentForTrestles(Airavata.Client client) throws TException { + public static String createEchoExperimentForTrestles(Airavata.Client client) throws TException { try { List<DataObjectType> exInputs = new ArrayList<DataObjectType>(); DataObjectType input = new DataObjectType(); http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java ---------------------------------------------------------------------- diff --cc modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java index a7fc02d,94a6b07..e1b93ed --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java @@@ -228,6 -222,15 +230,16 @@@ public class ServerSettings extends App return getSetting(ACTIVITY_LISTENERS).split(","); } ++ + public static String getActivityPublisher() throws ApplicationSettingsException{ + return getSetting(ACTIVITY_PUBLISHER); + } + + public static boolean isRabbitMqPublishEnabled() throws ApplicationSettingsException{ + String setting = getSetting(PUBLISH_RABBITMQ); + return Boolean.parseBoolean(setting); + } + public static boolean isEmbeddedZK() { return Boolean.parseBoolean(getSetting(EMBEDDED_ZK, "true")); } http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --cc modules/configuration/server/src/main/resources/airavata-server.properties index b1152c4,d303dbd..d618645 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@@ -185,11 -186,13 +185,16 @@@ monitors=org.apache.airavata.gfac.monit amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876 connection.name=xsede + #publisher activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher + publish.rabbitmq=false + activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher + rabbitmq.broker.url=amqp://localhost:5672 + rabbitmq.exchange.name=airavata_rabbitmq_exchange +#This property will be useful when there are multiple network interfaces in the machine where airavata is +#deployed, so users have to specify the ip address manually and this can be use for callback ip of the system(specially in gfac). +#ip=192.2.33.12 ###---------------------------Orchestrator module Configurations---------------------------### #job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/distribution/server/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/distribution/server/src/main/assembly/bin-assembly.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-core/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 1ed3a67,c70c8a8..109320c --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@@ -20,20 -20,9 +20,8 @@@ */ package org.apache.airavata.gfac.core.cpi; - import java.io.File; - import java.io.IOException; - import java.net.URL; - import java.util.ArrayList; - import java.util.List; - import java.util.Properties; - import java.util.concurrent.ExecutorService; - import java.util.concurrent.Executors; - - import javax.xml.parsers.ParserConfigurationException; - import javax.xml.xpath.XPathExpressionException; - import org.airavata.appcatalog.cpi.AppCatalog; import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; -import org.apache.airavata.client.api.AiravataAPI; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataZKUtils; import org.apache.airavata.common.utils.MonitorPublisher; @@@ -75,31 -55,21 +54,16 @@@ import org.apache.airavata.model.appcat import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; - import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; - import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand; - import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; - import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission; - import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; - import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; + import org.apache.airavata.model.appcatalog.computeresource.*; import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; -import org.apache.airavata.model.messaging.event.JobIdentifier; -import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; -import org.apache.airavata.model.messaging.event.TaskIdentifier; -import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; ++import org.apache.airavata.model.messaging.event.*; import org.apache.airavata.model.workspace.experiment.*; -import org.apache.airavata.registry.api.AiravataRegistry2; import org.apache.airavata.registry.cpi.Registry; import org.apache.airavata.registry.cpi.RegistryModelType; - import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; + import org.apache.airavata.schemas.gfac.*; import org.apache.airavata.schemas.gfac.DataType; - import org.apache.airavata.schemas.gfac.GsisshHostType; - import org.apache.airavata.schemas.gfac.HostDescriptionType; - import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType; - import org.apache.airavata.schemas.gfac.InputParameterType; - import org.apache.airavata.schemas.gfac.JobTypeType; - import org.apache.airavata.schemas.gfac.OutputParameterType; - import org.apache.airavata.schemas.gfac.ParameterType; - import org.apache.airavata.schemas.gfac.ProjectAccountType; - import org.apache.airavata.schemas.gfac.QueueType; - import org.apache.airavata.schemas.gfac.SSHHostType; - import org.apache.airavata.schemas.gfac.ServiceDescriptionType; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZKUtil; -import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xml.sax.SAXException; @@@ -522,91 -498,6 +503,98 @@@ public class BetterGfacImpl implements } return true; } catch (ApplicationSettingsException e) { + throw new GFacException("Error launching the Job",e); + } catch (KeeperException e) { + throw new GFacException("Error launching the Job",e); + } catch (InterruptedException e) { + throw new GFacException("Error launching the Job",e); + } + } + + public boolean cancel(String experimentID, String taskID, String gatewayID) throws GFacException { + JobExecutionContext jobExecutionContext = null; + try { + jobExecutionContext = createJEC(experimentID, taskID, gatewayID); + return cancel(jobExecutionContext); + } catch (Exception e) { + log.error("Error inovoking the job with experiment ID: " + experimentID); + throw new GFacException(e); + } + } + + private boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException { + // We need to check whether this job is submitted as a part of a large workflow. If yes, + // we need to setup workflow tracking listener. + try { + // we cannot call GFacUtils.getZKExperimentStateValue because experiment might be running in some other node + String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), zk); + int stateVal = GFacUtils.getZKExperimentStateValue(zk, expPath); // this is the original state came, if we query again it might be different,so we preserve this state in the environment + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + , GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status + String workflowInstanceID = null; + if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) { + // This mean we need to register workflow tracking listener. + //todo implement WorkflowTrackingListener properly + registerWorkflowTrackingListener(workflowInstanceID, jobExecutionContext); + } + // Register log event listener. This is required in all scenarios. + jobExecutionContext.getNotificationService().registerListener(new LoggingListener()); + if (stateVal < 2) { + // In this scenario We do everything from the beginning + log.info("Job is not yet submitted, so nothing much to do except changing the registry entry " + + " and stop the execution chain"); + } else if (stateVal >= 8) { + log.error("This experiment is almost finished, so cannot cancel this experiment"); + ZKUtil.deleteRecursive(zk, + AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID())); + } else { + log.info("Job is in a position to perform a proper cancellation"); + try { + Scheduler.schedule(jobExecutionContext); + + invokeProviderCancel(jobExecutionContext); + + } catch (Exception e) { + try { + // we make the experiment as failed due to exception scenario + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); + // monitorPublisher.publish(new + // ExperimentStatusChangedEvent(new + // ExperimentIdentity(jobExecutionContext.getExperimentID()), + // ExperimentState.FAILED)); + // Updating the task status if there's any task associated + // monitorPublisher.publish(new TaskStatusChangeRequest( + // new TaskIdentity(jobExecutionContext.getExperimentID(), + // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + // jobExecutionContext.getTaskData().getTaskID()), + // TaskState.FAILED + // )); - monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext), new JobIdentity(jobExecutionContext.getExperimentID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext - .getJobDetails().getJobID()), JobState.FAILED)); ++ JobStatusChangeRequestEvent changeRequestEvent = new JobStatusChangeRequestEvent(); ++ changeRequestEvent.setState(JobState.FAILED); ++ JobIdentifier jobIdentifier = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(), ++ jobExecutionContext.getTaskData().getTaskID(), ++ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), ++ jobExecutionContext.getExperimentID()); ++ changeRequestEvent.setJobIdentity(jobIdentifier); ++ monitorPublisher.publish(changeRequestEvent); + } catch (NullPointerException e1) { + log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " + + "NullPointerException occurred because at this point there might not have Job Created", e1, e); + //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED)); + // Updating the task status if there's any task associated - monitorPublisher.publish(new TaskStatusChangeRequest(new TaskIdentity(jobExecutionContext.getExperimentID(), jobExecutionContext - .getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED)); ++ monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, ++ new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), ++ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), ++ jobExecutionContext.getExperimentID()))); + + } + jobExecutionContext.setProperty(ERROR_SENT, "true"); + jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause())); + throw new GFacException(e.getMessage(), e); + } + } + return true; + } catch (ApplicationSettingsException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); @@@ -942,39 -774,32 +937,39 @@@ } monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING)); for (GFacHandlerConfig handlerClassName : handlers) { - Class<? extends GFacHandler> handlerClass; - GFacHandler handler; - try { - GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName()); - handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class); - handler = handlerClass.newInstance(); - handler.initProperties(handlerClassName.getProperties()); - } catch (ClassNotFoundException e) { - log.error(e.getMessage()); - throw new GFacException("Cannot load handler class " + handlerClassName, e); - } catch (InstantiationException e) { - log.error(e.getMessage()); - throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); - } catch (IllegalAccessException e) { - log.error(e.getMessage()); - throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); - } catch (Exception e) { - throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); - } - try { - handler.invoke(jobExecutionContext); - GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED); - } catch (Exception e) { - // TODO: Better error reporting. - throw new GFacException("Error Executing a OutFlow Handler", e); + if(!isCancelled()) { + Class<? extends GFacHandler> handlerClass; + GFacHandler handler; + try { + GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName()); + handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class); + handler = handlerClass.newInstance(); + handler.initProperties(handlerClassName.getProperties()); + } catch (ClassNotFoundException e) { + log.error(e.getMessage()); + throw new GFacException("Cannot load handler class " + handlerClassName, e); + } catch (InstantiationException e) { + log.error(e.getMessage()); + throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); + } catch (IllegalAccessException e) { + log.error(e.getMessage()); + throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); + } catch (Exception e) { + throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); + } + try { + handler.invoke(jobExecutionContext); + GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED); + } catch (Exception e) { - monitorPublisher.publish(new TaskStatusChangeRequest( - new TaskIdentity(jobExecutionContext.getExperimentID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), - jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED)); ++ TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), ++ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), ++ jobExecutionContext.getExperimentID()); ++ monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity)); + throw new GFacException(e); + } + }else{ + log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop"); + break; } monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED)); } http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java ---------------------------------------------------------------------- diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java index f7d8b28,4e44372..06a8d37 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java @@@ -85,14 -94,26 +94,26 @@@ public class AiravataTaskStatusUpdator case CANCELING: state=TaskState.CANCELING; break; default: - break; + return; } try { - state = updateTaskStatus(jobStatus.getIdentity().getTaskId(), state); - logger.debug("Publishing task status for "+jobStatus.getIdentity().getTaskId()+":"+state.toString()); - monitorPublisher.publish(new TaskStatusChangedEvent(jobStatus.getIdentity(),state)); - } catch (Exception e) { + updateTaskStatus(jobStatus.getJobIdentity().getTaskId(), state); + logger.debug("Publishing task status for "+jobStatus.getJobIdentity().getTaskId()+":"+state.toString()); + TaskIdentifier taskIdentity = new TaskIdentifier(jobStatus.getJobIdentity().getTaskId(), + jobStatus.getJobIdentity().getWorkflowNodeId(), + jobStatus.getJobIdentity().getExperimentId()); + TaskStatusChangeEvent event = new TaskStatusChangeEvent(state, taskIdentity); + monitorPublisher.publish(event); + String messageId = AiravataUtils.getId("TASK"); + MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId); + msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + if ( ServerSettings.isRabbitMqPublishEnabled()){ + publisher.publish(msgCntxt); + } + - } catch (Exception e) { ++ } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); + throw new Exception("Error persisting task status..", e); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java ---------------------------------------------------------------------- diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java index 889215a,2ba08e1..268677e --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java @@@ -70,14 -76,24 +76,24 @@@ public class AiravataWorkflowNodeStatus case CANCELING: state=WorkflowNodeState.CANCELING; break; default: - break; + return; } try { - updateWorkflowNodeStatus(taskStatus.getIdentity().getWorkflowNodeID(), state); - logger.debug("Publishing workflow node status for "+taskStatus.getIdentity().getWorkflowNodeID()+":"+state.toString()); - monitorPublisher.publish(new WorkflowNodeStatusChangedEvent(taskStatus.getIdentity(),state)); + updateWorkflowNodeStatus(taskStatus.getTaskIdentity().getWorkflowNodeId(), state); + logger.debug("Publishing workflow node status for "+taskStatus.getTaskIdentity().getWorkflowNodeId()+":"+state.toString()); + WorkflowIdentifier workflowIdentity = new WorkflowIdentifier(taskStatus.getTaskIdentity().getWorkflowNodeId(), taskStatus.getTaskIdentity().getExperimentId()); + WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent(state, workflowIdentity); + monitorPublisher.publish(event); + String messageId = AiravataUtils.getId("WFNODE"); + MessageContext msgCntxt = new MessageContext(event, MessageType.WORKFLOWNODE, messageId); + msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + + if ( ServerSettings.isRabbitMqPublishEnabled()){ + publisher.publish(msgCntxt); + } } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); + throw new Exception("Error persisting workflow node status..", e); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java ---------------------------------------------------------------------- diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java index 704bf26,386424e..e7a1297 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java @@@ -32,8 -32,8 +32,10 @@@ public class GfacExperimentStateChangeR private MonitorID monitorID; public GfacExperimentStateChangeRequest(MonitorID monitorID, GfacExperimentState state) { - setIdentity(new JobIdentity(monitorID.getExperimentID(), monitorID.getWorkflowNodeID(), - monitorID.getTaskID(), monitorID.getJobID())); - setIdentity(new JobIdentifier(monitorID.getExperimentID(), monitorID.getWorkflowNodeID(), - monitorID.getTaskID(), monitorID.getJobID())); ++ setIdentity(new JobIdentifier(monitorID.getJobID(), ++ monitorID.getTaskID(), ++ monitorID.getWorkflowNodeID(), ++ monitorID.getExperimentID())); setMonitorID(monitorID); this.state = state; } http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java ---------------------------------------------------------------------- diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java index 64c7899,0000000..0e56fc7 mode 100644,000000..100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java @@@ -1,60 -1,0 +1,60 @@@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.core.utils; + +import org.apache.airavata.common.utils.MonitorPublisher; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.cpi.GFac; +import org.apache.airavata.gfac.core.monitor.MonitorID; - import org.apache.airavata.gfac.core.monitor.TaskIdentity; - import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest; ++import org.apache.airavata.model.messaging.event.TaskIdentifier; ++import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent; +import org.apache.airavata.model.workspace.experiment.TaskState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OutHandlerWorker implements Runnable { + private final static Logger logger = LoggerFactory.getLogger(OutHandlerWorker.class); + + private GFac gfac; + + private MonitorID monitorID; + + private MonitorPublisher monitorPublisher; + + public OutHandlerWorker(GFac gfac, MonitorID monitorID,MonitorPublisher monitorPublisher) { + this.gfac = gfac; + this.monitorID = monitorID; + this.monitorPublisher = monitorPublisher; + } + + @Override + public void run() { + try { + gfac.invokeOutFlowHandlers(monitorID.getJobExecutionContext()); + } catch (GFacException e) { - monitorPublisher.publish(new TaskStatusChangeRequest(new TaskIdentity(monitorID.getExperimentID(), monitorID.getWorkflowNodeID(), - monitorID.getTaskID()), TaskState.FAILED)); ++ TaskIdentifier taskIdentifier = new TaskIdentifier(monitorID.getTaskID(), monitorID.getWorkflowNodeID(),monitorID.getExperimentID()); ++ monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier)); + //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status + logger.info(e.getLocalizedMessage(), e); + } + monitorPublisher.publish(monitorID.getStatus()); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --cc modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 959104e,2fa5f62..15c6380 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@@ -26,9 -24,9 +26,8 @@@ import org.apache.airavata.common.logge import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.commons.gfac.type.HostDescription; -import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.cpi.GFac; import org.apache.airavata.gfac.core.monitor.MonitorID; - import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest; import org.apache.airavata.gfac.monitor.HostMonitorData; import org.apache.airavata.gfac.monitor.UserMonitorData; import org.apache.airavata.gfac.monitor.core.PullMonitor; @@@ -37,11 -34,16 +36,12 @@@ import org.apache.airavata.gfac.monitor import org.apache.airavata.gfac.monitor.util.CommonUtils; import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; - import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; + import org.apache.airavata.model.messaging.event.JobIdentifier; + import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent; -import org.apache.airavata.model.messaging.event.TaskIdentifier; -import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; import org.apache.airavata.model.workspace.experiment.JobState; -import org.apache.airavata.model.workspace.experiment.TaskState; import org.apache.airavata.schemas.gfac.GsisshHostType; import org.apache.airavata.schemas.gfac.SSHHostType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.zookeeper.ZooKeeper; import java.sql.Timestamp; import java.util.*; @@@ -175,66 -154,16 +175,70 @@@ public class HPCPullMonitor extends Pul connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo()); connections.put(hostName, connection); } + + // before we get the statuses, we check the cancel job list and remove them permanently List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs(); + Iterator<String> iterator1 = cancelJobList.iterator(); + + for(MonitorID iMonitorID:monitorID){ + while(iterator1.hasNext()) { + String cancelMId = iterator1.next(); + if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) { + iMonitorID.setStatus(JobState.CANCELED); + completedJobs.put(iMonitorID.getJobName(), iMonitorID); + iterator1.remove(); + logger.debugId(cancelMId, "Found a match in cancel monitor queue, hence moved to the " + + "completed job queue, experiment {}, task {} , job {}", + iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobID()); + break; + } + } + iterator1 = cancelJobList.iterator(); + } + synchronized (completedJobsFromPush) { + ListIterator<String> iterator = completedJobsFromPush.listIterator(); + for (MonitorID iMonitorID : monitorID) { + String completeId = null; + while (iterator.hasNext()) { + completeId = iterator.next(); + if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) { + logger.info("This job is finished because push notification came with <username,jobName> " + completeId); + completedJobs.put(iMonitorID.getJobName(), iMonitorID); + iMonitorID.setStatus(JobState.COMPLETE); + iterator.remove();//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak + logger.debugId(completeId, "Push notification updated job {} status to {}. " + + "experiment {} , task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(), + iMonitorID.getExperimentID(), iMonitorID.getTaskID()); + break; + } + } + iterator = completedJobsFromPush.listIterator(); + } + } Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID); - for (MonitorID iMonitorID : monitorID) { + Iterator<MonitorID> iterator = monitorID.iterator(); + while (iterator.hasNext()) { + MonitorID iMonitorID = iterator.next(); currentMonitorID = iMonitorID; + if (!JobState.CANCELED.equals(iMonitorID.getStatus())&& + !JobState.COMPLETE.equals(iMonitorID.getStatus())) { + iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we have a logic + }else if(JobState.COMPLETE.equals(iMonitorID.getStatus())){ + completedJobs.put(iMonitorID.getJobName(), iMonitorID); + logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed jobs map, experiment {}, " + + "task {}", iMonitorID.getJobID(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); + } - jobStatus = new JobStatusChangeRequest(iMonitorID); ++ jobStatus = new JobStatusChangeRequestEvent(); + iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic - jobStatus.setJobIdentity(new JobIdentifier(iMonitorID.getJobID(), iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), iMonitorID.getExperimentID())); ++ JobIdentifier jobIdentity = new JobIdentifier(iMonitorID.getJobID(), iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), iMonitorID.getExperimentID()); ++ jobStatus.setJobIdentity(jobIdentity); + jobStatus.setState(iMonitorID.getStatus()); // we have this JobStatus class to handle amqp monitoring publisher.publish(jobStatus); - logger.debugId(jobStatus.getIdentity().getJobId(), "Published job status change request, " + - "experiment {} , task {}", jobStatus.getIdentity().getExperimentID(), - jobStatus.getIdentity().getTaskId()); ++ logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " + ++ "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(), ++ jobStatus.getJobIdentity().getTaskId()); // if the job is completed we do not have to put the job to the queue again iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); @@@ -315,6 -229,6 +319,14 @@@ if (e.getMessage().contains("Unknown Job Id Error")) { // in this case job is finished or may be the given job ID is wrong jobStatus.setState(JobState.UNKNOWN); ++ JobIdentifier jobIdentifier = new JobIdentifier("UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN"); ++ if (currentMonitorID != null){ ++ jobIdentifier.setExperimentId(currentMonitorID.getExperimentID()); ++ jobIdentifier.setTaskId(currentMonitorID.getTaskID()); ++ jobIdentifier.setWorkflowNodeId(currentMonitorID.getWorkflowNodeID()); ++ jobIdentifier.setJobId(currentMonitorID.getJobID()); ++ } ++ jobStatus.setJobIdentity(jobIdentifier); publisher.publish(jobStatus); } else if (e.getMessage().contains("illegally formed job identifier")) { logger.error("Wrong job ID is given so dropping the job from monitoring system"); http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/pom.xml ----------------------------------------------------------------------
