Repository: stratos Updated Branches: refs/heads/master 7b09bc153 -> f3e322545
fix amqp too many open files issue Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/f3e32254 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/f3e32254 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/f3e32254 Branch: refs/heads/master Commit: f3e3225453a38b7edf29b5f7d46409c52ce8314a Parents: dbdc993 Author: Udara Liyanage <[email protected]> Authored: Thu Apr 16 08:32:44 2015 +0530 Committer: Udara Liyanage <[email protected]> Committed: Thu Apr 16 09:33:45 2015 +0530 ---------------------------------------------------------------------- .../broker/connect/amqp/AmqpTopicConnector.java | 1 + .../broker/connect/amqp/AmqpTopicPublisher.java | 23 ++++++++++++++++++-- .../broker/publish/EventPublisher.java | 2 ++ 3 files changed, 24 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/f3e32254/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicConnector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicConnector.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicConnector.java index 381a733..7d347ca 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicConnector.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicConnector.java @@ -88,6 +88,7 @@ public abstract class AmqpTopicConnector implements TopicConnector { if (topicConnection != null) { try { topicConnection.stop(); + topicConnection.close(); } catch (JMSException ignore) { log.warn("Could not disconnect from message broker"); } http://git-wip-us.apache.org/repos/asf/stratos/blob/f3e32254/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicPublisher.java index ea2f314..3f75096 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicPublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicPublisher.java @@ -25,6 +25,7 @@ import org.apache.stratos.messaging.broker.connect.RetryTimer; import org.apache.stratos.messaging.broker.connect.TopicPublisher; import org.apache.stratos.messaging.domain.exception.MessagingException; +import javax.jms.JMSException; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSession; @@ -57,6 +58,8 @@ public class AmqpTopicPublisher extends AmqpTopicConnector implements TopicPubli public void publish(String message, boolean retry) { boolean published = false; while (!published) { + TopicSession topicSession = null; + javax.jms.TopicPublisher topicPublisher = null; try { while (connectionStatus == ConnectionStatus.ReConnecting) { // Connection has been broken, wait until reconnected @@ -73,13 +76,13 @@ public class AmqpTopicPublisher extends AmqpTopicConnector implements TopicPubli connectionStatus = ConnectionStatus.Connected; } - TopicSession topicSession = newSession(); + topicSession = newSession(); Topic topic = lookupTopic(topicName); if (topic == null) { // if the topic doesn't exist, create it. topic = topicSession.createTopic(topicName); } - javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic); + topicPublisher = topicSession.createPublisher(topic); TextMessage textMessage = topicSession.createTextMessage(message); topicPublisher.publish(textMessage); published = true; @@ -92,6 +95,22 @@ public class AmqpTopicPublisher extends AmqpTopicConnector implements TopicPubli } // Try to reconnect reconnect(); + }finally { + + try{ + if (topicSession != null) { + topicSession.close(); + } + if (topicPublisher != null) { + topicPublisher.close(); + } + + }catch (JMSException e) { + message = "Error cleaning up pubisher"; + log.error(message, e); + throw new MessagingException(message, e); + } + } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/f3e32254/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java index 8d21496..b488b23 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java @@ -70,6 +70,8 @@ public class EventPublisher { topicPublisher.connect(); topicPublisher.publish(message, retry); + topicPublisher.disconnect(); + } } }
