adding code to send the events through rabbitmq
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/61214dbf Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/61214dbf Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/61214dbf Branch: refs/heads/master Commit: 61214dbfaa66ead4a40ad786a04d90f1375f8224 Parents: 7ed2e34 Author: Chathuri Wimalasena <[email protected]> Authored: Thu Sep 25 10:28:03 2014 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Thu Sep 25 11:14:02 2014 -0400 ---------------------------------------------------------------------- .../airavata/common/utils/ThriftUtils.java | 37 ++++++++ .../airavata/messaging/core/MessageContext.java | 44 +++++++++ .../airavata/messaging/core/Publisher.java | 24 ++++- .../messaging/core/PublisherFactory.java | 50 +++++++++++ .../core/impl/AiravataRabbitMQPublisher.java | 42 --------- .../messaging/core/impl/RabbitMQProducer.java | 23 ++--- .../messaging/core/impl/RabbitMQPublisher.java | 93 ++++++++++++++++++++ 7 files changed, 260 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ThriftUtils.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ThriftUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ThriftUtils.java new file mode 100644 index 0000000..ee86f74 --- /dev/null +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ThriftUtils.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.common.utils; + +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; + +public class ThriftUtils { + public static byte[] serializeThriftObject(TBase object) throws TException { + return new TSerializer().serialize(object); + } + + public static void createThriftFromBytes(byte []bytes, TBase object) throws TException { + new TDeserializer().deserialize(object, bytes); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java new file mode 100644 index 0000000..48fff59 --- /dev/null +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.messaging.core; + +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.thrift.TBase; + +public class MessageContext { + private final TBase event; + + private final MessageType type; + + public MessageContext(TBase message, MessageType type) { + this.event = message; + this.type = type; + } + + public TBase getEvent() { + return event; + } + + public MessageType getType() { + return type; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java index 24c8e2a..4452856 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java @@ -1,7 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + package org.apache.airavata.messaging.core; +import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.model.messaging.event.*; public interface Publisher { - public void publish(Message message); + public void publish(MessageContext message) throws AiravataException; } http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java new file mode 100644 index 0000000..116e9b4 --- /dev/null +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.messaging.core; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.utils.ServerSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PublisherFactory { + private static Logger log = LoggerFactory.getLogger(PublisherFactory.class); + + public Publisher createPublisher() throws AiravataException { + String activityPublisher = ServerSettings.getActivityPublisher(); + + if (activityPublisher == null) { + String s = "Activity publisher is not specified"; + log.error(s); + throw new AiravataException(s); + } + + try { + Class<? extends Publisher> aPublisher = Class.forName(activityPublisher).asSubclass(Publisher.class); + return aPublisher.newInstance(); + } catch (Exception e) { + String msg = "Failed to load the publisher from the publisher class property: " + activityPublisher; + log.error(msg, e); + throw new AiravataException(msg, e); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java deleted file mode 100644 index 0fda442..0000000 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.messaging.core.impl; - -import org.apache.airavata.messaging.core.Publisher; -import org.apache.airavata.model.messaging.event.*; - -public class AiravataRabbitMQPublisher implements Publisher { - private String brokerUrl; - private String routingKey; - private String exchangeName; - private int prefetchCount; - private boolean isRequeueOnFail; - - public AiravataRabbitMQPublisher() { - - RabbitMQProducer rabbitMQProducer = new RabbitMQProducer(brokerUrl, routingKey, exchangeName, prefetchCount, isRequeueOnFail); - } - - public void publish(Message message) { - - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java index e52161f..0ad8705 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java @@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; public class RabbitMQProducer { + public static final int DEFAULT_PRE_FETCH = 64; + private static Logger log = LoggerFactory.getLogger(RabbitMQProducer.class); private Connection connection; @@ -40,23 +42,25 @@ public class RabbitMQProducer { private String exchangeName; - private String routingKey; - - private int prefetchCount; + private int prefetchCount = DEFAULT_PRE_FETCH; private boolean isReQueueOnFail = false; private String url; - public RabbitMQProducer(String url, String routingKey, String exchangeName, - int prefetchCount, boolean isReQueueOnFail) { - this.prefetchCount = prefetchCount; - this.isReQueueOnFail = isReQueueOnFail; + public RabbitMQProducer(String url, String exchangeName) { this.exchangeName = exchangeName; - this.routingKey = routingKey; this.url = url; } + public void setPrefetchCount(int prefetchCount) { + this.prefetchCount = prefetchCount; + } + + public void setReQueueOnFail(boolean isReQueueOnFail) { + this.isReQueueOnFail = isReQueueOnFail; + } + private void reset() { consumerTag = null; } @@ -109,7 +113,7 @@ public class RabbitMQProducer { } } - public void send(byte []message) throws Exception { + public void send(byte []message, String routingKey) throws Exception { try { channel.basicPublish(exchangeName, routingKey, null, message); } catch (IOException e) { @@ -125,7 +129,6 @@ public class RabbitMQProducer { connectionFactory.setUri(url); Connection connection = connectionFactory.newConnection(); connection.addShutdownListener(new ShutdownListener() { - @Override public void shutdownCompleted(ShutdownSignalException cause) { } }); http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java new file mode 100644 index 0000000..d9ad7e4 --- /dev/null +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java @@ -0,0 +1,93 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.messaging.core.impl; + +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.model.messaging.event.*; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RabbitMQPublisher implements Publisher { + public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url"; + public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name"; + private static Logger log = LoggerFactory.getLogger(RabbitMQPublisher.class); + + private RabbitMQProducer rabbitMQProducer; + + public RabbitMQPublisher() throws Exception { + String brokerUrl; + String exchangeName; + try { + brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL); + exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME); + } catch (ApplicationSettingsException e) { + String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; + log.error(message, e); + throw new AiravataException(message, e); + } + rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName); + rabbitMQProducer.open(); + } + + public void publish(MessageContext msgCtx) throws AiravataException { + try { + byte []body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); + Message message = new Message(); + message.setEvent(body); + String routingKey = null; + if (msgCtx.getType().equals(MessageType.EXPERIMENT)){ + ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent(); + routingKey = event.getExperimentId(); + } else if (msgCtx.getType().equals(MessageType.TASK)) { + TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); + routingKey = event.getTaskIdentity().getExperimentId() + "." + + event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId(); + }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){ + WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent(); + WorkflowIdentity workflowNodeIdentity = event.getWorkflowNodeIdentity(); + routingKey = workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId(); + }else if (msgCtx.getType().equals(MessageType.JOB)){ + JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent(); + JobIdentity identity = event.getJobIdentity(); + routingKey = identity.getExperimentId() + "." + + identity.getWorkflowNodeId() + "." + + identity.getTaskId() + "." + + identity.getJobId(); + } + rabbitMQProducer.send(body, routingKey); + } catch (TException e) { + String msg = "Error while deserializing the object"; + log.error(msg, e); + throw new AiravataException(msg, e); + } catch (Exception e) { + String msg = "Error while sending to rabbitmq"; + log.error(msg, e); + throw new AiravataException(msg, e); + } + } +}
