Repository: nifi
Updated Branches:
  refs/heads/master b5938062a -> 3ca7c3e7a


NIFI-4834: Updated AbstractJMSProcessor to use a separate 
SingleConnectionFactory per concurrent task instead of sharing one across the 
entire processor.

This closes #2445.

Signed-off-by: Andy LoPresto <alopre...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3ca7c3e7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3ca7c3e7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3ca7c3e7

Branch: refs/heads/master
Commit: 3ca7c3e7a19c7f496a214d9b93ace43b1507c9ec
Parents: b593806
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Jan 31 11:50:42 2018 -0500
Committer: Andy LoPresto <alopre...@apache.org>
Committed: Fri Feb 2 16:10:07 2018 -0500

----------------------------------------------------------------------
 .../jms/processors/AbstractJMSProcessor.java    | 123 ++++----
 .../apache/nifi/jms/processors/ConsumeJMS.java  |  62 ++--
 .../apache/nifi/jms/processors/JMSConsumer.java | 172 +++++------
 .../nifi/jms/processors/JMSPublisher.java       |  48 +--
 .../apache/nifi/jms/processors/JMSWorker.java   |  14 +-
 .../apache/nifi/jms/processors/PublishJMS.java  |  22 +-
 .../apache/nifi/jms/processors/CommonTest.java  |   6 +-
 .../nifi/jms/processors/ConsumeJMSTest.java     |  68 ++---
 .../processors/JMSPublisherConsumerTest.java    | 293 ++++++++++++-------
 .../nifi/jms/processors/PublishJMSTest.java     |  10 +-
 10 files changed, 415 insertions(+), 403 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3ca7c3e7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 39118a7..2758bfe 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -18,9 +18,13 @@ package org.apache.nifi.jms.processors;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.ConnectionFactory;
 
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
@@ -38,9 +42,7 @@ import org.springframework.jms.core.JmsTemplate;
 /**
  * Base JMS processor to support implementation of JMS producers and consumers.
  *
- * @param <T>
- *            the type of {@link JMSWorker} which could be {@link JMSPublisher}
- *            or {@link JMSConsumer}
+ * @param <T> the type of {@link JMSWorker} which could be {@link 
JMSPublisher} or {@link JMSConsumer}
  * @see PublishJMS
  * @see ConsumeJMS
  * @see JMSConnectionFactoryProviderDefinition
@@ -48,7 +50,6 @@ import org.springframework.jms.core.JmsTemplate;
 abstract class AbstractJMSProcessor<T extends JMSWorker> extends 
AbstractProcessor {
 
     static final String QUEUE = "QUEUE";
-
     static final String TOPIC = "TOPIC";
 
     static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
@@ -90,14 +91,13 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
             .build();
     static final PropertyDescriptor SESSION_CACHE_SIZE = new 
PropertyDescriptor.Builder()
             .name("Session Cache size")
-            .description("The maximum limit for the number of cached 
Sessions.")
-            .required(true)
+            .description("This property is deprecated and no longer has any 
effect on the Processor. It will be removed in a later version.")
+            .required(false)
             .defaultValue("1")
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             .build();
 
 
-    // ConnectionFactoryProvider ControllerService
     static final PropertyDescriptor CF_SERVICE = new 
PropertyDescriptor.Builder()
             .name("Connection Factory Service")
             .description("The Controller Service that is used to obtain 
ConnectionFactory")
@@ -106,11 +106,9 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
             .build();
 
     static final List<PropertyDescriptor> propertyDescriptors = new 
ArrayList<>();
+    private volatile BlockingQueue<T> workerPool;
+    private final AtomicInteger clientIdCounter = new AtomicInteger(1);
 
-    /*
-     * Will ensure that list of PropertyDescriptors is build only once, since
-     * all other lifecycle methods are invoked multiple times.
-     */
     static {
         propertyDescriptors.add(CF_SERVICE);
         propertyDescriptors.add(DESTINATION);
@@ -121,47 +119,35 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
         propertyDescriptors.add(SESSION_CACHE_SIZE);
     }
 
-    protected volatile T targetResource;
-
-    private volatile CachingConnectionFactory cachingConnectionFactory;
-
-    /**
-     *
-     */
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return propertyDescriptors;
     }
 
-    /**
-     * Builds target resource ({@link JMSPublisher} or {@link JMSConsumer}) 
upon
-     * first invocation while delegating to the sub-classes ( {@link 
PublishJMS}
-     * or {@link ConsumeJMS}) via
-     * {@link #rendezvousWithJms(ProcessContext, ProcessSession)} method.
-     */
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        synchronized (this) {
-            this.buildTargetResource(context);
+        T worker = workerPool.poll();
+        if (worker == null) {
+            worker = buildTargetResource(context);
         }
-        this.rendezvousWithJms(context, session);
+
+        rendezvousWithJms(context, session, worker);
+        workerPool.offer(worker);
     }
 
-    /**
-     * Will destroy the instance of {@link CachingConnectionFactory} and sets
-     * 'targetResource' to null;
-     */
+    @OnScheduled
+    public void setupWorkerPool(final ProcessContext context) {
+        workerPool = new 
LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+    }
+
+
     @OnStopped
     public void close() {
-        if (this.cachingConnectionFactory != null) {
-            this.cachingConnectionFactory.destroy();
+        T worker;
+        while ((worker = workerPool.poll()) != null) {
+            worker.shutdown();
         }
-        this.targetResource = null;
-    }
-
-    @Override
-    public String toString() {
-        return this.getClass().getSimpleName() + " - " + this.targetResource;
     }
 
     /**
@@ -169,23 +155,16 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
      * {@link #onTrigger(ProcessContext, ProcessSession)} operation. It is
      * implemented by sub-classes to perform {@link Processor} specific
      * functionality.
-     *
-     * @param context
-     *            instance of {@link ProcessContext}
-     * @param session
-     *            instance of {@link ProcessSession}
      */
-    protected abstract void rendezvousWithJms(ProcessContext context, 
ProcessSession session) throws ProcessException;
+    protected abstract void rendezvousWithJms(ProcessContext context, 
ProcessSession session, T jmsWorker) throws ProcessException;
 
     /**
      * Finishes building one of the {@link JMSWorker} subclasses T.
      *
-     * @param jmsTemplate instance of {@link JmsTemplate}
-     *
      * @see JMSPublisher
      * @see JMSConsumer
      */
-    protected abstract T finishBuildingTargetResource(JmsTemplate jmsTemplate, 
ProcessContext processContext);
+    protected abstract T finishBuildingJmsWorker(CachingConnectionFactory 
connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext);
 
     /**
      * This method essentially performs initialization of this Processor by
@@ -195,30 +174,30 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
      * in an instance of the {@link CachingConnectionFactory} used to construct
      * {@link JmsTemplate} used by this Processor.
      */
-    private void buildTargetResource(ProcessContext context) {
-        if (this.targetResource == null) {
-            JMSConnectionFactoryProviderDefinition cfProvider = 
context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
-            ConnectionFactory connectionFactory = 
cfProvider.getConnectionFactory();
-
-            UserCredentialsConnectionFactoryAdapter cfCredentialsAdapter = new 
UserCredentialsConnectionFactoryAdapter();
-            cfCredentialsAdapter.setTargetConnectionFactory(connectionFactory);
-            
cfCredentialsAdapter.setUsername(context.getProperty(USER).getValue());
-            
cfCredentialsAdapter.setPassword(context.getProperty(PASSWORD).getValue());
-
-            this.cachingConnectionFactory = new 
CachingConnectionFactory(cfCredentialsAdapter);
-            
this.cachingConnectionFactory.setSessionCacheSize(Integer.parseInt(context.getProperty(SESSION_CACHE_SIZE).getValue()));
-            String clientId = 
context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue();
-            if (clientId != null) {
-                this.cachingConnectionFactory.setClientId(clientId);
-            }
-            JmsTemplate jmsTemplate = new JmsTemplate();
-            jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
-            
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
-
-            // set of properties that may be good candidates for exposure via 
configuration
-            jmsTemplate.setReceiveTimeout(1000);
-
-            this.targetResource = 
this.finishBuildingTargetResource(jmsTemplate, context);
+    private T buildTargetResource(ProcessContext context) {
+        final JMSConnectionFactoryProviderDefinition cfProvider = 
context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
+        final ConnectionFactory connectionFactory = 
cfProvider.getConnectionFactory();
+
+        final UserCredentialsConnectionFactoryAdapter cfCredentialsAdapter = 
new UserCredentialsConnectionFactoryAdapter();
+        cfCredentialsAdapter.setTargetConnectionFactory(connectionFactory);
+        cfCredentialsAdapter.setUsername(context.getProperty(USER).getValue());
+        
cfCredentialsAdapter.setPassword(context.getProperty(PASSWORD).getValue());
+
+        final CachingConnectionFactory cachingFactory = new 
CachingConnectionFactory(cfCredentialsAdapter);
+
+        String clientId = 
context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue();
+        if (clientId != null) {
+            clientId = clientId + "-" + clientIdCounter.getAndIncrement();
+            cachingFactory.setClientId(clientId);
         }
+
+        JmsTemplate jmsTemplate = new JmsTemplate();
+        jmsTemplate.setConnectionFactory(cachingFactory);
+        
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
+
+        // set of properties that may be good candidates for exposure via 
configuration
+        jmsTemplate.setReceiveTimeout(1000);
+
+        return finishBuildingJmsWorker(cachingFactory, jmsTemplate, context);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3ca7c3e7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 8774397..a199411 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.jms.processors;
 
-import java.io.IOException;
-import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,8 +42,8 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 
 /**
@@ -146,35 +144,34 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
      * 'success' {@link Relationship}.
      */
     @Override
-    protected void rendezvousWithJms(final ProcessContext context, final 
ProcessSession processSession) throws ProcessException {
+    protected void rendezvousWithJms(final ProcessContext context, final 
ProcessSession processSession, final JMSConsumer consumer) throws 
ProcessException {
         final String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
         final Boolean durableBoolean = 
context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
         final boolean durable = durableBoolean == null ? false : 
durableBoolean;
         final Boolean sharedBoolean = 
context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
         final boolean shared = sharedBoolean == null ? false : sharedBoolean;
         final String subscriptionName = 
context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
-        this.targetResource.consume(destinationName, durable, shared, 
subscriptionName, new ConsumerCallback(){
+
+        consumer.consume(destinationName, durable, shared, subscriptionName, 
new ConsumerCallback() {
             @Override
             public void accept(final JMSResponse response) {
-                if (response != null){
-                    FlowFile flowFile = processSession.create();
-                    flowFile = processSession.write(flowFile, new 
OutputStreamCallback() {
-                        @Override
-                        public void process(final OutputStream out) throws 
IOException {
-                            out.write(response.getMessageBody());
-                        }
-                    });
-                    Map<String, Object> jmsHeaders = 
response.getMessageHeaders();
-                    Map<String, Object> jmsProperties = Collections.<String, 
Object>unmodifiableMap(response.getMessageProperties());
-                    flowFile = 
ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, 
processSession);
-                    flowFile = 
ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, 
flowFile, processSession);
-                    flowFile = processSession.putAttribute(flowFile, 
JMS_SOURCE_DESTINATION_NAME, destinationName);
-                    processSession.getProvenanceReporter().receive(flowFile, 
destinationName);
-                    processSession.transfer(flowFile, REL_SUCCESS);
-                    processSession.commit();
-                } else {
-                    context.yield();
+                if (response == null) {
+                    return;
                 }
+
+                FlowFile flowFile = processSession.create();
+                flowFile = processSession.write(flowFile, out -> 
out.write(response.getMessageBody()));
+
+                final Map<String, String> jmsHeaders = 
response.getMessageHeaders();
+                final Map<String, String> jmsProperties = 
response.getMessageProperties();
+
+                flowFile = 
ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, 
processSession);
+                flowFile = 
ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, 
flowFile, processSession);
+                flowFile = processSession.putAttribute(flowFile, 
JMS_SOURCE_DESTINATION_NAME, destinationName);
+
+                processSession.getProvenanceReporter().receive(flowFile, 
destinationName);
+                processSession.transfer(flowFile, REL_SUCCESS);
+                processSession.commit();
             }
         });
     }
@@ -183,23 +180,17 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
      * Will create an instance of {@link JMSConsumer}
      */
     @Override
-    protected JMSConsumer finishBuildingTargetResource(JmsTemplate 
jmsTemplate, ProcessContext processContext) {
+    protected JMSConsumer finishBuildingJmsWorker(CachingConnectionFactory 
connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext) {
         int ackMode = 
processContext.getProperty(ACKNOWLEDGEMENT_MODE).asInteger();
         jmsTemplate.setSessionAcknowledgeMode(ackMode);
-        return new JMSConsumer(jmsTemplate, this.getLogger());
+        return new JMSConsumer(connectionFactory, jmsTemplate, 
this.getLogger());
     }
 
-    /**
-     *
-     */
     @Override
     public Set<Relationship> getRelationships() {
         return relationships;
     }
 
-    /**
-     *
-     */
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return thisPropertyDescriptors;
@@ -211,11 +202,12 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
      * copied values of JMS attributes will be "stringified" via
      * String.valueOf(attribute).
      */
-    private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, 
Object> jmsAttributes, FlowFile flowFile, ProcessSession processSession) {
-        Map<String, String> attributes = new HashMap<String, String>();
-        for (Entry<String, Object> entry : jmsAttributes.entrySet()) {
-            attributes.put(entry.getKey(), String.valueOf(entry.getValue()));
+    private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, 
String> jmsAttributes, FlowFile flowFile, ProcessSession processSession) {
+        Map<String, String> attributes = new HashMap<>();
+        for (Entry<String, String> entry : jmsAttributes.entrySet()) {
+            attributes.put(entry.getKey(), entry.getValue());
         }
+
         flowFile = processSession.putAllAttributes(flowFile, attributes);
         return flowFile;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3ca7c3e7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index a4fc47a..841b62d 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -33,6 +33,7 @@ import javax.jms.Topic;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.SessionCallback;
 import org.springframework.jms.support.JmsHeaders;
@@ -43,63 +44,54 @@ import org.springframework.jms.support.JmsUtils;
  */
 final class JMSConsumer extends JMSWorker {
 
-    /**
-     * Creates an instance of this consumer
-     *
-     * @param jmsTemplate
-     *            instance of {@link JmsTemplate}
-     * @param processLog
-     *            instance of {@link ComponentLog}
-     */
-    JMSConsumer(JmsTemplate jmsTemplate, ComponentLog processLog) {
-        super(jmsTemplate, processLog);
-        if (this.processLog.isInfoEnabled()) {
-            this.processLog.info("Created Message Consumer for '" + 
jmsTemplate.toString() + "'.");
+    JMSConsumer(CachingConnectionFactory connectionFactory, JmsTemplate 
jmsTemplate, ComponentLog logger) {
+        super(connectionFactory, jmsTemplate, logger);
+        logger.debug("Created Message Consumer for '{}'", new Object[] 
{jmsTemplate});
+    }
+
+
+    private MessageConsumer createMessageConsumer(final Session session, final 
String destinationName, final boolean durable, final boolean shared, final 
String subscriberName) throws JMSException {
+        final boolean isPubSub = JMSConsumer.this.jmsTemplate.isPubSubDomain();
+        final Destination destination = 
JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName(session,
 destinationName, isPubSub);
+
+        if (isPubSub) {
+            if (shared) {
+                try {
+                    if (durable) {
+                        return session.createSharedDurableConsumer((Topic) 
destination, subscriberName);
+                    } else {
+                        return session.createSharedConsumer((Topic) 
destination, subscriberName);
+                    }
+                } catch (AbstractMethodError e) {
+                    throw new ProcessException("Failed to create a shared 
consumer. Make sure the target broker is JMS 2.0 compliant.", e);
+                }
+            } else {
+                if (durable) {
+                    return session.createDurableConsumer((Topic) destination, 
subscriberName, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
+                } else {
+                    return session.createConsumer(destination, null, 
JMSConsumer.this.jmsTemplate.isPubSubDomain());
+                }
+            }
+        } else {
+            return session.createConsumer(destination, null, 
JMSConsumer.this.jmsTemplate.isPubSubDomain());
         }
     }
 
 
-    /**
-     *
-     */
     public void consume(final String destinationName, final boolean durable, 
final boolean shared, final String subscriberName, final ConsumerCallback 
consumerCallback) {
         this.jmsTemplate.execute(new SessionCallback<Void>() {
             @Override
-            public Void doInJms(Session session) throws JMSException {
-                /*
-                 * We need to call recover to ensure that in in the event of
-                 * abrupt end or exception the current session will stop 
message
-                 * delivery and restarts with the oldest unacknowledged message
-                 */
+            public Void doInJms(final Session session) throws JMSException {
+                // We need to call recover to ensure that in in the event of
+                // abrupt end or exception the current session will stop 
message
+                // delivery and restarts with the oldest unacknowledged message
                 session.recover();
-                boolean isPubSub = 
JMSConsumer.this.jmsTemplate.isPubSubDomain();
-                Destination destination = 
JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName(
-                        session, destinationName, isPubSub);
-                MessageConsumer msgConsumer;
-                if (isPubSub) {
-                    if (shared) {
-                        try {
-                            if (durable) {
-                                msgConsumer = 
session.createSharedDurableConsumer((Topic)destination, subscriberName);
-                            } else {
-                                msgConsumer = 
session.createSharedConsumer((Topic)destination, subscriberName);
-                            }
-                        } catch (AbstractMethodError e) {
-                            throw new ProcessException("Failed to create a 
shared consumer. Make sure the target broker is JMS 2.0 compliant.", e);
-                        }
-                    } else {
-                        if (durable) {
-                            msgConsumer = 
session.createDurableConsumer((Topic)destination, subscriberName, null, 
JMSConsumer.this.jmsTemplate.isPubSubDomain());
-                        } else {
-                            msgConsumer = 
session.createConsumer((Topic)destination, null, 
JMSConsumer.this.jmsTemplate.isPubSubDomain());
-                        }
-                    }
-                } else {
-                    msgConsumer = session.createConsumer(destination, null, 
JMSConsumer.this.jmsTemplate.isPubSubDomain());
-                }
-                Message message = 
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
-                JMSResponse response = null;
+
+                final MessageConsumer msgConsumer = 
createMessageConsumer(session, destinationName, durable, shared, 
subscriberName);
                 try {
+                    final Message message = 
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
+                    JMSResponse response = null;
+
                     if (message != null) {
                         byte[] messageBody = null;
                         if (message instanceof TextMessage) {
@@ -108,12 +100,14 @@ final class JMSConsumer extends JMSWorker {
                             messageBody = 
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
                         } else {
                             throw new IllegalStateException("Message type 
other then TextMessage and BytesMessage are "
-                                    + "not supported at the moment");
+                                + "not supported at the moment");
                         }
-                        Map<String, Object> messageHeaders = 
extractMessageHeaders(message);
-                        Map<String, String> messageProperties = 
extractMessageProperties(message);
+
+                        final Map<String, String> messageHeaders = 
extractMessageHeaders(message);
+                        final Map<String, String> messageProperties = 
extractMessageProperties(message);
                         response = new JMSResponse(messageBody, 
messageHeaders, messageProperties);
                     }
+
                     // invoke the processor callback (regardless if it's null,
                     // so the processor can yield) as part of this inJMS call
                     // and ACK message *only* after its successful invocation
@@ -125,19 +119,18 @@ final class JMSConsumer extends JMSWorker {
                 } finally {
                     JmsUtils.closeMessageConsumer(msgConsumer);
                 }
+
                 return null;
             }
         }, true);
     }
 
-    /**
-     *
-     */
+
     @SuppressWarnings("unchecked")
-    private Map<String, String> extractMessageProperties(Message message) {
-        Map<String, String> properties = new HashMap<>();
+    private Map<String, String> extractMessageProperties(final Message 
message) {
+        final Map<String, String> properties = new HashMap<>();
         try {
-            Enumeration<String> propertyNames = message.getPropertyNames();
+            final Enumeration<String> propertyNames = 
message.getPropertyNames();
             while (propertyNames.hasMoreElements()) {
                 String propertyName = propertyNames.nextElement();
                 properties.put(propertyName, 
String.valueOf(message.getObjectProperty(propertyName)));
@@ -148,41 +141,33 @@ final class JMSConsumer extends JMSWorker {
         return properties;
     }
 
-    /**
-     *
-     *
-     */
-    private Map<String, Object> extractMessageHeaders(Message message) {
-        // even though all values are Strings in current impl, it may change in
-        // the future, so keeping it <String, Object>
-        Map<String, Object> messageHeaders = new HashMap<>();
-        try {
-            messageHeaders.put(JmsHeaders.DELIVERY_MODE, 
String.valueOf(message.getJMSDeliveryMode()));
-            messageHeaders.put(JmsHeaders.EXPIRATION, 
String.valueOf(message.getJMSExpiration()));
-            messageHeaders.put(JmsHeaders.PRIORITY, 
String.valueOf(message.getJMSPriority()));
-            messageHeaders.put(JmsHeaders.REDELIVERED, 
String.valueOf(message.getJMSRedelivered()));
-            messageHeaders.put(JmsHeaders.TIMESTAMP, 
String.valueOf(message.getJMSTimestamp()));
-            messageHeaders.put(JmsHeaders.CORRELATION_ID, 
message.getJMSCorrelationID());
-            messageHeaders.put(JmsHeaders.MESSAGE_ID, 
message.getJMSMessageID());
-            messageHeaders.put(JmsHeaders.TYPE, message.getJMSType());
-
-            String replyToDestinationName = 
this.retrieveDestinationName(message.getJMSReplyTo(), JmsHeaders.REPLY_TO);
-            if (replyToDestinationName != null) {
-                messageHeaders.put(JmsHeaders.REPLY_TO, 
replyToDestinationName);
-            }
-            String destinationName = 
this.retrieveDestinationName(message.getJMSDestination(), 
JmsHeaders.DESTINATION);
-            if (destinationName != null) {
-                messageHeaders.put(JmsHeaders.DESTINATION, destinationName);
-            }
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to extract JMS Headers", 
e);
+
+    private Map<String, String> extractMessageHeaders(final Message message) 
throws JMSException {
+        final Map<String, String> messageHeaders = new HashMap<>();
+
+        messageHeaders.put(JmsHeaders.DELIVERY_MODE, 
String.valueOf(message.getJMSDeliveryMode()));
+        messageHeaders.put(JmsHeaders.EXPIRATION, 
String.valueOf(message.getJMSExpiration()));
+        messageHeaders.put(JmsHeaders.PRIORITY, 
String.valueOf(message.getJMSPriority()));
+        messageHeaders.put(JmsHeaders.REDELIVERED, 
String.valueOf(message.getJMSRedelivered()));
+        messageHeaders.put(JmsHeaders.TIMESTAMP, 
String.valueOf(message.getJMSTimestamp()));
+        messageHeaders.put(JmsHeaders.CORRELATION_ID, 
message.getJMSCorrelationID());
+        messageHeaders.put(JmsHeaders.MESSAGE_ID, message.getJMSMessageID());
+        messageHeaders.put(JmsHeaders.TYPE, message.getJMSType());
+
+        String replyToDestinationName = 
this.retrieveDestinationName(message.getJMSReplyTo(), JmsHeaders.REPLY_TO);
+        if (replyToDestinationName != null) {
+            messageHeaders.put(JmsHeaders.REPLY_TO, replyToDestinationName);
+        }
+
+        String destinationName = 
this.retrieveDestinationName(message.getJMSDestination(), 
JmsHeaders.DESTINATION);
+        if (destinationName != null) {
+            messageHeaders.put(JmsHeaders.DESTINATION, destinationName);
         }
+
         return messageHeaders;
     }
 
-    /**
-     *
-     */
+
     private String retrieveDestinationName(Destination destination, String 
headerName) {
         String destinationName = null;
         if (destination != null) {
@@ -196,17 +181,14 @@ final class JMSConsumer extends JMSWorker {
         return destinationName;
     }
 
-    /**
-     *
-     */
+
     static class JMSResponse {
         private final byte[] messageBody;
 
-        private final Map<String, Object> messageHeaders;
-
+        private final Map<String, String> messageHeaders;
         private final Map<String, String> messageProperties;
 
-        JMSResponse(byte[] messageBody, Map<String, Object> messageHeaders, 
Map<String, String> messageProperties) {
+        JMSResponse(byte[] messageBody, Map<String, String> messageHeaders, 
Map<String, String> messageProperties) {
             this.messageBody = messageBody;
             this.messageHeaders = Collections.unmodifiableMap(messageHeaders);
             this.messageProperties = 
Collections.unmodifiableMap(messageProperties);
@@ -216,7 +198,7 @@ final class JMSConsumer extends JMSWorker {
             return this.messageBody;
         }
 
-        public Map<String, Object> getMessageHeaders() {
+        public Map<String, String> getMessageHeaders() {
             return this.messageHeaders;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/3ca7c3e7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index 49e3354..671f5c9 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -27,10 +27,8 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.Topic;
 
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.MessageCreator;
 import org.springframework.jms.core.SessionCallback;
@@ -41,45 +39,22 @@ import org.springframework.jms.support.JmsHeaders;
  */
 final class JMSPublisher extends JMSWorker {
 
-    private final static Logger logger = 
LoggerFactory.getLogger(JMSPublisher.class);
-
-
-    /**
-     * Creates an instance of this publisher
-     *
-     * @param jmsTemplate
-     *            instance of {@link JmsTemplate}
-     * @param processLog
-     *            instance of {@link ComponentLog}
-     */
-    JMSPublisher(JmsTemplate jmsTemplate, ComponentLog processLog) {
-        super(jmsTemplate, processLog);
-        if (logger.isInfoEnabled()) {
-            logger.info("Created Message Publisher for '" + 
jmsTemplate.toString() + "'.");
-        }
+    JMSPublisher(CachingConnectionFactory connectionFactory, JmsTemplate 
jmsTemplate, ComponentLog processLog) {
+        super(connectionFactory, jmsTemplate, processLog);
+        processLog.debug("Created Message Publisher for {}", new Object[] 
{jmsTemplate});
     }
 
-    /**
-     *
-     * @param messageBytes byte array representing contents of the message
-     */
     void publish(String destinationName, byte[] messageBytes) {
         this.publish(destinationName, messageBytes, null);
     }
 
-    /**
-     *
-     * @param messageBytes
-     *            byte array representing contents of the message
-     * @param flowFileAttributes
-     *            Map representing {@link FlowFile} attributes.
-     */
     void publish(final String destinationName, final byte[] messageBytes, 
final Map<String, String> flowFileAttributes) {
         this.jmsTemplate.send(destinationName, new MessageCreator() {
             @Override
             public Message createMessage(Session session) throws JMSException {
                 BytesMessage message = session.createBytesMessage();
                 message.writeBytes(messageBytes);
+
                 if (flowFileAttributes != null && 
!flowFileAttributes.isEmpty()) {
                     // set message headers and properties
                     for (Entry<String, String> entry : 
flowFileAttributes.entrySet()) {
@@ -121,18 +96,12 @@ final class JMSPublisher extends JMSWorker {
         });
     }
 
-    /**
-     *
-     */
+
     private void logUnbuildableDestination(String destinationName, String 
headerName) {
-        this.processLog.warn("Failed to determine destination type from 
destination name '" + destinationName
-                + "'. The '"
-                + headerName + "' will not be set.");
+        this.processLog.warn("Failed to determine destination type from 
destination name '{}'. The '{}' header will not be set.", new Object[] 
{destinationName, headerName});
     }
 
-    /**
-     *
-     */
+
     private Destination buildDestination(final String destinationName) {
         Destination destination;
         if (destinationName.toLowerCase().contains("topic")) {
@@ -152,6 +121,7 @@ final class JMSPublisher extends JMSWorker {
         } else {
             destination = null;
         }
+
         return destination;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3ca7c3e7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
index b0e7087..e6fa1bb 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
@@ -21,6 +21,7 @@ import java.nio.channels.Channel;
 import javax.jms.Connection;
 
 import org.apache.nifi.logging.ComponentLog;
+import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 
 
@@ -33,8 +34,9 @@ import org.springframework.jms.core.JmsTemplate;
 abstract class JMSWorker {
 
     protected final JmsTemplate jmsTemplate;
-
     protected final ComponentLog processLog;
+    private final CachingConnectionFactory connectionFactory;
+
 
     /**
      * Creates an instance of this worker initializing it with JMS
@@ -44,14 +46,16 @@ abstract class JMSWorker {
      * @param jmsTemplate the instance of {@link JmsTemplate}
      * @param processLog the instance of {@link ComponentLog}
      */
-    public JMSWorker(JmsTemplate jmsTemplate, ComponentLog processLog) {
+    public JMSWorker(CachingConnectionFactory connectionFactory, JmsTemplate 
jmsTemplate, ComponentLog processLog) {
+        this.connectionFactory = connectionFactory;
         this.jmsTemplate = jmsTemplate;
         this.processLog = processLog;
     }
 
-    /**
-     *
-     */
+    public void shutdown() {
+        connectionFactory.destroy();
+    }
+
     @Override
     public String toString() {
         return this.getClass().getSimpleName() + "[destination:" + 
this.jmsTemplate.getDefaultDestinationName()

http://git-wip-us.apache.org/repos/asf/nifi/blob/3ca7c3e7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index 24c3f25..80d6f3e 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.jms.processors;
 
-import java.io.IOException;
-import java.io.InputStream;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -37,8 +35,8 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.support.JmsHeaders;
 
@@ -91,20 +89,19 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
      * Upon success the incoming {@link FlowFile} is transferred to 
the'success'
      * {@link Relationship} and upon failure FlowFile is penalized and
      * transferred to the 'failure' {@link Relationship}
-     *
      */
     @Override
-    protected void rendezvousWithJms(ProcessContext context, ProcessSession 
processSession) throws ProcessException {
+    protected void rendezvousWithJms(ProcessContext context, ProcessSession 
processSession, JMSPublisher publisher) throws ProcessException {
         FlowFile flowFile = processSession.get();
         if (flowFile != null) {
             try {
                 String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
-                this.targetResource.publish(destinationName, 
this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
+                publisher.publish(destinationName, 
this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
                 processSession.transfer(flowFile, REL_SUCCESS);
                 processSession.getProvenanceReporter().send(flowFile, 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
             } catch (Exception e) {
                 processSession.transfer(flowFile, REL_FAILURE);
-                this.getLogger().error("Failed while sending message to JMS 
via " + this.targetResource, e);
+                this.getLogger().error("Failed while sending message to JMS 
via " + publisher, e);
                 context.yield();
             }
         }
@@ -122,8 +119,8 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
      * Will create an instance of {@link JMSPublisher}
      */
     @Override
-    protected JMSPublisher finishBuildingTargetResource(JmsTemplate 
jmsTemplate, ProcessContext processContext) {
-        return new JMSPublisher(jmsTemplate, this.getLogger());
+    protected JMSPublisher finishBuildingJmsWorker(CachingConnectionFactory 
connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext) {
+        return new JMSPublisher(connectionFactory, jmsTemplate, 
this.getLogger());
     }
 
     /**
@@ -131,12 +128,7 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
      */
     private byte[] extractMessageBody(FlowFile flowFile, ProcessSession 
session) {
         final byte[] messageContent = new byte[(int) flowFile.getSize()];
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream in) throws IOException {
-                StreamUtils.fillBuffer(in, messageContent, true);
-            }
-        });
+        session.read(flowFile, in -> StreamUtils.fillBuffer(in, 
messageContent, true));
         return messageContent;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3ca7c3e7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
index 12ffe03..5e963d2 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
@@ -52,13 +52,13 @@ public class CommonTest {
     }
 
     static JmsTemplate buildJmsTemplateForDestination(boolean pubSub) {
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
-                "vm://localhost?broker.persistent=false");
-        connectionFactory = new CachingConnectionFactory(connectionFactory);
+        ConnectionFactory activeMqConnectionFactory = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        final ConnectionFactory connectionFactory = new 
CachingConnectionFactory(activeMqConnectionFactory);
 
         JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
         jmsTemplate.setPubSubDomain(pubSub);
         jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
+        jmsTemplate.setReceiveTimeout(10L);
         return jmsTemplate;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3ca7c3e7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
index e9364d2..23cf806 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.nifi.jms.processors;
 
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -29,45 +33,43 @@ import 
org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.support.JmsHeaders;
 
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class ConsumeJMSTest {
 
     @Test
     public void validateSuccessfulConsumeAndTransferToSuccess() throws 
Exception {
         final String  destinationName = "cooQueue";
         JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
-        JMSPublisher sender = new JMSPublisher(jmsTemplate, 
mock(ComponentLog.class));
-        final Map<String, String> senderAttributes = new HashMap<>();
-        senderAttributes.put("filename", "message.txt");
-        senderAttributes.put("attribute_from_sender", "some value");
-        sender.publish(destinationName, "Hey dude!".getBytes(), 
senderAttributes);
-        TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
-        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
-        when(cs.getIdentifier()).thenReturn("cfProvider");
-        
when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory());
-        runner.addControllerService("cfProvider", cs);
-        runner.enableControllerService(cs);
-
-        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
-        runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
-        runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
-        runner.run(1, false);
-        //
-        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
-        assertNotNull(successFF);
-        successFF.assertAttributeExists(JmsHeaders.DESTINATION);
-        successFF.assertAttributeEquals(JmsHeaders.DESTINATION, 
destinationName);
-        successFF.assertAttributeExists("filename");
-        successFF.assertAttributeEquals("filename", "message.txt");
-        successFF.assertAttributeExists("attribute_from_sender");
-        successFF.assertAttributeEquals("attribute_from_sender", "some value");
-        successFF.assertContentEquals("Hey dude!".getBytes());
-        String sourceDestination = 
successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME);
-        assertNotNull(sourceDestination);
+        try {
+            JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
+            final Map<String, String> senderAttributes = new HashMap<>();
+            senderAttributes.put("filename", "message.txt");
+            senderAttributes.put("attribute_from_sender", "some value");
+            sender.publish(destinationName, "Hey dude!".getBytes(), 
senderAttributes);
+            TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
+            JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+            when(cs.getIdentifier()).thenReturn("cfProvider");
+            
when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory());
+            runner.addControllerService("cfProvider", cs);
+            runner.enableControllerService(cs);
 
-        ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+            runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+            runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
+            runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
+            runner.run(1, false);
+            //
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeExists(JmsHeaders.DESTINATION);
+            successFF.assertAttributeEquals(JmsHeaders.DESTINATION, 
destinationName);
+            successFF.assertAttributeExists("filename");
+            successFF.assertAttributeEquals("filename", "message.txt");
+            successFF.assertAttributeExists("attribute_from_sender");
+            successFF.assertAttributeEquals("attribute_from_sender", "some 
value");
+            successFF.assertContentEquals("Hey dude!".getBytes());
+            String sourceDestination = 
successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME);
+            assertNotNull(sourceDestination);
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3ca7c3e7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
index 0f8dafb..1a88b29 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
@@ -23,7 +23,9 @@ import static org.mockito.Mockito.mock;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
@@ -48,16 +50,18 @@ public class JMSPublisherConsumerTest {
         final String destinationName = "testQueue";
         JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
 
-        JMSPublisher publisher = new JMSPublisher(jmsTemplate, 
mock(ComponentLog.class));
-        publisher.publish(destinationName, "hellomq".getBytes());
-
-        Message receivedMessage = jmsTemplate.receive(destinationName);
-        assertTrue(receivedMessage instanceof BytesMessage);
-        byte[] bytes = new byte[7];
-        ((BytesMessage) receivedMessage).readBytes(bytes);
-        assertEquals("hellomq", new String(bytes));
+        try {
+            JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), 
jmsTemplate, mock(ComponentLog.class));
+            publisher.publish(destinationName, "hellomq".getBytes());
 
-        ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+            Message receivedMessage = jmsTemplate.receive(destinationName);
+            assertTrue(receivedMessage instanceof BytesMessage);
+            byte[] bytes = new byte[7];
+            ((BytesMessage) receivedMessage).readBytes(bytes);
+            assertEquals("hellomq", new String(bytes));
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
     }
 
     @Test
@@ -65,19 +69,22 @@ public class JMSPublisherConsumerTest {
         final String destinationName = "testQueue";
         JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
 
-        JMSPublisher publisher = new JMSPublisher(jmsTemplate, 
mock(ComponentLog.class));
-        Map<String, String> flowFileAttributes = new HashMap<>();
-        flowFileAttributes.put("foo", "foo");
-        flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
-        publisher.publish(destinationName, "hellomq".getBytes(), 
flowFileAttributes);
+        try {
+            JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), 
jmsTemplate, mock(ComponentLog.class));
+            Map<String, String> flowFileAttributes = new HashMap<>();
+            flowFileAttributes.put("foo", "foo");
+            flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
+            publisher.publish(destinationName, "hellomq".getBytes(), 
flowFileAttributes);
 
-        Message receivedMessage = jmsTemplate.receive(destinationName);
-        assertTrue(receivedMessage instanceof BytesMessage);
-        assertEquals("foo", receivedMessage.getStringProperty("foo"));
-        assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
-        assertEquals("myTopic", ((Topic) 
receivedMessage.getJMSReplyTo()).getTopicName());
+            Message receivedMessage = jmsTemplate.receive(destinationName);
+            assertTrue(receivedMessage instanceof BytesMessage);
+            assertEquals("foo", receivedMessage.getStringProperty("foo"));
+            assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
+            assertEquals("myTopic", ((Topic) 
receivedMessage.getJMSReplyTo()).getTopicName());
 
-        ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
     }
 
     /**
@@ -91,15 +98,15 @@ public class JMSPublisherConsumerTest {
         final String destinationName = "testQueue";
         JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
 
-        jmsTemplate.send(destinationName, new MessageCreator() {
-            @Override
-            public Message createMessage(Session session) throws JMSException {
-                return session.createObjectMessage();
-            }
-        });
-
-        JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ComponentLog.class));
         try {
+            jmsTemplate.send(destinationName, new MessageCreator() {
+                @Override
+                public Message createMessage(Session session) throws 
JMSException {
+                    return session.createObjectMessage();
+                }
+            });
+
+            JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
             consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
@@ -116,102 +123,182 @@ public class JMSPublisherConsumerTest {
         final String destinationName = "testQueue";
         JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
 
-        jmsTemplate.send(destinationName, new MessageCreator() {
-            @Override
-            public Message createMessage(Session session) throws JMSException {
-                TextMessage message = session.createTextMessage("hello from 
the other side");
-                message.setStringProperty("foo", "foo");
-                message.setBooleanProperty("bar", false);
-                message.setJMSReplyTo(session.createQueue("fooQueue"));
-                return message;
-            }
-        });
-
-        JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ComponentLog.class));
-        final AtomicBoolean callbackInvoked = new AtomicBoolean();
-        consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
-            @Override
-            public void accept(JMSResponse response) {
-                callbackInvoked.set(true);
-                assertEquals("hello from the other side", new 
String(response.getMessageBody()));
-                assertEquals("fooQueue", 
response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
-                assertEquals("foo", 
response.getMessageProperties().get("foo"));
-                assertEquals("false", 
response.getMessageProperties().get("bar"));
-            }
-        });
-        assertTrue(callbackInvoked.get());
-
-        ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
-    }
-
-    @Test
-    public void validateMessageRedeliveryWhenNotAcked() throws Exception {
-        String destinationName = "testQueue";
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
-        JMSPublisher publisher = new JMSPublisher(jmsTemplate, 
mock(ComponentLog.class));
-        publisher.publish(destinationName, 
"1".getBytes(StandardCharsets.UTF_8));
-        publisher.publish(destinationName, 
"2".getBytes(StandardCharsets.UTF_8));
-
-        JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ComponentLog.class));
-        final AtomicBoolean callbackInvoked = new AtomicBoolean();
         try {
-            consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+            jmsTemplate.send(destinationName, new MessageCreator() {
                 @Override
-                public void accept(JMSResponse response) {
-                    callbackInvoked.set(true);
-                    assertEquals("1", new String(response.getMessageBody()));
-                    throw new RuntimeException("intentional to avoid explicit 
ack");
+                public Message createMessage(Session session) throws 
JMSException {
+                    TextMessage message = session.createTextMessage("hello 
from the other side");
+                    message.setStringProperty("foo", "foo");
+                    message.setBooleanProperty("bar", false);
+                    message.setJMSReplyTo(session.createQueue("fooQueue"));
+                    return message;
                 }
             });
-        } catch (Exception e) {
-            // ignore
-        }
-        assertTrue(callbackInvoked.get());
-        callbackInvoked.set(false);
 
-        // should receive the same message, but will process it successfully
-        try {
+            JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
+            final AtomicBoolean callbackInvoked = new AtomicBoolean();
             consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     callbackInvoked.set(true);
-                    assertEquals("1", new String(response.getMessageBody()));
+                    assertEquals("hello from the other side", new 
String(response.getMessageBody()));
+                    assertEquals("fooQueue", 
response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
+                    assertEquals("foo", 
response.getMessageProperties().get("foo"));
+                    assertEquals("false", 
response.getMessageProperties().get("bar"));
                 }
             });
-        } catch (Exception e) {
-            // ignore
+            assertTrue(callbackInvoked.get());
+
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
         }
-        assertTrue(callbackInvoked.get());
-        callbackInvoked.set(false);
+    }
+
+
+    @Test(timeout = 2000000)
+    public void testMultipleThreads() throws Exception {
+        String destinationName = "testQueue";
+        JmsTemplate publishTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+        final CountDownLatch consumerTemplateCloseCount = new 
CountDownLatch(4);
 
-        // receiving next message and fail again
         try {
-            consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+            JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) publishTemplate.getConnectionFactory(), 
publishTemplate, mock(ComponentLog.class));
+            for (int i = 0; i < 4000; i++) {
+                publisher.publish(destinationName, 
String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+            }
+
+            final AtomicInteger msgCount = new AtomicInteger(0);
+
+            final ConsumerCallback callback = new ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
-                    callbackInvoked.set(true);
-                    assertEquals("2", new String(response.getMessageBody()));
-                    throw new RuntimeException("intentional to avoid explicit 
ack");
+                    msgCount.incrementAndGet();
                 }
-            });
-        } catch (Exception e) {
-            // ignore
+            };
+
+            final Thread[] threads = new Thread[4];
+            for (int i = 0; i < 4; i++) {
+                final Thread t = new Thread(() -> {
+                    JmsTemplate consumeTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+
+                    try {
+                        JMSConsumer consumer = new 
JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), 
consumeTemplate, mock(ComponentLog.class));
+
+                        for (int j = 0; j < 1000 && msgCount.get() < 4000; 
j++) {
+                            consumer.consume(destinationName, false, false, 
null, callback);
+                        }
+                    } finally {
+                        ((CachingConnectionFactory) 
consumeTemplate.getConnectionFactory()).destroy();
+                        consumerTemplateCloseCount.countDown();
+                    }
+                });
+
+                threads[i] = t;
+                t.start();
+            }
+
+            int iterations = 0;
+            while (msgCount.get() < 4000) {
+                Thread.sleep(10L);
+                if (++iterations % 100 == 0) {
+                    System.out.println(msgCount.get() + " messages received so 
far");
+                }
+            }
+        } finally {
+            ((CachingConnectionFactory) 
publishTemplate.getConnectionFactory()).destroy();
+
+            consumerTemplateCloseCount.await();
         }
-        assertTrue(callbackInvoked.get());
-        callbackInvoked.set(false);
+    }
+
 
-        // should receive the same message, but will process it successfully
+    @Test(timeout = 10000)
+    public void validateMessageRedeliveryWhenNotAcked() throws Exception {
+        String destinationName = "testQueue";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
         try {
-            consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
-                @Override
-                public void accept(JMSResponse response) {
-                    callbackInvoked.set(true);
-                    assertEquals("2", new String(response.getMessageBody()));
+            JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), 
jmsTemplate, mock(ComponentLog.class));
+            publisher.publish(destinationName, 
"1".getBytes(StandardCharsets.UTF_8));
+            publisher.publish(destinationName, 
"2".getBytes(StandardCharsets.UTF_8));
+
+            JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
+            final AtomicBoolean callbackInvoked = new AtomicBoolean();
+            try {
+                consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+                    @Override
+                    public void accept(JMSResponse response) {
+                        callbackInvoked.set(true);
+                        assertEquals("1", new 
String(response.getMessageBody()));
+                        throw new RuntimeException("intentional to avoid 
explicit ack");
+                    }
+                });
+            } catch (Exception e) {
+                // expected
+            }
+
+            assertTrue(callbackInvoked.get());
+            callbackInvoked.set(false);
+
+            // should receive the same message, but will process it 
successfully
+            while (!callbackInvoked.get()) {
+                consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+                    @Override
+                    public void accept(JMSResponse response) {
+                        if (response == null) {
+                            return;
+                        }
+
+                        callbackInvoked.set(true);
+                        assertEquals("1", new 
String(response.getMessageBody()));
+                    }
+                });
+            }
+
+            assertTrue(callbackInvoked.get());
+            callbackInvoked.set(false);
+
+            // receiving next message and fail again
+            try {
+                while (!callbackInvoked.get()) {
+                    consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+                        @Override
+                        public void accept(JMSResponse response) {
+                            if (response == null) {
+                                return;
+                            }
+
+                            callbackInvoked.set(true);
+                            assertEquals("2", new 
String(response.getMessageBody()));
+                            throw new RuntimeException("intentional to avoid 
explicit ack");
+                        }
+                    });
                 }
-            });
-        } catch (Exception e) {
-            // ignore
+            } catch (Exception e) {
+                // ignore
+            }
+            assertTrue(callbackInvoked.get());
+            callbackInvoked.set(false);
+
+            // should receive the same message, but will process it 
successfully
+            try {
+                while (!callbackInvoked.get()) {
+                    consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+                        @Override
+                        public void accept(JMSResponse response) {
+                            if (response == null) {
+                                return;
+                            }
+
+                            callbackInvoked.set(true);
+                            assertEquals("2", new 
String(response.getMessageBody()));
+                        }
+                    });
+                }
+            } catch (Exception e) {
+                // ignore
+            }
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
         }
-        ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3ca7c3e7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
index f7ccf17..1964ce9 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
@@ -39,7 +39,7 @@ import static org.mockito.Mockito.when;
 
 public class PublishJMSTest {
 
-    @Test
+    @Test(timeout = 10000)
     public void validateSuccessfulPublishAndTransferToSuccess() throws 
Exception {
         ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
 
@@ -60,7 +60,7 @@ public class PublishJMSTest {
         attributes.put("foo", "foo");
         attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
         runner.enqueue("Hey dude!".getBytes(), attributes);
-        runner.run(1, false);
+        runner.run(1, false); // Run once but don't shut down because we want 
the Connection Factory left in tact so that we can use it.
 
         final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
         assertNotNull(successFF);
@@ -72,6 +72,8 @@ public class PublishJMSTest {
         assertEquals("Hey dude!", new String(messageBytes));
         assertEquals("cooQueue", ((Queue) 
message.getJMSReplyTo()).getQueueName());
         assertEquals("foo", message.getStringProperty("foo"));
+
+        runner.run(1, true); // Run once just so that we can trigger the 
shutdown of the Connection Factory
     }
 
     @Test
@@ -96,7 +98,7 @@ public class PublishJMSTest {
         attributes.put("foo", "foo");
         attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
         runner.enqueue("Hey dude!".getBytes(), attributes);
-        runner.run(1, false);
+        runner.run(1, false); // Run once but don't shut down because we want 
the Connection Factory left in tact so that we can use it.
 
         final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
         assertNotNull(successFF);
@@ -108,6 +110,8 @@ public class PublishJMSTest {
         assertEquals("Hey dude!", new String(messageBytes));
         assertEquals("cooQueue", ((Queue) 
message.getJMSReplyTo()).getQueueName());
         assertEquals("foo", message.getStringProperty("foo"));
+
+        runner.run(1, true); // Run once just so that we can trigger the 
shutdown of the Connection Factory
     }
 
     @Test

Reply via email to