Author: ccustine
Date: Wed May 13 06:53:49 2009
New Revision: 774238
URL: http://svn.apache.org/viewvc?rev=774238&view=rev
Log:
SMXCOMP-507 - smx-jms in-out provider w/unspecified replyTo queue and
Pooled/SingleConnectionFactory leaks one temp replyTo queue per message
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=774238&r1=774237&r2=774238&view=diff
==============================================================================
---
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
(original)
+++
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
Wed May 13 06:53:49 2009
@@ -39,6 +39,8 @@
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
@@ -97,7 +99,7 @@
private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
private Destination replyDestination;
-
+
/**
* @deprecated The stateless property is no longer used.
* For backward compatibility purpose only.
@@ -646,6 +648,8 @@
// to indicate we will use the listener container
boolean asynchronous = false;
boolean useSelector = true;
+ // Indicate whether the replyTo destination is temporary or
explicitely specified replyTo destination
+ boolean isReplyDestTemporary = false;
Destination replyDest = chooseDestination(exchange, in, session,
replyDestinationChooser, null);
if (replyDest == null) {
useSelector = false;
@@ -659,6 +663,7 @@
} else {
replyDest = session.createTemporaryQueue();
}
+ isReplyDestTemporary = true;
}
}
// Create message and send it
@@ -719,6 +724,15 @@
} else {
send(exchange);
}
+
+ // delete temporary queue/topic immediately to avoid accumulation
in case that the connection is never destroyed
+ if (isReplyDestTemporary) {
+ if (isPubSubDomain()) {
+ ((TemporaryTopic)replyDest).delete();
+ } else {
+ ((TemporaryQueue)replyDest).delete();
+ }
+ }
}
}
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
URL:
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java?rev=774238&r1=774237&r2=774238&view=diff
==============================================================================
---
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
(original)
+++
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsProviderConsumerEndpointTest.java
Wed May 13 06:53:49 2009
@@ -66,31 +66,39 @@
InOut inout = null;
boolean result = false;
DataHandler dh = null;
-
+
// Test successful return
- inout = client.createInOutExchange();
- inout.setService(new QName("http://jms.servicemix.org/Test",
"Provider"));
- inout.getInMessage().setContent(new
StringSource("<hello>world</hello>"));
- dh = new DataHandler(new ByteArrayDataSource("myImage",
"application/octet-stream"));
- inout.getInMessage().addAttachment("myImage", dh);
- result = client.sendSync(inout);
- assertTrue(result);
- NormalizedMessage out = inout.getOutMessage();
- assertNotNull(out);
- Source src = out.getContent();
- assertNotNull(src);
- dh = out.getAttachment("myImage");
- assertNotNull(dh);
-
+ Source src = null;
+ for (int i = 0; i < 2; i++) {
+ inout = client.createInOutExchange();
+ inout.setService(new QName("http://jms.servicemix.org/Test",
"Provider"));
+ inout.getInMessage().setContent(new
StringSource("<hello>world</hello>"));
+ dh = new DataHandler(new ByteArrayDataSource("myImage",
"application/octet-stream"));
+ inout.getInMessage().addAttachment("myImage", dh);
+ result = client.sendSync(inout);
+ assertTrue(result);
+ NormalizedMessage out = inout.getOutMessage();
+ assertNotNull(out);
+ src = out.getContent();
+ assertNotNull(src);
+ dh = out.getAttachment("myImage");
+ assertNotNull(dh);
+ }
+
+ // Ensure that only one temporary replyTo queue was created for
multiple messages sent
+//
+// Not working with smx-3.2 because of AMQ 4.1.1
+// assertEquals(0, countBrokerTemporaryQueues());
+
logger.info(new SourceTransformer().toString(src));
- // Test fault return
+ // Test fault return
container.deactivateComponent("receiver");
ReturnFaultComponent fault = new ReturnFaultComponent();
ActivationSpec asFault = new ActivationSpec("receiver", fault);
asFault.setService(new QName("http://jms.servicemix.org/Test",
"Echo"));
container.activateComponent(asFault);
-
+
inout = client.createInOutExchange();
inout.setService(new QName("http://jms.servicemix.org/Test",
"Provider"));
inout.getInMessage().setContent(new
StringSource("<hello>world</hello>"));
@@ -106,7 +114,7 @@
ActivationSpec asError = new ActivationSpec("receiver", error);
asError.setService(new QName("http://jms.servicemix.org/Test",
"Echo"));
container.activateComponent(asError);
-
+
inout = client.createInOutExchange();
inout.setService(new QName("http://jms.servicemix.org/Test",
"Provider"));
inout.getInMessage().setContent(new
StringSource("<hello>world</hello>"));
@@ -138,7 +146,7 @@
super.onMessageExchange(exchange);
}
};
-
+
ActivationSpec asError = new ActivationSpec("receiver", error);
asError.setService(new QName("http://jms.servicemix.org/Test",
"Echo"));
container.activateComponent(asError);
@@ -157,7 +165,7 @@
}
assertTrue("The message was never processed by servicemix-jms",
receiveCount[0] > 0);
-
+
// Deactivate the JMS component so that it stops
// trying to get the message from the queue
container.deactivateComponent("servicemix-jms");
@@ -283,4 +291,11 @@
endpoint.setDestinationName("destination");
return endpoint;
}
+
+/*
+ This test does not work on SMX-3.2 because it uses AMQ 4.1.1
+ private int countBrokerTemporaryQueues() throws Exception {
+ return ((RegionBroker)
broker.getRegionBroker()).getTempQueueRegion().getDestinationMap().size();
+ }
+*/
}