Repository: stratos
Updated Branches:
  refs/heads/docker-grouping-merge c152fc7fd -> eaab3bcf6


Using timestamp + random number for MQTT client id


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/eaab3bcf
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/eaab3bcf
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/eaab3bcf

Branch: refs/heads/docker-grouping-merge
Commit: eaab3bcf65ca89bef28da828a37a97d8fffa270f
Parents: c152fc7
Author: Imesh Gunaratne <[email protected]>
Authored: Fri Nov 7 17:48:15 2014 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Fri Nov 7 17:48:15 2014 +0530

----------------------------------------------------------------------
 .../stratos/messaging/broker/connect/MQTTConnector.java |  6 ++++++
 .../messaging/broker/publish/TopicPublisher.java        | 12 +++++-------
 .../messaging/broker/subscribe/TopicSubscriber.java     |  8 +++++---
 .../org/apache/stratos/messaging/util/Constants.java    |  1 +
 4 files changed, 17 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/eaab3bcf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java
index b55056b..0f6c36d 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.exception.MessagingException;
 import org.apache.stratos.messaging.util.Util;
 import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
@@ -52,6 +53,11 @@ public class MQTTConnector {
             String mqttUrl = mqttProperties.getProperty("mqtturl", 
MQTT_URL_DEFAULT);
             MemoryPersistence memoryPersistence = new MemoryPersistence();
             MqttClient mqttClient = new MqttClient(mqttUrl, clientId, 
memoryPersistence);
+            MqttConnectOptions connectOptions = new MqttConnectOptions();
+            // Do not maintain the session between client and server, reliable 
delivery
+            // will be managed by stratos messaging component
+            connectOptions.setCleanSession(true);
+            mqttClient.connect(connectOptions);
             if (log.isDebugEnabled()) {
                 log.debug("MQTT client created");
             }

http://git-wip-us.apache.org/repos/asf/stratos/blob/eaab3bcf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index defb9b7..1baf907 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -19,9 +19,11 @@
 
 package org.apache.stratos.messaging.broker.publish;
 
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.broker.connect.MQTTConnector;
+import org.apache.stratos.messaging.util.Constants;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
@@ -79,18 +81,14 @@ public class TopicPublisher {
             boolean published = false;
 
             while (!published) {
-                // There will be only one publisher for a topic,
-                // hence using topic name as the client identifier
-                mqttClient = MQTTConnector.getMqttClient(topic);
+                String timestamp = String.valueOf(System.currentTimeMillis());
+                String clientId = timestamp + 
RandomStringUtils.random(Constants.CLIENT_ID_MAX_LENGTH - timestamp.length());
+                mqttClient = MQTTConnector.getMqttClient(clientId);
                 MqttMessage mqttMessage = new MqttMessage(message.getBytes());
                 // Set quality of service
                 mqttMessage.setQos(QOS);
 
                 try {
-                    MqttConnectOptions connectOptions = new 
MqttConnectOptions();
-                    // Maintain the session between client and server for 
reliable delivery
-                    connectOptions.setCleanSession(false);
-                    mqttClient.connect(connectOptions);
                     mqttClient.publish(topic, mqttMessage);
                     published = true;
                 } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/eaab3bcf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
index 4e89267..5801f64 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.broker.connect.MQTTConnector;
 import org.apache.stratos.messaging.broker.heartbeat.TopicHealthChecker;
+import org.apache.stratos.messaging.util.Constants;
 import org.apache.stratos.messaging.util.Util;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttClient;
@@ -40,9 +41,9 @@ import org.eclipse.paho.client.mqttv3.MqttException;
  */
 public class TopicSubscriber implements Runnable {
 
-       private static final Log log = LogFactory.getLog(TopicSubscriber.class);
+    private static final Log log = LogFactory.getLog(TopicSubscriber.class);
 
-       private boolean terminated = false;
+    private boolean terminated = false;
        private MqttCallback messageListener;
        private final String topicName;
 
@@ -63,7 +64,8 @@ public class TopicSubscriber implements Runnable {
 
        private void doSubscribe() throws MqttException {
 
-        String clientId = topicName;
+        String timestamp = String.valueOf(System.currentTimeMillis());
+        String clientId = timestamp + 
RandomStringUtils.random(Constants.CLIENT_ID_MAX_LENGTH - timestamp.length());
                MqttClient mqttClient = MQTTConnector.getMqttClient(clientId);
 
                if (log.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/eaab3bcf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
index 0802528..b9c3e0e 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
@@ -79,4 +79,5 @@ public class Constants {
        public static final int DEFAULT_AVERAGE_PING_INTERVAL = 1000;
        public static final int DEFAULT_FAILOVER_PING_INTERVAL = 30000;
 
+    public static final int CLIENT_ID_MAX_LENGTH = 23;
 }

Reply via email to