Author: cschneider
Date: Thu Jan 6 11:45:50 2011
New Revision: 1055837
URL: http://svn.apache.org/viewvc?rev=1055837&view=rev
Log:
CXF-3230 delete jms temp queue after request
Modified:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
Modified:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL:
http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=1055837&r1=1055836&r2=1055837&view=diff
==============================================================================
---
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
(original)
+++
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
Thu Jan 6 11:45:50 2011
@@ -34,6 +34,7 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Session;
+import javax.jms.TemporaryQueue;
import org.apache.cxf.Bus;
import org.apache.cxf.buslifecycle.BusLifeCycleListener;
@@ -155,12 +156,7 @@ public class JMSConduit extends Abstract
throw new ConfigurationException(msg);
}
- JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
- .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
- if (headers == null) {
- headers = new JMSMessageHeadersType();
- outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, headers);
- }
+ JMSMessageHeadersType headers = getOrCreateJmsHeaders(outMessage);
String replyTo = headers.getJMSReplyTo();
if (replyTo == null) {
replyTo = jmsConfig.getReplyDestination();
@@ -168,28 +164,8 @@ public class JMSConduit extends Abstract
final JmsTemplate jmsTemplate =
JMSFactory.createJmsTemplate(jmsConfig, headers);
String userCID = headers.getJMSCorrelationID();
- boolean messageIdPattern = false;
- String correlationId = null;
- if (!exchange.isOneWay()) {
- if (userCID != null) {
- correlationId = userCID;
- } else if (!jmsConfig.isSetConduitSelectorPrefix()
- && (exchange.isSynchronous() || exchange.isOneWay())
- && (!jmsConfig.isSetUseConduitIdSelector()
- || !jmsConfig.isUseConduitIdSelector())) {
- messageIdPattern = true;
- } else {
- if (jmsConfig.isUseConduitIdSelector()) {
- correlationId = JMSUtils.createCorrelationId(jmsConfig
- .getConduitSelectorPrefix()
- + conduitId,
messageCount
- .incrementAndGet());
- } else {
- correlationId = JMSUtils.createCorrelationId(jmsConfig
- .getConduitSelectorPrefix(),
messageCount.incrementAndGet());
- }
- }
- }
+
+ String correlationId = createCorrelationId(exchange, userCID);
Destination replyToDestination = null;
if (!exchange.isOneWay() || !jmsConfig.isEnforceSpec() &&
isSetReplyTo(outMessage)
@@ -248,7 +224,7 @@ public class JMSConduit extends Abstract
if (!exchange.isOneWay()) {
synchronized (exchange) {
jmsTemplate.send(jmsConfig.getTargetDestination(),
messageCreator);
- if (messageIdPattern) {
+ if (correlationId == null) {
correlationId = messageCreator.getMessageID();
}
headers.setJMSMessageID(messageCreator.getMessageID());
@@ -263,6 +239,16 @@ public class JMSConduit extends Abstract
} else {
doReplyMessage(exchange, replyMessage);
}
+
+ // TODO How do we delete the temp queue in case of an
async request
+ // or is async with a temp queue not possible ?
+ if (replyToDestination instanceof TemporaryQueue) {
+ try {
+ ((TemporaryQueue)replyToDestination).delete();
+ } catch (JMSException e) {
+ throw new RuntimeException("Unable to remove
temporary queue", e);
+ }
+ }
}
}
} else {
@@ -271,6 +257,38 @@ public class JMSConduit extends Abstract
}
}
+ private String createCorrelationId(final Exchange exchange, String
userCID) {
+ String correlationId = null;
+ if (!exchange.isOneWay()) {
+ if (userCID != null) {
+ correlationId = userCID;
+ } else if (!jmsConfig.isSetConduitSelectorPrefix()
+ && (exchange.isSynchronous() || exchange.isOneWay())
+ && (!jmsConfig.isSetUseConduitIdSelector()
+ || !jmsConfig.isUseConduitIdSelector())) {
+ // in this case the correlation id will be set to
+ // the message id later
+ correlationId = null;
+ } else {
+ String prefix = (jmsConfig.isUseConduitIdSelector())
+ ? jmsConfig.getConduitSelectorPrefix() + conduitId
+ : jmsConfig.getConduitSelectorPrefix();
+ correlationId = JMSUtils.createCorrelationId(prefix,
messageCount.incrementAndGet());
+ }
+ }
+ return correlationId;
+ }
+
+ private JMSMessageHeadersType getOrCreateJmsHeaders(final Message
outMessage) {
+ JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
+ .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+ if (headers == null) {
+ headers = new JMSMessageHeadersType();
+ outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, headers);
+ }
+ return headers;
+ }
+
static class JMSBusLifeCycleListener implements BusLifeCycleListener {
final WeakReference<JMSConduit> ref;
BusLifeCycleManager blcm;