Removed topic publisher retry from health checker ping event
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/039438b0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/039438b0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/039438b0 Branch: refs/heads/4.0.0-incubating Commit: 039438b08135839ed9b88ab3beae77ec740b3e4a Parents: be8ea92 Author: Imesh Gunaratne <[email protected]> Authored: Thu Apr 24 13:29:10 2014 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Thu Apr 24 13:29:10 2014 +0530 ---------------------------------------------------------------------- .../messaging/broker/heartbeat/TopicHealthChecker.java | 2 +- .../messaging/broker/publish/EventPublisher.java | 6 +++++- .../messaging/broker/publish/TopicPublisher.java | 13 ++++++++++--- .../stratos/messaging/publish/MessagePublisher.java | 4 ++-- 4 files changed, 18 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/039438b0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java index d36b9cb..72466aa 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java @@ -57,7 +57,7 @@ public class TopicHealthChecker implements Runnable { Thread.sleep(1000); testConnector.init(topicName); // A ping event is published to detect a session timeout - EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent()); + EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent(), false); } catch (Exception e) { // Implies connection is not established // sleep for 30 sec and retry http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/039438b0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java index 8db203c..2150494 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java @@ -43,10 +43,14 @@ public class EventPublisher extends TopicPublisher { * @param event event to be published */ public void publish(Event event) { + publish(event, true); + } + + public void publish(Event event, boolean retry) { synchronized (EventPublisher.class) { Properties headers = new Properties(); headers.put(Constants.EVENT_CLASS_NAME, event.getClass().getName()); - super.publish(event, headers); + super.publish(event, headers, retry); } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/039438b0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java index 7394d5b..c0074f5 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java @@ -67,11 +67,11 @@ public class TopicPublisher extends MessagePublisher { * lost, this will perform re-subscription periodically, until a connection * obtained. */ - public void publish(Object messageObj) { - publish(messageObj, null); + public void publish(Object messageObj, boolean retry) { + publish(messageObj, null, retry); } - public void publish(Object messageObj, Properties headers) { + public void publish(Object messageObj, Properties headers, boolean retry) { synchronized (TopicPublisher.class) { Gson gson = new Gson(); String message = gson.toJson(messageObj); @@ -86,6 +86,13 @@ public class TopicPublisher extends MessagePublisher { if(log.isErrorEnabled()) { log.error("Error while publishing to the topic: " + getName(), e); } + if(!retry) { + if(log.isDebugEnabled()) { + log.debug("Retry disabled for topic " + getName()); + } + throw new RuntimeException(e); + } + if(log.isInfoEnabled()) { log.info("Will try to re-publish in 60 sec"); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/039438b0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java index ca3399f..758089f 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java @@ -54,7 +54,7 @@ public abstract class MessagePublisher { * @param messageObj * POJO to be published. */ - public abstract void publish(Object messageObj); + public abstract void publish(Object messageObj, boolean retry); /** * This operation get triggered when a message is ready to be published. @@ -67,5 +67,5 @@ public abstract class MessagePublisher { * @param headers * properties to be set as message headers. */ - public abstract void publish(Object messageObj, Properties headers); + public abstract void publish(Object messageObj, Properties headers, boolean retry); }
