Author: ccustine
Date: Wed May 13 06:53:49 2009
New Revision: 774238

URL: http://svn.apache.org/viewvc?rev=774238&view=rev
Log:
SMXCOMP-507 - smx-jms in-out provider w/unspecified replyTo queue and 
Pooled/SingleConnectionFactory leaks one temp replyTo queue per message

Modified:
    
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
    
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java

Modified: 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=774238&r1=774237&r2=774238&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
 Wed May 13 06:53:49 2009
@@ -39,6 +39,8 @@
 import javax.jms.QueueSender;
 import javax.jms.QueueSession;
 import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicConnectionFactory;
@@ -97,7 +99,7 @@
     private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
 
     private Destination replyDestination;
-    
+
     /**
      * @deprecated The stateless property is no longer used.
      * For backward compatibility purpose only.
@@ -646,6 +648,8 @@
         // to indicate we will use the listener container
         boolean asynchronous = false;
         boolean useSelector = true;
+        // Indicate whether the replyTo destination is temporary or 
explicitely specified replyTo destination
+        boolean isReplyDestTemporary = false;
         Destination replyDest = chooseDestination(exchange, in, session, 
replyDestinationChooser, null);
         if (replyDest == null) {
             useSelector = false;
@@ -659,6 +663,7 @@
                 } else {
                     replyDest = session.createTemporaryQueue();
                 }
+                isReplyDestTemporary = true;
             }
         }
         // Create message and send it
@@ -719,6 +724,15 @@
             } else {
                 send(exchange);
             }
+
+            // delete temporary queue/topic immediately to avoid accumulation 
in case that the connection is never destroyed
+            if (isReplyDestTemporary) {
+                if (isPubSubDomain()) {
+                    ((TemporaryTopic)replyDest).delete();
+                } else {
+                    ((TemporaryQueue)replyDest).delete();
+                }
+            }
         }
     }
 

Modified: 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
URL: 
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java?rev=774238&r1=774237&r2=774238&view=diff
==============================================================================
--- 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
 (original)
+++ 
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
 Wed May 13 06:53:49 2009
@@ -66,31 +66,39 @@
         InOut inout = null;
         boolean result = false;
         DataHandler dh = null;
-        
+
         // Test successful return
-        inout = client.createInOutExchange();
-        inout.setService(new QName("http://jms.servicemix.org/Test";, 
"Provider"));
-        inout.getInMessage().setContent(new 
StringSource("<hello>world</hello>"));
-        dh = new DataHandler(new ByteArrayDataSource("myImage", 
"application/octet-stream"));
-        inout.getInMessage().addAttachment("myImage", dh);
-        result = client.sendSync(inout);
-        assertTrue(result);
-        NormalizedMessage out = inout.getOutMessage();
-        assertNotNull(out);
-        Source src = out.getContent();
-        assertNotNull(src);
-        dh = out.getAttachment("myImage");
-        assertNotNull(dh);
-        
+        Source src = null;
+        for (int i = 0; i < 2; i++) {
+            inout = client.createInOutExchange();
+            inout.setService(new QName("http://jms.servicemix.org/Test";, 
"Provider"));
+            inout.getInMessage().setContent(new 
StringSource("<hello>world</hello>"));
+            dh = new DataHandler(new ByteArrayDataSource("myImage", 
"application/octet-stream"));
+            inout.getInMessage().addAttachment("myImage", dh);
+            result = client.sendSync(inout);
+            assertTrue(result);
+            NormalizedMessage out = inout.getOutMessage();
+            assertNotNull(out);
+            src = out.getContent();
+            assertNotNull(src);
+            dh = out.getAttachment("myImage");
+            assertNotNull(dh);
+        }
+
+        // Ensure that only one temporary replyTo queue was created for 
multiple messages sent
+//
+//      Not working with smx-3.2 because of AMQ 4.1.1       
+//      assertEquals(0, countBrokerTemporaryQueues());
+
         logger.info(new SourceTransformer().toString(src));
 
-        // Test fault return 
+        // Test fault return
         container.deactivateComponent("receiver");
         ReturnFaultComponent fault = new ReturnFaultComponent();
         ActivationSpec asFault = new ActivationSpec("receiver", fault);
         asFault.setService(new QName("http://jms.servicemix.org/Test";, 
"Echo"));
         container.activateComponent(asFault);
-        
+
         inout = client.createInOutExchange();
         inout.setService(new QName("http://jms.servicemix.org/Test";, 
"Provider"));
         inout.getInMessage().setContent(new 
StringSource("<hello>world</hello>"));
@@ -106,7 +114,7 @@
         ActivationSpec asError = new ActivationSpec("receiver", error);
         asError.setService(new QName("http://jms.servicemix.org/Test";, 
"Echo"));
         container.activateComponent(asError);
-        
+
         inout = client.createInOutExchange();
         inout.setService(new QName("http://jms.servicemix.org/Test";, 
"Provider"));
         inout.getInMessage().setContent(new 
StringSource("<hello>world</hello>"));
@@ -138,7 +146,7 @@
                 super.onMessageExchange(exchange);
             }
         };
-        
+
         ActivationSpec asError = new ActivationSpec("receiver", error);
         asError.setService(new QName("http://jms.servicemix.org/Test";, 
"Echo"));
         container.activateComponent(asError);
@@ -157,7 +165,7 @@
         }
 
         assertTrue("The message was never processed by servicemix-jms", 
receiveCount[0] > 0);
-        
+
         // Deactivate the JMS component so that it stops
         // trying to get the message from the queue
         container.deactivateComponent("servicemix-jms");
@@ -283,4 +291,11 @@
         endpoint.setDestinationName("destination");
         return endpoint;
     }
+
+/*
+    This test does not work on SMX-3.2 because it uses AMQ 4.1.1
+    private int countBrokerTemporaryQueues() throws Exception {
+        return ((RegionBroker) 
broker.getRegionBroker()).getTempQueueRegion().getDestinationMap().size();
+    }
+*/
 }


Reply via email to