add configurable system parameters to configure ping message interval

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/0de28d86
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/0de28d86
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/0de28d86

Branch: refs/heads/4.0.0-grouping
Commit: 0de28d86f3958405b0e3c1470e9951aec5faa6f5
Parents: b650785
Author: Udara Liyanage <[email protected]>
Authored: Mon Aug 18 15:04:00 2014 +0530
Committer: Udara Liyanage <[email protected]>
Committed: Mon Aug 18 15:04:00 2014 +0530

----------------------------------------------------------------------
 .../broker/heartbeat/TopicHealthChecker.java    | 14 ++++---
 .../broker/subscribe/TopicSubscriber.java       |  5 ++-
 .../stratos/messaging/util/Constants.java       |  9 +++++
 .../org/apache/stratos/messaging/util/Util.java | 41 ++++++++++++++++++++
 4 files changed, 61 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/0de28d86/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
index 72466aa..7ea98d4 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
@@ -24,6 +24,7 @@ import 
org.apache.stratos.messaging.broker.connect.TopicConnector;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.event.ping.PingEvent;
 import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
 
 import javax.jms.JMSException;
 
@@ -51,19 +52,20 @@ public class TopicHealthChecker implements Runnable {
                TopicConnector testConnector = new TopicConnector();
                while (!terminated) {
                        try {
-                               // Health checker needs to run with the 
smallest possible time interval
-                               // to detect a connection drop. Otherwise the 
subscriber will not
+
+                // Health checker needs to run with the smallest possible time 
interval (configurable)
+                // to detect a connection drop. Otherwise the subscriber will 
not
                 // get reconnected after a connection drop.
-                               Thread.sleep(1000);
+                               Thread.sleep(Util.getAveragePingInterval());
                                testConnector.init(topicName);
                 // A ping event is published to detect a session timeout
                 
EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent(), 
false);
                        } catch (Exception e) {
                                // Implies connection is not established
-                               // sleep for 30 sec and retry
+                               // sleep for configured failover ping interval 
and retry
                                try {
-                                       log.error(topicName + " topic health 
checker is failed and will try to subscribe again in 30 sec");
-                                       Thread.sleep(30000);
+                    log.error(topicName + " topic health checker is failed and 
will try to subscribe again in "+Util.getFailoverPingInterval()/1000+" 
seconds.");
+                    Thread.sleep(Util.getFailoverPingInterval());
                                        break;
                                } catch (InterruptedException ignore) {
                                }

http://git-wip-us.apache.org/repos/asf/stratos/blob/0de28d86/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
index 64ce136..c0aee87 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.broker.connect.TopicConnector;
 import org.apache.stratos.messaging.broker.heartbeat.TopicHealthChecker;
+import org.apache.stratos.messaging.util.Util;
 
 /**
  * Any instance who needs to subscribe to a topic, should communicate with this
@@ -132,10 +133,10 @@ public class TopicSubscriber implements Runnable {
                 else {
                                    // subscription failed
                     if(log.isInfoEnabled()) {
-                        log.info("Will try to subscribe again in 30 sec");
+                        log.info("Will try to subscribe again in 
"+Util.getFailoverPingInterval()/1000+" sec");
                     }
                     try {
-                        Thread.sleep(30000);
+                        Thread.sleep(Util.getFailoverPingInterval());
                     } catch (InterruptedException ignore) {
                     }
                 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/0de28d86/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
index 36ad532..5991e25 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java
@@ -66,4 +66,13 @@ public class Constants {
 
     public static final String IS_PRIMARY = "PRIMARY";
 
+    //System Properties
+    public static final String AVERAGE_PING_INTERVAL_PROPERTY = 
"stratos.messaging.averagePingInterval";
+    public static final String FAILOVER_PING_INTERVAL_PROPERTY = 
"stratos.messaging.failoverPingInterval";
+
+    //Default values
+    public static final int DEFAULT_AVERAGE_PING_INTERVAL = 1000;
+    public static final int DEFAULT_FAILOVER_PING_INTERVAL = 30000;
+
+
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/0de28d86/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
index eb0035e..95b5a07 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java
@@ -105,4 +105,45 @@ public class Util {
         return (new JsonMessage(json, type)).getObject();
     }
 
+
+    // Time interval between each ping message sent to topic.
+    private static int averagePingInterval;
+
+    // Time interval between each ping message after an error had occurred.
+    private static int failoverPingInterval;
+
+    /**
+     * fetch value from system param
+     * @return
+     */
+    public static int getAveragePingInterval() {
+        if (averagePingInterval <= 0) {
+            averagePingInterval = 
Util.getNumericSystemProperty(Constants.DEFAULT_AVERAGE_PING_INTERVAL,Constants.AVERAGE_PING_INTERVAL_PROPERTY);
+        }
+        return averagePingInterval;
+    }
+
+    /**
+     * fetch value from system param
+     * @return
+     */
+    public static int getFailoverPingInterval() {
+        if (failoverPingInterval <= 0) {
+            failoverPingInterval = 
Util.getNumericSystemProperty(Constants.DEFAULT_FAILOVER_PING_INTERVAL,Constants.FAILOVER_PING_INTERVAL_PROPERTY);
+        }
+        return failoverPingInterval;
+    }
+
+    /**
+     * Method to safely access numeric system properties
+     * @param defaultValue
+     * @return
+     */
+    public static Integer getNumericSystemProperty(Integer defaultValue, 
String propertyKey) {
+        try {
+            return Integer.valueOf(System.getProperty(propertyKey));
+        } catch (NumberFormatException ex) {
+            return defaultValue;
+        }
+    }
 }

Reply via email to