This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.2.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 589b860809ce50b26fde911c692118b667a23ca7 Author: steingebein <[email protected]> AuthorDate: Sat Dec 14 02:13:46 2019 +0100 [CXF-8161] fix memory leak and thread leak in JMSDestination - run restartConnection() only once if a exception in PollingMessageListenerContainer-Poller-Threads occurs (#603) Thanks @steingebein ! --- .../apache/cxf/transport/jms/JMSDestination.java | 21 +++--- .../jms/util/PollingMessageListenerContainer.java | 7 +- .../cxf/transport/jms/JMSDestinationTest.java | 74 ++++++++++++++++++---- .../transport/jms/util/MessageListenerTest.java | 9 ++- 4 files changed, 81 insertions(+), 30 deletions(-) diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java index c72931b..fb34c64 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java @@ -128,14 +128,22 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess Session session = null; try { ExceptionListener exListener = new ExceptionListener() { - public void onException(JMSException exception) { - if (!shutdown) { + private boolean restartTriggered; + + public synchronized void onException(JMSException exception) { + if (!shutdown && !restartTriggered) { LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect", exception); - restartConnection(); + new Thread(new Runnable() { + @Override + public void run() { + restartConnection(); + } + }).start(); + restartTriggered = true; } } }; - + PollingMessageListenerContainer container; if (!jmsConfig.isOneSessionPerConnection()) { connection = JMSFactory.createConnection(jmsConfig); @@ -174,7 +182,7 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess } } - protected void restartConnection() { + protected synchronized void restartConnection() { int tries = 0; do { tries++; @@ -215,14 +223,11 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess } - /** * Convert JMS message received by ListenerThread to CXF message and inform incomingObserver that a * message was received. The observer will call the service and then send the response CXF message by * using the BackChannelConduit * - * @param message - * @throws IOException */ public void onMessage(javax.jms.Message message) { ClassLoaderHolder origLoader = null; diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java index de2c57f..8bab56b 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java @@ -152,6 +152,7 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont safeRollBack(); } } catch (Throwable e) { + safeRollBack(); handleException(e); } } @@ -201,7 +202,6 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont } protected void handleException(Throwable e) { - running = false; JMSException wrapped; if (e instanceof JMSException) { wrapped = (JMSException) e; @@ -239,10 +239,7 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont @Override public void stop() { - LOG.fine("Shuttting down " + this.getClass().getSimpleName()); - if (!running) { - return; - } + LOG.fine("Shutting down " + this.getClass().getSimpleName()); running = false; super.stop(); } diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java index 1a401fb..87cc5ec 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java @@ -36,6 +36,7 @@ import javax.jms.Destination; import javax.jms.InvalidClientIDException; import javax.jms.JMSException; import javax.jms.Queue; +import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.util.ServiceStopper; @@ -50,20 +51,29 @@ import org.apache.cxf.transport.Conduit; import org.apache.cxf.transport.MessageObserver; import org.apache.cxf.transport.MultiplexDestination; import org.apache.cxf.transport.jms.util.ResourceCloser; +import org.awaitility.Awaitility; import org.junit.Ignore; import org.junit.Test; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.niceMock; +import static org.easymock.EasyMock.replay; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + public class JMSDestinationTest extends AbstractJMSTester { private static final class FaultyConnectionFactory implements ConnectionFactory { private final ConnectionFactory delegate; private final AtomicInteger latch; - + private FaultyConnectionFactory(ConnectionFactory delegate, int faults) { this.delegate = delegate; this.latch = new AtomicInteger(faults); } - + @Override public Connection createConnection() throws JMSException { if (latch.getAndDecrement() == 0) { @@ -81,7 +91,7 @@ public class JMSDestinationTest extends AbstractJMSTester { throw new JMSException("createConnection(userName, password) failed (simulated)"); } } - + } @Test @@ -485,7 +495,7 @@ public class JMSDestinationTest extends AbstractJMSTester { @Test public void testMessageObserverExceptionHandling() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort"); JMSConduit conduit = setupJMSConduitWithObserver(ei); @@ -511,11 +521,11 @@ public class JMSDestinationTest extends AbstractJMSTester { conduit.close(); destination.shutdown(); } - + @Test public void testConnectionFactoryExceptionHandling() throws Exception { EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort"); - final Function<ConnectionFactory, ConnectionFactory> wrapper = + final Function<ConnectionFactory, ConnectionFactory> wrapper = new Function<ConnectionFactory, ConnectionFactory>() { @Override public ConnectionFactory apply(ConnectionFactory cf) { @@ -530,9 +540,9 @@ public class JMSDestinationTest extends AbstractJMSTester { Message outMessage = new MessageImpl(); setupMessageHeader(outMessage); Thread.sleep(4000L); - + sendOneWayMessage(conduit, outMessage); - + // wait for the message to be got from the destination, // create the thread to handler the Destination incoming message waitForReceiveDestMessage(); @@ -551,17 +561,17 @@ public class JMSDestinationTest extends AbstractJMSTester { JMSDestination destination = setupJMSDestination(ei); destination.getJmsConfig().setRetryInterval(1000); destination.setMessageObserver(createMessageObserver()); - + Thread.sleep(500L); broker.stopAllConnectors(new ServiceStopper()); - + broker.startAllConnectors(); Thread.sleep(2000L); Message outMessage = new MessageImpl(); setupMessageHeader(outMessage); sendOneWayMessage(conduit, outMessage); - + // wait for the message to be got from the destination, // create the thread to handler the Destination incoming message waitForReceiveDestMessage(); @@ -607,7 +617,47 @@ public class JMSDestinationTest extends AbstractJMSTester { String receivedName = getDestinationName(jmsMsg.getJMSReplyTo()); assertTrue("JMS Messsage's replyTo must be named " + expectedName + " but was " + receivedName, expectedName == receivedName || receivedName.equals(expectedName)); - } + @Test + public void testRestartConnectionAfterExceptionIsOnlyCalledOnce() throws Exception { + final AtomicInteger failedPollerThreads = new AtomicInteger(); + final AtomicInteger restartConnectionCalls = new AtomicInteger(); + + final ConnectionFactory connectionFactoryMock = niceMock(ConnectionFactory.class); + final Connection connectionMock = niceMock(Connection.class); + final Session sessionMock = niceMock(Session.class); + final Queue queueMock = niceMock(Queue.class); + + expect(connectionFactoryMock.createConnection()).andReturn(connectionMock); + expect(connectionMock.createSession(false, Session.AUTO_ACKNOWLEDGE)) + .andReturn(sessionMock) + .andAnswer(() -> { + failedPollerThreads.incrementAndGet(); + throw new JMSException("session terminated for test"); + }).anyTimes(); + expect(sessionMock.createQueue("test.jmstransport.binary")).andReturn(queueMock); + + replay(connectionFactoryMock, connectionMock, sessionMock, queueMock); + + EndpointInfo ei = setupServiceInfo("HWStaticReplyQBinMsgService", "HWStaticReplyQBinMsgPort"); + JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null); + jmsConfig.setConnectionFactory(connectionFactoryMock); + + JMSDestination destination = new JMSDestination(bus, ei, jmsConfig) { + @Override + protected synchronized void restartConnection() { + restartConnectionCalls.incrementAndGet(); + // don't call to super.restartConnection(). + // it will stop the thread pool and cause race conditions in this test. + } + }; + destination.activate(); + + Awaitility.await().until(() -> failedPollerThreads.get() > 5); + Awaitility.await().until(() -> restartConnectionCalls.get() > 0); + assertEquals("only one call to restartConnection() for all poller-threads allowed!", + 1, restartConnectionCalls.get()); + destination.shutdown(); + } } diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java index ccdb450..113e931 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java @@ -49,7 +49,6 @@ import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.newCapture; import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; public class MessageListenerTest { @@ -74,9 +73,9 @@ public class MessageListenerTest { new PollingMessageListenerContainer(connection, dest, listenerHandler, exListener); connection.close(); // Simulate connection problem container.start(); - Awaitility.await().until(() -> !container.isRunning()); - verify(exListener); + Awaitility.await().until(() -> captured.getValue() != null); JMSException ex = captured.getValue(); + Assert.assertNotNull(ex); Assert.assertEquals("The connection is already closed", ex.getMessage()); } @@ -102,9 +101,9 @@ public class MessageListenerTest { connection.close(); // Simulate connection problem container.start(); - Awaitility.await().until(() -> !container.isRunning()); - verify(exListener); + Awaitility.await().until(() -> captured.getValue() != null); JMSException ex = captured.getValue(); + Assert.assertNotNull(ex); // Closing the pooled connection will result in a NPE when using it Assert.assertEquals("Wrapped exception. null", ex.getMessage()); }
