Removed topic publisher retry from health checker ping event

Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/039438b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/039438b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/039438b0

Branch: refs/heads/4.0.0-incubating
Commit: 039438b08135839ed9b88ab3beae77ec740b3e4a
Parents: be8ea92
Author: Imesh Gunaratne <[email protected]>
Authored: Thu Apr 24 13:29:10 2014 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Thu Apr 24 13:29:10 2014 +0530

----------------------------------------------------------------------
 .../messaging/broker/heartbeat/TopicHealthChecker.java |  2 +-
 .../messaging/broker/publish/EventPublisher.java       |  6 +++++-
 .../messaging/broker/publish/TopicPublisher.java       | 13 ++++++++++---
 .../stratos/messaging/publish/MessagePublisher.java    |  4 ++--
 4 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/039438b0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
index d36b9cb..72466aa 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java
@@ -57,7 +57,7 @@ public class TopicHealthChecker implements Runnable {
                                Thread.sleep(1000);
                                testConnector.init(topicName);
                 // A ping event is published to detect a session timeout
-                
EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent());
+                
EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent(), 
false);
                        } catch (Exception e) {
                                // Implies connection is not established
                                // sleep for 30 sec and retry

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/039438b0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
index 8db203c..2150494 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
@@ -43,10 +43,14 @@ public class EventPublisher extends TopicPublisher {
      * @param event event to be published
      */
     public void publish(Event event) {
+        publish(event, true);
+    }
+
+    public void publish(Event event, boolean retry) {
         synchronized (EventPublisher.class) {
             Properties headers = new Properties();
             headers.put(Constants.EVENT_CLASS_NAME, 
event.getClass().getName());
-            super.publish(event, headers);
+            super.publish(event, headers, retry);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/039438b0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index 7394d5b..c0074f5 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -67,11 +67,11 @@ public class TopicPublisher extends MessagePublisher {
         * lost, this will perform re-subscription periodically, until a 
connection
         * obtained.
         */
-       public void publish(Object messageObj) {
-               publish(messageObj, null);
+       public void publish(Object messageObj, boolean retry) {
+               publish(messageObj, null, retry);
        }
        
-       public void publish(Object messageObj, Properties headers) {
+       public void publish(Object messageObj, Properties headers, boolean 
retry) {
         synchronized (TopicPublisher.class) {
             Gson gson = new Gson();
             String message = gson.toJson(messageObj);
@@ -86,6 +86,13 @@ public class TopicPublisher extends MessagePublisher {
                     if(log.isErrorEnabled()) {
                         log.error("Error while publishing to the topic: " + 
getName(), e);
                     }
+                    if(!retry) {
+                        if(log.isDebugEnabled()) {
+                            log.debug("Retry disabled for topic " + getName());
+                        }
+                        throw new RuntimeException(e);
+                    }
+
                     if(log.isInfoEnabled()) {
                         log.info("Will try to re-publish in 60 sec");
                     }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/039438b0/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java
index ca3399f..758089f 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/publish/MessagePublisher.java
@@ -54,7 +54,7 @@ public abstract class MessagePublisher {
         * @param messageObj
         *            POJO to be published.
         */
-       public abstract void publish(Object messageObj);
+       public abstract void publish(Object messageObj, boolean retry);
 
        /**
         * This operation get triggered when a message is ready to be published.
@@ -67,5 +67,5 @@ public abstract class MessagePublisher {
         * @param headers
         *            properties to be set as message headers.
         */
-       public abstract void publish(Object messageObj, Properties headers);
+       public abstract void publish(Object messageObj, Properties headers, 
boolean retry);
 }

Reply via email to