merging with master
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ffbb1b9f Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ffbb1b9f Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ffbb1b9f Branch: refs/heads/master Commit: ffbb1b9f6776cf90a3eb9e7d418bd5bed76a0ef4 Parents: 840e627 33d2e27 Author: Lahiru Gunathilake <[email protected]> Authored: Tue Feb 24 10:57:46 2015 -0500 Committer: Lahiru Gunathilake <[email protected]> Committed: Tue Feb 24 10:57:46 2015 -0500 ---------------------------------------------------------------------- .../server/handler/AiravataServerHandler.java | 86 +- .../java/org/apache/airavata/api/Airavata.java | 6323 ++++++++++++++++-- .../main/resources/lib/airavata/Airavata.cpp | 5246 +++++++++------ .../src/main/resources/lib/airavata/Airavata.h | 636 ++ .../lib/airavata/Airavata_server.skeleton.cpp | 20 + .../resources/lib/Airavata/API/Airavata.php | 1270 +++- .../client/samples/CreateLaunchExperiment.java | 60 +- .../client/samples/RegisterSampleData.java | 2 +- .../tools/RegisterSampleApplications.java | 13 +- .../airavataAPI.thrift | 23 + .../appcatalog/cpi/ComputeResource.java | 3 + .../catalog/data/impl/ComputeResourceImpl.java | 30 +- .../catalog/data/model/UnicoreDataMovement.java | 65 + .../data/resources/AbstractResource.java | 7 + .../resources/UnicoreDataMovementResource.java | 255 + .../catalog/data/util/AppCatalogJPAUtils.java | 19 +- .../data/util/AppCatalogResourceType.java | 1 + .../data/util/AppCatalogThriftConversion.java | 20 +- .../src/main/resources/META-INF/persistence.xml | 1 + .../src/main/resources/appcatalog-derby.sql | 8 + .../src/main/resources/appcatalog-mysql.sql | 9 + .../src/test/resources/appcatalog-derby.sql | 8 + .../main/resources/airavata-server.properties | 6 + .../credential/store/client/TestSSLClient.java | 140 + .../store/server/CredentialStoreServer.java | 21 +- .../server/CredentialStoreServerHandler.java | 37 +- .../airavata/credential/store/util/Utility.java | 14 +- modules/distribution/server/pom.xml | 110 +- modules/messaging/client/README | 15 + modules/messaging/client/pom.xml | 103 + .../messaging/client/RabbitMQListner.java | 230 + .../core/impl/RabbitMQStatusPublisher.java | 12 +- .../messaging/core/stats/CountWriterTask.java | 36 + .../messaging/core/stats/LatencyWriterTask.java | 37 + .../messaging/core/stats/StatCounter.java | 83 + modules/messaging/pom.xml | 1 + 36 files changed, 12261 insertions(+), 2689 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/ffbb1b9f/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/ffbb1b9f/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --cc airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index 1e9d983,9f4cd12..c4c303f --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@@ -60,10 -57,10 +57,10 @@@ public class CreateLaunchExperiment private static final String DEFAULT_GATEWAY = "default.registry.gateway"; private static Airavata.Client airavataClient; - private static String echoAppId = "Echo_2e539083-665d-40fd-aaa2-4a751028326b"; + private static String echoAppId = "Echo_a8fc8511-7b8e-431a-ad0f-de5eb1a9c576"; private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9"; private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762"; - private static String amberAppId = "Amber_eda074ea-223d-49d7-a942-6c8742249f36"; + private static String amberAppId = "Amber_42124128-628b-484c-829d-aff8b584eb00"; private static String gromacsAppId = "GROMACS_05622038-9edd-4cb1-824e-0b7cb993364b"; private static String espressoAppId = "ESPRESSO_10cc2820-5d0b-4c63-9546-8a8b595593c1"; private static String lammpsAppId = "LAMMPS_10893eb5-3840-438c-8446-d26c7ecb001f"; http://git-wip-us.apache.org/repos/asf/airavata/blob/ffbb1b9f/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --cc modules/configuration/server/src/main/resources/airavata-server.properties index e309901,c493752..00191a7 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@@ -216,12 -215,9 +216,18 @@@ connection.name=xsed #publisher activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher publish.rabbitmq=false ++<<<<<<< HEAD +status.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher +task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchPublisher +rabbitmq.broker.url=amqp://localhost:5672 +rabbitmq.status.exchange.name=airavata_rabbitmq_exchange +rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange + ++======= + activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher + rabbitmq.broker.url=amqp://gw111.iu.xsede.org:5672 + rabbitmq.exchange.name=airavata_rabbitmq_exchange ++>>>>>>> master ########################################################################### # Orchestrator module Configuration http://git-wip-us.apache.org/repos/asf/airavata/blob/ffbb1b9f/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java ---------------------------------------------------------------------- diff --cc modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java index fe06ed7,0000000..70ed942 mode 100644,000000..100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java @@@ -1,99 -1,0 +1,103 @@@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.messaging.core.impl; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.messaging.core.Publisher; ++import org.apache.airavata.messaging.core.stats.StatCounter; +import org.apache.airavata.model.messaging.event.*; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RabbitMQStatusPublisher implements Publisher { + + private static Logger log = LoggerFactory.getLogger(RabbitMQStatusPublisher.class); + + private RabbitMQProducer rabbitMQProducer; + ++ StatCounter statCounter = StatCounter.getInstance(); + + public RabbitMQStatusPublisher() throws Exception { + String brokerUrl; + String exchangeName; + try { + brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); + exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME); + } catch (ApplicationSettingsException e) { + String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; + log.error(message, e); + throw new AiravataException(message, e); + } + rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName); + rabbitMQProducer.open(); + } + + public void publish(MessageContext msgCtx) throws AiravataException { + try { + log.info("Publishing status to rabbitmq..."); + byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); + Message message = new Message(); + message.setEvent(body); + message.setMessageId(msgCtx.getMessageId()); + message.setMessageType(msgCtx.getType()); + message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); ++ String gatewayId = msgCtx.getGatewayId(); + String routingKey = null; + if (msgCtx.getType().equals(MessageType.EXPERIMENT)){ + ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent(); - routingKey = event.getExperimentId(); ++ routingKey = gatewayId + "." + event.getExperimentId(); + } else if (msgCtx.getType().equals(MessageType.TASK)) { + TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); - routingKey = event.getTaskIdentity().getExperimentId() + "." + ++ routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." + + event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId(); + }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){ + WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent(); + WorkflowIdentifier workflowNodeIdentity = event.getWorkflowNodeIdentity(); - routingKey = workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId(); ++ routingKey = gatewayId + "." + workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId(); + }else if (msgCtx.getType().equals(MessageType.JOB)){ + JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent(); + JobIdentifier identity = event.getJobIdentity(); - routingKey = identity.getExperimentId() + "." + ++ routingKey = gatewayId + "." + identity.getExperimentId() + "." + + identity.getWorkflowNodeId() + "." + + identity.getTaskId() + "." + + identity.getJobId(); + } + byte[] messageBody = ThriftUtils.serializeThriftObject(message); + rabbitMQProducer.send(messageBody, routingKey); ++ statCounter.add(message); + } catch (TException e) { + String msg = "Error while deserializing the object"; + log.error(msg, e); + throw new AiravataException(msg, e); + } catch (Exception e) { + String msg = "Error while sending to rabbitmq"; + log.error(msg, e); + throw new AiravataException(msg, e); + } + } +}
