Repository: stratos Updated Branches: refs/heads/docker-grouping-merge 08b7cac1a -> caeb7104a
Refactoring MQTT connector, publisher & subscriber and introducing unique client identifier values Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/caeb7104 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/caeb7104 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/caeb7104 Branch: refs/heads/docker-grouping-merge Commit: caeb7104af07572ea065b8eefff10a3c4ac747a4 Parents: 08b7cac Author: Imesh Gunaratne <[email protected]> Authored: Fri Nov 7 16:14:15 2014 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Fri Nov 7 16:14:15 2014 +0530 ---------------------------------------------------------------------- .../messaging/broker/connect/MQTTConnector.java | 89 +++++--------------- .../broker/publish/TopicPublisher.java | 41 +++++---- .../broker/subscribe/TopicSubscriber.java | 13 ++- .../domain/exception/MessagingException.java | 40 +++++++++ 4 files changed, 93 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/caeb7104/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java index 5e93274..b55056b 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java @@ -21,9 +21,9 @@ package org.apache.stratos.messaging.broker.connect; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.exception.MessagingException; import org.apache.stratos.messaging.util.Util; import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; @@ -40,69 +40,26 @@ import java.util.Properties; */ public class MQTTConnector { - public static final String MQTTURL = "defaultValue"; - public static final String CLIENT_ID = "Startos"; - public static final String TMPFILELOCATION = "/tmp"; - private static MqttClient topicClient; - - private static MqttClient topicClientSub; - private static final Log log = LogFactory.getLog(MQTTConnector.class); - private static String configFileLocation = System.getProperty("jndi.properties.dir"); - private static Properties mqttProp = - Util.getProperties(configFileLocation + File.separator + - "mqtttopic.properties"); - - public static synchronized MqttClient getMQTTConClient() { - - if (topicClient == null) { - - String broker = mqttProp.getProperty("mqtturl", MQTTURL); - - String clientId = mqttProp.getProperty("clientID", CLIENT_ID); - MemoryPersistence persistence = new MemoryPersistence(); - - try { - topicClient = new MqttClient(broker, clientId, persistence); - - if (log.isDebugEnabled()) { - log.debug("MQTT client connected"); - } - - } catch (MqttException me) { - - log.error("Failed to initiate autoscaler service client. ", me); - } - - } - return topicClient; - - } - - public static synchronized MqttClient getMQTTSubClient(String identifier) throws MqttException { - //if (topicClientSub == null) { - - String broker = mqttProp.getProperty("mqtturl", MQTTURL); - - String tempFile = mqttProp.getProperty("tempfilelocation", TMPFILELOCATION); - // Creating new default persistence for mqtt client - MqttDefaultFilePersistence persistence = new MqttDefaultFilePersistence(tempFile); - - try { - - // mqtt client with specific url and a random client id - topicClientSub = new MqttClient(broker, identifier, persistence); - - if (log.isDebugEnabled()) { - log.debug("MQTT client connected"); - } - - } catch (MqttException me) { - - log.error("Failed to initiate autoscaler service client. ", me); - } - - //} - return topicClientSub; - - } + public static final String MQTT_URL_DEFAULT = "defaultValue"; + + private static final Log log = LogFactory.getLog(MQTTConnector.class); + private static String configFileLocation = System.getProperty("jndi.properties.dir"); + private static Properties mqttProperties = Util.getProperties(configFileLocation + + File.separator + "mqtttopic.properties"); + + public static MqttClient getMqttClient(String clientId) { + try { + String mqttUrl = mqttProperties.getProperty("mqtturl", MQTT_URL_DEFAULT); + MemoryPersistence memoryPersistence = new MemoryPersistence(); + MqttClient mqttClient = new MqttClient(mqttUrl, clientId, memoryPersistence); + if (log.isDebugEnabled()) { + log.debug("MQTT client created"); + } + return mqttClient; + } catch (Exception e) { + String message = "Could not create MQTT client"; + log.error(message, e); + throw new MessagingException(message, e); + } + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/caeb7104/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java index 15fc98e..defb9b7 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java @@ -41,18 +41,23 @@ import com.google.gson.Gson; */ public class TopicPublisher { - private static final Log log = LogFactory.getLog(TopicPublisher.class); + private static final Log log = LogFactory.getLog(TopicPublisher.class); + /** + * Quality of Service for message delivery: + * Setting it to 2 to make sure that message is guaranteed to deliver once + * using two-phase acknowledgement across the network. + */ private static final int QOS = 2; + public static final int PUBLISH_RETRY_INTERVAL = 60000; - public static TopicPublisher topicPub; + public static TopicPublisher topicPub; private boolean initialized; private final String topic; MqttClient mqttClient; /** - * @param aTopicName - * topic name of this publisher instance. + * @param aTopicName topic name of this publisher instance. */ TopicPublisher(String aTopicName) { this.topic = aTopicName; @@ -68,24 +73,27 @@ public class TopicPublisher { */ public void publish(Object messageObj, boolean retry) { - synchronized (TopicPublisher.class) { Gson gson = new Gson(); String message = gson.toJson(messageObj); boolean published = false; + while (!published) { - mqttClient = MQTTConnector.getMQTTConClient(); - MqttMessage mqttMSG = new MqttMessage(message.getBytes()); + // There will be only one publisher for a topic, + // hence using topic name as the client identifier + mqttClient = MQTTConnector.getMqttClient(topic); + MqttMessage mqttMessage = new MqttMessage(message.getBytes()); + // Set quality of service + mqttMessage.setQos(QOS); - mqttMSG.setQos(QOS); - MqttConnectOptions connOpts = new MqttConnectOptions(); - connOpts.setCleanSession(true); try { - mqttClient.connect(connOpts); - mqttClient.publish(topic, mqttMSG); - + MqttConnectOptions connectOptions = new MqttConnectOptions(); + // Maintain the session between client and server for reliable delivery + connectOptions.setCleanSession(false); + mqttClient.connect(connectOptions); + mqttClient.publish(topic, mqttMessage); published = true; - } catch (MqttException e) { + } catch (Exception e) { initialized = false; if (!retry) { if (log.isDebugEnabled()) { @@ -95,17 +103,16 @@ public class TopicPublisher { } if (log.isInfoEnabled()) { - log.info("Will try to re-publish in 60 sec"); + log.info(String.format("Will try to re-publish in %d sec", (PUBLISH_RETRY_INTERVAL/1000))); } try { - Thread.sleep(60000); + Thread.sleep(PUBLISH_RETRY_INTERVAL); } catch (InterruptedException ignore) { } } finally { try { mqttClient.disconnect(); } catch (MqttException ignore) { - } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/caeb7104/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java index a76cc9e..6d5d17c 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java @@ -20,14 +20,12 @@ package org.apache.stratos.messaging.broker.subscribe; import javax.jms.JMSException; -import javax.jms.TopicSession; +import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.broker.connect.MQTTConnector; import org.apache.stratos.messaging.broker.heartbeat.TopicHealthChecker; -import org.apache.stratos.messaging.message.processor.MessageProcessorChain; -import org.apache.stratos.messaging.message.processor.instance.notifier.InstanceNotifierMessageProcessorChain; import org.apache.stratos.messaging.util.Util; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; @@ -65,11 +63,12 @@ public class TopicSubscriber implements Runnable { private void doSubscribe() throws MqttException { - MqttClient mqttClient = MQTTConnector.getMQTTSubClient(Util.getRandomString(5)); + String clientId = topicName + "-" + RandomStringUtils.random(10); + MqttClient mqttClient = MQTTConnector.getMqttClient(clientId); - if (log.isDebugEnabled()) { - log.debug("Subscribing to topic '" + topicName + "' from " + - mqttClient.getServerURI()); + if (log.isInfoEnabled()) { + log.info(String.format("Subscribing to topic: [topic] %s [server] %s [client-id] %s", + topicName, mqttClient.getServerURI(), clientId)); } /* Subscribing to specific topic */ http://git-wip-us.apache.org/repos/asf/stratos/blob/caeb7104/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/exception/MessagingException.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/exception/MessagingException.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/exception/MessagingException.java new file mode 100644 index 0000000..46b9a6a --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/exception/MessagingException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.domain.exception; + +/** + * Messaging exception. + */ +public class MessagingException extends RuntimeException { + public MessagingException() { + super(); + } + + public MessagingException(String message) { + super(message); + } + + public MessagingException(String message, Throwable throwable) { + super(message, throwable); + } + + public MessagingException(Throwable throwable) { + super(throwable); + } +}
