Repository: stratos Updated Branches: refs/heads/master 50f890f65 -> ec1da2c38
applying changes from PR#87 Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/ec1da2c3 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/ec1da2c3 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/ec1da2c3 Branch: refs/heads/master Commit: ec1da2c38b6061b452e1f4bcf75d299f30e79632 Parents: 50f890f Author: R-Rajkumar <[email protected]> Authored: Sun Oct 12 18:10:10 2014 +0530 Committer: R-Rajkumar <[email protected]> Committed: Sun Oct 12 18:10:10 2014 +0530 ---------------------------------------------------------------------- .../broker/publish/EventPublisherPool.java | 1 - .../broker/publish/TopicPublisher.java | 92 +++++++++----------- .../broker/subscribe/TopicSubscriber.java | 54 +++++------- .../stat/HealthStatEventMessageListener.java | 45 +++++----- .../InstanceNotifierEventMessageListener.java | 46 +++++----- 5 files changed, 108 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java index 175d09b..257c56c 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java @@ -52,7 +52,6 @@ public class EventPublisherPool { public static void close(String topicName) { synchronized (EventPublisherPool.class) { if(topicNameEventPublisherMap.containsKey(topicName)) { - topicNameEventPublisherMap.get(topicName).close(); topicNameEventPublisherMap.remove(topicName); if(log.isDebugEnabled()) { log.debug(String.format("Event publisher closed and removed from pool: [topic] %s", topicName)); http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java index b3e6843..15fc98e 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.broker.connect.MQTTConnector; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import com.google.gson.Gson; @@ -69,56 +70,45 @@ public class TopicPublisher { public void publish(Object messageObj, boolean retry) { synchronized (TopicPublisher.class) { - Gson gson = new Gson(); - String message = gson.toJson(messageObj); - boolean published = false; - while (!published) - try { - mqttClient = MQTTConnector.getMQTTConClient(); - - MqttMessage mqttMSG = new MqttMessage(message.getBytes()); - - mqttMSG.setQos(QOS); - MqttConnectOptions connOpts = new MqttConnectOptions(); - connOpts.setCleanSession(true); - mqttClient.connect(connOpts); - mqttClient.publish(topic, mqttMSG); - mqttClient.disconnect(); - published = true; - } catch (Exception e) { - initialized = false; - if (log.isErrorEnabled()) { - log.error("Error while publishing to the topic: " + topic, e); - } - if (!retry) { - if (log.isDebugEnabled()) { - log.debug("Retry disabled for topic " + topic); - } - throw new RuntimeException(e); - } - - if (log.isInfoEnabled()) { - log.info("Will try to re-publish in 60 sec"); - } - try { - Thread.sleep(60000); - } catch (InterruptedException ignore) { - } - } - finally { - - } - } + Gson gson = new Gson(); + String message = gson.toJson(messageObj); + boolean published = false; + while (!published) { + mqttClient = MQTTConnector.getMQTTConClient(); + MqttMessage mqttMSG = new MqttMessage(message.getBytes()); + + mqttMSG.setQos(QOS); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + try { + mqttClient.connect(connOpts); + mqttClient.publish(topic, mqttMSG); + + published = true; + } catch (MqttException e) { + initialized = false; + if (!retry) { + if (log.isDebugEnabled()) { + log.debug("Retry disabled for topic " + topic); + } + throw new RuntimeException(e); + } + + if (log.isInfoEnabled()) { + log.info("Will try to re-publish in 60 sec"); + } + try { + Thread.sleep(60000); + } catch (InterruptedException ignore) { + } + } finally { + try { + mqttClient.disconnect(); + } catch (MqttException ignore) { + + } + } + } + } } - - public void close() { - synchronized (TopicPublisher.class) { - // closes all sessions/connections - try { - - } catch (Exception ignore) { - } - } - } - } http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java index b9d0905..a76cc9e 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java @@ -46,13 +46,11 @@ public class TopicSubscriber implements Runnable { private boolean terminated = false; private MqttCallback messageListener; - private TopicSession topicSession; private final String topicName; private TopicHealthChecker healthChecker; private final javax.jms.TopicSubscriber topicSubscriber = null; private boolean subscribed; - private final MessageProcessorChain processorChain; /** * @param aTopicName topic name of this subscriber instance. @@ -63,7 +61,6 @@ public class TopicSubscriber implements Runnable { if (log.isDebugEnabled()) { log.debug(String.format("Topic subscriber connector created: [topic] %s", topicName)); } - this.processorChain = new InstanceNotifierMessageProcessorChain(); } private void doSubscribe() throws MqttException { @@ -74,26 +71,31 @@ public class TopicSubscriber implements Runnable { log.debug("Subscribing to topic '" + topicName + "' from " + mqttClient.getServerURI()); } - // Subscribing to specific topic - try { - - MqttConnectOptions connOpts = new MqttConnectOptions(); - connOpts.setCleanSession(true); - mqttClient.connect(connOpts); - // Continue waiting for messages - mqttClient.subscribe(topicName); - mqttClient.setCallback(messageListener); - subscribed = true; - while (true) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - } - } finally { - mqttClient.disconnect(); - } + /* Subscribing to specific topic */ + while(true) { + try { + + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + mqttClient.connect(connOpts); + + mqttClient.subscribe(topicName); + mqttClient.setCallback(messageListener); + subscribed = true; + // Continue waiting for messages + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } + } + + } finally { + mqttClient.disconnect(); + } + } + } /** @@ -151,14 +153,6 @@ public class TopicSubscriber implements Runnable { topicName)); } } - if (topicSession != null) { - topicSession.close(); - if (log.isDebugEnabled()) { - log.debug(String.format("Topic subscriber session closed: [topic] %s", - topicName)); - } - } - } catch (JMSException ignore) { } } http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java index bfaf622..aabc432 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java @@ -47,42 +47,37 @@ public class HealthStatEventMessageListener implements MqttCallback { } @Override - public void connectionLost(Throwable arg0) { - // TODO Auto-generated method stub - + public void connectionLost(Throwable err) { + log.debug("MQTT connection lost", err); } @Override public void deliveryComplete(IMqttDeliveryToken arg0) { - // TODO Auto-generated method stub - + log.debug("Message delivery completed"); } @Override - public void messageArrived(String topicName, MqttMessage message) throws Exception { - if (message instanceof MqttMessage) { + public void messageArrived(String topicName, MqttMessage message) + throws Exception { + TextMessage receivedMessage = new ActiveMQTextMessage(); + if (log.isDebugEnabled()) { + log.debug(String.format("Health stat event messege received....")); + } + receivedMessage.setText(new String(message.getPayload())); + receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME, + Util.getEventNameForTopic(topicName)); - TextMessage receivedMessage = new ActiveMQTextMessage(); + try { if (log.isDebugEnabled()) { - log.debug(String.format("Health stat event messege received....")); - + log.debug(String.format( + "Health stat event message received: %s", + ((TextMessage) message).getText())); } - receivedMessage.setText(new String(message.getPayload())); - receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME, - Util.getEventNameForTopic(topicName)); + // Add received message to the queue + messageQueue.add(receivedMessage); - try { - if (log.isDebugEnabled()) { - log.debug(String.format("Health stat event message received: %s", - ((TextMessage) message).getText())); - } - // Add received message to the queue - messageQueue.add(receivedMessage); - - } catch (JMSException e) { - log.error(e.getMessage(), e); - } + } catch (JMSException e) { + log.error(e.getMessage(), e); } - } } http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java index c7d1e98..5203ce4 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java @@ -47,43 +47,43 @@ class InstanceNotifierEventMessageListener implements MqttCallback { } @Override - public void connectionLost(Throwable arg0) { + public void connectionLost(Throwable err) { if (log.isDebugEnabled()) { - log.debug("MQTT connection lost"); + log.debug("MQTT connection lost" , err); } } @Override - public void deliveryComplete(IMqttDeliveryToken arg0) { - + public void deliveryComplete(IMqttDeliveryToken err) { + log.debug("Message delivery completed"); } @Override - public void messageArrived(String topicName, MqttMessage message) throws Exception { - if (message instanceof MqttMessage) { - - TextMessage receivedMessage = new ActiveMQTextMessage(); - if (log.isDebugEnabled()) { - log.debug(String.format("instance notifier messege received....")); + public void messageArrived(String topicName, MqttMessage message) + throws Exception { - } + TextMessage receivedMessage = new ActiveMQTextMessage(); + if (log.isDebugEnabled()) { + log.debug(String.format("instance notifier messege received....")); - receivedMessage.setText(new String(message.getPayload())); - receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME, - Util.getEventNameForTopic(topicName)); + } - try { - if (log.isDebugEnabled()) { - log.debug(String.format("Instance notifier message received: %s", - ((TextMessage) message).getText())); - } - // Add received message to the queue - messageQueue.add(receivedMessage); + receivedMessage.setText(new String(message.getPayload())); + receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME, + Util.getEventNameForTopic(topicName)); - } catch (JMSException e) { - log.error(e.getMessage(), e); + try { + if (log.isDebugEnabled()) { + log.debug(String.format( + "Instance notifier message received: %s", + ((TextMessage) message).getText())); } + // Add received message to the queue + messageQueue.add(receivedMessage); + + } catch (JMSException e) { + log.error(e.getMessage(), e); } }
