Repository: stratos Updated Branches: refs/heads/docker-grouping-merge eaab3bcf6 -> 19fdf7a15
Removing parameterized client id and using a random string of 23 characters Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/19fdf7a1 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/19fdf7a1 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/19fdf7a1 Branch: refs/heads/docker-grouping-merge Commit: 19fdf7a15cbfe2ecb10b2349ecddb75dde6b7536 Parents: eaab3bc Author: Imesh Gunaratne <[email protected]> Authored: Fri Nov 7 18:22:48 2014 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Fri Nov 7 18:22:48 2014 +0530 ---------------------------------------------------------------------- .../messaging/broker/connect/MQTTConnector.java | 10 +++------- .../messaging/broker/publish/TopicPublisher.java | 9 ++++++--- .../messaging/broker/subscribe/TopicSubscriber.java | 13 ++++++------- 3 files changed, 15 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/19fdf7a1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java index 0f6c36d..7e49cea 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/MQTTConnector.java @@ -48,18 +48,14 @@ public class MQTTConnector { private static Properties mqttProperties = Util.getProperties(configFileLocation + File.separator + "mqtttopic.properties"); - public static MqttClient getMqttClient(String clientId) { + public static MqttClient getMqttClient() { try { String mqttUrl = mqttProperties.getProperty("mqtturl", MQTT_URL_DEFAULT); MemoryPersistence memoryPersistence = new MemoryPersistence(); + String clientId = Util.getRandomString(23); MqttClient mqttClient = new MqttClient(mqttUrl, clientId, memoryPersistence); - MqttConnectOptions connectOptions = new MqttConnectOptions(); - // Do not maintain the session between client and server, reliable delivery - // will be managed by stratos messaging component - connectOptions.setCleanSession(true); - mqttClient.connect(connectOptions); if (log.isDebugEnabled()) { - log.debug("MQTT client created"); + log.debug("MQTT client created: [client-id] " + clientId); } return mqttClient; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/stratos/blob/19fdf7a1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java index 1baf907..27748c8 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java @@ -63,6 +63,7 @@ public class TopicPublisher { */ TopicPublisher(String aTopicName) { this.topic = aTopicName; + this.mqttClient = MQTTConnector.getMqttClient(); if (log.isDebugEnabled()) { log.debug(String.format("Topic publisher connector created: [topic] %s", topic)); } @@ -81,14 +82,16 @@ public class TopicPublisher { boolean published = false; while (!published) { - String timestamp = String.valueOf(System.currentTimeMillis()); - String clientId = timestamp + RandomStringUtils.random(Constants.CLIENT_ID_MAX_LENGTH - timestamp.length()); - mqttClient = MQTTConnector.getMqttClient(clientId); MqttMessage mqttMessage = new MqttMessage(message.getBytes()); // Set quality of service mqttMessage.setQos(QOS); try { + MqttConnectOptions connectOptions = new MqttConnectOptions(); + // Do not maintain the session between client and server, reliable delivery + // will be managed by stratos messaging component + connectOptions.setCleanSession(true); + mqttClient.connect(connectOptions); mqttClient.publish(topic, mqttMessage); published = true; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/stratos/blob/19fdf7a1/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java index 5801f64..ec1d238 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java @@ -42,6 +42,7 @@ import org.eclipse.paho.client.mqttv3.MqttException; public class TopicSubscriber implements Runnable { private static final Log log = LogFactory.getLog(TopicSubscriber.class); + private final MqttClient mqttClient; private boolean terminated = false; private MqttCallback messageListener; @@ -56,6 +57,7 @@ public class TopicSubscriber implements Runnable { */ public TopicSubscriber(String aTopicName) { topicName = aTopicName; + mqttClient = MQTTConnector.getMqttClient(); if (log.isDebugEnabled()) { log.debug(String.format("Topic subscriber connector created: [topic] %s", topicName)); @@ -64,20 +66,17 @@ public class TopicSubscriber implements Runnable { private void doSubscribe() throws MqttException { - String timestamp = String.valueOf(System.currentTimeMillis()); - String clientId = timestamp + RandomStringUtils.random(Constants.CLIENT_ID_MAX_LENGTH - timestamp.length()); - MqttClient mqttClient = MQTTConnector.getMqttClient(clientId); - if (log.isInfoEnabled()) { - log.info(String.format("Subscribing to topic: [topic] %s [server] %s [client-id] %s", - topicName, mqttClient.getServerURI(), clientId)); + log.info(String.format("Subscribing to topic: [topic] %s [server] %s", + topicName, mqttClient.getServerURI())); } /* Subscribing to specific topic */ while(true) { try { - MqttConnectOptions connOpts = new MqttConnectOptions(); + // Do not maintain the session between client and server, reliable delivery + // will be managed by stratos messaging component connOpts.setCleanSession(true); mqttClient.connect(connOpts);
