Repository: stratos
Updated Branches:
  refs/heads/docker-grouping-merge 08b7cac1a -> caeb7104a


Refactoring MQTT connector, publisher & subscriber and introducing unique 
client identifier values


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/caeb7104
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/caeb7104
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/caeb7104

Branch: refs/heads/docker-grouping-merge
Commit: caeb7104af07572ea065b8eefff10a3c4ac747a4
Parents: 08b7cac
Author: Imesh Gunaratne <[email protected]>
Authored: Fri Nov 7 16:14:15 2014 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Fri Nov 7 16:14:15 2014 +0530

----------------------------------------------------------------------
 .../messaging/broker/connect/MQTTConnector.java | 89 +++++---------------
 .../broker/publish/TopicPublisher.java          | 41 +++++----
 .../broker/subscribe/TopicSubscriber.java       | 13 ++-
 .../domain/exception/MessagingException.java    | 40 +++++++++
 4 files changed, 93 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/caeb7104/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java
index 5e93274..b55056b 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java
@@ -21,9 +21,9 @@ package org.apache.stratos.messaging.broker.connect;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.exception.MessagingException;
 import org.apache.stratos.messaging.util.Util;
 import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
@@ -40,69 +40,26 @@ import java.util.Properties;
  */
 public class MQTTConnector {
 
-       public static final String MQTTURL = "defaultValue";
-       public static final String CLIENT_ID = "Startos";
-       public static final String TMPFILELOCATION = "/tmp";
-       private static MqttClient topicClient;
-
-       private static MqttClient topicClientSub;
-       private static final Log log = LogFactory.getLog(MQTTConnector.class);
-       private static String configFileLocation = 
System.getProperty("jndi.properties.dir");
-       private static Properties mqttProp =
-                       Util.getProperties(configFileLocation + File.separator +
-                                          "mqtttopic.properties");
-
-       public static synchronized MqttClient getMQTTConClient() {
-
-               if (topicClient == null) {
-
-                       String broker = mqttProp.getProperty("mqtturl", 
MQTTURL);
-
-                       String clientId = mqttProp.getProperty("clientID", 
CLIENT_ID);
-                       MemoryPersistence persistence = new MemoryPersistence();
-
-                       try {
-                               topicClient = new MqttClient(broker, clientId, 
persistence);
-
-                               if (log.isDebugEnabled()) {
-                                       log.debug("MQTT client connected");
-                               }
-
-                       } catch (MqttException me) {
-
-                               log.error("Failed to initiate autoscaler 
service client. ", me);
-                       }
-
-               }
-               return topicClient;
-
-       }
-
-       public static synchronized MqttClient getMQTTSubClient(String 
identifier) throws MqttException {
-               //if (topicClientSub == null) {
-
-               String broker = mqttProp.getProperty("mqtturl", MQTTURL);
-
-               String tempFile = mqttProp.getProperty("tempfilelocation", 
TMPFILELOCATION);
-               // Creating new default persistence for mqtt client
-               MqttDefaultFilePersistence persistence = new 
MqttDefaultFilePersistence(tempFile);
-
-               try {
-
-                       // mqtt client with specific url and a random client id
-                       topicClientSub = new MqttClient(broker, identifier, 
persistence);
-
-                       if (log.isDebugEnabled()) {
-                               log.debug("MQTT client connected");
-                       }
-
-               } catch (MqttException me) {
-
-                       log.error("Failed to initiate autoscaler service 
client. ", me);
-               }
-
-               //}
-               return topicClientSub;
-
-       }
+    public static final String MQTT_URL_DEFAULT = "defaultValue";
+
+    private static final Log log = LogFactory.getLog(MQTTConnector.class);
+    private static String configFileLocation = 
System.getProperty("jndi.properties.dir");
+    private static Properties mqttProperties = 
Util.getProperties(configFileLocation
+            + File.separator + "mqtttopic.properties");
+
+    public static MqttClient getMqttClient(String clientId) {
+        try {
+            String mqttUrl = mqttProperties.getProperty("mqtturl", 
MQTT_URL_DEFAULT);
+            MemoryPersistence memoryPersistence = new MemoryPersistence();
+            MqttClient mqttClient = new MqttClient(mqttUrl, clientId, 
memoryPersistence);
+            if (log.isDebugEnabled()) {
+                log.debug("MQTT client created");
+            }
+            return mqttClient;
+        } catch (Exception e) {
+            String message = "Could not create MQTT client";
+            log.error(message, e);
+            throw new MessagingException(message, e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/caeb7104/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index 15fc98e..defb9b7 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -41,18 +41,23 @@ import com.google.gson.Gson;
  */
 public class TopicPublisher {
 
-       private static final Log log = LogFactory.getLog(TopicPublisher.class);
+    private static final Log log = LogFactory.getLog(TopicPublisher.class);
 
+    /**
+     * Quality of Service for message delivery:
+     * Setting it to 2 to make sure that message is guaranteed to deliver once
+     * using two-phase acknowledgement across the network.
+     */
        private static final int QOS = 2;
+    public static final int PUBLISH_RETRY_INTERVAL = 60000;
 
-       public static TopicPublisher topicPub;
+    public static TopicPublisher topicPub;
        private boolean initialized;
        private final String topic;
        MqttClient mqttClient;
 
        /**
-        * @param aTopicName
-        *            topic name of this publisher instance.
+        * @param aTopicName topic name of this publisher instance.
         */
        TopicPublisher(String aTopicName) {
                this.topic = aTopicName;
@@ -68,24 +73,27 @@ public class TopicPublisher {
         */
 
        public void publish(Object messageObj, boolean retry) {
-
                synchronized (TopicPublisher.class) {
             Gson gson = new Gson();
             String message = gson.toJson(messageObj);
             boolean published = false;
+
             while (!published) {
-                mqttClient = MQTTConnector.getMQTTConClient();
-                MqttMessage mqttMSG = new MqttMessage(message.getBytes());
+                // There will be only one publisher for a topic,
+                // hence using topic name as the client identifier
+                mqttClient = MQTTConnector.getMqttClient(topic);
+                MqttMessage mqttMessage = new MqttMessage(message.getBytes());
+                // Set quality of service
+                mqttMessage.setQos(QOS);
 
-                mqttMSG.setQos(QOS);
-                MqttConnectOptions connOpts = new MqttConnectOptions();
-                connOpts.setCleanSession(true);
                 try {
-                    mqttClient.connect(connOpts);
-                    mqttClient.publish(topic, mqttMSG);
-
+                    MqttConnectOptions connectOptions = new 
MqttConnectOptions();
+                    // Maintain the session between client and server for 
reliable delivery
+                    connectOptions.setCleanSession(false);
+                    mqttClient.connect(connectOptions);
+                    mqttClient.publish(topic, mqttMessage);
                     published = true;
-                } catch (MqttException e) {
+                } catch (Exception e) {
                     initialized = false;
                     if (!retry) {
                         if (log.isDebugEnabled()) {
@@ -95,17 +103,16 @@ public class TopicPublisher {
                     }
 
                     if (log.isInfoEnabled()) {
-                        log.info("Will try to re-publish in 60 sec");
+                        log.info(String.format("Will try to re-publish in %d 
sec", (PUBLISH_RETRY_INTERVAL/1000)));
                     }
                     try {
-                        Thread.sleep(60000);
+                        Thread.sleep(PUBLISH_RETRY_INTERVAL);
                     } catch (InterruptedException ignore) {
                     }
                 } finally {
                     try {
                         mqttClient.disconnect();
                     } catch (MqttException ignore) {
-
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/stratos/blob/caeb7104/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
index a76cc9e..6d5d17c 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
@@ -20,14 +20,12 @@
 package org.apache.stratos.messaging.broker.subscribe;
 
 import javax.jms.JMSException;
-import javax.jms.TopicSession;
 
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.broker.connect.MQTTConnector;
 import org.apache.stratos.messaging.broker.heartbeat.TopicHealthChecker;
-import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
-import 
org.apache.stratos.messaging.message.processor.instance.notifier.InstanceNotifierMessageProcessorChain;
 import org.apache.stratos.messaging.util.Util;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttClient;
@@ -65,11 +63,12 @@ public class TopicSubscriber implements Runnable {
 
        private void doSubscribe() throws MqttException {
 
-               MqttClient mqttClient = 
MQTTConnector.getMQTTSubClient(Util.getRandomString(5));
+        String clientId = topicName + "-" + RandomStringUtils.random(10);
+               MqttClient mqttClient = MQTTConnector.getMqttClient(clientId);
 
-               if (log.isDebugEnabled()) {
-                       log.debug("Subscribing to topic '" + topicName + "' 
from " +
-                                 mqttClient.getServerURI());
+               if (log.isInfoEnabled()) {
+                       log.info(String.format("Subscribing to topic: [topic] 
%s [server] %s [client-id] %s",
+                    topicName, mqttClient.getServerURI(), clientId));
                }
 
                /* Subscribing to specific topic */

http://git-wip-us.apache.org/repos/asf/stratos/blob/caeb7104/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/exception/MessagingException.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/exception/MessagingException.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/exception/MessagingException.java
new file mode 100644
index 0000000..46b9a6a
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/exception/MessagingException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.domain.exception;
+
+/**
+ * Messaging exception.
+ */
+public class MessagingException extends RuntimeException {
+    public MessagingException() {
+        super();
+    }
+
+    public MessagingException(String message) {
+        super(message);
+    }
+
+    public MessagingException(String message, Throwable throwable) {
+        super(message, throwable);
+    }
+
+    public MessagingException(Throwable throwable) {
+        super(throwable);
+    }
+}

Reply via email to