Repository: stratos
Updated Branches:
  refs/heads/master 7b09bc153 -> f3e322545


fix amqp too many open files issue


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/f3e32254
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/f3e32254
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/f3e32254

Branch: refs/heads/master
Commit: f3e3225453a38b7edf29b5f7d46409c52ce8314a
Parents: dbdc993
Author: Udara Liyanage <[email protected]>
Authored: Thu Apr 16 08:32:44 2015 +0530
Committer: Udara Liyanage <[email protected]>
Committed: Thu Apr 16 09:33:45 2015 +0530

----------------------------------------------------------------------
 .../broker/connect/amqp/AmqpTopicConnector.java |  1 +
 .../broker/connect/amqp/AmqpTopicPublisher.java | 23 ++++++++++++++++++--
 .../broker/publish/EventPublisher.java          |  2 ++
 3 files changed, 24 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/f3e32254/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicConnector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicConnector.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicConnector.java
index 381a733..7d347ca 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicConnector.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicConnector.java
@@ -88,6 +88,7 @@ public abstract class AmqpTopicConnector implements 
TopicConnector {
         if (topicConnection != null) {
             try {
                 topicConnection.stop();
+                topicConnection.close();
             } catch (JMSException ignore) {
                 log.warn("Could not disconnect from message broker");
             }

http://git-wip-us.apache.org/repos/asf/stratos/blob/f3e32254/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicPublisher.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicPublisher.java
index ea2f314..3f75096 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicPublisher.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/connect/amqp/AmqpTopicPublisher.java
@@ -25,6 +25,7 @@ import org.apache.stratos.messaging.broker.connect.RetryTimer;
 import org.apache.stratos.messaging.broker.connect.TopicPublisher;
 import org.apache.stratos.messaging.domain.exception.MessagingException;
 
+import javax.jms.JMSException;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicSession;
@@ -57,6 +58,8 @@ public class AmqpTopicPublisher extends AmqpTopicConnector 
implements TopicPubli
     public void publish(String message, boolean retry) {
         boolean published = false;
         while (!published) {
+            TopicSession topicSession = null;
+            javax.jms.TopicPublisher topicPublisher = null;
             try {
                 while (connectionStatus == ConnectionStatus.ReConnecting) {
                     // Connection has been broken, wait until reconnected
@@ -73,13 +76,13 @@ public class AmqpTopicPublisher extends AmqpTopicConnector 
implements TopicPubli
                     connectionStatus = ConnectionStatus.Connected;
                 }
 
-                TopicSession topicSession = newSession();
+                topicSession = newSession();
                 Topic topic = lookupTopic(topicName);
                 if (topic == null) {
                     // if the topic doesn't exist, create it.
                     topic = topicSession.createTopic(topicName);
                 }
-                javax.jms.TopicPublisher topicPublisher = 
topicSession.createPublisher(topic);
+                topicPublisher = topicSession.createPublisher(topic);
                 TextMessage textMessage = 
topicSession.createTextMessage(message);
                 topicPublisher.publish(textMessage);
                 published = true;
@@ -92,6 +95,22 @@ public class AmqpTopicPublisher extends AmqpTopicConnector 
implements TopicPubli
                 }
                 // Try to reconnect
                 reconnect();
+            }finally {
+
+                try{
+                    if (topicSession != null) {
+                        topicSession.close();
+                    }
+                    if (topicPublisher != null) {
+                        topicPublisher.close();
+                    }
+
+                }catch (JMSException e) {
+                    message = "Error cleaning up pubisher";
+                    log.error(message, e);
+                    throw new MessagingException(message, e);
+                }
+
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/f3e32254/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
index 8d21496..b488b23 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
@@ -70,6 +70,8 @@ public class EventPublisher {
 
             topicPublisher.connect();
             topicPublisher.publish(message, retry);
+            topicPublisher.disconnect();
+
         }
     }
 }

Reply via email to