Repository: incubator-stratos
Updated Branches:
  refs/heads/4.0.0-incubating 828aa9861 -> e8ffe3d08


Fixed messaging model session timeout handling, subscriber re-connection and 
publisher retry logic


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/1afd2969
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/1afd2969
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/1afd2969

Branch: refs/heads/4.0.0-incubating
Commit: 1afd29698523caf81f0794a768ab2da45322b406
Parents: 0a682d8
Author: Imesh Gunaratne <[email protected]>
Authored: Thu Apr 24 12:49:35 2014 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Thu Apr 24 12:49:35 2014 +0530

----------------------------------------------------------------------
 .../broker/connect/TopicConnector.java          |   1 -
 .../broker/heartbeat/TopicHealthChecker.java    |  23 ++--
 .../broker/publish/EventPublisher.java          |   8 +-
 .../broker/publish/TopicPublisher.java          | 135 ++++++++++++++-----
 .../broker/subscribe/TopicSubscriber.java       | 119 ++++++++++------
 .../stratos/messaging/event/ping/PingEvent.java |  30 +++++
 .../stratos/messaging/util/Constants.java       |   1 +
 7 files changed, 226 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1afd2969/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
index a4fc5c4..ebb339d 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java
@@ -67,7 +67,6 @@ public class TopicConnector {
         }
         topicConnection = connFactory.createTopicConnection();
         topicConnection.start();
-
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1afd2969/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
index 1fbd25e..d36b9cb 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
@@ -21,6 +21,9 @@ package org.apache.stratos.messaging.broker.heartbeat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.broker.connect.TopicConnector;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.event.ping.PingEvent;
+import org.apache.stratos.messaging.util.Constants;
 
 import javax.jms.JMSException;
 
@@ -43,22 +46,24 @@ public class TopicHealthChecker implements Runnable {
        @Override
        public void run() {
         if(log.isDebugEnabled()){
-                   log.debug(topicName + " topic Health Checker is running... 
" );
+                   log.debug(topicName + " topic health checker is running... 
" );
         }
                TopicConnector testConnector = new TopicConnector();
                while (!terminated) {
                        try {
-                               // health checker runs in every 30s
-                               Thread.sleep(30000);
-
+                               // Health checker needs to run with the 
smallest possible time interval
+                               // to detect a connection drop. Otherwise the 
subscriber will not
+                // get reconnected after a connection drop.
+                               Thread.sleep(1000);
                                testConnector.init(topicName);
-
+                // A ping event is published to detect a session timeout
+                
EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent());
                        } catch (Exception e) {
-                               // implies connection is not established
-                               // sleep for 5s and retry
+                               // Implies connection is not established
+                               // sleep for 30 sec and retry
                                try {
-                                       log.error(topicName + " topic health 
checker is failed and will retry to establish a connection after 5s.");
-                                       Thread.sleep(5000);
+                                       log.error(topicName + " topic health 
checker is failed and will try to subscribe again in 30 sec");
+                                       Thread.sleep(30000);
                                        break;
                                } catch (InterruptedException ignore) {
                                }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1afd2969/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
index 5d39956..8db203c 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
@@ -43,8 +43,10 @@ public class EventPublisher extends TopicPublisher {
      * @param event event to be published
      */
     public void publish(Event event) {
-        Properties headers = new Properties();
-        headers.put(Constants.EVENT_CLASS_NAME, event.getClass().getName());
-        super.publish(event, headers);
+        synchronized (EventPublisher.class) {
+            Properties headers = new Properties();
+            headers.put(Constants.EVENT_CLASS_NAME, 
event.getClass().getName());
+            super.publish(event, headers);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1afd2969/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index 004be13..7394d5b 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -48,6 +48,7 @@ public class TopicPublisher extends MessagePublisher {
        private TopicSession topicSession;
        private TopicConnector connector;
        private javax.jms.TopicPublisher topicPublisher = null;
+    private boolean initialized;
 
        /**
         * @param aTopicName
@@ -56,6 +57,9 @@ public class TopicPublisher extends MessagePublisher {
        TopicPublisher(String aTopicName) {
                super(aTopicName);
                connector = new TopicConnector();
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Topic publisher connector created: 
[topic] %s", getName()));
+        }
        }
 
        /**
@@ -68,35 +72,86 @@ public class TopicPublisher extends MessagePublisher {
        }
        
        public void publish(Object messageObj, Properties headers) {
-               
-               Gson gson = new Gson();
-               String message = gson.toJson(messageObj);
-               try {
-                       doPublish(message, headers);
-                       
-               } catch (Exception e) {
-                       log.error("Error while publishing to the topic: " + 
getName(), e);
-                       // TODO would it be worth to throw this exception?
-               }
+        synchronized (TopicPublisher.class) {
+            Gson gson = new Gson();
+            String message = gson.toJson(messageObj);
+            boolean published = false;
+            while(!published) {
+
+                try {
+                    doPublish(message, headers);
+                    published = true;
+                } catch (Exception e) {
+                    initialized = false;
+                    if(log.isErrorEnabled()) {
+                        log.error("Error while publishing to the topic: " + 
getName(), e);
+                    }
+                    if(log.isInfoEnabled()) {
+                        log.info("Will try to re-publish in 60 sec");
+                    }
+                    try {
+                        Thread.sleep(60000);
+                    } catch (InterruptedException ignore) {
+                    }
+                }
+            }
+        }
        }
 
        public void close() {
-
-               // closes all sessions/connections
-               try {
-                       topicPublisher.close();
-                       topicSession.close();
-                       connector.close();
-               } catch (JMSException ignore) {
-               }
+        synchronized (TopicPublisher.class) {
+            // closes all sessions/connections
+            try {
+                if(topicPublisher != null) {
+                    topicPublisher.close();
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Topic publisher closed: 
[topic] %s", getName()));
+                    }
+                }
+                if(topicSession != null) {
+                    topicSession.close();
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Topic publisher session 
closed: [topic] %s", getName()));
+                    }
+                }
+                if(connector != null) {
+                    connector.close();
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Topic publisher connector 
closed: [topic] %s", getName()));
+                    }
+                }
+            } catch (JMSException ignore) {
+            }
+        }
        }
 
        private void doPublish(String message, Properties headers) throws 
Exception, JMSException {
-               setPublisher();
+        if(!initialized) {
+            // Initialize a topic connection to the message broker
+            connector.init(getName());
+            initialized = true;
+            if(log.isDebugEnabled()) {
+                log.debug(String.format("Topic publisher connector 
initialized: [topic] %s", getName()));
+            }
+        }
 
-               TextMessage textMessage = 
topicSession.createTextMessage(message);
+        try {
+        // Create a new session
+        topicSession = createSession(connector);
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Topic publisher session created: [topic] 
%s", getName()));
+        }
+        // Create a publisher from session
+        topicPublisher = createPublisher(topicSession);
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Topic publisher created: [topic] %s", 
getName()));
+        }
+
+        // Create text message
+        TextMessage textMessage = topicSession.createTextMessage(message);
                
                if (headers != null) {
+            // Add header properties
                        @SuppressWarnings("rawtypes")
                        Enumeration e = headers.propertyNames();
 
@@ -110,26 +165,34 @@ public class TopicPublisher extends MessagePublisher {
         if (log.isDebugEnabled()) {
             log.debug(String.format("Message published: [topic] %s [header] %s 
[body] %s", getName(), (headers != null) ? headers.toString() : "null", 
message));
         }
-       }
+        }
+        finally {
+            if(topicPublisher != null) {
+                topicPublisher.close();
+                if(log.isDebugEnabled()) {
+                    log.debug(String.format("Topic publisher closed: [topic] 
%s", getName()));
+                }
+            }
+            if(topicSession != null) {
+                topicSession.close();
+                if(log.isDebugEnabled()) {
+                    log.debug(String.format("Topic publisher session closed: 
[topic] %s", getName()));
+                }
+            }
+        }
+    }
 
-       private void setPublisher() throws Exception, JMSException {
-               if (topicSession != null && topicPublisher != null) {
-                       return;
-               }
-               
-               if (topicSession == null) {
-                       // initialize a TopicConnector
-                       connector.init(getName());
-                       // get a session
-                       topicSession = connector.newSession();
-               }
-               
-               Topic topic = connector.getTopic();
+    private TopicSession createSession(TopicConnector topicConnector) throws 
Exception {
+        // Create a new session
+        return topicConnector.newSession();
+    }
+
+       private javax.jms.TopicPublisher createPublisher(TopicSession 
topicSession) throws Exception, JMSException {
+        Topic topic = connector.getTopic();
                if (topic == null) {
                        // if the topic doesn't exist, create it.
                        topic = topicSession.createTopic(getName());
                }
-               topicPublisher = topicSession.createPublisher(topic);
+               return topicSession.createPublisher(topic);
        }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1afd2969/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
index f6fa587..64ce136 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
@@ -53,30 +53,45 @@ public class TopicSubscriber implements Runnable {
        public TopicSubscriber(String aTopicName) {
                topicName = aTopicName;
                connector = new TopicConnector();
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Topic subscriber connector created: 
[topic] %s", topicName));
+        }
        }
 
        private void doSubscribe() throws Exception, JMSException {
-               if (topicSession != null && topicSubscriber != null) {
-                       return;
-               }
-               
-               if (topicSession == null) {
-                       // initialize a TopicConnector
-                       connector.init(topicName);
-                       // get a session
-                       topicSession = connector.newSession();
-               }
-               
-               Topic topic = connector.getTopic();
-               if (topic == null) {
-                       // if topic doesn't exist, create it.
-                       topic = topicSession.createTopic(topicName);
-               }
-               topicSubscriber = topicSession.createSubscriber(topic);
-               topicSubscriber.setMessageListener(messageListener);
+               // Initialize a topic connection
+               connector.init(topicName);
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Topic subscriber connector initialized: 
[topic] %s", topicName));
+        }
+        // Create new session
+        topicSession = createSession(connector);
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Topic subscriber session created: [topic] 
%s", topicName));
+        }
+        // Create a new subscriber
+        createSubscriber(topicSession);
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("Topic subscriber created: [topic] %s", 
topicName));
+        }
         subscribed = true;
        }
 
+    private void createSubscriber(TopicSession topicSession) throws 
JMSException {
+        Topic topic = connector.getTopic();
+        if (topic == null) {
+            // if topic doesn't exist, create it.
+            topic = topicSession.createTopic(topicName);
+        }
+        topicSubscriber = topicSession.createSubscriber(topic);
+        topicSubscriber.setMessageListener(messageListener);
+    }
+
+    private TopicSession createSession(TopicConnector topicConnector) throws 
Exception {
+        // Create a new session
+        return topicConnector.newSession();
+    }
+
        /**
         * @param messageListener
         *            this MessageListener will get triggered each time this
@@ -100,32 +115,52 @@ public class TopicSubscriber implements Runnable {
                        try {
                                doSubscribe();
                        } catch (Exception e) {
+                subscribed = false;
                                log.error("Error while subscribing to the 
topic: " + topicName, e);
                        } finally {
-                               // start the health checker
-                healthChecker = new TopicHealthChecker(topicName);
-                           Thread healthCheckerThread = new 
Thread(healthChecker);
-                               healthCheckerThread.start();
-                               try {
-                                       // waits till the thread finishes.
-                                       healthCheckerThread.join();
-                               } catch (InterruptedException ignore) {
-                               }
-                               // health checker failed
-                               // closes all sessions/connections
-                               try {
-                    subscribed = false;
-                                       if (topicSubscriber != null) {
-                                               topicSubscriber.close();
-                                       }
-                                       if (topicSession != null) {
-                                               topicSession.close();
-                                       }
-                                       if (connector != null) {
-                                               connector.close();
-                                       }
-                               } catch (JMSException ignore) {
-                               }
+                if(subscribed) {
+                    // start the health checker if subscribed
+                    healthChecker = new TopicHealthChecker(topicName);
+                    Thread healthCheckerThread = new Thread(healthChecker);
+                    healthCheckerThread.start();
+                    try {
+                        // waits till the thread finishes.
+                        healthCheckerThread.join();
+                    } catch (InterruptedException ignore) {
+                    }
+                }
+                else {
+                                   // subscription failed
+                    if(log.isInfoEnabled()) {
+                        log.info("Will try to subscribe again in 30 sec");
+                    }
+                    try {
+                        Thread.sleep(30000);
+                    } catch (InterruptedException ignore) {
+                    }
+                }
+                // closes all sessions/connections
+                try {
+                    if (topicSubscriber != null) {
+                        topicSubscriber.close();
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Topic subscriber closed: 
[topic] %s", topicName));
+                        }
+                    }
+                    if (topicSession != null) {
+                        topicSession.close();
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Topic subscriber session 
closed: [topic] %s", topicName));
+                        }
+                    }
+                    if (connector != null) {
+                        connector.close();
+                        if(log.isDebugEnabled()) {
+                            log.debug(String.format("Topic subscriber 
connector closed: [topic] %s", topicName));
+                        }
+                    }
+                } catch (JMSException ignore) {
+                }
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1afd2969/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java
new file mode 100644
index 0000000..0fcb6a5
--- /dev/null
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.ping;
+
+import 
org.apache.stratos.messaging.event.instance.notifier.InstanceNotifierEvent;
+
+import java.io.Serializable;
+
+/**
+ * Ping event.
+ */
+public class PingEvent extends InstanceNotifierEvent implements Serializable {
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1afd2969/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
index 397a468..8b3c814 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
@@ -24,6 +24,7 @@ public class Constants {
        public static final String HEALTH_STAT_TOPIC = 
"summarized-health-stats";
     public static final String INSTANCE_STATUS_TOPIC = "instance-status";
     public static final String INSTANCE_NOTIFIER_TOPIC = "instance-notifier";
+    public static final String PING_TOPIC = "ping";
     public static final String TENANT_TOPIC = "tenant";
     public static final String TENANT_RANGE_ALL = "*";
 

Reply via email to