add configurable system parameters to configure ping message interval
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/0de28d86 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/0de28d86 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/0de28d86 Branch: refs/heads/4.0.0-grouping Commit: 0de28d86f3958405b0e3c1470e9951aec5faa6f5 Parents: b650785 Author: Udara Liyanage <[email protected]> Authored: Mon Aug 18 15:04:00 2014 +0530 Committer: Udara Liyanage <[email protected]> Committed: Mon Aug 18 15:04:00 2014 +0530 ---------------------------------------------------------------------- .../broker/heartbeat/TopicHealthChecker.java | 14 ++++--- .../broker/subscribe/TopicSubscriber.java | 5 ++- .../stratos/messaging/util/Constants.java | 9 +++++ .../org/apache/stratos/messaging/util/Util.java | 41 ++++++++++++++++++++ 4 files changed, 61 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/0de28d86/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java index 72466aa..7ea98d4 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java @@ -24,6 +24,7 @@ import org.apache.stratos.messaging.broker.connect.TopicConnector; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.event.ping.PingEvent; import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.util.Util; import javax.jms.JMSException; @@ -51,19 +52,20 @@ public class TopicHealthChecker implements Runnable { TopicConnector testConnector = new TopicConnector(); while (!terminated) { try { - // Health checker needs to run with the smallest possible time interval - // to detect a connection drop. Otherwise the subscriber will not + + // Health checker needs to run with the smallest possible time interval (configurable) + // to detect a connection drop. Otherwise the subscriber will not // get reconnected after a connection drop. - Thread.sleep(1000); + Thread.sleep(Util.getAveragePingInterval()); testConnector.init(topicName); // A ping event is published to detect a session timeout EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent(), false); } catch (Exception e) { // Implies connection is not established - // sleep for 30 sec and retry + // sleep for configured failover ping interval and retry try { - log.error(topicName + " topic health checker is failed and will try to subscribe again in 30 sec"); - Thread.sleep(30000); + log.error(topicName + " topic health checker is failed and will try to subscribe again in "+Util.getFailoverPingInterval()/1000+" seconds."); + Thread.sleep(Util.getFailoverPingInterval()); break; } catch (InterruptedException ignore) { } http://git-wip-us.apache.org/repos/asf/stratos/blob/0de28d86/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java index 64ce136..c0aee87 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.broker.connect.TopicConnector; import org.apache.stratos.messaging.broker.heartbeat.TopicHealthChecker; +import org.apache.stratos.messaging.util.Util; /** * Any instance who needs to subscribe to a topic, should communicate with this @@ -132,10 +133,10 @@ public class TopicSubscriber implements Runnable { else { // subscription failed if(log.isInfoEnabled()) { - log.info("Will try to subscribe again in 30 sec"); + log.info("Will try to subscribe again in "+Util.getFailoverPingInterval()/1000+" sec"); } try { - Thread.sleep(30000); + Thread.sleep(Util.getFailoverPingInterval()); } catch (InterruptedException ignore) { } } http://git-wip-us.apache.org/repos/asf/stratos/blob/0de28d86/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java index 36ad532..5991e25 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java @@ -66,4 +66,13 @@ public class Constants { public static final String IS_PRIMARY = "PRIMARY"; + //System Properties + public static final String AVERAGE_PING_INTERVAL_PROPERTY = "stratos.messaging.averagePingInterval"; + public static final String FAILOVER_PING_INTERVAL_PROPERTY = "stratos.messaging.failoverPingInterval"; + + //Default values + public static final int DEFAULT_AVERAGE_PING_INTERVAL = 1000; + public static final int DEFAULT_FAILOVER_PING_INTERVAL = 30000; + + } http://git-wip-us.apache.org/repos/asf/stratos/blob/0de28d86/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java index eb0035e..95b5a07 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Util.java @@ -105,4 +105,45 @@ public class Util { return (new JsonMessage(json, type)).getObject(); } + + // Time interval between each ping message sent to topic. + private static int averagePingInterval; + + // Time interval between each ping message after an error had occurred. + private static int failoverPingInterval; + + /** + * fetch value from system param + * @return + */ + public static int getAveragePingInterval() { + if (averagePingInterval <= 0) { + averagePingInterval = Util.getNumericSystemProperty(Constants.DEFAULT_AVERAGE_PING_INTERVAL,Constants.AVERAGE_PING_INTERVAL_PROPERTY); + } + return averagePingInterval; + } + + /** + * fetch value from system param + * @return + */ + public static int getFailoverPingInterval() { + if (failoverPingInterval <= 0) { + failoverPingInterval = Util.getNumericSystemProperty(Constants.DEFAULT_FAILOVER_PING_INTERVAL,Constants.FAILOVER_PING_INTERVAL_PROPERTY); + } + return failoverPingInterval; + } + + /** + * Method to safely access numeric system properties + * @param defaultValue + * @return + */ + public static Integer getNumericSystemProperty(Integer defaultValue, String propertyKey) { + try { + return Integer.valueOf(System.getProperty(propertyKey)); + } catch (NumberFormatException ex) { + return defaultValue; + } + } }
