Repository: stratos
Updated Branches:
  refs/heads/master 50f890f65 -> ec1da2c38


applying changes from PR#87


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/ec1da2c3
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/ec1da2c3
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/ec1da2c3

Branch: refs/heads/master
Commit: ec1da2c38b6061b452e1f4bcf75d299f30e79632
Parents: 50f890f
Author: R-Rajkumar <[email protected]>
Authored: Sun Oct 12 18:10:10 2014 +0530
Committer: R-Rajkumar <[email protected]>
Committed: Sun Oct 12 18:10:10 2014 +0530

----------------------------------------------------------------------
 .../broker/publish/EventPublisherPool.java      |  1 -
 .../broker/publish/TopicPublisher.java          | 92 +++++++++-----------
 .../broker/subscribe/TopicSubscriber.java       | 54 +++++-------
 .../stat/HealthStatEventMessageListener.java    | 45 +++++-----
 .../InstanceNotifierEventMessageListener.java   | 46 +++++-----
 5 files changed, 108 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
index 175d09b..257c56c 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
@@ -52,7 +52,6 @@ public class EventPublisherPool {
     public static void close(String topicName) {
         synchronized (EventPublisherPool.class) {
             if(topicNameEventPublisherMap.containsKey(topicName)) {
-                topicNameEventPublisherMap.get(topicName).close();
                 topicNameEventPublisherMap.remove(topicName);
                 if(log.isDebugEnabled()) {
                     log.debug(String.format("Event publisher closed and 
removed from pool: [topic] %s", topicName));

http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index b3e6843..15fc98e 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.broker.connect.MQTTConnector;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 
 import com.google.gson.Gson;
@@ -69,56 +70,45 @@ public class TopicPublisher {
        public void publish(Object messageObj, boolean retry) {
 
                synchronized (TopicPublisher.class) {
-                       Gson gson = new Gson();
-                       String message = gson.toJson(messageObj);
-                       boolean published = false;
-                       while (!published)
-                               try {
-                                       mqttClient = 
MQTTConnector.getMQTTConClient();
-
-                                       MqttMessage mqttMSG = new 
MqttMessage(message.getBytes());
-
-                                       mqttMSG.setQos(QOS);
-                                       MqttConnectOptions connOpts = new 
MqttConnectOptions();
-                                       connOpts.setCleanSession(true);
-                                       mqttClient.connect(connOpts);
-                                       mqttClient.publish(topic, mqttMSG);
-                                       mqttClient.disconnect();
-                                       published = true;
-                               } catch (Exception e) {
-                                       initialized = false;
-                                       if (log.isErrorEnabled()) {
-                                               log.error("Error while 
publishing to the topic: " + topic, e);
-                                       }
-                                       if (!retry) {
-                                               if (log.isDebugEnabled()) {
-                                                       log.debug("Retry 
disabled for topic " + topic);
-                                               }
-                                               throw new RuntimeException(e);
-                                       }
-
-                                       if (log.isInfoEnabled()) {
-                                               log.info("Will try to 
re-publish in 60 sec");
-                                       }
-                                       try {
-                                               Thread.sleep(60000);
-                                       } catch (InterruptedException ignore) {
-                                       }
-                               }
-                               finally {
-
-                               }
-               }
+            Gson gson = new Gson();
+            String message = gson.toJson(messageObj);
+            boolean published = false;
+            while (!published) {
+                mqttClient = MQTTConnector.getMQTTConClient();
+                MqttMessage mqttMSG = new MqttMessage(message.getBytes());
+
+                mqttMSG.setQos(QOS);
+                MqttConnectOptions connOpts = new MqttConnectOptions();
+                connOpts.setCleanSession(true);
+                try {
+                    mqttClient.connect(connOpts);
+                    mqttClient.publish(topic, mqttMSG);
+
+                    published = true;
+                } catch (MqttException e) {
+                    initialized = false;
+                    if (!retry) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Retry disabled for topic " + topic);
+                        }
+                        throw new RuntimeException(e);
+                    }
+
+                    if (log.isInfoEnabled()) {
+                        log.info("Will try to re-publish in 60 sec");
+                    }
+                    try {
+                        Thread.sleep(60000);
+                    } catch (InterruptedException ignore) {
+                    }
+                } finally {
+                    try {
+                        mqttClient.disconnect();
+                    } catch (MqttException ignore) {
+
+                    }
+                }
+            }
+        }
        }
-
-       public void close() {
-               synchronized (TopicPublisher.class) {
-                       // closes all sessions/connections
-                       try {
-
-                       } catch (Exception ignore) {
-                       }
-               }
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
index b9d0905..a76cc9e 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
@@ -46,13 +46,11 @@ public class TopicSubscriber implements Runnable {
 
        private boolean terminated = false;
        private MqttCallback messageListener;
-       private TopicSession topicSession;
        private final String topicName;
 
        private TopicHealthChecker healthChecker;
        private final javax.jms.TopicSubscriber topicSubscriber = null;
        private boolean subscribed;
-       private final MessageProcessorChain processorChain;
 
        /**
         * @param aTopicName topic name of this subscriber instance.
@@ -63,7 +61,6 @@ public class TopicSubscriber implements Runnable {
                if (log.isDebugEnabled()) {
                        log.debug(String.format("Topic subscriber connector 
created: [topic] %s", topicName));
                }
-               this.processorChain = new 
InstanceNotifierMessageProcessorChain();
        }
 
        private void doSubscribe() throws MqttException {
@@ -74,26 +71,31 @@ public class TopicSubscriber implements Runnable {
                        log.debug("Subscribing to topic '" + topicName + "' 
from " +
                                  mqttClient.getServerURI());
                }
-               // Subscribing to specific topic
-               try {
-
-                       MqttConnectOptions connOpts = new MqttConnectOptions();
-                       connOpts.setCleanSession(true);
-                       mqttClient.connect(connOpts);
-                       // Continue waiting for messages
-                       mqttClient.subscribe(topicName);
-                       mqttClient.setCallback(messageListener);
-                       subscribed = true;
-                       while (true) {
-                               try {
-                                       Thread.sleep(1000);
-                               } catch (InterruptedException e) {
-                               }
-                       }
 
-               } finally {
-                       mqttClient.disconnect();
-               }
+               /* Subscribing to specific topic */
+        while(true) {
+            try {
+
+                MqttConnectOptions connOpts = new MqttConnectOptions();
+                connOpts.setCleanSession(true);
+                mqttClient.connect(connOpts);
+
+                mqttClient.subscribe(topicName);
+                mqttClient.setCallback(messageListener);
+                subscribed = true;
+                // Continue waiting for messages
+                while (true) {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException ignored) {
+                    }
+                }
+
+            } finally {
+                mqttClient.disconnect();
+            }
+        }
+               
        }
 
        /**
@@ -151,14 +153,6 @@ public class TopicSubscriber implements Runnable {
                                                                                
topicName));
                                                }
                                        }
-                                       if (topicSession != null) {
-                                               topicSession.close();
-                                               if (log.isDebugEnabled()) {
-                                                       
log.debug(String.format("Topic subscriber session closed: [topic] %s",
-                                                                               
topicName));
-                                               }
-                                       }
-
                                } catch (JMSException ignore) {
                                }
                        }

http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
index bfaf622..aabc432 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageListener.java
@@ -47,42 +47,37 @@ public class HealthStatEventMessageListener implements 
MqttCallback {
        }
 
        @Override
-       public void connectionLost(Throwable arg0) {
-               // TODO Auto-generated method stub
-
+       public void connectionLost(Throwable err) {
+               log.debug("MQTT connection lost", err);
        }
 
        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {
-               // TODO Auto-generated method stub
-
+               log.debug("Message delivery completed");
        }
 
        @Override
-       public void messageArrived(String topicName, MqttMessage message) 
throws Exception {
-               if (message instanceof MqttMessage) {
+       public void messageArrived(String topicName, MqttMessage message)
+                       throws Exception {
+               TextMessage receivedMessage = new ActiveMQTextMessage();
+               if (log.isDebugEnabled()) {
+                       log.debug(String.format("Health stat event messege 
received...."));
+               }
+               receivedMessage.setText(new String(message.getPayload()));
+               receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
+                               Util.getEventNameForTopic(topicName));
 
-                       TextMessage receivedMessage = new ActiveMQTextMessage();
+               try {
                        if (log.isDebugEnabled()) {
-                               log.debug(String.format("Health stat event 
messege received...."));
-
+                               log.debug(String.format(
+                                               "Health stat event message 
received: %s",
+                                               ((TextMessage) 
message).getText()));
                        }
-                       receivedMessage.setText(new 
String(message.getPayload()));
-                       
receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
-                                                         
Util.getEventNameForTopic(topicName));
+                       // Add received message to the queue
+                       messageQueue.add(receivedMessage);
 
-                       try {
-                               if (log.isDebugEnabled()) {
-                                       log.debug(String.format("Health stat 
event message received: %s",
-                                                               ((TextMessage) 
message).getText()));
-                               }
-                               // Add received message to the queue
-                               messageQueue.add(receivedMessage);
-
-                       } catch (JMSException e) {
-                               log.error(e.getMessage(), e);
-                       }
+               } catch (JMSException e) {
+                       log.error(e.getMessage(), e);
                }
-
        }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/ec1da2c3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
index c7d1e98..5203ce4 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageListener.java
@@ -47,43 +47,43 @@ class InstanceNotifierEventMessageListener implements 
MqttCallback {
        }
 
        @Override
-       public void connectionLost(Throwable arg0) {
+       public void connectionLost(Throwable err) {
                if (log.isDebugEnabled()) {
-                       log.debug("MQTT connection lost");
+                       log.debug("MQTT connection lost" , err);
                }
 
        }
 
        @Override
-       public void deliveryComplete(IMqttDeliveryToken arg0) {
-
+       public void deliveryComplete(IMqttDeliveryToken err) {
+               log.debug("Message delivery completed");
        }
 
        @Override
-       public void messageArrived(String topicName, MqttMessage message) 
throws Exception {
-               if (message instanceof MqttMessage) {
-
-                       TextMessage receivedMessage = new ActiveMQTextMessage();
-                       if (log.isDebugEnabled()) {
-                               log.debug(String.format("instance notifier 
messege received...."));
+       public void messageArrived(String topicName, MqttMessage message)
+                       throws Exception {
 
-                       }
+               TextMessage receivedMessage = new ActiveMQTextMessage();
+               if (log.isDebugEnabled()) {
+                       log.debug(String.format("instance notifier messege 
received...."));
 
-                       receivedMessage.setText(new 
String(message.getPayload()));
-                       
receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
-                                                         
Util.getEventNameForTopic(topicName));
+               }
 
-                       try {
-                               if (log.isDebugEnabled()) {
-                                       log.debug(String.format("Instance 
notifier message received: %s",
-                                                               ((TextMessage) 
message).getText()));
-                               }
-                               // Add received message to the queue
-                               messageQueue.add(receivedMessage);
+               receivedMessage.setText(new String(message.getPayload()));
+               receivedMessage.setStringProperty(Constants.EVENT_CLASS_NAME,
+                               Util.getEventNameForTopic(topicName));
 
-                       } catch (JMSException e) {
-                               log.error(e.getMessage(), e);
+               try {
+                       if (log.isDebugEnabled()) {
+                               log.debug(String.format(
+                                               "Instance notifier message 
received: %s",
+                                               ((TextMessage) 
message).getText()));
                        }
+                       // Add received message to the queue
+                       messageQueue.add(receivedMessage);
+
+               } catch (JMSException e) {
+                       log.error(e.getMessage(), e);
                }
 
        }

Reply via email to