This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.2.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 9e743b3b6196a748518234e7aa84a71cf011f507 Author: reta <[email protected]> AuthorDate: Sat Dec 14 10:17:33 2019 -0500 CXF-8161: fix memory leak and thread leak in JMSDestination. Putting back 'running=false' inside exception handler (PollingMessageListenerContainer), refactored tests to not use mocks but doubles. (cherry picked from commit fc4f94f663db6a27d2352a9b6424e42c89f7a522) # Conflicts: # rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java --- .../jms/util/PollingMessageListenerContainer.java | 5 +- .../cxf/transport/jms/JMSDestinationTest.java | 180 +++++++++++++++------ 2 files changed, 132 insertions(+), 53 deletions(-) diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java index 8bab56b..cbe8b60 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java @@ -202,6 +202,7 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont } protected void handleException(Throwable e) { + running = false; JMSException wrapped; if (e instanceof JMSException) { wrapped = (JMSException) e; @@ -209,7 +210,9 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont wrapped = new JMSException("Wrapped exception. " + e.getMessage()); wrapped.addSuppressed(e); } - this.exceptionListener.onException(wrapped); + if (this.exceptionListener != null) { + this.exceptionListener.onException(wrapped); + } } private boolean isReply() { diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java index 87cc5ec..25e4f20 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java @@ -30,12 +30,16 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import javax.jms.Connection; +import javax.jms.ConnectionConsumer; import javax.jms.ConnectionFactory; +import javax.jms.ConnectionMetaData; import javax.jms.DeliveryMode; import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.InvalidClientIDException; import javax.jms.JMSException; import javax.jms.Queue; +import javax.jms.ServerSessionPool; import javax.jms.Session; import javax.jms.Topic; @@ -51,33 +55,102 @@ import org.apache.cxf.transport.Conduit; import org.apache.cxf.transport.MessageObserver; import org.apache.cxf.transport.MultiplexDestination; import org.apache.cxf.transport.jms.util.ResourceCloser; -import org.awaitility.Awaitility; import org.junit.Ignore; import org.junit.Test; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.niceMock; -import static org.easymock.EasyMock.replay; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class JMSDestinationTest extends AbstractJMSTester { + private static class FaultyConnection implements Connection { + private final Connection delegate; + + FaultyConnection(final Connection delegate) { + this.delegate = delegate; + } + + @Override + public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { + return delegate.createSession(transacted, acknowledgeMode); + } + + @Override + public String getClientID() throws JMSException { + return delegate.getClientID(); + } + + @Override + public void setClientID(String clientID) throws JMSException { + delegate.setClientID(clientID); + } + + @Override + public ConnectionMetaData getMetaData() throws JMSException { + return delegate.getMetaData(); + } + + @Override + public ExceptionListener getExceptionListener() throws JMSException { + return delegate.getExceptionListener(); + } + + @Override + public void setExceptionListener(ExceptionListener listener) throws JMSException { + delegate.setExceptionListener(listener); + } + + @Override + public void start() throws JMSException { + delegate.start(); + } + + @Override + public void stop() throws JMSException { + delegate.stop(); + } + + @Override + public void close() throws JMSException { + delegate.close(); + } + + @Override + public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, + ServerSessionPool sessionPool, int maxMessages) throws JMSException { + return delegate.createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages); + } + + @Override + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, + String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { + return delegate.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, + sessionPool, maxMessages); + } + } + private static final class FaultyConnectionFactory implements ConnectionFactory { private final ConnectionFactory delegate; + private final Function<Connection, Connection> wrapper; private final AtomicInteger latch; private FaultyConnectionFactory(ConnectionFactory delegate, int faults) { + this(delegate, FaultyConnection::new, faults); + } + + private FaultyConnectionFactory(ConnectionFactory delegate, + Function<Connection, Connection> wrapper, int faults) { this.delegate = delegate; + this.wrapper = wrapper; this.latch = new AtomicInteger(faults); } @Override public Connection createConnection() throws JMSException { - if (latch.getAndDecrement() == 0) { - return delegate.createConnection(); + if (latch.getAndDecrement() <= 0) { + return wrapper.apply(delegate.createConnection()); } else { throw new JMSException("createConnection() failed (simulated)"); } @@ -85,15 +158,14 @@ public class JMSDestinationTest extends AbstractJMSTester { @Override public Connection createConnection(String userName, String password) throws JMSException { - if (latch.decrementAndGet() == 0) { - return delegate.createConnection(userName, password); + if (latch.decrementAndGet() <= 0) { + return wrapper.apply(delegate.createConnection(userName, password)); } else { throw new JMSException("createConnection(userName, password) failed (simulated)"); } } - } - + @Test public void testGetConfigurationFromWSDL() throws Exception { EndpointInfo ei = setupServiceInfo("HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort"); @@ -582,6 +654,52 @@ public class JMSDestinationTest extends AbstractJMSTester { conduit.close(); destination.shutdown(); } + + @Test + public void testSessionsExceptionHandling() throws Exception { + EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort"); + final AtomicInteger latch = new AtomicInteger(1); + + final Function<Connection, Connection> connection = c -> new FaultyConnection(c) { + @Override + public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { + // Fail only once + if (latch.getAndDecrement() == 0) { + throw new JMSException("createSession() failed (simulated)"); + } else { + return super.createSession(transacted, acknowledgeMode); + } + } + }; + + final Function<ConnectionFactory, ConnectionFactory> wrapper = + new Function<ConnectionFactory, ConnectionFactory>() { + @Override + public ConnectionFactory apply(ConnectionFactory cf) { + return new FaultyConnectionFactory(cf, connection, 0); + } + }; + + JMSConduit conduit = setupJMSConduitWithObserver(ei); + JMSDestination destination = setupJMSDestination(ei, wrapper); + destination.getJmsConfig().setRetryInterval(1000); + destination.setMessageObserver(createMessageObserver()); + + Message outMessage = new MessageImpl(); + setupMessageHeader(outMessage); + Thread.sleep(4000L); + + sendOneWayMessage(conduit, outMessage); + + // wait for the message to be got from the destination, + // create the thread to handler the Destination incoming message + waitForReceiveDestMessage(); + verifyReceivedMessage(destMessage); + + conduit.close(); + destination.shutdown(); + } + private String getQueueName(String exName) { if (exName == null) { @@ -618,46 +736,4 @@ public class JMSDestinationTest extends AbstractJMSTester { assertTrue("JMS Messsage's replyTo must be named " + expectedName + " but was " + receivedName, expectedName == receivedName || receivedName.equals(expectedName)); } - - @Test - public void testRestartConnectionAfterExceptionIsOnlyCalledOnce() throws Exception { - final AtomicInteger failedPollerThreads = new AtomicInteger(); - final AtomicInteger restartConnectionCalls = new AtomicInteger(); - - final ConnectionFactory connectionFactoryMock = niceMock(ConnectionFactory.class); - final Connection connectionMock = niceMock(Connection.class); - final Session sessionMock = niceMock(Session.class); - final Queue queueMock = niceMock(Queue.class); - - expect(connectionFactoryMock.createConnection()).andReturn(connectionMock); - expect(connectionMock.createSession(false, Session.AUTO_ACKNOWLEDGE)) - .andReturn(sessionMock) - .andAnswer(() -> { - failedPollerThreads.incrementAndGet(); - throw new JMSException("session terminated for test"); - }).anyTimes(); - expect(sessionMock.createQueue("test.jmstransport.binary")).andReturn(queueMock); - - replay(connectionFactoryMock, connectionMock, sessionMock, queueMock); - - EndpointInfo ei = setupServiceInfo("HWStaticReplyQBinMsgService", "HWStaticReplyQBinMsgPort"); - JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null); - jmsConfig.setConnectionFactory(connectionFactoryMock); - - JMSDestination destination = new JMSDestination(bus, ei, jmsConfig) { - @Override - protected synchronized void restartConnection() { - restartConnectionCalls.incrementAndGet(); - // don't call to super.restartConnection(). - // it will stop the thread pool and cause race conditions in this test. - } - }; - destination.activate(); - - Awaitility.await().until(() -> failedPollerThreads.get() > 5); - Awaitility.await().until(() -> restartConnectionCalls.get() > 0); - assertEquals("only one call to restartConnection() for all poller-threads allowed!", - 1, restartConnectionCalls.get()); - destination.shutdown(); - } }
