Repository: incubator-stratos Updated Branches: refs/heads/4.0.0-incubating 828aa9861 -> e8ffe3d08
Fixed messaging model session timeout handling, subscriber re-connection and publisher retry logic Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/1afd2969 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/1afd2969 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/1afd2969 Branch: refs/heads/4.0.0-incubating Commit: 1afd29698523caf81f0794a768ab2da45322b406 Parents: 0a682d8 Author: Imesh Gunaratne <[email protected]> Authored: Thu Apr 24 12:49:35 2014 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Thu Apr 24 12:49:35 2014 +0530 ---------------------------------------------------------------------- .../broker/connect/TopicConnector.java | 1 - .../broker/heartbeat/TopicHealthChecker.java | 23 ++-- .../broker/publish/EventPublisher.java | 8 +- .../broker/publish/TopicPublisher.java | 135 ++++++++++++++----- .../broker/subscribe/TopicSubscriber.java | 119 ++++++++++------ .../stratos/messaging/event/ping/PingEvent.java | 30 +++++ .../stratos/messaging/util/Constants.java | 1 + 7 files changed, 226 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1afd2969/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java index a4fc5c4..ebb339d 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/TopicConnector.java @@ -67,7 +67,6 @@ public class TopicConnector { } topicConnection = connFactory.createTopicConnection(); topicConnection.start(); - } /** http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1afd2969/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java index 1fbd25e..d36b9cb 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/heartbeat/TopicHealthChecker.java @@ -21,6 +21,9 @@ package org.apache.stratos.messaging.broker.heartbeat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.broker.connect.TopicConnector; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; +import org.apache.stratos.messaging.event.ping.PingEvent; +import org.apache.stratos.messaging.util.Constants; import javax.jms.JMSException; @@ -43,22 +46,24 @@ public class TopicHealthChecker implements Runnable { @Override public void run() { if(log.isDebugEnabled()){ - log.debug(topicName + " topic Health Checker is running... " ); + log.debug(topicName + " topic health checker is running... " ); } TopicConnector testConnector = new TopicConnector(); while (!terminated) { try { - // health checker runs in every 30s - Thread.sleep(30000); - + // Health checker needs to run with the smallest possible time interval + // to detect a connection drop. Otherwise the subscriber will not + // get reconnected after a connection drop. + Thread.sleep(1000); testConnector.init(topicName); - + // A ping event is published to detect a session timeout + EventPublisherPool.getPublisher(Constants.PING_TOPIC).publish(new PingEvent()); } catch (Exception e) { - // implies connection is not established - // sleep for 5s and retry + // Implies connection is not established + // sleep for 30 sec and retry try { - log.error(topicName + " topic health checker is failed and will retry to establish a connection after 5s."); - Thread.sleep(5000); + log.error(topicName + " topic health checker is failed and will try to subscribe again in 30 sec"); + Thread.sleep(30000); break; } catch (InterruptedException ignore) { } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1afd2969/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java index 5d39956..8db203c 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java @@ -43,8 +43,10 @@ public class EventPublisher extends TopicPublisher { * @param event event to be published */ public void publish(Event event) { - Properties headers = new Properties(); - headers.put(Constants.EVENT_CLASS_NAME, event.getClass().getName()); - super.publish(event, headers); + synchronized (EventPublisher.class) { + Properties headers = new Properties(); + headers.put(Constants.EVENT_CLASS_NAME, event.getClass().getName()); + super.publish(event, headers); + } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1afd2969/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java index 004be13..7394d5b 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java @@ -48,6 +48,7 @@ public class TopicPublisher extends MessagePublisher { private TopicSession topicSession; private TopicConnector connector; private javax.jms.TopicPublisher topicPublisher = null; + private boolean initialized; /** * @param aTopicName @@ -56,6 +57,9 @@ public class TopicPublisher extends MessagePublisher { TopicPublisher(String aTopicName) { super(aTopicName); connector = new TopicConnector(); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic publisher connector created: [topic] %s", getName())); + } } /** @@ -68,35 +72,86 @@ public class TopicPublisher extends MessagePublisher { } public void publish(Object messageObj, Properties headers) { - - Gson gson = new Gson(); - String message = gson.toJson(messageObj); - try { - doPublish(message, headers); - - } catch (Exception e) { - log.error("Error while publishing to the topic: " + getName(), e); - // TODO would it be worth to throw this exception? - } + synchronized (TopicPublisher.class) { + Gson gson = new Gson(); + String message = gson.toJson(messageObj); + boolean published = false; + while(!published) { + + try { + doPublish(message, headers); + published = true; + } catch (Exception e) { + initialized = false; + if(log.isErrorEnabled()) { + log.error("Error while publishing to the topic: " + getName(), e); + } + if(log.isInfoEnabled()) { + log.info("Will try to re-publish in 60 sec"); + } + try { + Thread.sleep(60000); + } catch (InterruptedException ignore) { + } + } + } + } } public void close() { - - // closes all sessions/connections - try { - topicPublisher.close(); - topicSession.close(); - connector.close(); - } catch (JMSException ignore) { - } + synchronized (TopicPublisher.class) { + // closes all sessions/connections + try { + if(topicPublisher != null) { + topicPublisher.close(); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic publisher closed: [topic] %s", getName())); + } + } + if(topicSession != null) { + topicSession.close(); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic publisher session closed: [topic] %s", getName())); + } + } + if(connector != null) { + connector.close(); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic publisher connector closed: [topic] %s", getName())); + } + } + } catch (JMSException ignore) { + } + } } private void doPublish(String message, Properties headers) throws Exception, JMSException { - setPublisher(); + if(!initialized) { + // Initialize a topic connection to the message broker + connector.init(getName()); + initialized = true; + if(log.isDebugEnabled()) { + log.debug(String.format("Topic publisher connector initialized: [topic] %s", getName())); + } + } - TextMessage textMessage = topicSession.createTextMessage(message); + try { + // Create a new session + topicSession = createSession(connector); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic publisher session created: [topic] %s", getName())); + } + // Create a publisher from session + topicPublisher = createPublisher(topicSession); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic publisher created: [topic] %s", getName())); + } + + // Create text message + TextMessage textMessage = topicSession.createTextMessage(message); if (headers != null) { + // Add header properties @SuppressWarnings("rawtypes") Enumeration e = headers.propertyNames(); @@ -110,26 +165,34 @@ public class TopicPublisher extends MessagePublisher { if (log.isDebugEnabled()) { log.debug(String.format("Message published: [topic] %s [header] %s [body] %s", getName(), (headers != null) ? headers.toString() : "null", message)); } - } + } + finally { + if(topicPublisher != null) { + topicPublisher.close(); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic publisher closed: [topic] %s", getName())); + } + } + if(topicSession != null) { + topicSession.close(); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic publisher session closed: [topic] %s", getName())); + } + } + } + } - private void setPublisher() throws Exception, JMSException { - if (topicSession != null && topicPublisher != null) { - return; - } - - if (topicSession == null) { - // initialize a TopicConnector - connector.init(getName()); - // get a session - topicSession = connector.newSession(); - } - - Topic topic = connector.getTopic(); + private TopicSession createSession(TopicConnector topicConnector) throws Exception { + // Create a new session + return topicConnector.newSession(); + } + + private javax.jms.TopicPublisher createPublisher(TopicSession topicSession) throws Exception, JMSException { + Topic topic = connector.getTopic(); if (topic == null) { // if the topic doesn't exist, create it. topic = topicSession.createTopic(getName()); } - topicPublisher = topicSession.createPublisher(topic); + return topicSession.createPublisher(topic); } - } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1afd2969/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java index f6fa587..64ce136 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/subscribe/TopicSubscriber.java @@ -53,30 +53,45 @@ public class TopicSubscriber implements Runnable { public TopicSubscriber(String aTopicName) { topicName = aTopicName; connector = new TopicConnector(); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic subscriber connector created: [topic] %s", topicName)); + } } private void doSubscribe() throws Exception, JMSException { - if (topicSession != null && topicSubscriber != null) { - return; - } - - if (topicSession == null) { - // initialize a TopicConnector - connector.init(topicName); - // get a session - topicSession = connector.newSession(); - } - - Topic topic = connector.getTopic(); - if (topic == null) { - // if topic doesn't exist, create it. - topic = topicSession.createTopic(topicName); - } - topicSubscriber = topicSession.createSubscriber(topic); - topicSubscriber.setMessageListener(messageListener); + // Initialize a topic connection + connector.init(topicName); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic subscriber connector initialized: [topic] %s", topicName)); + } + // Create new session + topicSession = createSession(connector); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic subscriber session created: [topic] %s", topicName)); + } + // Create a new subscriber + createSubscriber(topicSession); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic subscriber created: [topic] %s", topicName)); + } subscribed = true; } + private void createSubscriber(TopicSession topicSession) throws JMSException { + Topic topic = connector.getTopic(); + if (topic == null) { + // if topic doesn't exist, create it. + topic = topicSession.createTopic(topicName); + } + topicSubscriber = topicSession.createSubscriber(topic); + topicSubscriber.setMessageListener(messageListener); + } + + private TopicSession createSession(TopicConnector topicConnector) throws Exception { + // Create a new session + return topicConnector.newSession(); + } + /** * @param messageListener * this MessageListener will get triggered each time this @@ -100,32 +115,52 @@ public class TopicSubscriber implements Runnable { try { doSubscribe(); } catch (Exception e) { + subscribed = false; log.error("Error while subscribing to the topic: " + topicName, e); } finally { - // start the health checker - healthChecker = new TopicHealthChecker(topicName); - Thread healthCheckerThread = new Thread(healthChecker); - healthCheckerThread.start(); - try { - // waits till the thread finishes. - healthCheckerThread.join(); - } catch (InterruptedException ignore) { - } - // health checker failed - // closes all sessions/connections - try { - subscribed = false; - if (topicSubscriber != null) { - topicSubscriber.close(); - } - if (topicSession != null) { - topicSession.close(); - } - if (connector != null) { - connector.close(); - } - } catch (JMSException ignore) { - } + if(subscribed) { + // start the health checker if subscribed + healthChecker = new TopicHealthChecker(topicName); + Thread healthCheckerThread = new Thread(healthChecker); + healthCheckerThread.start(); + try { + // waits till the thread finishes. + healthCheckerThread.join(); + } catch (InterruptedException ignore) { + } + } + else { + // subscription failed + if(log.isInfoEnabled()) { + log.info("Will try to subscribe again in 30 sec"); + } + try { + Thread.sleep(30000); + } catch (InterruptedException ignore) { + } + } + // closes all sessions/connections + try { + if (topicSubscriber != null) { + topicSubscriber.close(); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic subscriber closed: [topic] %s", topicName)); + } + } + if (topicSession != null) { + topicSession.close(); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic subscriber session closed: [topic] %s", topicName)); + } + } + if (connector != null) { + connector.close(); + if(log.isDebugEnabled()) { + log.debug(String.format("Topic subscriber connector closed: [topic] %s", topicName)); + } + } + } catch (JMSException ignore) { + } } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1afd2969/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java new file mode 100644 index 0000000..0fcb6a5 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/ping/PingEvent.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.event.ping; + +import org.apache.stratos.messaging.event.instance.notifier.InstanceNotifierEvent; + +import java.io.Serializable; + +/** + * Ping event. + */ +public class PingEvent extends InstanceNotifierEvent implements Serializable { +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/1afd2969/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java index 397a468..8b3c814 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/util/Constants.java @@ -24,6 +24,7 @@ public class Constants { public static final String HEALTH_STAT_TOPIC = "summarized-health-stats"; public static final String INSTANCE_STATUS_TOPIC = "instance-status"; public static final String INSTANCE_NOTIFIER_TOPIC = "instance-notifier"; + public static final String PING_TOPIC = "ping"; public static final String TENANT_TOPIC = "tenant"; public static final String TENANT_RANGE_ALL = "*";
