This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.2.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit bd98adffcf470709e1e2763326cfa2a07b4f4d54 Author: reta <[email protected]> AuthorDate: Sun Dec 8 13:56:23 2019 -0500 CXF-8161: fix memory leak and thread leak in JMSDestination. Adding more tests for JMSDestination first (cherry picked from commit dcc9c3f74c2fe9b8797d18296dee2e6270442481) (cherry picked from commit 53e45f515d36a31d99d0eab4e1776e5a17eeba55) # Conflicts: # rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java # rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java --- .../cxf/transport/jms/AbstractJMSTester.java | 9 ++ .../cxf/transport/jms/JMSDestinationTest.java | 124 +++++++++++++++++++++ 2 files changed, 133 insertions(+) diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java index 807c4ec..5d18f50 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java @@ -26,6 +26,7 @@ import java.io.Reader; import java.io.StringReader; import java.io.Writer; import java.net.URL; +import java.util.function.Function; import javax.jms.ConnectionFactory; import javax.xml.namespace.QName; @@ -163,6 +164,7 @@ public abstract class AbstractJMSTester extends Assert { return new JMSConduit(target, jmsConfig, bus); } + protected JMSConduit setupJMSConduitWithObserver(EndpointInfo ei) throws IOException { JMSConduit jmsConduit = setupJMSConduit(ei); MessageObserver observer = new MessageObserver() { @@ -174,6 +176,13 @@ public abstract class AbstractJMSTester extends Assert { return jmsConduit; } + protected JMSDestination setupJMSDestination(EndpointInfo ei, + Function<ConnectionFactory, ConnectionFactory> wrapper) throws IOException { + JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null); + jmsConfig.setConnectionFactory(wrapper.apply(cf)); + return new JMSDestination(bus, ei, jmsConfig); + } + protected JMSDestination setupJMSDestination(EndpointInfo ei) throws IOException { JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null); jmsConfig.setConnectionFactory(cf); diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java index 89e603b..1a401fb 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java @@ -24,8 +24,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.Reader; import java.io.StringReader; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.InvalidClientIDException; @@ -33,6 +38,7 @@ import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.Topic; +import org.apache.activemq.util.ServiceStopper; import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.ExchangeImpl; @@ -49,6 +55,34 @@ import org.junit.Ignore; import org.junit.Test; public class JMSDestinationTest extends AbstractJMSTester { + private static final class FaultyConnectionFactory implements ConnectionFactory { + private final ConnectionFactory delegate; + private final AtomicInteger latch; + + private FaultyConnectionFactory(ConnectionFactory delegate, int faults) { + this.delegate = delegate; + this.latch = new AtomicInteger(faults); + } + + @Override + public Connection createConnection() throws JMSException { + if (latch.getAndDecrement() == 0) { + return delegate.createConnection(); + } else { + throw new JMSException("createConnection() failed (simulated)"); + } + } + + @Override + public Connection createConnection(String userName, String password) throws JMSException { + if (latch.decrementAndGet() == 0) { + return delegate.createConnection(userName, password); + } else { + throw new JMSException("createConnection(userName, password) failed (simulated)"); + } + } + + } @Test public void testGetConfigurationFromWSDL() throws Exception { @@ -449,6 +483,96 @@ public class JMSDestinationTest extends AbstractJMSTester { destination.shutdown(); } + @Test + public void testMessageObserverExceptionHandling() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort"); + JMSConduit conduit = setupJMSConduitWithObserver(ei); + + JMSDestination destination = setupJMSDestination(ei); + destination.setMessageObserver(new MessageObserver() { + @Override + public void onMessage(Message message) { + try { + throw new RuntimeException("Error!"); + } finally { + latch.countDown(); + } + } + }); + + Message outMessage = new MessageImpl(); + setupMessageHeader(outMessage); + Thread.sleep(500L); + + sendOneWayMessage(conduit, outMessage); + latch.await(5, TimeUnit.SECONDS); + + conduit.close(); + destination.shutdown(); + } + + @Test + public void testConnectionFactoryExceptionHandling() throws Exception { + EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort"); + final Function<ConnectionFactory, ConnectionFactory> wrapper = + new Function<ConnectionFactory, ConnectionFactory>() { + @Override + public ConnectionFactory apply(ConnectionFactory cf) { + return new FaultyConnectionFactory(cf, 3); + } + }; + JMSConduit conduit = setupJMSConduitWithObserver(ei); + JMSDestination destination = setupJMSDestination(ei, wrapper); + destination.getJmsConfig().setRetryInterval(1000); + destination.setMessageObserver(createMessageObserver()); + + Message outMessage = new MessageImpl(); + setupMessageHeader(outMessage); + Thread.sleep(4000L); + + sendOneWayMessage(conduit, outMessage); + + // wait for the message to be got from the destination, + // create the thread to handler the Destination incoming message + waitForReceiveDestMessage(); + + verifyReceivedMessage(destMessage); + verifyHeaders(destMessage, outMessage); + + conduit.close(); + destination.shutdown(); + } + + @Test + public void testBrokerExceptionHandling() throws Exception { + EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort"); + JMSConduit conduit = setupJMSConduitWithObserver(ei); + JMSDestination destination = setupJMSDestination(ei); + destination.getJmsConfig().setRetryInterval(1000); + destination.setMessageObserver(createMessageObserver()); + + Thread.sleep(500L); + broker.stopAllConnectors(new ServiceStopper()); + + broker.startAllConnectors(); + Thread.sleep(2000L); + + Message outMessage = new MessageImpl(); + setupMessageHeader(outMessage); + sendOneWayMessage(conduit, outMessage); + + // wait for the message to be got from the destination, + // create the thread to handler the Destination incoming message + waitForReceiveDestMessage(); + + verifyReceivedMessage(destMessage); + verifyHeaders(destMessage, outMessage); + + conduit.close(); + destination.shutdown(); + } + private String getQueueName(String exName) { if (exName == null) { return null;
