Add some synch blocks around some of the variables that could be set/read on multiple threads
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/1f7d6ad5 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/1f7d6ad5 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/1f7d6ad5 Branch: refs/heads/master Commit: 1f7d6ad5c95532a3523f940d9daf277c6a03f4d9 Parents: 7d30cc4 Author: Daniel Kulp <[email protected]> Authored: Tue Apr 1 16:53:17 2014 -0400 Committer: Daniel Kulp <[email protected]> Committed: Tue Apr 1 16:53:47 2014 -0400 ---------------------------------------------------------------------- .../apache/cxf/transport/jms/JMSConduit.java | 39 ++++++++++++-------- .../cxf/transport/jms/JMSConfiguration.java | 4 +- 2 files changed, 26 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/1f7d6ad5/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java index 95d6d78..27b0a58 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java @@ -35,6 +35,7 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageListener; import javax.jms.Session; +import javax.jms.TemporaryQueue; import org.apache.cxf.Bus; import org.apache.cxf.buslifecycle.BusLifeCycleListener; @@ -70,7 +71,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me private Map<String, Exchange> correlationMap = new ConcurrentHashMap<String, Exchange>(); private JMSListenerContainer jmsListener; private String conduitId; - private AtomicLong messageCount; + private final AtomicLong messageCount = new AtomicLong(0); private JMSBusLifeCycleListener listener; private Bus bus; private Connection connection; @@ -83,7 +84,6 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me bus = b; this.jmsConfig = jmsConfig; conduitId = UUID.randomUUID().toString().replaceAll("-", ""); - messageCount = new AtomicLong(0); } /** @@ -101,8 +101,14 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me MessageStreamUtil.closeStreams(msg); super.close(msg); } - - private synchronized void getJMSListener(Destination replyTo) { + private synchronized Connection getConnection() throws JMSException { + if (connection == null) { + connection = JMSFactory.createConnection(jmsConfig); + connection.start(); + } + return connection; + } + private synchronized void getJMSListener(Destination replyTo) throws JMSException { if (jmsListener != null) { return; } @@ -112,7 +118,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me // An option for this might be a good idea for people who do not plan to share queues. return; } - MessageListenerContainer container = new MessageListenerContainer(connection, replyTo, this); + MessageListenerContainer container = new MessageListenerContainer(getConnection(), replyTo, this); container.setMessageSelector(messageSelector); Executor executor = JMSFactory.createExecutor(bus, "jms-conduit"); container.setExecutor(executor); @@ -142,12 +148,9 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me ResourceCloser closer = new ResourceCloser(); try { - if (connection == null) { - connection = JMSFactory.createConnection(jmsConfig); - connection.start(); - } - Session session = closer.register(connection.createSession(jmsConfig.isSessionTransacted(), - Session.AUTO_ACKNOWLEDGE)); + Connection c = getConnection(); + Session session = closer.register(c.createSession(jmsConfig.isSessionTransacted(), + Session.AUTO_ACKNOWLEDGE)); if (exchange.isOneWay()) { sendMessage(request, outMessage, null, null, closer, session); @@ -160,14 +163,20 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me closer.close(); } } - - private void sendAndReceiveMessage(final Exchange exchange, final Object request, final Message outMessage, - ResourceCloser closer, - Session session) throws JMSException { + + private synchronized void setupReplyDestination(Session session) throws JMSException { if (staticReplyDestination == null) { staticReplyDestination = jmsConfig.getReplyDestination(session); getJMSListener(staticReplyDestination); } + } + + private void sendAndReceiveMessage(final Exchange exchange, final Object request, final Message outMessage, + ResourceCloser closer, + Session session) throws JMSException { + + setupReplyDestination(session); + JMSMessageHeadersType headers = getOrCreateJmsHeaders(outMessage); String userCID = headers.getJMSCorrelationID(); assertIsNotAsyncAndUserCID(exchange, userCID); http://git-wip-us.apache.org/repos/asf/cxf/blob/1f7d6ad5/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java index 00328ef..84321c6 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java @@ -335,7 +335,7 @@ public class JMSConfiguration { this.reconnectOnException = reconnectOnException; } - public ConnectionFactory getConnectionFactory() { + public synchronized ConnectionFactory getConnectionFactory() { if (connectionFactory == null) { connectionFactory = JMSFactory.getConnectionFactoryFromJndi(this); } @@ -396,7 +396,7 @@ public class JMSConfiguration { return destinationResolver.resolveDestinationName(session, userDestination, replyPubSubDomain); } - public Destination getReplyDestination(Session session) throws JMSException { + public synchronized Destination getReplyDestination(Session session) throws JMSException { if (replyDestinationDest == null) { replyDestinationDest = replyDestination == null ? session.createTemporaryQueue()
