This is an automated email from the ASF dual-hosted git repository.
jfrazee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new e158420 NIFI-7563: Optimize the usage of JMS sessions and message
producers
e158420 is described below
commit e1584207d114d4915cc793d633039b724f6a07cb
Author: Gardella Juan Pablo <[email protected]>
AuthorDate: Wed Jul 1 06:30:39 2020 -0300
NIFI-7563: Optimize the usage of JMS sessions and message producers
The introduced changes prevent creating unnecesary sessions and producers
in some scenarios.
This closes #4378.
Signed-off-by: Joey Frazee <[email protected]>
---
.../apache/nifi/jms/processors/JMSPublisher.java | 35 ++----
.../apache/nifi/jms/processors/PublishJMSIT.java | 128 +++++++++++++++++++++
.../ConnectionFactoryInvocationHandler.java | 112 ++++++++++++++++++
.../helpers/ConnectionInvocationHandler.java | 108 +++++++++++++++++
.../helpers/MessageProducerInvocationHandler.java | 63 ++++++++++
.../helpers/SessionInvocationHandler.java | 91 +++++++++++++++
6 files changed, 511 insertions(+), 26 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index 17d5690..3a65bab 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -20,17 +20,14 @@ import org.apache.nifi.logging.ComponentLog;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
-import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.JmsHeaders;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.jms.Topic;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -56,7 +53,7 @@ final class JMSPublisher extends JMSWorker {
public Message createMessage(Session session) throws JMSException {
BytesMessage message = session.createBytesMessage();
message.writeBytes(messageBytes);
- setMessageHeaderAndProperties(message, flowFileAttributes);
+ setMessageHeaderAndProperties(session, message,
flowFileAttributes);
return message;
}
});
@@ -67,13 +64,13 @@ final class JMSPublisher extends JMSWorker {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage(messageText);
- setMessageHeaderAndProperties(message, flowFileAttributes);
+ setMessageHeaderAndProperties(session, message,
flowFileAttributes);
return message;
}
});
}
- void setMessageHeaderAndProperties(final Message message, final
Map<String, String> flowFileAttributes) throws JMSException {
+ void setMessageHeaderAndProperties(final Session session, final Message
message, final Map<String, String> flowFileAttributes) throws JMSException {
if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) {
for (Entry<String, String> entry : flowFileAttributes.entrySet()) {
@@ -96,14 +93,14 @@ final class JMSPublisher extends JMSWorker {
} else if (entry.getKey().equals(JmsHeaders.TYPE)) {
message.setJMSType(entry.getValue());
} else if (entry.getKey().equals(JmsHeaders.REPLY_TO)) {
- Destination destination =
buildDestination(entry.getValue());
+ Destination destination = buildDestination(session,
entry.getValue());
if (destination != null) {
message.setJMSReplyTo(destination);
} else {
logUnbuildableDestination(entry.getValue(),
JmsHeaders.REPLY_TO);
}
} else if (entry.getKey().equals(JmsHeaders.DESTINATION)) {
- Destination destination =
buildDestination(entry.getValue());
+ Destination destination = buildDestination(session,
entry.getValue());
if (destination != null) {
message.setJMSDestination(destination);
} else {
@@ -128,27 +125,13 @@ final class JMSPublisher extends JMSWorker {
}
- private Destination buildDestination(final String destinationName) {
- Destination destination;
+ private static Destination buildDestination(final Session session, final
String destinationName) throws JMSException {
if (destinationName.toLowerCase().contains("topic")) {
- destination = this.jmsTemplate.execute(new
SessionCallback<Topic>() {
- @Override
- public Topic doInJms(Session session) throws JMSException {
- return session.createTopic(destinationName);
- }
- });
+ return session.createTopic(destinationName);
} else if (destinationName.toLowerCase().contains("queue")) {
- destination = this.jmsTemplate.execute(new
SessionCallback<Queue>() {
- @Override
- public Queue doInJms(Session session) throws JMSException {
- return session.createQueue(destinationName);
- }
- });
- } else {
- destination = null;
+ return session.createQueue(destinationName);
}
-
- return destination;
+ return null;
}
/**
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
index b901b73..8070194 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
@@ -33,6 +33,7 @@ import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties;
+import
org.apache.nifi.jms.processors.helpers.ConnectionFactoryInvocationHandler;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
@@ -45,6 +46,7 @@ import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;
import java.io.IOException;
+import java.lang.reflect.Proxy;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
@@ -325,6 +327,7 @@ public class PublishJMSIT {
runner.run(1, true, false); // Run once just so that we can trigger
the shutdown of the Connection Factory
}
+
/**
* <p>
* This test validates the connection resources are closed if the
publisher is marked as invalid.
@@ -385,6 +388,131 @@ public class PublishJMSIT {
}
}
+ /**
+ * <p>
+ * This test validates the optimal resources usage. To process one message
is expected to create only one connection, one session and one message producer.
+ * </p>
+ * <p>
+ * See <a
href="NIFI-7563">https://issues.apache.org/jira/browse/NIFI-7563</a> for
details.
+ * </p>
+ * @throws Exception any error related to the broker.
+ */
+ @Test(timeout = 10000)
+ public void validateNIFI7563UsingOneThread() throws Exception {
+ BrokerService broker = new BrokerService();
+ try {
+ broker.setPersistent(false);
+ TransportConnector connector =
broker.addConnector("tcp://127.0.0.1:0");
+ int port = connector.getServer().getSocketAddress().getPort();
+ broker.start();
+
+ final ActiveMQConnectionFactory innerCf = new
ActiveMQConnectionFactory("tcp://127.0.0.1:" + port);
+ ConnectionFactoryInvocationHandler connectionFactoryProxy = new
ConnectionFactoryInvocationHandler(innerCf);
+
+ // Create a connection Factory proxy to catch metrics and usage.
+ ConnectionFactory cf = (ConnectionFactory)
Proxy.newProxyInstance(ConnectionFactory.class.getClassLoader(), new Class[] {
ConnectionFactory.class }, connectionFactoryProxy);
+
+ TestRunner runner = TestRunners.newTestRunner(new PublishJMS());
+ JMSConnectionFactoryProviderDefinition cs =
mock(JMSConnectionFactoryProviderDefinition.class);
+ when(cs.getIdentifier()).thenReturn("cfProvider");
+ when(cs.getConnectionFactory()).thenReturn(cf);
+ runner.addControllerService("cfProvider", cs);
+ runner.enableControllerService(cs);
+
+ runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+
+ String destinationName = "myDestinationName";
+ // The destination option according current implementation should
contain topic or queue to infer the destination type
+ // from the name. Check
https://issues.apache.org/jira/browse/NIFI-7561. Once that is fixed, the name
can be
+ // randomly created.
+ String topicNameInHeader = "topic-foo";
+ runner.setProperty(PublishJMS.DESTINATION, destinationName);
+ runner.setProperty(PublishJMS.DESTINATION_TYPE, PublishJMS.QUEUE);
+
+ int threads = 1;
+ Map<String, String> flowFileAttributes = new HashMap<>();
+ // This method will be removed once
https://issues.apache.org/jira/browse/NIFI-7564 is fixed.
+ flowFileAttributes.put(JmsHeaders.DESTINATION, topicNameInHeader);
+ flowFileAttributes.put(JmsHeaders.REPLY_TO, topicNameInHeader);
+ runner.setThreadCount(threads);
+ runner.enqueue("hi".getBytes(), flowFileAttributes);
+ runner.enqueue("hi".getBytes(), flowFileAttributes);
+ runner.run(2);
+ assertTrue("It is expected at least " + threads + " Connection to
be opened.", threads == connectionFactoryProxy.openedConnections());
+ assertTrue("It is expected " + threads + " Session to be opened
and there are " + connectionFactoryProxy.openedSessions(), threads ==
connectionFactoryProxy.openedSessions());
+ assertTrue("It is expected " + threads + " MessageProducer to be
opened and there are " + connectionFactoryProxy.openedProducers(), threads ==
connectionFactoryProxy.openedProducers());
+ assertTrue("Some resources were not closed.",
connectionFactoryProxy.isAllResourcesClosed());
+ } finally {
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+ }
+
+ /**
+ * <p>
+ * This test validates the optimal resources usage. To process one message
is expected to create only one connection, one session and one message producer.
+ * </p>
+ * <p>
+ * See <a
href="NIFI-7563">https://issues.apache.org/jira/browse/NIFI-7563</a> for
details.
+ * </p>
+ * @throws Exception any error related to the broker.
+ */
+ @Test(timeout = 10000)
+ public void validateNIFI7563UsingMultipleThreads() throws Exception {
+ BrokerService broker = new BrokerService();
+ try {
+ broker.setPersistent(false);
+ TransportConnector connector =
broker.addConnector("tcp://127.0.0.1:0");
+ int port = connector.getServer().getSocketAddress().getPort();
+ broker.start();
+
+ final ActiveMQConnectionFactory innerCf = new
ActiveMQConnectionFactory("tcp://127.0.0.1:" + port);
+ ConnectionFactoryInvocationHandler connectionFactoryProxy = new
ConnectionFactoryInvocationHandler(innerCf);
+
+ // Create a connection Factory proxy to catch metrics and usage.
+ ConnectionFactory cf = (ConnectionFactory)
Proxy.newProxyInstance(ConnectionFactory.class.getClassLoader(), new Class[] {
ConnectionFactory.class }, connectionFactoryProxy);
+
+ TestRunner runner = TestRunners.newTestRunner(new PublishJMS());
+ JMSConnectionFactoryProviderDefinition cs =
mock(JMSConnectionFactoryProviderDefinition.class);
+ when(cs.getIdentifier()).thenReturn("cfProvider");
+ when(cs.getConnectionFactory()).thenReturn(cf);
+ runner.addControllerService("cfProvider", cs);
+ runner.enableControllerService(cs);
+
+ runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+
+ String destinationName = "myDestinationName";
+ // The destination option according current implementation should
contain topic or queue to infer the destination type
+ // from the name. Check
https://issues.apache.org/jira/browse/NIFI-7561. Once that is fixed, the name
can be
+ // randomly created.
+ String topicNameInHeader = "topic-foo";
+ runner.setProperty(PublishJMS.DESTINATION, destinationName);
+ runner.setProperty(PublishJMS.DESTINATION_TYPE, PublishJMS.QUEUE);
+
+ int messagesToGenerate = 1000;
+ int threads = 10;
+ runner.setThreadCount(threads);
+ Map<String, String> flowFileAttributes = new HashMap<>();
+ // This method will be removed once
https://issues.apache.org/jira/browse/NIFI-7564 is fixed.
+ flowFileAttributes.put(JmsHeaders.DESTINATION, topicNameInHeader);
+ flowFileAttributes.put(JmsHeaders.REPLY_TO, topicNameInHeader);
+ byte[] messageContent = "hi".getBytes();
+ for (int i = 0; i < messagesToGenerate; i++) {
+ runner.enqueue(messageContent, flowFileAttributes);
+ }
+ runner.run(messagesToGenerate);
+ assertTrue("It is expected at least " + threads + " Connection to
be opened.", connectionFactoryProxy.openedConnections() <= threads);
+ assertTrue("It is expected " + threads + " Session to be opened
and there are " + connectionFactoryProxy.openedSessions(),
connectionFactoryProxy.openedSessions() <= threads);
+ assertTrue("It is expected " + threads + " MessageProducer to be
opened and there are " + connectionFactoryProxy.openedProducers(),
connectionFactoryProxy.openedProducers() <= threads);
+ assertTrue("Some resources were not closed.",
connectionFactoryProxy.isAllResourcesClosed());
+ } finally {
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+ }
+
@Test
public void
whenExceptionIsRaisedDuringConnectionFactoryInitializationTheProcessorShouldBeYielded()
throws Exception {
TestRunner runner = TestRunners.newTestRunner(PublishJMS.class);
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionFactoryInvocationHandler.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionFactoryInvocationHandler.java
new file mode 100644
index 0000000..8510ad6
--- /dev/null
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionFactoryInvocationHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.helpers;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ConnectionFactory}'s invocation handler to be used to create {@link
Proxy} instances. This handler stores
+ * useful information to validate the proper resources handling of underlying
connection factory.
+ */
+@ThreadSafe
+public final class ConnectionFactoryInvocationHandler implements
InvocationHandler {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConnectionFactoryInvocationHandler.class);
+
+ private final ConnectionFactory connectionFactory;
+ private final List<ConnectionInvocationHandler> handlers = new
CopyOnWriteArrayList<>();
+ private final AtomicInteger openedConnections = new AtomicInteger();
+
+ public ConnectionFactoryInvocationHandler(ConnectionFactory
connectionFactory) {
+ this.connectionFactory = Objects.requireNonNull(connectionFactory);
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
+ final Object o =
connectionFactory.getClass().getMethod(method.getName(),
method.getParameterTypes()).invoke(connectionFactory, args);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Method {} called on {}", method.getName(),
connectionFactory);
+ }
+ if ("createConnection".equals(method.getName())) {
+ Connection connection = (Connection) o;
+ ConnectionInvocationHandler cp = new
ConnectionInvocationHandler(connection);
+ handlers.add(cp);
+ openedConnections.incrementAndGet();
+ LOGGER.info("Connection created {}", connection);
+ return (Connection)
Proxy.newProxyInstance(o.getClass().getClassLoader(), new Class[] {
Connection.class }, cp);
+ }
+ return o;
+ }
+
+ /**
+ * @return true if all opened resources were closed.
+ */
+ public boolean isAllResourcesClosed() {
+ boolean closed = true;
+ for (ConnectionInvocationHandler handler : handlers) {
+ boolean handlerClosed = handler.isClosed();
+ closed = closed && handlerClosed;
+ if (!handlerClosed) {
+ LOGGER.warn("Connection is not closed {}",
handler.getConnection());
+ }
+ }
+ return closed;
+ }
+
+ /**
+ * @return number of opened connections.
+ */
+ public int openedConnections() {
+ return openedConnections.get();
+ }
+
+ /**
+ * @return all opened producers.
+ */
+ public int openedProducers() {
+ int producers = 0;
+ for (ConnectionInvocationHandler handler : handlers) {
+ producers += handler.openedProducers();
+ }
+ return producers;
+ }
+
+ /**
+ * @return number of opened sessions.
+ */
+ public int openedSessions() {
+ int sessions = 0;
+ for (ConnectionInvocationHandler handler : handlers) {
+ sessions += handler.openedSessions();
+ }
+ return sessions;
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionInvocationHandler.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionInvocationHandler.java
new file mode 100644
index 0000000..fba5278
--- /dev/null
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionInvocationHandler.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.helpers;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple {@link Connection} proxy utility to detect opened and unclosed
resources.
+ */
+@ThreadSafe
+final class ConnectionInvocationHandler implements InvocationHandler {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConnectionInvocationHandler.class);
+ private final AtomicInteger closeCalled = new AtomicInteger();
+ private final Connection connection;
+ private final List<SessionInvocationHandler> handlers = new
CopyOnWriteArrayList<>();
+ private final AtomicInteger openedSessions = new AtomicInteger();
+
+ public ConnectionInvocationHandler(Connection connection) {
+ this.connection = Objects.requireNonNull(connection);
+ }
+
+ public Connection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
+ final Object o = connection.getClass().getMethod(method.getName(),
method.getParameterTypes()).invoke(connection, args);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Method {} called on {}", method.getName(),
connection);
+ }
+ if (method.getName().equals("createSession")) {
+ Session session = (Session) o;
+ LOGGER.info("Session created {} using connection {}", session,
connection);
+ openedSessions.incrementAndGet();
+ SessionInvocationHandler sp = new
SessionInvocationHandler(session);
+ handlers.add(sp);
+ Session sessionProxy = (Session)
Proxy.newProxyInstance(o.getClass().getClassLoader(), new Class[] {
Session.class }, sp);
+ return sessionProxy;
+ }
+ if ("close".equals(method.getName())) {
+ closeCalled.incrementAndGet();
+ LOGGER.info("Connection close method called {} times for {}",
closeCalled, connection);
+ }
+ return o;
+ }
+
+ /**
+ * @return true if {@link Connection#close()} method was closed to {@link
#connection} and all resources created from the connection were closed too.
+ */
+ public boolean isClosed() {
+ boolean closed = closeCalled.get() >= 1;
+ for (SessionInvocationHandler handler : handlers) {
+ boolean handlerClosed = handler.isClosed();
+ closed = closed && handlerClosed;
+ if (!handlerClosed) {
+ LOGGER.warn("Session is not closed {}", handler.getSession());
+ }
+ }
+ return closed;
+ }
+
+ /**
+ * @return number opened producers.
+ */
+ public int openedProducers() {
+ int producers = 0;
+ for (SessionInvocationHandler handler : handlers) {
+ producers += handler.openedProducers();
+ }
+ return producers;
+ }
+
+ /**
+ * @return the number of opened sessions.
+ */
+ public int openedSessions() {
+ return openedSessions.get();
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/MessageProducerInvocationHandler.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/MessageProducerInvocationHandler.java
new file mode 100644
index 0000000..5fd6f2e
--- /dev/null
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/MessageProducerInvocationHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.helpers;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.MessageProducer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link MessageProducer} invocation handler.
+ */
+final class MessageProducerInvocationHandler implements InvocationHandler {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SessionInvocationHandler.class);
+
+ private final AtomicInteger closeCalled = new AtomicInteger();
+ private final MessageProducer messageProducer;
+
+ public MessageProducerInvocationHandler(MessageProducer messageProducer) {
+ this.messageProducer = Objects.requireNonNull(messageProducer);
+ }
+
+ public MessageProducer getMessageProducer() {
+ return messageProducer;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
+ final Object o =
messageProducer.getClass().getMethod(method.getName(),
method.getParameterTypes()).invoke(messageProducer, args);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Method {} called on {}", method.getName(),
messageProducer);
+ }
+ if ("close".equals(method.getName())) {
+ closeCalled.incrementAndGet();
+ LOGGER.info("MessageProducer closed {}", messageProducer);
+ }
+ return o;
+ }
+
+ public boolean isClosed() {
+ return closeCalled.get() >= 1;
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/SessionInvocationHandler.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/SessionInvocationHandler.java
new file mode 100644
index 0000000..d73415d
--- /dev/null
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/SessionInvocationHandler.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.helpers;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Session} invocation handler for Session proxy instances.
+ * @see ConnectionFactoryInvocationHandler
+ */
+final class SessionInvocationHandler implements InvocationHandler {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SessionInvocationHandler.class);
+
+ private final AtomicInteger closeCalled = new AtomicInteger();
+ private final List<MessageProducerInvocationHandler> handlers = new
CopyOnWriteArrayList<>();
+ private final AtomicInteger openedProducers = new AtomicInteger();
+ private final Session session;
+
+ public SessionInvocationHandler(Session session) {
+ this.session = Objects.requireNonNull(session);
+ }
+
+ public Session getSession() {
+ return session;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
+ final Object o = session.getClass().getMethod(method.getName(),
method.getParameterTypes()).invoke(session, args);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Method {} called on {}", method.getName(), session);
+ }
+ if (method.getName().equals("createProducer")) {
+ MessageProducer messageProducer = (MessageProducer) o;
+ LOGGER.info("Created a Message Producer {} using session {}",
messageProducer, session);
+ openedProducers.incrementAndGet();
+ MessageProducerInvocationHandler mp = new
MessageProducerInvocationHandler(messageProducer);
+ handlers.add(mp);
+ MessageProducer messageProducerProxy = (MessageProducer)
Proxy.newProxyInstance(o.getClass().getClassLoader(), new Class[] {
MessageProducer.class }, mp);
+ return messageProducerProxy;
+ }
+ if ("close".equals(method.getName())) {
+ closeCalled.incrementAndGet();
+ LOGGER.info("Session close method called {} times for {}",
closeCalled, session);
+ }
+ return o;
+ }
+
+ public boolean isClosed() {
+ boolean closed = closeCalled.get() >= 1;
+ for (MessageProducerInvocationHandler handler : handlers) {
+ boolean handlerClosed = handler.isClosed();
+ closed = closed && handlerClosed;
+ if (!handlerClosed) {
+ LOGGER.warn("MessageProducer is not closed {}",
handler.getMessageProducer());
+ }
+ }
+ return closed;
+ }
+
+ public int openedProducers() {
+ return openedProducers.get();
+ }
+
+}