This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new cddaac5 NIFI-7034 Thise closes #4002. Connection leak with
JMSConsumer and JMSPublisher
cddaac5 is described below
commit cddaac591bbf13ad19490bfaebf92a5b17993e18
Author: Gardella Juan Pablo <[email protected]>
AuthorDate: Sun Jan 19 13:09:49 2020 -0300
NIFI-7034 Thise closes #4002. Connection leak with JMSConsumer and
JMSPublisher
Signed-off-by: Joe Witt <[email protected]>
---
.../nifi/jms/processors/AbstractJMSProcessor.java | 3 +
.../apache/nifi/jms/processors/ConsumeJMSIT.java | 115 +++++++++++++++------
.../apache/nifi/jms/processors/PublishJMSIT.java | 93 +++++++++++++++--
3 files changed, 171 insertions(+), 40 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 33cc87c..74f826c 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -196,6 +196,9 @@ public abstract class AbstractJMSProcessor<T extends
JMSWorker> extends Abstract
getLogger().debug("Worker is invalid. Will try re-create... ");
final JMSConnectionFactoryProviderDefinition cfProvider =
context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
try {
+ if (worker != null) {
+ worker.shutdown();
+ }
// Safe to cast. Method
#buildTargetResource(ProcessContext context) sets only CachingConnectionFactory
CachingConnectionFactory currentCF =
(CachingConnectionFactory)worker.jmsTemplate.getConnectionFactory();
cfProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory());
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
index e5ca276..085db0e 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
@@ -17,6 +17,7 @@
package org.apache.nifi.jms.processors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -25,9 +26,16 @@ import static org.mockito.Mockito.when;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.wireformat.WireFormat;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -38,9 +46,13 @@ import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.support.JmsHeaders;
+import java.io.IOException;
+import java.net.URI;
+import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -52,6 +64,7 @@ import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
+import javax.net.SocketFactory;
public class ConsumeJMSIT {
@@ -148,49 +161,29 @@ public class ConsumeJMSIT {
@Test
public void testTextMessageTypeAttribute() throws Exception {
- testMessageTypeAttribute(
- "testTextMessage",
- Session::createTextMessage,
- TextMessage.class.getSimpleName()
- );
+ testMessageTypeAttribute("testTextMessage",
Session::createTextMessage, TextMessage.class.getSimpleName());
}
@Test
public void testByteMessageTypeAttribute() throws Exception {
- testMessageTypeAttribute(
- "testByteMessage",
- Session::createBytesMessage,
- BytesMessage.class.getSimpleName()
- );
+ testMessageTypeAttribute("testByteMessage",
Session::createBytesMessage, BytesMessage.class.getSimpleName());
}
@Test
public void testObjectMessageTypeAttribute() throws Exception {
String destinationName = "testObjectMessage";
- testMessageTypeAttribute(
- destinationName,
- Session::createObjectMessage,
- ObjectMessage.class.getSimpleName()
- );
+ testMessageTypeAttribute(destinationName,
Session::createObjectMessage, ObjectMessage.class.getSimpleName());
}
@Test
public void testStreamMessageTypeAttribute() throws Exception {
- testMessageTypeAttribute(
- "testStreamMessage",
- Session::createStreamMessage,
- StreamMessage.class.getSimpleName()
- );
+ testMessageTypeAttribute("testStreamMessage",
Session::createStreamMessage, StreamMessage.class.getSimpleName());
}
@Test
public void testMapMessageTypeAttribute() throws Exception {
- testMessageTypeAttribute(
- "testMapMessage",
- Session::createMapMessage,
- MapMessage.class.getSimpleName()
- );
+ testMessageTypeAttribute("testMapMessage", Session::createMapMessage,
MapMessage.class.getSimpleName());
}
@Test
@@ -201,10 +194,7 @@ public class ConsumeJMSIT {
JMSPublisher sender = new JMSPublisher((CachingConnectionFactory)
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
- sender.jmsTemplate.send("testMapMessage", __ ->
createUnsupportedMessage(
- "unsupportedMessagePropertyKey",
- "unsupportedMessagePropertyValue"
- ));
+ sender.jmsTemplate.send("testMapMessage", __ ->
createUnsupportedMessage("unsupportedMessagePropertyKey",
"unsupportedMessagePropertyValue"));
TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
JMSConnectionFactoryProviderDefinition cs =
mock(JMSConnectionFactoryProviderDefinition.class);
@@ -278,7 +268,8 @@ public class ConsumeJMSIT {
* </ul>
* It is expected <tt>C1</tt> receives message <tt>M1</tt>.
* </p>
- * @throws Exception unexpected
+ * @throws Exception
+ * unexpected
*/
@Test(timeout = 10000)
public void validateNifi6915() throws Exception {
@@ -338,6 +329,70 @@ public class ConsumeJMSIT {
runner.run(1, true);
}
+ /**
+ * <p>
+ * This test validates the connection resources are closed if the
publisher is marked as invalid.
+ * </p>
+ * <p>
+ * This tests validates the proper resources handling for TCP connections
using ActiveMQ (the bug was discovered against ActiveMQ 5.x). In this test,
using some ActiveMQ's classes is possible to
+ * verify if an opened socket is closed. See <a
href="https://issues.apache.org/jira/browse/NIFI-7034">NIFI-7034</a>.
+ * </p>
+ * @throws Exception
+ * any error related to the broker.
+ */
+ @Test(timeout = 10000)
+ public void validateNIFI7034() throws Exception {
+ class ConsumeJMSForNifi7034 extends ConsumeJMS {
+ @Override
+ protected void rendezvousWithJms(ProcessContext context,
ProcessSession processSession, JMSConsumer consumer) throws ProcessException {
+ super.rendezvousWithJms(context, processSession, consumer);
+ consumer.setValid(false);
+ }
+ }
+ BrokerService broker = new BrokerService();
+ try {
+ broker.setPersistent(false);
+ broker.setBrokerName("nifi7034publisher");
+ TransportConnector connector =
broker.addConnector("tcp://127.0.0.1:0");
+ int port = connector.getServer().getSocketAddress().getPort();
+ broker.start();
+
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("validateNIFI7034://127.0.0.1:" + port);
+ final String destinationName = "nifi7034";
+ final AtomicReference<TcpTransport> tcpTransport = new
AtomicReference<TcpTransport>();
+ TcpTransportFactory.registerTransportFactory("validateNIFI7034",
new TcpTransportFactory() {
+ @Override
+ protected TcpTransport createTcpTransport(WireFormat wf,
SocketFactory socketFactory, URI location, URI localLocation) throws
UnknownHostException, IOException {
+ TcpTransport transport = super.createTcpTransport(wf,
socketFactory, location, localLocation);
+ tcpTransport.set(transport);
+ return transport;
+ }
+ });
+
+ TestRunner runner = TestRunners.newTestRunner(new
ConsumeJMSForNifi7034());
+ JMSConnectionFactoryProviderDefinition cs =
mock(JMSConnectionFactoryProviderDefinition.class);
+ when(cs.getIdentifier()).thenReturn("cfProvider");
+ when(cs.getConnectionFactory()).thenReturn(cf);
+ runner.addControllerService("cfProvider", cs);
+ runner.enableControllerService(cs);
+
+ runner.setProperty(ConsumeJMS.CF_SERVICE, "cfProvider");
+ runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
+ runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
+
+ try {
+ runner.run();
+ fail("Unit test implemented in a way this line must not be
called");
+ } catch (AssertionError e) {
+ assertFalse("It is expected transport be closed. ",
tcpTransport.get().isConnected());
+ }
+ } finally {
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+ }
+
private static void publishAMessage(ActiveMQConnectionFactory cf, final
String destinationName, String messageContent) throws JMSException {
// Publish a message.
try (Connection conn = cf.createConnection();
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
index 1f489d5..247a3ac 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
@@ -16,8 +16,24 @@
*/
package org.apache.nifi.jms.processors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.wireformat.WireFormat;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -25,21 +41,19 @@ import org.junit.Test;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;
+import java.io.IOException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.TextMessage;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import javax.net.SocketFactory;
public class PublishJMSIT {
@@ -308,4 +322,63 @@ public class PublishJMSIT {
runner.run(1, true, false); // Run once just so that we can trigger
the shutdown of the Connection Factory
}
+ /**
+ * <p>
+ * This test validates the connection resources are closed if the
publisher is marked as invalid.
+ * </p>
+ * <p>
+ * This tests validates the proper resources handling for TCP connections
using ActiveMQ (the bug was discovered against ActiveMQ 5.x). In this test,
using some ActiveMQ's classes is possible to
+ * verify if an opened socket is closed. See <a
href="NIFI-7034">https://issues.apache.org/jira/browse/NIFI-7034</a>.
+ * </p>
+ * @throws Exception any error related to the broker.
+ */
+ @Test(timeout = 10000)
+ public void validateNIFI7034() throws Exception {
+ class PublishJmsForNifi7034 extends PublishJMS {
+ @Override
+ protected void rendezvousWithJms(ProcessContext context,
ProcessSession processSession, JMSPublisher publisher) throws ProcessException {
+ super.rendezvousWithJms(context, processSession, publisher);
+ publisher.setValid(false);
+ }
+ }
+ BrokerService broker = new BrokerService();
+ try {
+ broker.setPersistent(false);
+ broker.setBrokerName("nifi7034publisher");
+ TransportConnector connector =
broker.addConnector("tcp://127.0.0.1:0");
+ int port = connector.getServer().getSocketAddress().getPort();
+ broker.start();
+
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("validateNIFI7034://127.0.0.1:" + port);
+ final String destinationName = "nifi7034";
+ final AtomicReference<TcpTransport> tcpTransport = new
AtomicReference<TcpTransport>();
+ TcpTransportFactory.registerTransportFactory("validateNIFI7034",
new TcpTransportFactory() {
+ @Override
+ protected TcpTransport createTcpTransport(WireFormat wf,
SocketFactory socketFactory, URI location, URI localLocation) throws
UnknownHostException, IOException {
+ TcpTransport transport = super.createTcpTransport(wf,
socketFactory, location, localLocation);
+ tcpTransport.set(transport);
+ return transport;
+ }
+ });
+
+ TestRunner runner = TestRunners.newTestRunner(new
PublishJmsForNifi7034());
+ JMSConnectionFactoryProviderDefinition cs =
mock(JMSConnectionFactoryProviderDefinition.class);
+ when(cs.getIdentifier()).thenReturn("cfProvider");
+ when(cs.getConnectionFactory()).thenReturn(cf);
+ runner.addControllerService("cfProvider", cs);
+ runner.enableControllerService(cs);
+
+ runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+ runner.setProperty(PublishJMS.DESTINATION, destinationName);
+ runner.setProperty(PublishJMS.DESTINATION_TYPE, PublishJMS.TOPIC);
+
+ runner.enqueue("hi".getBytes());
+ runner.run();
+ assertFalse("It is expected transport be closed. ",
tcpTransport.get().isConnected());
+ } finally {
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+ }
}