This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new cddaac5  NIFI-7034 Thise closes #4002. Connection leak with 
JMSConsumer and JMSPublisher
cddaac5 is described below

commit cddaac591bbf13ad19490bfaebf92a5b17993e18
Author: Gardella Juan Pablo <[email protected]>
AuthorDate: Sun Jan 19 13:09:49 2020 -0300

    NIFI-7034 Thise closes #4002. Connection leak with JMSConsumer and 
JMSPublisher
    
    Signed-off-by: Joe Witt <[email protected]>
---
 .../nifi/jms/processors/AbstractJMSProcessor.java  |   3 +
 .../apache/nifi/jms/processors/ConsumeJMSIT.java   | 115 +++++++++++++++------
 .../apache/nifi/jms/processors/PublishJMSIT.java   |  93 +++++++++++++++--
 3 files changed, 171 insertions(+), 40 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 33cc87c..74f826c 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -196,6 +196,9 @@ public abstract class AbstractJMSProcessor<T extends 
JMSWorker> extends Abstract
                 getLogger().debug("Worker is invalid. Will try re-create... ");
                 final JMSConnectionFactoryProviderDefinition cfProvider = 
context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
                 try {
+                    if (worker != null) {
+                        worker.shutdown();
+                    }
                     // Safe to cast. Method 
#buildTargetResource(ProcessContext context) sets only CachingConnectionFactory
                     CachingConnectionFactory currentCF = 
(CachingConnectionFactory)worker.jmsTemplate.getConnectionFactory();
                     
cfProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory());
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
index e5ca276..085db0e 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.jms.processors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -25,9 +26,16 @@ import static org.mockito.Mockito.when;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.wireformat.WireFormat;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -38,9 +46,13 @@ import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.MessageCreator;
 import org.springframework.jms.support.JmsHeaders;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -52,6 +64,7 @@ import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
+import javax.net.SocketFactory;
 
 public class ConsumeJMSIT {
 
@@ -148,49 +161,29 @@ public class ConsumeJMSIT {
 
     @Test
     public void testTextMessageTypeAttribute() throws Exception {
-        testMessageTypeAttribute(
-            "testTextMessage",
-            Session::createTextMessage,
-            TextMessage.class.getSimpleName()
-        );
+        testMessageTypeAttribute("testTextMessage", 
Session::createTextMessage, TextMessage.class.getSimpleName());
     }
 
     @Test
     public void testByteMessageTypeAttribute() throws Exception {
-        testMessageTypeAttribute(
-            "testByteMessage",
-            Session::createBytesMessage,
-            BytesMessage.class.getSimpleName()
-        );
+        testMessageTypeAttribute("testByteMessage", 
Session::createBytesMessage, BytesMessage.class.getSimpleName());
     }
 
     @Test
     public void testObjectMessageTypeAttribute() throws Exception {
         String destinationName = "testObjectMessage";
 
-        testMessageTypeAttribute(
-            destinationName,
-            Session::createObjectMessage,
-            ObjectMessage.class.getSimpleName()
-        );
+        testMessageTypeAttribute(destinationName, 
Session::createObjectMessage, ObjectMessage.class.getSimpleName());
     }
 
     @Test
     public void testStreamMessageTypeAttribute() throws Exception {
-        testMessageTypeAttribute(
-            "testStreamMessage",
-            Session::createStreamMessage,
-            StreamMessage.class.getSimpleName()
-        );
+        testMessageTypeAttribute("testStreamMessage", 
Session::createStreamMessage, StreamMessage.class.getSimpleName());
     }
 
     @Test
     public void testMapMessageTypeAttribute() throws Exception {
-        testMessageTypeAttribute(
-            "testMapMessage",
-            Session::createMapMessage,
-            MapMessage.class.getSimpleName()
-        );
+        testMessageTypeAttribute("testMapMessage", Session::createMapMessage, 
MapMessage.class.getSimpleName());
     }
 
     @Test
@@ -201,10 +194,7 @@ public class ConsumeJMSIT {
 
             JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
 
-            sender.jmsTemplate.send("testMapMessage", __ -> 
createUnsupportedMessage(
-                "unsupportedMessagePropertyKey",
-                "unsupportedMessagePropertyValue"
-            ));
+            sender.jmsTemplate.send("testMapMessage", __ -> 
createUnsupportedMessage("unsupportedMessagePropertyKey", 
"unsupportedMessagePropertyValue"));
 
             TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
             JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
@@ -278,7 +268,8 @@ public class ConsumeJMSIT {
      * </ul>
      * It is expected <tt>C1</tt> receives message <tt>M1</tt>.
      * </p>
-     * @throws Exception unexpected
+     * @throws Exception
+     *             unexpected
      */
     @Test(timeout = 10000)
     public void validateNifi6915() throws Exception {
@@ -338,6 +329,70 @@ public class ConsumeJMSIT {
         runner.run(1, true);
     }
 
+    /**
+     * <p>
+     * This test validates the connection resources are closed if the 
publisher is marked as invalid.
+     * </p>
+     * <p>
+     * This tests validates the proper resources handling for TCP connections 
using ActiveMQ (the bug was discovered against ActiveMQ 5.x). In this test, 
using some ActiveMQ's classes is possible to
+     * verify if an opened socket is closed. See <a 
href="https://issues.apache.org/jira/browse/NIFI-7034";>NIFI-7034</a>.
+     * </p>
+     * @throws Exception
+     *             any error related to the broker.
+     */
+    @Test(timeout = 10000)
+    public void validateNIFI7034() throws Exception {
+        class ConsumeJMSForNifi7034 extends ConsumeJMS {
+            @Override
+            protected void rendezvousWithJms(ProcessContext context, 
ProcessSession processSession, JMSConsumer consumer) throws ProcessException {
+                super.rendezvousWithJms(context, processSession, consumer);
+                consumer.setValid(false);
+            }
+        }
+        BrokerService broker = new BrokerService();
+        try {
+            broker.setPersistent(false);
+            broker.setBrokerName("nifi7034publisher");
+            TransportConnector connector = 
broker.addConnector("tcp://127.0.0.1:0");
+            int port = connector.getServer().getSocketAddress().getPort();
+            broker.start();
+
+            ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("validateNIFI7034://127.0.0.1:" + port);
+            final String destinationName = "nifi7034";
+            final AtomicReference<TcpTransport> tcpTransport = new 
AtomicReference<TcpTransport>();
+            TcpTransportFactory.registerTransportFactory("validateNIFI7034", 
new TcpTransportFactory() {
+                @Override
+                protected TcpTransport createTcpTransport(WireFormat wf, 
SocketFactory socketFactory, URI location, URI localLocation) throws 
UnknownHostException, IOException {
+                    TcpTransport transport = super.createTcpTransport(wf, 
socketFactory, location, localLocation);
+                    tcpTransport.set(transport);
+                    return transport;
+                }
+            });
+
+            TestRunner runner = TestRunners.newTestRunner(new 
ConsumeJMSForNifi7034());
+            JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+            when(cs.getIdentifier()).thenReturn("cfProvider");
+            when(cs.getConnectionFactory()).thenReturn(cf);
+            runner.addControllerService("cfProvider", cs);
+            runner.enableControllerService(cs);
+
+            runner.setProperty(ConsumeJMS.CF_SERVICE, "cfProvider");
+            runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
+            runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
+
+            try {
+                runner.run();
+                fail("Unit test implemented in a way this line must not be 
called");
+            } catch (AssertionError e) {
+                assertFalse("It is expected transport be closed. ", 
tcpTransport.get().isConnected());
+            }
+        } finally {
+            if (broker != null) {
+                broker.stop();
+            }
+        }
+    }
+
     private static void publishAMessage(ActiveMQConnectionFactory cf, final 
String destinationName, String messageContent) throws JMSException {
         // Publish a message.
         try (Connection conn = cf.createConnection();
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
index 1f489d5..247a3ac 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
@@ -16,8 +16,24 @@
  */
 package org.apache.nifi.jms.processors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.wireformat.WireFormat;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -25,21 +41,19 @@ import org.junit.Test;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.support.JmsHeaders;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
 import javax.jms.BytesMessage;
 import javax.jms.ConnectionFactory;
 import javax.jms.Message;
 import javax.jms.Queue;
 import javax.jms.TextMessage;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import javax.net.SocketFactory;
 
 public class PublishJMSIT {
 
@@ -308,4 +322,63 @@ public class PublishJMSIT {
 
         runner.run(1, true, false); // Run once just so that we can trigger 
the shutdown of the Connection Factory
     }
+    /**
+     * <p>
+     * This test validates the connection resources are closed if the 
publisher is marked as invalid.
+     * </p>
+     * <p>
+     * This tests validates the proper resources handling for TCP connections 
using ActiveMQ (the bug was discovered against ActiveMQ 5.x). In this test, 
using some ActiveMQ's classes is possible to
+     * verify if an opened socket is closed. See <a 
href="NIFI-7034">https://issues.apache.org/jira/browse/NIFI-7034</a>.
+     * </p>
+     * @throws Exception any error related to the broker.
+     */
+    @Test(timeout = 10000)
+    public void validateNIFI7034() throws Exception {
+        class PublishJmsForNifi7034 extends PublishJMS {
+            @Override
+            protected void rendezvousWithJms(ProcessContext context, 
ProcessSession processSession, JMSPublisher publisher) throws ProcessException {
+                super.rendezvousWithJms(context, processSession, publisher);
+                publisher.setValid(false);
+            }
+        }
+        BrokerService broker = new BrokerService();
+        try {
+            broker.setPersistent(false);
+            broker.setBrokerName("nifi7034publisher");
+            TransportConnector connector = 
broker.addConnector("tcp://127.0.0.1:0");
+            int port = connector.getServer().getSocketAddress().getPort();
+            broker.start();
+
+            ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("validateNIFI7034://127.0.0.1:" + port);
+            final String destinationName = "nifi7034";
+            final AtomicReference<TcpTransport> tcpTransport = new 
AtomicReference<TcpTransport>();
+            TcpTransportFactory.registerTransportFactory("validateNIFI7034", 
new TcpTransportFactory() {
+                @Override
+                protected TcpTransport createTcpTransport(WireFormat wf, 
SocketFactory socketFactory, URI location, URI localLocation) throws 
UnknownHostException, IOException {
+                    TcpTransport transport = super.createTcpTransport(wf, 
socketFactory, location, localLocation);
+                    tcpTransport.set(transport);
+                    return transport;
+                }
+            });
+
+            TestRunner runner = TestRunners.newTestRunner(new 
PublishJmsForNifi7034());
+            JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+            when(cs.getIdentifier()).thenReturn("cfProvider");
+            when(cs.getConnectionFactory()).thenReturn(cf);
+            runner.addControllerService("cfProvider", cs);
+            runner.enableControllerService(cs);
+
+            runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+            runner.setProperty(PublishJMS.DESTINATION, destinationName);
+            runner.setProperty(PublishJMS.DESTINATION_TYPE, PublishJMS.TOPIC);
+
+            runner.enqueue("hi".getBytes());
+            runner.run();
+            assertFalse("It is expected transport be closed. ", 
tcpTransport.get().isConnected());
+        } finally {
+            if (broker != null) {
+                broker.stop();
+            }
+        }
+    }
 }

Reply via email to