This is an automated email from the ASF dual-hosted git repository.

jfrazee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new e158420  NIFI-7563: Optimize the usage of JMS sessions and message 
producers
e158420 is described below

commit e1584207d114d4915cc793d633039b724f6a07cb
Author: Gardella Juan Pablo <[email protected]>
AuthorDate: Wed Jul 1 06:30:39 2020 -0300

    NIFI-7563: Optimize the usage of JMS sessions and message producers
    
    The introduced changes prevent creating unnecesary sessions and producers
    in some scenarios.
    
    This closes #4378.
    
    Signed-off-by: Joey Frazee <[email protected]>
---
 .../apache/nifi/jms/processors/JMSPublisher.java   |  35 ++----
 .../apache/nifi/jms/processors/PublishJMSIT.java   | 128 +++++++++++++++++++++
 .../ConnectionFactoryInvocationHandler.java        | 112 ++++++++++++++++++
 .../helpers/ConnectionInvocationHandler.java       | 108 +++++++++++++++++
 .../helpers/MessageProducerInvocationHandler.java  |  63 ++++++++++
 .../helpers/SessionInvocationHandler.java          |  91 +++++++++++++++
 6 files changed, 511 insertions(+), 26 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index 17d5690..3a65bab 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -20,17 +20,14 @@ import org.apache.nifi.logging.ComponentLog;
 import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.MessageCreator;
-import org.springframework.jms.core.SessionCallback;
 import org.springframework.jms.support.JmsHeaders;
 
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import javax.jms.Topic;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -56,7 +53,7 @@ final class JMSPublisher extends JMSWorker {
             public Message createMessage(Session session) throws JMSException {
                 BytesMessage message = session.createBytesMessage();
                 message.writeBytes(messageBytes);
-                setMessageHeaderAndProperties(message, flowFileAttributes);
+                setMessageHeaderAndProperties(session, message, 
flowFileAttributes);
                 return message;
             }
         });
@@ -67,13 +64,13 @@ final class JMSPublisher extends JMSWorker {
             @Override
             public Message createMessage(Session session) throws JMSException {
                 TextMessage message = session.createTextMessage(messageText);
-                setMessageHeaderAndProperties(message, flowFileAttributes);
+                setMessageHeaderAndProperties(session, message, 
flowFileAttributes);
                 return message;
             }
         });
     }
 
-    void setMessageHeaderAndProperties(final Message message, final 
Map<String, String> flowFileAttributes) throws JMSException {
+    void setMessageHeaderAndProperties(final Session session, final Message 
message, final Map<String, String> flowFileAttributes) throws JMSException {
         if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) {
 
             for (Entry<String, String> entry : flowFileAttributes.entrySet()) {
@@ -96,14 +93,14 @@ final class JMSPublisher extends JMSWorker {
                     } else if (entry.getKey().equals(JmsHeaders.TYPE)) {
                         message.setJMSType(entry.getValue());
                     } else if (entry.getKey().equals(JmsHeaders.REPLY_TO)) {
-                        Destination destination = 
buildDestination(entry.getValue());
+                        Destination destination = buildDestination(session, 
entry.getValue());
                         if (destination != null) {
                             message.setJMSReplyTo(destination);
                         } else {
                             logUnbuildableDestination(entry.getValue(), 
JmsHeaders.REPLY_TO);
                         }
                     } else if (entry.getKey().equals(JmsHeaders.DESTINATION)) {
-                        Destination destination = 
buildDestination(entry.getValue());
+                        Destination destination = buildDestination(session, 
entry.getValue());
                         if (destination != null) {
                             message.setJMSDestination(destination);
                         } else {
@@ -128,27 +125,13 @@ final class JMSPublisher extends JMSWorker {
     }
 
 
-    private Destination buildDestination(final String destinationName) {
-        Destination destination;
+    private static Destination buildDestination(final Session session, final 
String destinationName) throws JMSException {
         if (destinationName.toLowerCase().contains("topic")) {
-            destination = this.jmsTemplate.execute(new 
SessionCallback<Topic>() {
-                @Override
-                public Topic doInJms(Session session) throws JMSException {
-                    return session.createTopic(destinationName);
-                }
-            });
+            return session.createTopic(destinationName);
         } else if (destinationName.toLowerCase().contains("queue")) {
-            destination = this.jmsTemplate.execute(new 
SessionCallback<Queue>() {
-                @Override
-                public Queue doInJms(Session session) throws JMSException {
-                    return session.createQueue(destinationName);
-                }
-            });
-        } else {
-            destination = null;
+            return session.createQueue(destinationName);
         }
-
-        return destination;
+        return null;
     }
 
     /**
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
index b901b73..8070194 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
@@ -33,6 +33,7 @@ import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
 import org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties;
+import 
org.apache.nifi.jms.processors.helpers.ConnectionFactoryInvocationHandler;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -45,6 +46,7 @@ import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.support.JmsHeaders;
 
 import java.io.IOException;
+import java.lang.reflect.Proxy;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.util.HashMap;
@@ -325,6 +327,7 @@ public class PublishJMSIT {
 
         runner.run(1, true, false); // Run once just so that we can trigger 
the shutdown of the Connection Factory
     }
+
     /**
      * <p>
      * This test validates the connection resources are closed if the 
publisher is marked as invalid.
@@ -385,6 +388,131 @@ public class PublishJMSIT {
         }
     }
 
+    /**
+     * <p>
+     * This test validates the optimal resources usage. To process one message 
is expected to create only one connection, one session and one message producer.
+     * </p>
+     * <p>
+     * See <a 
href="NIFI-7563">https://issues.apache.org/jira/browse/NIFI-7563</a> for 
details.
+     * </p>
+     * @throws Exception any error related to the broker.
+     */
+    @Test(timeout = 10000)
+    public void validateNIFI7563UsingOneThread() throws Exception {
+        BrokerService broker = new BrokerService();
+        try {
+            broker.setPersistent(false);
+            TransportConnector connector = 
broker.addConnector("tcp://127.0.0.1:0");
+            int port = connector.getServer().getSocketAddress().getPort();
+            broker.start();
+
+            final ActiveMQConnectionFactory innerCf = new 
ActiveMQConnectionFactory("tcp://127.0.0.1:" + port);
+            ConnectionFactoryInvocationHandler connectionFactoryProxy = new 
ConnectionFactoryInvocationHandler(innerCf);
+
+            // Create a connection Factory proxy to catch metrics and usage.
+            ConnectionFactory cf = (ConnectionFactory) 
Proxy.newProxyInstance(ConnectionFactory.class.getClassLoader(), new Class[] { 
ConnectionFactory.class }, connectionFactoryProxy);
+
+            TestRunner runner = TestRunners.newTestRunner(new PublishJMS());
+            JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+            when(cs.getIdentifier()).thenReturn("cfProvider");
+            when(cs.getConnectionFactory()).thenReturn(cf);
+            runner.addControllerService("cfProvider", cs);
+            runner.enableControllerService(cs);
+
+            runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+
+            String destinationName = "myDestinationName";
+            // The destination option according current implementation should 
contain topic or queue to infer the destination type
+            // from the name. Check 
https://issues.apache.org/jira/browse/NIFI-7561. Once that is fixed, the name 
can be
+            // randomly created.
+            String topicNameInHeader = "topic-foo";
+            runner.setProperty(PublishJMS.DESTINATION, destinationName);
+            runner.setProperty(PublishJMS.DESTINATION_TYPE, PublishJMS.QUEUE);
+
+            int threads = 1;
+            Map<String, String> flowFileAttributes = new HashMap<>();
+            // This method will be removed once 
https://issues.apache.org/jira/browse/NIFI-7564 is fixed.
+            flowFileAttributes.put(JmsHeaders.DESTINATION, topicNameInHeader);
+            flowFileAttributes.put(JmsHeaders.REPLY_TO, topicNameInHeader);
+            runner.setThreadCount(threads);
+            runner.enqueue("hi".getBytes(), flowFileAttributes);
+            runner.enqueue("hi".getBytes(), flowFileAttributes);
+            runner.run(2);
+            assertTrue("It is expected at least " + threads + " Connection to 
be opened.", threads == connectionFactoryProxy.openedConnections());
+            assertTrue("It is expected " + threads + " Session to be opened 
and there are " + connectionFactoryProxy.openedSessions(), threads == 
connectionFactoryProxy.openedSessions());
+            assertTrue("It is expected " + threads + " MessageProducer to be 
opened and there are " + connectionFactoryProxy.openedProducers(), threads == 
connectionFactoryProxy.openedProducers());
+            assertTrue("Some resources were not closed.", 
connectionFactoryProxy.isAllResourcesClosed());
+        } finally {
+            if (broker != null) {
+                broker.stop();
+            }
+        }
+    }
+
+    /**
+     * <p>
+     * This test validates the optimal resources usage. To process one message 
is expected to create only one connection, one session and one message producer.
+     * </p>
+     * <p>
+     * See <a 
href="NIFI-7563">https://issues.apache.org/jira/browse/NIFI-7563</a> for 
details.
+     * </p>
+     * @throws Exception any error related to the broker.
+     */
+    @Test(timeout = 10000)
+    public void validateNIFI7563UsingMultipleThreads() throws Exception {
+        BrokerService broker = new BrokerService();
+        try {
+            broker.setPersistent(false);
+            TransportConnector connector = 
broker.addConnector("tcp://127.0.0.1:0");
+            int port = connector.getServer().getSocketAddress().getPort();
+            broker.start();
+
+            final ActiveMQConnectionFactory innerCf = new 
ActiveMQConnectionFactory("tcp://127.0.0.1:" + port);
+            ConnectionFactoryInvocationHandler connectionFactoryProxy = new 
ConnectionFactoryInvocationHandler(innerCf);
+
+            // Create a connection Factory proxy to catch metrics and usage.
+            ConnectionFactory cf = (ConnectionFactory) 
Proxy.newProxyInstance(ConnectionFactory.class.getClassLoader(), new Class[] { 
ConnectionFactory.class }, connectionFactoryProxy);
+
+            TestRunner runner = TestRunners.newTestRunner(new PublishJMS());
+            JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+            when(cs.getIdentifier()).thenReturn("cfProvider");
+            when(cs.getConnectionFactory()).thenReturn(cf);
+            runner.addControllerService("cfProvider", cs);
+            runner.enableControllerService(cs);
+
+            runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+
+            String destinationName = "myDestinationName";
+            // The destination option according current implementation should 
contain topic or queue to infer the destination type
+            // from the name. Check 
https://issues.apache.org/jira/browse/NIFI-7561. Once that is fixed, the name 
can be
+            // randomly created.
+            String topicNameInHeader = "topic-foo";
+            runner.setProperty(PublishJMS.DESTINATION, destinationName);
+            runner.setProperty(PublishJMS.DESTINATION_TYPE, PublishJMS.QUEUE);
+
+            int messagesToGenerate = 1000;
+            int threads = 10;
+            runner.setThreadCount(threads);
+            Map<String, String> flowFileAttributes = new HashMap<>();
+            // This method will be removed once 
https://issues.apache.org/jira/browse/NIFI-7564 is fixed.
+            flowFileAttributes.put(JmsHeaders.DESTINATION, topicNameInHeader);
+            flowFileAttributes.put(JmsHeaders.REPLY_TO, topicNameInHeader);
+            byte[] messageContent = "hi".getBytes();
+            for (int i = 0; i < messagesToGenerate; i++) {
+                runner.enqueue(messageContent, flowFileAttributes);
+            }
+            runner.run(messagesToGenerate);
+            assertTrue("It is expected at least " + threads + " Connection to 
be opened.", connectionFactoryProxy.openedConnections() <= threads);
+            assertTrue("It is expected " + threads + " Session to be opened 
and there are " + connectionFactoryProxy.openedSessions(), 
connectionFactoryProxy.openedSessions() <= threads);
+            assertTrue("It is expected " + threads + " MessageProducer to be 
opened and there are " + connectionFactoryProxy.openedProducers(), 
connectionFactoryProxy.openedProducers() <= threads);
+            assertTrue("Some resources were not closed.", 
connectionFactoryProxy.isAllResourcesClosed());
+        } finally {
+            if (broker != null) {
+                broker.stop();
+            }
+        }
+    }
+
     @Test
     public void 
whenExceptionIsRaisedDuringConnectionFactoryInitializationTheProcessorShouldBeYielded()
 throws Exception {
         TestRunner runner = TestRunners.newTestRunner(PublishJMS.class);
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionFactoryInvocationHandler.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionFactoryInvocationHandler.java
new file mode 100644
index 0000000..8510ad6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionFactoryInvocationHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.helpers;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ConnectionFactory}'s invocation handler to be used to create {@link 
Proxy} instances. This handler stores
+ * useful information to validate the proper resources handling of underlying 
connection factory.
+ */
+@ThreadSafe
+public final class ConnectionFactoryInvocationHandler implements 
InvocationHandler {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ConnectionFactoryInvocationHandler.class);
+
+    private final ConnectionFactory connectionFactory;
+    private final List<ConnectionInvocationHandler> handlers = new 
CopyOnWriteArrayList<>();
+    private final AtomicInteger openedConnections = new AtomicInteger();
+
+    public ConnectionFactoryInvocationHandler(ConnectionFactory 
connectionFactory) {
+        this.connectionFactory = Objects.requireNonNull(connectionFactory);
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+        final Object o = 
connectionFactory.getClass().getMethod(method.getName(), 
method.getParameterTypes()).invoke(connectionFactory, args);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Method {} called on {}", method.getName(), 
connectionFactory);
+        }
+        if ("createConnection".equals(method.getName())) {
+            Connection connection = (Connection) o;
+            ConnectionInvocationHandler cp = new 
ConnectionInvocationHandler(connection);
+            handlers.add(cp);
+            openedConnections.incrementAndGet();
+            LOGGER.info("Connection created {}", connection);
+            return (Connection) 
Proxy.newProxyInstance(o.getClass().getClassLoader(), new Class[] { 
Connection.class }, cp);
+        }
+        return o;
+    }
+
+    /**
+     * @return true if all opened resources were closed.
+     */
+    public boolean isAllResourcesClosed() {
+        boolean closed = true;
+        for (ConnectionInvocationHandler handler : handlers) {
+            boolean handlerClosed = handler.isClosed();
+            closed = closed && handlerClosed;
+            if (!handlerClosed) {
+                LOGGER.warn("Connection is not closed {}", 
handler.getConnection());
+            }
+        }
+        return closed;
+    }
+
+    /**
+     * @return number of opened connections.
+     */
+    public int openedConnections() {
+        return openedConnections.get();
+    }
+
+    /**
+     * @return all opened producers.
+     */
+    public int openedProducers() {
+        int producers = 0;
+        for (ConnectionInvocationHandler handler : handlers) {
+            producers += handler.openedProducers();
+        }
+        return producers;
+    }
+
+    /**
+     * @return number of opened sessions.
+     */
+    public int openedSessions() {
+        int sessions = 0;
+        for (ConnectionInvocationHandler handler : handlers) {
+            sessions += handler.openedSessions();
+        }
+        return sessions;
+    }
+
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionInvocationHandler.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionInvocationHandler.java
new file mode 100644
index 0000000..fba5278
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/ConnectionInvocationHandler.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.helpers;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple {@link Connection} proxy utility to detect opened and unclosed 
resources.
+ */
+@ThreadSafe
+final class ConnectionInvocationHandler implements InvocationHandler {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ConnectionInvocationHandler.class);
+    private final AtomicInteger closeCalled = new AtomicInteger();
+    private final Connection connection;
+    private final List<SessionInvocationHandler> handlers = new 
CopyOnWriteArrayList<>();
+    private final AtomicInteger openedSessions = new AtomicInteger();
+
+    public ConnectionInvocationHandler(Connection connection) {
+        this.connection = Objects.requireNonNull(connection);
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+        final Object o = connection.getClass().getMethod(method.getName(), 
method.getParameterTypes()).invoke(connection, args);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Method {} called on {}", method.getName(), 
connection);
+        }
+        if (method.getName().equals("createSession")) {
+            Session session = (Session) o;
+            LOGGER.info("Session created {} using connection {}", session, 
connection);
+            openedSessions.incrementAndGet();
+            SessionInvocationHandler sp = new 
SessionInvocationHandler(session);
+            handlers.add(sp);
+            Session sessionProxy = (Session) 
Proxy.newProxyInstance(o.getClass().getClassLoader(), new Class[] { 
Session.class }, sp);
+            return sessionProxy;
+        }
+        if ("close".equals(method.getName())) {
+            closeCalled.incrementAndGet();
+            LOGGER.info("Connection close method called {} times for {}", 
closeCalled, connection);
+        }
+        return o;
+    }
+
+    /**
+     * @return true if {@link Connection#close()} method was closed to {@link 
#connection} and all resources created from the connection were closed too.
+     */
+    public boolean isClosed() {
+        boolean closed = closeCalled.get() >= 1;
+        for (SessionInvocationHandler handler : handlers) {
+            boolean handlerClosed = handler.isClosed();
+            closed = closed && handlerClosed;
+            if (!handlerClosed) {
+                LOGGER.warn("Session is not closed {}", handler.getSession());
+            }
+        }
+        return closed;
+    }
+
+    /**
+     * @return number opened producers.
+     */
+    public int openedProducers() {
+        int producers = 0;
+        for (SessionInvocationHandler handler : handlers) {
+            producers += handler.openedProducers();
+        }
+        return producers;
+    }
+
+    /**
+     * @return the number of opened sessions.
+     */
+    public int openedSessions() {
+        return openedSessions.get();
+    }
+
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/MessageProducerInvocationHandler.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/MessageProducerInvocationHandler.java
new file mode 100644
index 0000000..5fd6f2e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/MessageProducerInvocationHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.helpers;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.MessageProducer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link MessageProducer} invocation handler.
+ */
+final class MessageProducerInvocationHandler implements InvocationHandler {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SessionInvocationHandler.class);
+
+    private final AtomicInteger closeCalled = new AtomicInteger();
+    private final MessageProducer messageProducer;
+
+    public MessageProducerInvocationHandler(MessageProducer messageProducer) {
+        this.messageProducer = Objects.requireNonNull(messageProducer);
+    }
+
+    public MessageProducer getMessageProducer() {
+        return messageProducer;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+        final Object o = 
messageProducer.getClass().getMethod(method.getName(), 
method.getParameterTypes()).invoke(messageProducer, args);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Method {} called on {}", method.getName(), 
messageProducer);
+        }
+        if ("close".equals(method.getName())) {
+            closeCalled.incrementAndGet();
+            LOGGER.info("MessageProducer closed {}", messageProducer);
+        }
+        return o;
+    }
+
+    public boolean isClosed() {
+        return closeCalled.get() >= 1;
+    }
+
+}
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/SessionInvocationHandler.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/SessionInvocationHandler.java
new file mode 100644
index 0000000..d73415d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/helpers/SessionInvocationHandler.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors.helpers;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Session} invocation handler for Session proxy instances.
+ * @see ConnectionFactoryInvocationHandler
+ */
+final class SessionInvocationHandler implements InvocationHandler {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SessionInvocationHandler.class);
+
+    private final AtomicInteger closeCalled = new AtomicInteger();
+    private final List<MessageProducerInvocationHandler> handlers = new 
CopyOnWriteArrayList<>();
+    private final AtomicInteger openedProducers = new AtomicInteger();
+    private final Session session;
+
+    public SessionInvocationHandler(Session session) {
+        this.session = Objects.requireNonNull(session);
+    }
+
+    public Session getSession() {
+        return session;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+        final Object o = session.getClass().getMethod(method.getName(), 
method.getParameterTypes()).invoke(session, args);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Method {} called on {}", method.getName(), session);
+        }
+        if (method.getName().equals("createProducer")) {
+            MessageProducer messageProducer = (MessageProducer) o;
+            LOGGER.info("Created a Message Producer {} using session {}", 
messageProducer, session);
+            openedProducers.incrementAndGet();
+            MessageProducerInvocationHandler mp = new 
MessageProducerInvocationHandler(messageProducer);
+            handlers.add(mp);
+            MessageProducer messageProducerProxy = (MessageProducer) 
Proxy.newProxyInstance(o.getClass().getClassLoader(), new Class[] { 
MessageProducer.class }, mp);
+            return messageProducerProxy;
+        }
+        if ("close".equals(method.getName())) {
+            closeCalled.incrementAndGet();
+            LOGGER.info("Session close method called {} times for {}", 
closeCalled, session);
+        }
+        return o;
+    }
+
+    public boolean isClosed() {
+        boolean closed = closeCalled.get() >= 1;
+        for (MessageProducerInvocationHandler handler : handlers) {
+            boolean handlerClosed = handler.isClosed();
+            closed = closed && handlerClosed;
+            if (!handlerClosed) {
+                LOGGER.warn("MessageProducer is not closed {}", 
handler.getMessageProducer());
+            }
+        }
+        return closed;
+    }
+
+    public int openedProducers() {
+        return openedProducers.get();
+    }
+
+}

Reply via email to