Add some synch blocks around some of the variables that could be set/read on 
multiple threads


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/1f7d6ad5
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/1f7d6ad5
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/1f7d6ad5

Branch: refs/heads/master
Commit: 1f7d6ad5c95532a3523f940d9daf277c6a03f4d9
Parents: 7d30cc4
Author: Daniel Kulp <[email protected]>
Authored: Tue Apr 1 16:53:17 2014 -0400
Committer: Daniel Kulp <[email protected]>
Committed: Tue Apr 1 16:53:47 2014 -0400

----------------------------------------------------------------------
 .../apache/cxf/transport/jms/JMSConduit.java    | 39 ++++++++++++--------
 .../cxf/transport/jms/JMSConfiguration.java     |  4 +-
 2 files changed, 26 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/1f7d6ad5/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
index 95d6d78..27b0a58 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
@@ -35,6 +35,7 @@ import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
 import javax.jms.Session;
+import javax.jms.TemporaryQueue;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.buslifecycle.BusLifeCycleListener;
@@ -70,7 +71,7 @@ public class JMSConduit extends AbstractConduit implements 
JMSExchangeSender, Me
     private Map<String, Exchange> correlationMap = new 
ConcurrentHashMap<String, Exchange>();
     private JMSListenerContainer jmsListener;
     private String conduitId;
-    private AtomicLong messageCount;
+    private final AtomicLong messageCount = new AtomicLong(0);
     private JMSBusLifeCycleListener listener;
     private Bus bus;
     private Connection connection;
@@ -83,7 +84,6 @@ public class JMSConduit extends AbstractConduit implements 
JMSExchangeSender, Me
         bus = b;
         this.jmsConfig = jmsConfig;
         conduitId = UUID.randomUUID().toString().replaceAll("-", "");
-        messageCount = new AtomicLong(0);
     }
     
     /**
@@ -101,8 +101,14 @@ public class JMSConduit extends AbstractConduit implements 
JMSExchangeSender, Me
         MessageStreamUtil.closeStreams(msg);
         super.close(msg);
     }
-
-    private synchronized void getJMSListener(Destination replyTo) {
+    private synchronized Connection getConnection() throws JMSException {
+        if (connection == null) {
+            connection = JMSFactory.createConnection(jmsConfig);
+            connection.start();
+        }
+        return connection;
+    }
+    private synchronized void getJMSListener(Destination replyTo) throws 
JMSException {
         if (jmsListener != null) {
             return;
         }
@@ -112,7 +118,7 @@ public class JMSConduit extends AbstractConduit implements 
JMSExchangeSender, Me
             // An option for this might be a good idea for people who do not 
plan to share queues.
             return;
         }
-        MessageListenerContainer container = new 
MessageListenerContainer(connection, replyTo, this);
+        MessageListenerContainer container = new 
MessageListenerContainer(getConnection(), replyTo, this);
         container.setMessageSelector(messageSelector);
         Executor executor = JMSFactory.createExecutor(bus, "jms-conduit");
         container.setExecutor(executor);
@@ -142,12 +148,9 @@ public class JMSConduit extends AbstractConduit implements 
JMSExchangeSender, Me
 
         ResourceCloser closer = new ResourceCloser();
         try {
-            if (connection == null) {
-                connection = JMSFactory.createConnection(jmsConfig);
-                connection.start();
-            }
-            Session session = 
closer.register(connection.createSession(jmsConfig.isSessionTransacted(), 
-                                                                       
Session.AUTO_ACKNOWLEDGE));
+            Connection c = getConnection();
+            Session session = 
closer.register(c.createSession(jmsConfig.isSessionTransacted(), 
+                                                              
Session.AUTO_ACKNOWLEDGE));
             
             if (exchange.isOneWay()) {
                 sendMessage(request, outMessage, null, null, closer, session);
@@ -160,14 +163,20 @@ public class JMSConduit extends AbstractConduit 
implements JMSExchangeSender, Me
             closer.close();
         }
     }
-
-    private void sendAndReceiveMessage(final Exchange exchange, final Object 
request, final Message outMessage,
-                                ResourceCloser closer,
-                                Session session) throws JMSException {
+    
+    private synchronized void setupReplyDestination(Session session) throws 
JMSException {
         if (staticReplyDestination == null) {
             staticReplyDestination = jmsConfig.getReplyDestination(session);
             getJMSListener(staticReplyDestination);
         }
+    }
+
+    private void sendAndReceiveMessage(final Exchange exchange, final Object 
request, final Message outMessage,
+                                ResourceCloser closer,
+                                Session session) throws JMSException {
+        
+        setupReplyDestination(session);
+        
         JMSMessageHeadersType headers = getOrCreateJmsHeaders(outMessage);
         String userCID = headers.getJMSCorrelationID();
         assertIsNotAsyncAndUserCID(exchange, userCID);

http://git-wip-us.apache.org/repos/asf/cxf/blob/1f7d6ad5/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
index 00328ef..84321c6 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
@@ -335,7 +335,7 @@ public class JMSConfiguration {
         this.reconnectOnException = reconnectOnException;
     }
 
-    public ConnectionFactory getConnectionFactory() {
+    public synchronized ConnectionFactory getConnectionFactory() {
         if (connectionFactory == null) {
             connectionFactory = JMSFactory.getConnectionFactoryFromJndi(this);
         }
@@ -396,7 +396,7 @@ public class JMSConfiguration {
         return destinationResolver.resolveDestinationName(session, 
userDestination, replyPubSubDomain);
     }
     
-    public Destination getReplyDestination(Session session) throws 
JMSException {
+    public synchronized Destination getReplyDestination(Session session) 
throws JMSException {
         if (replyDestinationDest == null) {
             replyDestinationDest = replyDestination == null 
                 ? session.createTemporaryQueue()

Reply via email to