This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 589b860809ce50b26fde911c692118b667a23ca7
Author: steingebein <[email protected]>
AuthorDate: Sat Dec 14 02:13:46 2019 +0100

    [CXF-8161] fix memory leak and thread leak in JMSDestination - run 
restartConnection() only once if a exception in 
PollingMessageListenerContainer-Poller-Threads occurs (#603)
    
    Thanks @steingebein !
---
 .../apache/cxf/transport/jms/JMSDestination.java   | 21 +++---
 .../jms/util/PollingMessageListenerContainer.java  |  7 +-
 .../cxf/transport/jms/JMSDestinationTest.java      | 74 ++++++++++++++++++----
 .../transport/jms/util/MessageListenerTest.java    |  9 ++-
 4 files changed, 81 insertions(+), 30 deletions(-)

diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index c72931b..fb34c64 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -128,14 +128,22 @@ public class JMSDestination extends 
AbstractMultiplexDestination implements Mess
         Session session = null;
         try {
             ExceptionListener exListener = new ExceptionListener() {
-                public void onException(JMSException exception) {
-                    if (!shutdown) {
+                private boolean restartTriggered;
+
+                public synchronized void onException(JMSException exception) {
+                    if (!shutdown && !restartTriggered) {
                         LOG.log(Level.WARNING, "Exception on JMS connection. 
Trying to reconnect", exception);
-                        restartConnection();
+                        new Thread(new Runnable() {
+                            @Override
+                            public void run() {
+                                restartConnection();
+                            }
+                        }).start();
+                        restartTriggered = true;
                     }
                 }
             };
-            
+
             PollingMessageListenerContainer container;
             if (!jmsConfig.isOneSessionPerConnection()) {
                 connection = JMSFactory.createConnection(jmsConfig);
@@ -174,7 +182,7 @@ public class JMSDestination extends 
AbstractMultiplexDestination implements Mess
         }
     }
 
-    protected void restartConnection() {
+    protected synchronized void restartConnection() {
         int tries = 0;
         do {
             tries++;
@@ -215,14 +223,11 @@ public class JMSDestination extends 
AbstractMultiplexDestination implements Mess
     }
 
 
-
     /**
      * Convert JMS message received by ListenerThread to CXF message and 
inform incomingObserver that a
      * message was received. The observer will call the service and then send 
the response CXF message by
      * using the BackChannelConduit
      *
-     * @param message
-     * @throws IOException
      */
     public void onMessage(javax.jms.Message message) {
         ClassLoaderHolder origLoader = null;
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index de2c57f..8bab56b 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -152,6 +152,7 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
                         safeRollBack();
                     }
                 } catch (Throwable e) {
+                    safeRollBack();
                     handleException(e);
                 }
             }
@@ -201,7 +202,6 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
     }
     
     protected void handleException(Throwable e) {
-        running = false;
         JMSException wrapped;
         if (e  instanceof JMSException) {
             wrapped = (JMSException) e;
@@ -239,10 +239,7 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
 
     @Override
     public void stop() {
-        LOG.fine("Shuttting down " + this.getClass().getSimpleName());
-        if (!running) {
-            return;
-        }
+        LOG.fine("Shutting down " + this.getClass().getSimpleName());
         running = false;
         super.stop();
     }
diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index 1a401fb..87cc5ec 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -36,6 +36,7 @@ import javax.jms.Destination;
 import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import javax.jms.Queue;
+import javax.jms.Session;
 import javax.jms.Topic;
 
 import org.apache.activemq.util.ServiceStopper;
@@ -50,20 +51,29 @@ import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.MultiplexDestination;
 import org.apache.cxf.transport.jms.util.ResourceCloser;
+import org.awaitility.Awaitility;
 
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.niceMock;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 public class JMSDestinationTest extends AbstractJMSTester {
     private static final class FaultyConnectionFactory implements 
ConnectionFactory {
         private final ConnectionFactory delegate;
         private final AtomicInteger latch;
-        
+
         private FaultyConnectionFactory(ConnectionFactory delegate, int 
faults) {
             this.delegate = delegate;
             this.latch = new AtomicInteger(faults);
         }
-        
+
         @Override
         public Connection createConnection() throws JMSException {
             if (latch.getAndDecrement() == 0) {
@@ -81,7 +91,7 @@ public class JMSDestinationTest extends AbstractJMSTester {
                 throw new JMSException("createConnection(userName, password) 
failed (simulated)");
             }
         }
-        
+
     }
 
     @Test
@@ -485,7 +495,7 @@ public class JMSDestinationTest extends AbstractJMSTester {
 
     @Test
     public void testMessageObserverExceptionHandling() throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1); 
+        final CountDownLatch latch = new CountDownLatch(1);
         EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", 
"HelloWorldPubSubPort");
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
 
@@ -511,11 +521,11 @@ public class JMSDestinationTest extends AbstractJMSTester 
{
         conduit.close();
         destination.shutdown();
     }
-    
+
     @Test
     public void testConnectionFactoryExceptionHandling() throws Exception {
         EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", 
"HelloWorldPubSubPort");
-        final Function<ConnectionFactory, ConnectionFactory> wrapper = 
+        final Function<ConnectionFactory, ConnectionFactory> wrapper =
             new Function<ConnectionFactory, ConnectionFactory>() {
                 @Override
                 public ConnectionFactory apply(ConnectionFactory cf) {
@@ -530,9 +540,9 @@ public class JMSDestinationTest extends AbstractJMSTester {
         Message outMessage = new MessageImpl();
         setupMessageHeader(outMessage);
         Thread.sleep(4000L);
-        
+
         sendOneWayMessage(conduit, outMessage);
-        
+
         // wait for the message to be got from the destination,
         // create the thread to handler the Destination incoming message
         waitForReceiveDestMessage();
@@ -551,17 +561,17 @@ public class JMSDestinationTest extends AbstractJMSTester 
{
         JMSDestination destination = setupJMSDestination(ei);
         destination.getJmsConfig().setRetryInterval(1000);
         destination.setMessageObserver(createMessageObserver());
-        
+
         Thread.sleep(500L);
         broker.stopAllConnectors(new ServiceStopper());
-        
+
         broker.startAllConnectors();
         Thread.sleep(2000L);
         
         Message outMessage = new MessageImpl();
         setupMessageHeader(outMessage);
         sendOneWayMessage(conduit, outMessage);
-        
+
         // wait for the message to be got from the destination,
         // create the thread to handler the Destination incoming message
         waitForReceiveDestMessage();
@@ -607,7 +617,47 @@ public class JMSDestinationTest extends AbstractJMSTester {
         String receivedName = getDestinationName(jmsMsg.getJMSReplyTo());
         assertTrue("JMS Messsage's replyTo must be named " + expectedName + " 
but was " + receivedName,
                    expectedName == receivedName || 
receivedName.equals(expectedName));
-
     }
 
+    @Test
+    public void testRestartConnectionAfterExceptionIsOnlyCalledOnce() throws 
Exception {
+        final AtomicInteger failedPollerThreads = new AtomicInteger();
+        final AtomicInteger restartConnectionCalls = new AtomicInteger();
+
+        final ConnectionFactory connectionFactoryMock = 
niceMock(ConnectionFactory.class);
+        final Connection connectionMock = niceMock(Connection.class);
+        final Session sessionMock = niceMock(Session.class);
+        final Queue queueMock = niceMock(Queue.class);
+
+        
expect(connectionFactoryMock.createConnection()).andReturn(connectionMock);
+        expect(connectionMock.createSession(false, Session.AUTO_ACKNOWLEDGE))
+                .andReturn(sessionMock)
+                .andAnswer(() -> {
+                    failedPollerThreads.incrementAndGet();
+                    throw new JMSException("session terminated for test");
+                }).anyTimes();
+        
expect(sessionMock.createQueue("test.jmstransport.binary")).andReturn(queueMock);
+
+        replay(connectionFactoryMock, connectionMock, sessionMock, queueMock);
+
+        EndpointInfo ei = setupServiceInfo("HWStaticReplyQBinMsgService", 
"HWStaticReplyQBinMsgPort");
+        JMSConfiguration jmsConfig = 
JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
+        jmsConfig.setConnectionFactory(connectionFactoryMock);
+
+        JMSDestination destination = new JMSDestination(bus, ei, jmsConfig) {
+            @Override
+            protected synchronized void restartConnection() {
+                restartConnectionCalls.incrementAndGet();
+                // don't call to super.restartConnection().
+                // it will stop the thread pool and cause race conditions in 
this test.
+            }
+        };
+        destination.activate();
+
+        Awaitility.await().until(() -> failedPollerThreads.get() > 5);
+        Awaitility.await().until(() -> restartConnectionCalls.get() > 0);
+        assertEquals("only one call to restartConnection() for all 
poller-threads allowed!",
+                     1, restartConnectionCalls.get());
+        destination.shutdown();
+    }
 }
diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
index ccdb450..113e931 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
@@ -49,7 +49,6 @@ import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.newCapture;
 import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 
 public class MessageListenerTest {
 
@@ -74,9 +73,9 @@ public class MessageListenerTest {
             new PollingMessageListenerContainer(connection, dest, 
listenerHandler, exListener);
         connection.close(); // Simulate connection problem
         container.start();
-        Awaitility.await().until(() -> !container.isRunning());
-        verify(exListener);
+        Awaitility.await().until(() -> captured.getValue() != null);
         JMSException ex = captured.getValue();
+        Assert.assertNotNull(ex);
         Assert.assertEquals("The connection is already closed", 
ex.getMessage());
     }
     
@@ -102,9 +101,9 @@ public class MessageListenerTest {
 
         connection.close(); // Simulate connection problem
         container.start();
-        Awaitility.await().until(() -> !container.isRunning());
-        verify(exListener);
+        Awaitility.await().until(() -> captured.getValue() != null);
         JMSException ex = captured.getValue();
+        Assert.assertNotNull(ex);
         // Closing the pooled connection will result in a NPE when using it
         Assert.assertEquals("Wrapped exception. null", ex.getMessage());
     }

Reply via email to