This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 9e743b3b6196a748518234e7aa84a71cf011f507
Author: reta <[email protected]>
AuthorDate: Sat Dec 14 10:17:33 2019 -0500

    CXF-8161: fix memory leak and thread leak in JMSDestination. Putting back 
'running=false' inside exception handler (PollingMessageListenerContainer), 
refactored tests to not use mocks but doubles.
    
    (cherry picked from commit fc4f94f663db6a27d2352a9b6424e42c89f7a522)
    
    # Conflicts:
    #   
rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
---
 .../jms/util/PollingMessageListenerContainer.java  |   5 +-
 .../cxf/transport/jms/JMSDestinationTest.java      | 180 +++++++++++++++------
 2 files changed, 132 insertions(+), 53 deletions(-)

diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index 8bab56b..cbe8b60 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -202,6 +202,7 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
     }
     
     protected void handleException(Throwable e) {
+        running = false;
         JMSException wrapped;
         if (e  instanceof JMSException) {
             wrapped = (JMSException) e;
@@ -209,7 +210,9 @@ public class PollingMessageListenerContainer extends 
AbstractMessageListenerCont
             wrapped = new JMSException("Wrapped exception. " + e.getMessage());
             wrapped.addSuppressed(e);
         }
-        this.exceptionListener.onException(wrapped);
+        if (this.exceptionListener != null) {
+            this.exceptionListener.onException(wrapped);
+        }
     }
 
     private boolean isReply() {
diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index 87cc5ec..25e4f20 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -30,12 +30,16 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionFactory;
+import javax.jms.ConnectionMetaData;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
 import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import javax.jms.Queue;
+import javax.jms.ServerSessionPool;
 import javax.jms.Session;
 import javax.jms.Topic;
 
@@ -51,33 +55,102 @@ import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.MultiplexDestination;
 import org.apache.cxf.transport.jms.util.ResourceCloser;
-import org.awaitility.Awaitility;
 
 import org.junit.Ignore;
 import org.junit.Test;
 
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.niceMock;
-import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class JMSDestinationTest extends AbstractJMSTester {
+    private static class FaultyConnection implements Connection {
+        private final Connection delegate;
+        
+        FaultyConnection(final Connection delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public Session createSession(boolean transacted, int acknowledgeMode) 
throws JMSException {
+            return delegate.createSession(transacted, acknowledgeMode);
+        }
+
+        @Override
+        public String getClientID() throws JMSException {
+            return delegate.getClientID();
+        }
+
+        @Override
+        public void setClientID(String clientID) throws JMSException {
+            delegate.setClientID(clientID);
+        }
+
+        @Override
+        public ConnectionMetaData getMetaData() throws JMSException {
+            return delegate.getMetaData();
+        }
+
+        @Override
+        public ExceptionListener getExceptionListener() throws JMSException {
+            return delegate.getExceptionListener();
+        }
+
+        @Override
+        public void setExceptionListener(ExceptionListener listener) throws 
JMSException {
+            delegate.setExceptionListener(listener);
+        }
+
+        @Override
+        public void start() throws JMSException {
+            delegate.start();
+        }
+
+        @Override
+        public void stop() throws JMSException {
+            delegate.stop();
+        }
+
+        @Override
+        public void close() throws JMSException {
+            delegate.close();
+        }
+
+        @Override
+        public ConnectionConsumer createConnectionConsumer(Destination 
destination, String messageSelector,
+                ServerSessionPool sessionPool, int maxMessages) throws 
JMSException {
+            return delegate.createConnectionConsumer(destination, 
messageSelector, sessionPool, maxMessages);
+        }
+
+        @Override
+        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 
String subscriptionName, 
+                String messageSelector, ServerSessionPool sessionPool, int 
maxMessages) throws JMSException {
+            return delegate.createDurableConnectionConsumer(topic, 
subscriptionName, messageSelector, 
+                sessionPool, maxMessages);   
+        }
+    }
+    
     private static final class FaultyConnectionFactory implements 
ConnectionFactory {
         private final ConnectionFactory delegate;
+        private final Function<Connection, Connection> wrapper;
         private final AtomicInteger latch;
 
         private FaultyConnectionFactory(ConnectionFactory delegate, int 
faults) {
+            this(delegate, FaultyConnection::new, faults);
+        }
+        
+        private FaultyConnectionFactory(ConnectionFactory delegate, 
+                Function<Connection, Connection> wrapper, int faults) {
             this.delegate = delegate;
+            this.wrapper = wrapper;
             this.latch = new AtomicInteger(faults);
         }
 
         @Override
         public Connection createConnection() throws JMSException {
-            if (latch.getAndDecrement() == 0) {
-                return delegate.createConnection();
+            if (latch.getAndDecrement() <= 0) {
+                return wrapper.apply(delegate.createConnection());
             } else {
                 throw new JMSException("createConnection() failed 
(simulated)");
             }
@@ -85,15 +158,14 @@ public class JMSDestinationTest extends AbstractJMSTester {
 
         @Override
         public Connection createConnection(String userName, String password) 
throws JMSException {
-            if (latch.decrementAndGet() == 0) {
-                return delegate.createConnection(userName, password);
+            if (latch.decrementAndGet() <= 0) {
+                return wrapper.apply(delegate.createConnection(userName, 
password));
             } else {
                 throw new JMSException("createConnection(userName, password) 
failed (simulated)");
             }
         }
-
     }
-
+    
     @Test
     public void testGetConfigurationFromWSDL() throws Exception {
         EndpointInfo ei = setupServiceInfo("HelloWorldQueueBinMsgService", 
"HelloWorldQueueBinMsgPort");
@@ -582,6 +654,52 @@ public class JMSDestinationTest extends AbstractJMSTester {
         conduit.close();
         destination.shutdown();
     }
+    
+    @Test
+    public void testSessionsExceptionHandling() throws Exception {
+        EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", 
"HelloWorldPubSubPort");
+        final AtomicInteger latch = new AtomicInteger(1);
+        
+        final Function<Connection, Connection> connection = c -> new 
FaultyConnection(c) {
+                @Override
+                public Session createSession(boolean transacted, int 
acknowledgeMode) throws JMSException {
+                    // Fail only once
+                    if (latch.getAndDecrement() == 0) {
+                        throw new JMSException("createSession() failed 
(simulated)");
+                    } else {
+                        return super.createSession(transacted, 
acknowledgeMode);
+                    }
+                }
+            };
+            
+        final Function<ConnectionFactory, ConnectionFactory> wrapper =
+            new Function<ConnectionFactory, ConnectionFactory>() {
+                @Override
+                public ConnectionFactory apply(ConnectionFactory cf) {
+                    return new FaultyConnectionFactory(cf, connection, 0);
+                }
+            };
+            
+        JMSConduit conduit = setupJMSConduitWithObserver(ei);
+        JMSDestination destination = setupJMSDestination(ei, wrapper);
+        destination.getJmsConfig().setRetryInterval(1000);
+        destination.setMessageObserver(createMessageObserver());
+
+        Message outMessage = new MessageImpl();
+        setupMessageHeader(outMessage);
+        Thread.sleep(4000L);
+
+        sendOneWayMessage(conduit, outMessage);
+
+        // wait for the message to be got from the destination,
+        // create the thread to handler the Destination incoming message
+        waitForReceiveDestMessage();
+        verifyReceivedMessage(destMessage);
+
+        conduit.close();
+        destination.shutdown();
+    }
+
 
     private String getQueueName(String exName) {
         if (exName == null) {
@@ -618,46 +736,4 @@ public class JMSDestinationTest extends AbstractJMSTester {
         assertTrue("JMS Messsage's replyTo must be named " + expectedName + " 
but was " + receivedName,
                    expectedName == receivedName || 
receivedName.equals(expectedName));
     }
-
-    @Test
-    public void testRestartConnectionAfterExceptionIsOnlyCalledOnce() throws 
Exception {
-        final AtomicInteger failedPollerThreads = new AtomicInteger();
-        final AtomicInteger restartConnectionCalls = new AtomicInteger();
-
-        final ConnectionFactory connectionFactoryMock = 
niceMock(ConnectionFactory.class);
-        final Connection connectionMock = niceMock(Connection.class);
-        final Session sessionMock = niceMock(Session.class);
-        final Queue queueMock = niceMock(Queue.class);
-
-        
expect(connectionFactoryMock.createConnection()).andReturn(connectionMock);
-        expect(connectionMock.createSession(false, Session.AUTO_ACKNOWLEDGE))
-                .andReturn(sessionMock)
-                .andAnswer(() -> {
-                    failedPollerThreads.incrementAndGet();
-                    throw new JMSException("session terminated for test");
-                }).anyTimes();
-        
expect(sessionMock.createQueue("test.jmstransport.binary")).andReturn(queueMock);
-
-        replay(connectionFactoryMock, connectionMock, sessionMock, queueMock);
-
-        EndpointInfo ei = setupServiceInfo("HWStaticReplyQBinMsgService", 
"HWStaticReplyQBinMsgPort");
-        JMSConfiguration jmsConfig = 
JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
-        jmsConfig.setConnectionFactory(connectionFactoryMock);
-
-        JMSDestination destination = new JMSDestination(bus, ei, jmsConfig) {
-            @Override
-            protected synchronized void restartConnection() {
-                restartConnectionCalls.incrementAndGet();
-                // don't call to super.restartConnection().
-                // it will stop the thread pool and cause race conditions in 
this test.
-            }
-        };
-        destination.activate();
-
-        Awaitility.await().until(() -> failedPollerThreads.get() > 5);
-        Awaitility.await().until(() -> restartConnectionCalls.get() > 0);
-        assertEquals("only one call to restartConnection() for all 
poller-threads allowed!",
-                     1, restartConnectionCalls.get());
-        destination.shutdown();
-    }
 }

Reply via email to