Repository: stratos Updated Branches: refs/heads/docker-grouping-merge c152fc7fd -> eaab3bcf6
Using timestamp + random number for MQTT client id Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/eaab3bcf Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/eaab3bcf Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/eaab3bcf Branch: refs/heads/docker-grouping-merge Commit: eaab3bcf65ca89bef28da828a37a97d8fffa270f Parents: c152fc7 Author: Imesh Gunaratne <[email protected]> Authored: Fri Nov 7 17:48:15 2014 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Fri Nov 7 17:48:15 2014 +0530 ---------------------------------------------------------------------- .../stratos/messaging/broker/connect/MQTTConnector.java | 6 ++++++ .../messaging/broker/publish/TopicPublisher.java | 12 +++++------- .../messaging/broker/subscribe/TopicSubscriber.java | 8 +++++--- .../org/apache/stratos/messaging/util/Constants.java | 1 + 4 files changed, 17 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/eaab3bcf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java index b55056b..0f6c36d 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.domain.exception.MessagingException; import org.apache.stratos.messaging.util.Util; import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; @@ -52,6 +53,11 @@ public class MQTTConnector { String mqttUrl = mqttProperties.getProperty("mqtturl", MQTT_URL_DEFAULT); MemoryPersistence memoryPersistence = new MemoryPersistence(); MqttClient mqttClient = new MqttClient(mqttUrl, clientId, memoryPersistence); + MqttConnectOptions connectOptions = new MqttConnectOptions(); + // Do not maintain the session between client and server, reliable delivery + // will be managed by stratos messaging component + connectOptions.setCleanSession(true); + mqttClient.connect(connectOptions); if (log.isDebugEnabled()) { log.debug("MQTT client created"); } http://git-wip-us.apache.org/repos/asf/stratos/blob/eaab3bcf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java index defb9b7..1baf907 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java @@ -19,9 +19,11 @@ package org.apache.stratos.messaging.broker.publish; +import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.broker.connect.MQTTConnector; +import org.apache.stratos.messaging.util.Constants; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; @@ -79,18 +81,14 @@ public class TopicPublisher { boolean published = false; while (!published) { - // There will be only one publisher for a topic, - // hence using topic name as the client identifier - mqttClient = MQTTConnector.getMqttClient(topic); + String timestamp = String.valueOf(System.currentTimeMillis()); + String clientId = timestamp + RandomStringUtils.random(Constants.CLIENT_ID_MAX_LENGTH - timestamp.length()); + mqttClient = MQTTConnector.getMqttClient(clientId); MqttMessage mqttMessage = new MqttMessage(message.getBytes()); // Set quality of service mqttMessage.setQos(QOS); try { - MqttConnectOptions connectOptions = new MqttConnectOptions(); - // Maintain the session between client and server for reliable delivery - connectOptions.setCleanSession(false); - mqttClient.connect(connectOptions); mqttClient.publish(topic, mqttMessage); published = true; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/stratos/blob/eaab3bcf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java index 4e89267..5801f64 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.broker.connect.MQTTConnector; import org.apache.stratos.messaging.broker.heartbeat.TopicHealthChecker; +import org.apache.stratos.messaging.util.Constants; import org.apache.stratos.messaging.util.Util; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; @@ -40,9 +41,9 @@ import org.eclipse.paho.client.mqttv3.MqttException; */ public class TopicSubscriber implements Runnable { - private static final Log log = LogFactory.getLog(TopicSubscriber.class); + private static final Log log = LogFactory.getLog(TopicSubscriber.class); - private boolean terminated = false; + private boolean terminated = false; private MqttCallback messageListener; private final String topicName; @@ -63,7 +64,8 @@ public class TopicSubscriber implements Runnable { private void doSubscribe() throws MqttException { - String clientId = topicName; + String timestamp = String.valueOf(System.currentTimeMillis()); + String clientId = timestamp + RandomStringUtils.random(Constants.CLIENT_ID_MAX_LENGTH - timestamp.length()); MqttClient mqttClient = MQTTConnector.getMqttClient(clientId); if (log.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/eaab3bcf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java index 0802528..b9c3e0e 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java @@ -79,4 +79,5 @@ public class Constants { public static final int DEFAULT_AVERAGE_PING_INTERVAL = 1000; public static final int DEFAULT_FAILOVER_PING_INTERVAL = 30000; + public static final int CLIENT_ID_MAX_LENGTH = 23; }
