This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.2.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit bd98adffcf470709e1e2763326cfa2a07b4f4d54
Author: reta <[email protected]>
AuthorDate: Sun Dec 8 13:56:23 2019 -0500

    CXF-8161: fix memory leak and thread leak in JMSDestination. Adding more 
tests for JMSDestination first
    
    (cherry picked from commit dcc9c3f74c2fe9b8797d18296dee2e6270442481)
    (cherry picked from commit 53e45f515d36a31d99d0eab4e1776e5a17eeba55)
    
    # Conflicts:
    #   
rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
    #   
rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
---
 .../cxf/transport/jms/AbstractJMSTester.java       |   9 ++
 .../cxf/transport/jms/JMSDestinationTest.java      | 124 +++++++++++++++++++++
 2 files changed, 133 insertions(+)

diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
index 807c4ec..5d18f50 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
@@ -26,6 +26,7 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.io.Writer;
 import java.net.URL;
+import java.util.function.Function;
 
 import javax.jms.ConnectionFactory;
 import javax.xml.namespace.QName;
@@ -163,6 +164,7 @@ public abstract class AbstractJMSTester extends Assert {
         return new JMSConduit(target, jmsConfig, bus);
     }
 
+
     protected JMSConduit setupJMSConduitWithObserver(EndpointInfo ei) throws 
IOException {
         JMSConduit jmsConduit = setupJMSConduit(ei);
         MessageObserver observer = new MessageObserver() {
@@ -174,6 +176,13 @@ public abstract class AbstractJMSTester extends Assert {
         return jmsConduit;
     }
 
+    protected JMSDestination setupJMSDestination(EndpointInfo ei, 
+            Function<ConnectionFactory, ConnectionFactory> wrapper) throws 
IOException {
+        JMSConfiguration jmsConfig = 
JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
+        jmsConfig.setConnectionFactory(wrapper.apply(cf));
+        return new JMSDestination(bus, ei, jmsConfig);
+    }
+    
     protected JMSDestination setupJMSDestination(EndpointInfo ei) throws 
IOException {
         JMSConfiguration jmsConfig = 
JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
         jmsConfig.setConnectionFactory(cf);
diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index 89e603b..1a401fb 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -24,8 +24,13 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
 import java.io.StringReader;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.InvalidClientIDException;
@@ -33,6 +38,7 @@ import javax.jms.JMSException;
 import javax.jms.Queue;
 import javax.jms.Topic;
 
+import org.apache.activemq.util.ServiceStopper;
 import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
@@ -49,6 +55,34 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 public class JMSDestinationTest extends AbstractJMSTester {
+    private static final class FaultyConnectionFactory implements 
ConnectionFactory {
+        private final ConnectionFactory delegate;
+        private final AtomicInteger latch;
+        
+        private FaultyConnectionFactory(ConnectionFactory delegate, int 
faults) {
+            this.delegate = delegate;
+            this.latch = new AtomicInteger(faults);
+        }
+        
+        @Override
+        public Connection createConnection() throws JMSException {
+            if (latch.getAndDecrement() == 0) {
+                return delegate.createConnection();
+            } else {
+                throw new JMSException("createConnection() failed 
(simulated)");
+            }
+        }
+
+        @Override
+        public Connection createConnection(String userName, String password) 
throws JMSException {
+            if (latch.decrementAndGet() == 0) {
+                return delegate.createConnection(userName, password);
+            } else {
+                throw new JMSException("createConnection(userName, password) 
failed (simulated)");
+            }
+        }
+        
+    }
 
     @Test
     public void testGetConfigurationFromWSDL() throws Exception {
@@ -449,6 +483,96 @@ public class JMSDestinationTest extends AbstractJMSTester {
         destination.shutdown();
     }
 
+    @Test
+    public void testMessageObserverExceptionHandling() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1); 
+        EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", 
"HelloWorldPubSubPort");
+        JMSConduit conduit = setupJMSConduitWithObserver(ei);
+
+        JMSDestination destination = setupJMSDestination(ei);
+        destination.setMessageObserver(new MessageObserver() {
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    throw new RuntimeException("Error!");
+                } finally {
+                    latch.countDown();
+                }
+            }
+        });
+        
+        Message outMessage = new MessageImpl();
+        setupMessageHeader(outMessage);
+        Thread.sleep(500L);
+
+        sendOneWayMessage(conduit, outMessage);
+        latch.await(5, TimeUnit.SECONDS);
+
+        conduit.close();
+        destination.shutdown();
+    }
+    
+    @Test
+    public void testConnectionFactoryExceptionHandling() throws Exception {
+        EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", 
"HelloWorldPubSubPort");
+        final Function<ConnectionFactory, ConnectionFactory> wrapper = 
+            new Function<ConnectionFactory, ConnectionFactory>() {
+                @Override
+                public ConnectionFactory apply(ConnectionFactory cf) {
+                    return new FaultyConnectionFactory(cf, 3);
+                }
+            };
+        JMSConduit conduit = setupJMSConduitWithObserver(ei);
+        JMSDestination destination = setupJMSDestination(ei, wrapper);
+        destination.getJmsConfig().setRetryInterval(1000);
+        destination.setMessageObserver(createMessageObserver());
+        
+        Message outMessage = new MessageImpl();
+        setupMessageHeader(outMessage);
+        Thread.sleep(4000L);
+        
+        sendOneWayMessage(conduit, outMessage);
+        
+        // wait for the message to be got from the destination,
+        // create the thread to handler the Destination incoming message
+        waitForReceiveDestMessage();
+        
+        verifyReceivedMessage(destMessage);
+        verifyHeaders(destMessage, outMessage);
+
+        conduit.close();
+        destination.shutdown();
+    }
+
+    @Test
+    public void testBrokerExceptionHandling() throws Exception {
+        EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", 
"HelloWorldPubSubPort");
+        JMSConduit conduit = setupJMSConduitWithObserver(ei);
+        JMSDestination destination = setupJMSDestination(ei);
+        destination.getJmsConfig().setRetryInterval(1000);
+        destination.setMessageObserver(createMessageObserver());
+        
+        Thread.sleep(500L);
+        broker.stopAllConnectors(new ServiceStopper());
+        
+        broker.startAllConnectors();
+        Thread.sleep(2000L);
+        
+        Message outMessage = new MessageImpl();
+        setupMessageHeader(outMessage);
+        sendOneWayMessage(conduit, outMessage);
+        
+        // wait for the message to be got from the destination,
+        // create the thread to handler the Destination incoming message
+        waitForReceiveDestMessage();
+        
+        verifyReceivedMessage(destMessage);
+        verifyHeaders(destMessage, outMessage);
+
+        conduit.close();
+        destination.shutdown();
+    }
+
     private String getQueueName(String exName) {
         if (exName == null) {
             return null;

Reply via email to