This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 144ff8c  [CXF-7023] Add oneSessionPerConnection property to JMS 
transport
144ff8c is described below

commit 144ff8c9fa7a4495ad4825ba47cef74af563a299
Author: Simon Marti <simon.ma...@inventage.com>
AuthorDate: Thu Dec 14 14:44:07 2017 +0100

    [CXF-7023] Add oneSessionPerConnection property to JMS transport
---
 .../cxf/transport/jms/BackChannelConduit.java      | 20 +++++-
 .../org/apache/cxf/transport/jms/JMSConduit.java   | 41 +++++++++---
 .../apache/cxf/transport/jms/JMSConfigFactory.java |  2 +
 .../apache/cxf/transport/jms/JMSConfiguration.java |  9 +++
 .../apache/cxf/transport/jms/JMSDestination.java   | 46 +++++++++-----
 .../apache/cxf/transport/jms/uri/JMSEndpoint.java  | 13 ++++
 .../jms/util/PollingMessageListenerContainer.java  | 74 ++++++++++++++++++++--
 .../jms/testsuite/testcases/SoapJmsSpecTest.java   |  2 +-
 8 files changed, 174 insertions(+), 33 deletions(-)

diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
index a83984b..5a95501 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
@@ -54,14 +54,19 @@ class BackChannelConduit extends AbstractConduit implements 
JMSExchangeSender {
     private static final Logger LOG = 
LogUtils.getL7dLogger(BackChannelConduit.class);
     private JMSConfiguration jmsConfig;
     private Message inMessage;
-    private Connection connection;
+    private Connection persistentConnection;
 
     BackChannelConduit(Message inMessage, JMSConfiguration jmsConfig, 
Connection connection) {
         super(EndpointReferenceUtils.getAnonymousEndpointReference());
         this.inMessage = inMessage;
         this.jmsConfig = jmsConfig;
-        this.connection = connection;
+        this.persistentConnection = connection;
     }
+
+    BackChannelConduit(Message inMessage, JMSConfiguration jmsConfig) {
+        this(inMessage, jmsConfig, null);
+    }
+
     @Override
     public void close(Message msg) throws IOException {
         MessageStreamUtil.closeStreams(msg);
@@ -121,6 +126,14 @@ class BackChannelConduit extends AbstractConduit 
implements JMSExchangeSender {
 
     private void send(final Message outMessage, final Object replyObj, 
ResourceCloser closer)
         throws JMSException {
+        Connection connection;
+
+        if (persistentConnection == null) {
+            connection = 
closer.register(JMSFactory.createConnection(jmsConfig));
+        } else {
+            connection = this.persistentConnection;
+        }
+
         Session session = closer.register(connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE));
 
         JMSMessageHeadersType outProps = 
(JMSMessageHeadersType)outMessage.get(JMS_SERVER_RESPONSE_HEADERS);
@@ -178,6 +191,7 @@ class BackChannelConduit extends AbstractConduit implements 
JMSExchangeSender {
         
messageProperties.setJMSDeliveryMode(inMessageProperties.getJMSDeliveryMode());
         messageProperties.setJMSPriority(inMessageProperties.getJMSPriority());
         
messageProperties.setSOAPJMSRequestURI(inMessageProperties.getSOAPJMSRequestURI());
+        
messageProperties.setSOAPJMSSOAPAction(inMessageProperties.getSOAPJMSSOAPAction());
         messageProperties.setSOAPJMSBindingVersion("1.0");
     }
 
@@ -220,4 +234,4 @@ class BackChannelConduit extends AbstractConduit implements 
JMSExchangeSender {
             : request.getJMSCorrelationID();
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
index 50db27d..d627c16 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
@@ -47,10 +47,12 @@ import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.security.SecurityContext;
 import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.transport.jms.util.AbstractMessageListenerContainer;
 import org.apache.cxf.transport.jms.util.JMSListenerContainer;
 import org.apache.cxf.transport.jms.util.JMSSender;
 import org.apache.cxf.transport.jms.util.JMSUtil;
 import org.apache.cxf.transport.jms.util.MessageListenerContainer;
+import org.apache.cxf.transport.jms.util.PollingMessageListenerContainer;
 import org.apache.cxf.transport.jms.util.ResourceCloser;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
@@ -157,8 +159,16 @@ public class JMSConduit extends AbstractConduit implements 
JMSExchangeSender, Me
         assertIsNotTextMessageAndMtom(outMessage);
 
         try (ResourceCloser closer = new ResourceCloser()) {
-            Connection c = getConnection();
-            Session session = closer.register(c.createSession(false,
+            Connection c;
+
+            if (jmsConfig.isOneSessionPerConnection()) {
+                c = closer.register(JMSFactory.createConnection(jmsConfig));
+                c.start();
+            } else {
+                c = getConnection();
+            }
+
+            Session session = closer.register(c.createSession(false, 
                                                               
Session.AUTO_ACKNOWLEDGE));
 
             if (exchange.isOneWay()) {
@@ -168,9 +178,11 @@ public class JMSConduit extends AbstractConduit implements 
JMSExchangeSender, Me
             }
         } catch (JMSException e) {
             // Close connection so it will be refreshed on next try
-            ResourceCloser.close(connection);
-            this.connection = null;
-            jmsConfig.resetCachedReplyDestination();
+            if (!jmsConfig.isOneSessionPerConnection()) {
+                ResourceCloser.close(connection);
+                this.connection = null;
+                jmsConfig.resetCachedReplyDestination();
+            }
             this.staticReplyDestination = null;
             if (this.jmsListener != null) {
                 this.jmsListener.shutdown();
@@ -192,14 +204,27 @@ public class JMSConduit extends AbstractConduit 
implements JMSExchangeSender, Me
                     staticReplyDestination = 
jmsConfig.getReplyDestination(session);
 
                     String messageSelector = 
JMSFactory.getMessageSelector(jmsConfig, conduitId);
+                    if (jmsConfig.getMessageSelector() != null) {
+                        messageSelector += (messageSelector != null && 
!messageSelector.isEmpty() ? " AND " : "")
+                                + jmsConfig.getMessageSelector();
+                    }
                     if (messageSelector == null && 
!jmsConfig.isPubSubDomain()) {
                         // Do not open listener without selector on a queue as 
we then can not share the queue.
                         // An option for this might be a good idea for people 
who do not plan to share queues.
                         return;
                     }
-                    MessageListenerContainer container = new 
MessageListenerContainer(getConnection(),
-                                                                               
       staticReplyDestination,
-                                                                               
       this);
+
+                    AbstractMessageListenerContainer container;
+
+                    if (jmsConfig.isOneSessionPerConnection()) {
+                        container = new 
PollingMessageListenerContainer(jmsConfig, true, this);
+                    } else {
+                        container = new 
MessageListenerContainer(getConnection(), staticReplyDestination, this);
+                    }
+
+                    
container.setTransactionManager(jmsConfig.getTransactionManager());
+                    container.setTransacted(jmsConfig.isSessionTransacted());
+                    
container.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName());
                     container.setMessageSelector(messageSelector);
                     Object executor = 
bus.getProperty(JMSFactory.JMS_CONDUIT_EXECUTOR);
                     if (executor instanceof Executor) {
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
index 48953f5..b7866cf 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
@@ -86,6 +86,8 @@ public final class JMSConfigFactory {
         jmsConfig.setUserName(endpoint.getUsername());
         jmsConfig.setPassword(endpoint.getPassword());
         jmsConfig.setConcurrentConsumers(endpoint.getConcurrentConsumers());
+        
jmsConfig.setOneSessionPerConnection(endpoint.isOneSessionPerConnection());
+        jmsConfig.setMessageSelector(endpoint.getMessageSelector());
 
         TransactionManager tm = getTransactionManager(bus, endpoint);
         jmsConfig.setTransactionManager(tm);
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
index 434cb0c..bead3e7 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
@@ -91,6 +91,7 @@ public class JMSConfiguration {
     private boolean useConduitIdSelector = true;
     private String conduitSelectorPrefix;
     private boolean jmsProviderTibcoEms;
+    private boolean oneSessionPerConnection;
 
     private TransactionManager transactionManager;
 
@@ -432,6 +433,14 @@ public class JMSConfiguration {
         this.jmsProviderTibcoEms = jmsProviderTibcoEms;
     }
 
+    public boolean isOneSessionPerConnection() {
+        return oneSessionPerConnection;
+    }
+
+    public void setOneSessionPerConnection(boolean oneSessionPerConnection) {
+        this.oneSessionPerConnection = oneSessionPerConnection;
+    }
+
     public static Destination resolveOrCreateDestination(final Session session,
                                                          final 
DestinationResolver resolver,
                                                          final String 
replyToDestinationName,
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index b85411e..c72931b 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -90,7 +90,13 @@ public class JMSDestination extends 
AbstractMultiplexDestination implements Mess
             && !robust) {
             return null;
         }
-        return new BackChannelConduit(inMessage, jmsConfig, connection);
+
+        if (jmsConfig.isOneSessionPerConnection()) {
+            return new BackChannelConduit(inMessage, jmsConfig);
+        } else {
+            return new BackChannelConduit(inMessage, jmsConfig, connection);
+        }
+
     }
 
     /**
@@ -105,14 +111,15 @@ public class JMSDestination extends 
AbstractMultiplexDestination implements Mess
             if (e.getCause() != null && 
InvalidClientIDException.class.isInstance(e.getCause())) {
                 throw e;
             }
-            // If first connect fails we will try to establish the connection 
in the background 
-            new Thread(new Runnable() {
-
-                @Override
-                public void run() {
-                    restartConnection();
-                }
-            }).start();
+            if (!jmsConfig.isOneSessionPerConnection()) {
+                // If first connect fails we will try to establish the 
connection in the background
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        restartConnection();
+                    }
+                }).start();
+            }
         }
     }
 
@@ -120,7 +127,6 @@ public class JMSDestination extends 
AbstractMultiplexDestination implements Mess
     private JMSListenerContainer createTargetDestinationListener() {
         Session session = null;
         try {
-            connection = JMSFactory.createConnection(jmsConfig);
             ExceptionListener exListener = new ExceptionListener() {
                 public void onException(JMSException exception) {
                     if (!shutdown) {
@@ -129,12 +135,17 @@ public class JMSDestination extends 
AbstractMultiplexDestination implements Mess
                     }
                 }
             };
-            session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-            Destination destination = jmsConfig.getTargetDestination(session);
+            
+            PollingMessageListenerContainer container;
+            if (!jmsConfig.isOneSessionPerConnection()) {
+                connection = JMSFactory.createConnection(jmsConfig);
+                session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                Destination destination = 
jmsConfig.getTargetDestination(session);
+                container = new PollingMessageListenerContainer(connection, 
destination, this, exListener);
+            } else {
+                container = new PollingMessageListenerContainer(jmsConfig, 
false, this);
+            }
 
-            PollingMessageListenerContainer container = new 
PollingMessageListenerContainer(connection,
-                                                                               
             destination, 
-                                                                               
             this, exListener);
             
container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
             container.setTransactionManager(jmsConfig.getTransactionManager());
             container.setMessageSelector(jmsConfig.getMessageSelector());
@@ -149,7 +160,10 @@ public class JMSDestination extends 
AbstractMultiplexDestination implements Mess
             container.setJndiEnvironment(jmsConfig.getJndiEnvironment());
             container.start();
             suspendedContinuations.setListenerContainer(container);
-            connection.start();
+
+            if (!jmsConfig.isOneSessionPerConnection()) {
+                connection.start();
+            }
             return container;
         } catch (JMSException e) {
             ResourceCloser.close(connection);
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
index 89c12fb..dd004b8 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
@@ -88,6 +88,7 @@ public class JMSEndpoint {
     private int concurrentConsumers = 1;
     private String messageSelector;
     private int retryInterval = 5000;
+    private boolean oneSessionPerConnection;
 
     /**
      * @param uri
@@ -499,4 +500,16 @@ public class JMSEndpoint {
         this.retryInterval = Integer.valueOf(retryInterval);
     }
     
+    public boolean isOneSessionPerConnection() {
+        return oneSessionPerConnection;
+    }
+    
+    public void setOneSessionPerConnection(String oneSessionPerConnection) {
+        this.oneSessionPerConnection = 
Boolean.valueOf(oneSessionPerConnection);
+    }
+    
+    public void setOneSessionPerConnection(boolean oneSessionPerConnection) {
+        this.oneSessionPerConnection = oneSessionPerConnection;
+    }
+
 }
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index 9edd0da..de2c57f 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -34,11 +34,23 @@ import javax.transaction.Status;
 import javax.transaction.Transaction;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.jms.JMSConfiguration;
+import org.apache.cxf.transport.jms.JMSFactory;
 
 public class PollingMessageListenerContainer extends 
AbstractMessageListenerContainer {
     private static final Logger LOG = 
LogUtils.getL7dLogger(PollingMessageListenerContainer.class);
     private ExceptionListener exceptionListener;
 
+    private JMSConfiguration jmsConfig;
+    private boolean reply;
+
+    public PollingMessageListenerContainer(JMSConfiguration jmsConfig, boolean 
isReply,
+                                           MessageListener listenerHandler) {
+        this.jmsConfig = jmsConfig;
+        this.reply = isReply;
+        this.listenerHandler = listenerHandler;
+    }
+
     public PollingMessageListenerContainer(Connection connection, Destination 
destination,
                                            MessageListener listenerHandler, 
ExceptionListener exceptionListener) {
         this.connection = connection;
@@ -55,9 +67,16 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
             while (running) {
                 try (ResourceCloser closer = new ResourceCloser()) {
                     closer.register(createInitialContext());
-                    // Create session early to optimize performance            
    // In
+                    Connection connection;
+                    if (jmsConfig != null && 
jmsConfig.isOneSessionPerConnection()) {
+                        connection = closer.register(createConnection());
+                    } else {
+                        connection = 
PollingMessageListenerContainer.this.connection;
+                    }
+                    // Create session early to optimize performance
                     session = 
closer.register(connection.createSession(transacted, acknowledgeMode));
-                    MessageConsumer consumer = 
closer.register(createConsumer(session));
+                    MessageConsumer consumer = 
closer.register(createConsumer(connection, session));
+
                     while (running) {
                         Message message = consumer.receive(1000);
                         try {
@@ -108,12 +127,20 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
                         throw new IllegalStateException("External transactions 
are not supported in XAPoller");
                     }
                     transactionManager.begin();
+
+                    Connection connection;
+                    if (getConnection() == null) {
+                        connection = closer.register(createConnection());
+                    } else {
+                        connection = getConnection();
+                    }
+
                     /*
                      * Create session inside transaction to give it the
                      * chance to enlist itself as a resource
                      */
                     Session session = 
closer.register(connection.createSession(transacted, acknowledgeMode));
-                    MessageConsumer consumer = 
closer.register(createConsumer(session));
+                    MessageConsumer consumer = 
closer.register(createConsumer(connection, session));
                     Message message = consumer.receive(1000);
                     try {
                         if (message != null) {
@@ -122,7 +149,7 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
                         transactionManager.commit();
                     } catch (Throwable e) {
                         LOG.log(Level.WARNING, "Exception while processing jms 
message in cxf. Rolling back", e);
-                        safeRollBack(session);
+                        safeRollBack();
                     }
                 } catch (Throwable e) {
                     handleException(e);
@@ -131,7 +158,7 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
 
         }
 
-        protected void safeRollBack(Session session) {
+        private void safeRollBack() {
             try {
                 transactionManager.rollback();
             } catch (Throwable e) {
@@ -141,7 +168,31 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
 
     }
 
+    private MessageConsumer createConsumer(final Connection connection, final 
Session session)
+            throws JMSException {
+        final MessageConsumer consumer;
+        
+        if (jmsConfig != null && jmsConfig.isOneSessionPerConnection()) {
+            Destination destination;
+            if (!isReply()) {
+                destination = jmsConfig.getTargetDestination(session);
+            } else {
+                destination = jmsConfig.getReplyDestination(session);
+            }
+            consumer = createConsumer(destination, session);
+            connection.start();
+        } else {
+            consumer = createConsumer(session);
+        }
+        
+        return consumer;
+    }
+
     private MessageConsumer createConsumer(Session session) throws 
JMSException {
+        return createConsumer(this.destination, session);
+    }
+
+    private MessageConsumer createConsumer(Destination destination, Session 
session) throws JMSException {
         if (durableSubscriptionName != null && destination instanceof Topic) {
             return session.createDurableSubscriber((Topic)destination, 
durableSubscriptionName,
                                                    messageSelector, 
pubSubNoLocal);
@@ -161,6 +212,19 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
         this.exceptionListener.onException(wrapped);
     }
 
+    private boolean isReply() {
+        return reply;
+    }
+
+    private Connection createConnection() {
+        try {
+            return JMSFactory.createConnection(jmsConfig);
+        } catch (JMSException e) {
+            handleException(e);
+            throw JMSUtil.convertJmsException(e);
+        }
+    }
+
     @Override
     public void start() {
         if (running) {
diff --git 
a/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java
 
b/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java
index 12beef5..7c78b6d 100644
--- 
a/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java
+++ 
b/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java
@@ -107,7 +107,7 @@ public class SoapJmsSpecTest extends AbstractVmJMSTest {
         JMSMessageHeadersType responseHeader = 
(JMSMessageHeadersType)responseContext
             .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
         Assert.assertEquals("1.0", responseHeader.getSOAPJMSBindingVersion());
-        Assert.assertEquals(null, responseHeader.getSOAPJMSSOAPAction());
+        Assert.assertEquals("\"test\"", responseHeader.getSOAPJMSSOAPAction());
         Assert.assertEquals(DeliveryMode.PERSISTENT, 
responseHeader.getJMSDeliveryMode());
         Assert.assertEquals(7, responseHeader.getJMSPriority());
     }

-- 
To stop receiving notification emails like this one, please contact
cschnei...@apache.org.

Reply via email to