http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/LICENSE
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..44893cd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,209 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses. 

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..6d315e2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,18 @@
+nifi-jms—processors-nar
+Copyright 2014-2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2016 The Apache Software Foundation
+
+     
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml
new file mode 100644
index 0000000..de35876
--- /dev/null
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/pom.xml
@@ -0,0 +1,68 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
+               contributor license agreements. See the NOTICE file distributed 
with this 
+               work for additional information regarding copyright ownership. 
The ASF licenses 
+               this file to You under the Apache License, Version 2.0 (the 
"License"); you 
+               may not use this file except in compliance with the License. 
You may obtain 
+               a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless 
+               required by applicable law or agreed to in writing, software 
distributed 
+               under the License is distributed on an "AS IS" BASIS, WITHOUT 
WARRANTIES 
+               OR CONDITIONS OF ANY KIND, either express or implied. See the 
License for 
+               the specific language governing permissions and limitations 
under the License. -->
+       <parent>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>nifi-jms-bundle</artifactId>
+               <version>0.6.0-SNAPSHOT</version>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+       <artifactId>nifi-jms-processors</artifactId>
+       <packaging>jar</packaging>
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-ssl-context-service-api</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-api</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-jms-cf-service</artifactId>
+                       <version>0.6.0-SNAPSHOT</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.springframework</groupId>
+                       <artifactId>spring-jms</artifactId>
+                       <version>4.2.4.RELEASE</version>
+               </dependency>
+               <dependency>
+                       <groupId>commons-logging</groupId>
+                       <artifactId>commons-logging</artifactId>
+                       <version>1.2</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.activemq</groupId>
+                       <artifactId>activemq-client</artifactId>
+                       <exclusions>
+                               <!-- -->
+                               <exclusion>
+                                       
<groupId>org.apache.geronimo.specs</groupId>
+                                       
<artifactId>geronimo-jms_1.1_spec</artifactId>
+                               </exclusion>
+                       </exclusions>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.activemq</groupId>
+                       <artifactId>activemq-broker</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-mock</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
new file mode 100644
index 0000000..f8030db
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
+import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import 
org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
+import org.springframework.jms.core.JmsTemplate;
+
+/**
+ * Base JMS processor to support implementation of JMS producers and consumers.
+ *
+ * @param <T>
+ *            the type of {@link JMSWorker} which could be {@link JMSPublisher}
+ *            or {@link JMSConsumer}
+ * @see PublishJMS
+ * @see ConsumeJMS
+ * @see JMSConnectionFactoryProviderDefinition
+ */
+abstract class AbstractJMSProcessor<T extends JMSWorker> extends 
AbstractProcessor {
+
+    static final String QUEUE = "QUEUE";
+
+    static final String TOPIC = "TOPIC";
+
+    static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
+            .name("User Name")
+            .description("User Name used for authentication and 
authorization.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("Password used for authentication and authorization.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+    static final PropertyDescriptor DESTINATION = new 
PropertyDescriptor.Builder()
+            .name("Destination Name")
+            .description("The name of the JMS Destination. Usually provided by 
the administrator (e.g., 'topic://myTopic').")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    static final PropertyDescriptor DESTINATION_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Destination Type")
+            .description("The type of the JMS Destination. Could be one of 
'QUEUE' or 'TOPIC'. Usually provided by the administrator. Defaults to 'TOPIC")
+            .required(true)
+            .allowableValues(QUEUE, TOPIC)
+            .defaultValue(QUEUE)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    static final PropertyDescriptor SESSION_CACHE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Session Cache size")
+            .description("The maximum limit for the number of cached 
Sessions.")
+            .required(true)
+            .defaultValue("1")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    // ConnectionFactoryProvider ControllerService
+    static final PropertyDescriptor CF_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Connection Factory Service")
+            .description("The Controller Service that is used to obtain 
ConnectionFactory")
+            .required(true)
+            
.identifiesControllerService(JMSConnectionFactoryProviderDefinition.class)
+            .build();
+
+    static final List<PropertyDescriptor> propertyDescriptors = new 
ArrayList<>();
+
+    /*
+     * Will ensure that list of PropertyDescriptors is build only once, since
+     * all other lifecycle methods are invoked multiple times.
+     */
+    static {
+        propertyDescriptors.add(USER);
+        propertyDescriptors.add(PASSWORD);
+        propertyDescriptors.add(DESTINATION);
+        propertyDescriptors.add(DESTINATION_TYPE);
+        propertyDescriptors.add(SESSION_CACHE_SIZE);
+        propertyDescriptors.add(CF_SERVICE);
+    }
+
+    protected volatile T targetResource;
+
+    private volatile CachingConnectionFactory cachingConnectionFactory;
+
+    /**
+     *
+     */
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    /**
+     * Builds target resource ({@link JMSPublisher} or {@link JMSConsumer}) 
upon
+     * first invocation while delegating to the sub-classes ( {@link 
PublishJMS}
+     * or {@link ConsumeJMS}) via
+     * {@link #rendezvousWithJms(ProcessContext, ProcessSession)} method.
+     */
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        synchronized (this) {
+            this.buildTargetResource(context);
+        }
+        this.rendezvousWithJms(context, session);
+    }
+
+    /**
+     * Will destroy the instance of {@link CachingConnectionFactory} and sets
+     * 'targetResource' to null;
+     */
+    @OnStopped
+    public void close() {
+        if (this.cachingConnectionFactory != null) {
+            this.cachingConnectionFactory.destroy();
+        }
+        this.targetResource = null;
+    }
+
+    @Override
+    public String toString() {
+        return this.getClass().getSimpleName() + " - " + this.targetResource;
+    }
+
+    /**
+     * Delegate method to supplement
+     * {@link #onTrigger(ProcessContext, ProcessSession)} operation. It is
+     * implemented by sub-classes to perform {@link Processor} specific
+     * functionality.
+     *
+     * @param context
+     *            instance of {@link ProcessContext}
+     * @param session
+     *            instance of {@link ProcessSession}
+     */
+    protected abstract void rendezvousWithJms(ProcessContext context, 
ProcessSession session) throws ProcessException;
+
+    /**
+     * Finishes building one of the {@link JMSWorker} subclasses T.
+     *
+     * @param jmsTemplate instance of {@link JmsTemplate}
+     *
+     * @see JMSPublisher
+     * @see JMSConsumer
+     */
+    protected abstract T finishBuildingTargetResource(JmsTemplate jmsTemplate);
+
+    /**
+     * This method essentially performs initialization of this Processor by
+     * obtaining an instance of the {@link ConnectionFactory} from the
+     * {@link JMSConnectionFactoryProvider} (ControllerService) and performing 
a
+     * series of {@link ConnectionFactory} adaptations which eventually results
+     * in an instance of the {@link CachingConnectionFactory} used to construct
+     * {@link JmsTemplate} used by this Processor.
+     */
+    private void buildTargetResource(ProcessContext context) {
+        if (this.targetResource == null) {
+            JMSConnectionFactoryProviderDefinition cfProvider = 
context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
+            ConnectionFactory connectionFactory = 
cfProvider.getConnectionFactory();
+
+            UserCredentialsConnectionFactoryAdapter cfCredentialsAdapter = new 
UserCredentialsConnectionFactoryAdapter();
+            cfCredentialsAdapter.setTargetConnectionFactory(connectionFactory);
+            
cfCredentialsAdapter.setUsername(context.getProperty(USER).getValue());
+            
cfCredentialsAdapter.setPassword(context.getProperty(PASSWORD).getValue());
+
+            this.cachingConnectionFactory = new 
CachingConnectionFactory(cfCredentialsAdapter);
+            
this.cachingConnectionFactory.setSessionCacheSize(Integer.parseInt(context.getProperty(SESSION_CACHE_SIZE).getValue()));
+
+            JmsTemplate jmsTemplate = new JmsTemplate();
+            jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
+            
jmsTemplate.setDefaultDestinationName(context.getProperty(DESTINATION).getValue());
+            
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
+
+            // set of properties that may be good candidates for exposure via 
configuration
+            jmsTemplate.setReceiveTimeout(10000);
+
+            this.targetResource = 
this.finishBuildingTargetResource(jmsTemplate);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
new file mode 100644
index 0000000..a4cad0d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
+import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.springframework.jms.core.JmsTemplate;
+
+/**
+ * Consuming JMS processor which upon each invocation of
+ * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct a
+ * {@link FlowFile} containing the body of the consumed JMS message and JMS
+ * properties that came with message which are added to a {@link FlowFile} as
+ * attributes.
+ */
+@Tags({ "jms", "get", "message", "receive", "consume" })
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Consumes JMS Message of type BytesMessage or 
TextMessage transforming its content to "
+        + "a FlowFile and transitioning it to 'success' relationship.")
+@SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class })
+public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All FlowFiles that are received from the JMS 
Destination are routed to this relationship")
+            .build();
+
+    private final static Set<Relationship> relationships;
+
+    static {
+        Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    /**
+     * Will construct a {@link FlowFile} containing the body of the consumed 
JMS
+     * message (if {@link GetResponse} returned by {@link JMSConsumer} is not
+     * null) and JMS properties that came with message which are added to a
+     * {@link FlowFile} as attributes, transferring {@link FlowFile} to
+     * 'success' {@link Relationship}.
+     */
+    @Override
+    protected void rendezvousWithJms(ProcessContext context, ProcessSession 
processSession) throws ProcessException {
+        final JMSResponse response = this.targetResource.consume();
+        if (response != null){
+            FlowFile flowFile = processSession.create();
+            flowFile = processSession.write(flowFile, new 
OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream out) throws IOException 
{
+                    out.write(response.getMessageBody());
+                }
+            });
+            Map<String, Object> jmsHeaders = response.getMessageHeaders();
+            flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, 
flowFile, processSession);
+            processSession.getProvenanceReporter().receive(flowFile, 
context.getProperty(DESTINATION).getValue());
+            processSession.transfer(flowFile, REL_SUCCESS);
+        } else {
+            context.yield();
+        }
+    }
+
+    /**
+     * Will create an instance of {@link JMSConsumer}
+     */
+    @Override
+    protected JMSConsumer finishBuildingTargetResource(JmsTemplate 
jmsTemplate) {
+        return new JMSConsumer(jmsTemplate, this.getLogger());
+    }
+
+    /**
+     *
+     */
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    /**
+     *
+     */
+    private FlowFile updateFlowFileAttributesWithJmsHeaders(Map<String, 
Object> jmsHeaders, FlowFile flowFile, ProcessSession processSession) {
+        Map<String, String> attributes = new HashMap<String, String>();
+        for (Entry<String, Object> headersEntry : jmsHeaders.entrySet()) {
+            attributes.put(headersEntry.getKey(), 
String.valueOf(headersEntry.getValue()));
+        }
+        flowFile = processSession.putAllAttributes(flowFile, attributes);
+        return flowFile;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
new file mode 100644
index 0000000..e69705a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors;
+
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.nifi.logging.ProcessorLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.JmsHeaders;
+
+/**
+ * Generic consumer of messages from JMS compliant messaging system.
+ */
+final class JMSConsumer extends JMSWorker {
+
+    private final static Logger logger = 
LoggerFactory.getLogger(JMSConsumer.class);
+
+    /**
+     * Creates an instance of this consumer
+     *
+     * @param jmsTemplate
+     *            instance of {@link JmsTemplate}
+     * @param processLog
+     *            instance of {@link ProcessorLog}
+     */
+    JMSConsumer(JmsTemplate jmsTemplate, ProcessorLog processLog) {
+        super(jmsTemplate, processLog);
+        if (logger.isInfoEnabled()) {
+            logger.info("Created Message Consumer for '" + 
jmsTemplate.toString() + "'.");
+        }
+    }
+
+
+    /**
+     *
+     */
+    public JMSResponse consume() {
+        Message message = this.jmsTemplate.receive();
+        if (message != null) {
+            byte[] messageBody = null;
+            try {
+                if (message instanceof TextMessage) {
+                    messageBody = 
MessageBodyToBytesConverter.toBytes((TextMessage) message);
+                } else if (message instanceof BytesMessage) {
+                    messageBody = 
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
+                } else {
+                    throw new UnsupportedOperationException("Message type 
other then TextMessage and BytesMessage are "
+                            + "not supported at the moment");
+                }
+                Map<String, Object> messageHeaders = 
this.extractMessageHeaders(message);
+                Map<String, String> messageProperties = 
this.extractMessageProperties(message);
+                return new JMSResponse(messageBody, messageHeaders, 
messageProperties);
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
+        }
+        return null;
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("unchecked")
+    private Map<String, String> extractMessageProperties(Message message) {
+        Map<String, String> properties = new HashMap<>();
+        try {
+            Enumeration<String> propertyNames = message.getPropertyNames();
+            while (propertyNames.hasMoreElements()) {
+                String propertyName = propertyNames.nextElement();
+                properties.put(propertyName, 
String.valueOf(message.getObjectProperty(propertyName)));
+            }
+        } catch (JMSException e) {
+            logger.warn("Failed to extract message properties", e);
+            this.processLog.warn("Failed to extract message properties", e);
+        }
+        return properties;
+    }
+
+    /**
+     *
+     *
+     */
+    private Map<String, Object> extractMessageHeaders(Message message) {
+        // even though all values are Strings in current impl, it may change in
+        // the future, so keeping it <String, Object>
+        Map<String, Object> messageHeaders = new HashMap<>();
+        try {
+            messageHeaders.put(JmsHeaders.DELIVERY_MODE, 
String.valueOf(message.getJMSDeliveryMode()));
+            messageHeaders.put(JmsHeaders.EXPIRATION, 
String.valueOf(message.getJMSExpiration()));
+            messageHeaders.put(JmsHeaders.PRIORITY, 
String.valueOf(message.getJMSPriority()));
+            messageHeaders.put(JmsHeaders.REDELIVERED, 
String.valueOf(message.getJMSRedelivered()));
+            messageHeaders.put(JmsHeaders.TIMESTAMP, 
String.valueOf(message.getJMSTimestamp()));
+            messageHeaders.put(JmsHeaders.CORRELATION_ID, 
message.getJMSCorrelationID());
+            messageHeaders.put(JmsHeaders.MESSAGE_ID, 
message.getJMSMessageID());
+            messageHeaders.put(JmsHeaders.TYPE, message.getJMSType());
+
+            String replyToDestinationName = 
this.retrieveDestinationName(message.getJMSReplyTo(), JmsHeaders.REPLY_TO);
+            if (replyToDestinationName != null) {
+                messageHeaders.put(JmsHeaders.REPLY_TO, 
replyToDestinationName);
+            }
+            String destinationName = 
this.retrieveDestinationName(message.getJMSDestination(), 
JmsHeaders.DESTINATION);
+            if (destinationName != null) {
+                messageHeaders.put(JmsHeaders.DESTINATION, destinationName);
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to extract JMS Headers", 
e);
+        }
+        return messageHeaders;
+    }
+
+    /**
+     *
+     */
+    private String retrieveDestinationName(Destination destination, String 
headerName) {
+        String destinationName = null;
+        if (destination != null) {
+            try {
+                destinationName = (destination instanceof Queue) ? ((Queue) 
destination).getQueueName()
+                        : ((Topic) destination).getTopicName();
+            } catch (JMSException e) {
+                logger.warn("Failed to retrieve Destination name for '" + 
headerName + "' header", e);
+                this.processLog.warn("Failed to retrieve Destination name for 
'" + headerName + "' header", e);
+            }
+        }
+        return destinationName;
+    }
+
+    /**
+     *
+     */
+    static class JMSResponse {
+        private final byte[] messageBody;
+
+        private final Map<String, Object> messageHeaders;
+
+        private final Map<String, String> messageProperties;
+
+        JMSResponse(byte[] messageBody, Map<String, Object> messageHeaders, 
Map<String, String> messageProperties) {
+            this.messageBody = messageBody;
+            this.messageHeaders = Collections.unmodifiableMap(messageHeaders);
+            this.messageProperties = 
Collections.unmodifiableMap(messageProperties);
+        }
+
+        public byte[] getMessageBody() {
+            return this.messageBody;
+        }
+
+        public Map<String, Object> getMessageHeaders() {
+            return this.messageHeaders;
+        }
+
+        public Map<String, String> getMessageProperties() {
+            return messageProperties;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
new file mode 100644
index 0000000..3643c51
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.support.JmsHeaders;
+
+/**
+ * Generic publisher of messages to JMS compliant messaging system.
+ */
+final class JMSPublisher extends JMSWorker {
+
+    private final static Logger logger = 
LoggerFactory.getLogger(JMSPublisher.class);
+
+
+    /**
+     * Creates an instance of this publisher
+     *
+     * @param jmsTemplate
+     *            instance of {@link JmsTemplate}
+     * @param processLog
+     *            instance of {@link ProcessorLog}
+     */
+    JMSPublisher(JmsTemplate jmsTemplate, ProcessorLog processLog) {
+        super(jmsTemplate, processLog);
+        if (logger.isInfoEnabled()) {
+            logger.info("Created Message Publisher for '" + 
jmsTemplate.toString() + "'.");
+        }
+    }
+
+    /**
+     *
+     * @param messageBytes byte array representing contents of the message
+     */
+    void publish(byte[] messageBytes) {
+        this.publish(messageBytes, null);
+    }
+
+    /**
+     *
+     * @param messageBytes
+     *            byte array representing contents of the message
+     * @param flowFileAttributes
+     *            Map representing {@link FlowFile} attributes.
+     */
+    void publish(final byte[] messageBytes, final Map<String, String> 
flowFileAttributes) {
+        this.jmsTemplate.send(new MessageCreator() {
+            @Override
+            public Message createMessage(Session session) throws JMSException {
+                BytesMessage message = session.createBytesMessage();
+                message.writeBytes(messageBytes);
+                if (flowFileAttributes != null && 
!flowFileAttributes.isEmpty()) {
+                    // set message headers and properties
+                    for (Entry<String, String> entry : 
flowFileAttributes.entrySet()) {
+                        if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && 
!entry.getKey().contains("-")) {// '-' is illegal char in JMS prop names
+                            message.setStringProperty(entry.getKey(), 
entry.getValue());
+                        } else if 
(entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
+                            
message.setJMSDeliveryMode(Integer.parseInt(entry.getValue()));
+                        } else if 
(entry.getKey().equals(JmsHeaders.EXPIRATION)) {
+                            
message.setJMSExpiration(Integer.parseInt(entry.getValue()));
+                        } else if (entry.getKey().equals(JmsHeaders.PRIORITY)) 
{
+                            
message.setJMSPriority(Integer.parseInt(entry.getValue()));
+                        } else if 
(entry.getKey().equals(JmsHeaders.REDELIVERED)) {
+                            
message.setJMSRedelivered(Boolean.parseBoolean(entry.getValue()));
+                        } else if 
(entry.getKey().equals(JmsHeaders.TIMESTAMP)) {
+                            
message.setJMSTimestamp(Long.parseLong(entry.getValue()));
+                        } else if 
(entry.getKey().equals(JmsHeaders.CORRELATION_ID)) {
+                            message.setJMSCorrelationID(entry.getValue());
+                        } else if (entry.getKey().equals(JmsHeaders.TYPE)) {
+                            message.setJMSType(entry.getValue());
+                        } else if (entry.getKey().equals(JmsHeaders.REPLY_TO)) 
{
+                            Destination destination = 
buildDestination(entry.getValue());
+                            if (destination != null) {
+                                message.setJMSReplyTo(destination);
+                            } else {
+                                logUnbuildableDestination(entry.getKey(), 
JmsHeaders.REPLY_TO);
+                            }
+                        } else if 
(entry.getKey().equals(JmsHeaders.DESTINATION)) {
+                            Destination destination = 
buildDestination(entry.getValue());
+                            if (destination != null) {
+                                message.setJMSDestination(destination);
+                            } else {
+                                logUnbuildableDestination(entry.getKey(), 
JmsHeaders.DESTINATION);
+                            }
+                        }
+                    }
+                }
+                return message;
+            }
+        });
+    }
+
+    /**
+     *
+     */
+    private void logUnbuildableDestination(String destinationName, String 
headerName) {
+        logger.warn("Failed to determine destination type from destination 
name '" + destinationName + "'. The '"
+                + headerName + "' will not be set.");
+        processLog.warn("Failed to determine destination type from destination 
name '" + destinationName + "'. The '"
+                + headerName + "' will not be set.");
+    }
+
+    /**
+     *
+     */
+    private Destination buildDestination(final String destinationName) {
+        Destination destination;
+        if (destinationName.toLowerCase().contains("topic")) {
+            destination = this.jmsTemplate.execute(new 
SessionCallback<Topic>() {
+                @Override
+                public Topic doInJms(Session session) throws JMSException {
+                    return session.createTopic(destinationName);
+                }
+            });
+        } else if (destinationName.toLowerCase().contains("queue")) {
+            destination = this.jmsTemplate.execute(new 
SessionCallback<Queue>() {
+                @Override
+                public Queue doInJms(Session session) throws JMSException {
+                    return session.createQueue(destinationName);
+                }
+            });
+        } else {
+            destination = null;
+        }
+        return destination;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
new file mode 100644
index 0000000..bfb3f86
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors;
+
+import java.nio.channels.Channel;
+
+import javax.jms.Connection;
+
+import org.apache.nifi.logging.ProcessorLog;
+import org.springframework.jms.core.JmsTemplate;
+
+
+/**
+ * Base class for implementing publishing and consuming JMS workers.
+ *
+ * @see JMSPublisher
+ * @see JMSConsumer
+ */
+abstract class JMSWorker {
+
+    protected final JmsTemplate jmsTemplate;
+
+    protected final ProcessorLog processLog;
+
+    /**
+     * Creates an instance of this worker initializing it with JMS
+     * {@link Connection} and creating a target {@link Channel} used by
+     * sub-classes to interact with JMS systems
+     *
+     * @param jmsTemplate the instance of {@link JmsTemplate}
+     * @param processLog the instance of {@link ProcessorLog}
+     */
+    public JMSWorker(JmsTemplate jmsTemplate, ProcessorLog processLog) {
+        this.jmsTemplate = jmsTemplate;
+        this.processLog = processLog;
+    }
+
+    /**
+     *
+     */
+    @Override
+    public String toString() {
+        return this.getClass().getSimpleName() + "[destination:" + 
this.jmsTemplate.getDefaultDestinationName()
+                + "; pub-sub:" + this.jmsTemplate.isPubSubDomain() + ";]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java
new file mode 100644
index 0000000..ed212db
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+import org.apache.commons.io.IOUtils;
+
+/**
+ *
+ */
+abstract class MessageBodyToBytesConverter {
+
+    /**
+     *
+     * @param message instance of {@link TextMessage}
+     * @return  byte array representing the {@link TextMessage}
+     */
+    public static byte[] toBytes(TextMessage message) {
+        try {
+            return message.getText().getBytes();
+        } catch (JMSException e) {
+            throw new MessageConversionException("Failed to convert 
BytesMessage to byte[]", e);
+        }
+    }
+
+    /**
+     *
+     * @param message instance of {@link BytesMessage}
+     * @return byte array representing the {@link BytesMessage}
+     */
+    public static byte[] toBytes(BytesMessage message){
+        try {
+            InputStream is = new BytesMessageInputStream(message);
+            return IOUtils.toByteArray(is);
+        } catch (Exception e) {
+            throw new MessageConversionException("Failed to convert 
BytesMessage to byte[]", e);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class BytesMessageInputStream extends InputStream {
+        private BytesMessage message;
+
+        /**
+         *
+         */
+        public BytesMessageInputStream(BytesMessage message) {
+            this.message = message;
+        }
+
+        /**
+         *
+         */
+        @Override
+        public int read() throws IOException {
+            try {
+                return this.message.readByte();
+            } catch (JMSException e) {
+                throw new IOException(e.toString());
+            }
+        }
+
+        /**
+         *
+         */
+        @Override
+        public int read(byte[] buffer, int offset, int length) throws 
IOException {
+            try {
+                if (offset == 0)
+                    return this.message.readBytes(buffer, length);
+                else
+                    return super.read(buffer, offset, length);
+            } catch (JMSException e) {
+                throw new IOException(e.toString());
+            }
+        }
+
+        /**
+         *
+         */
+        @Override
+        public int read(byte[] buffer) throws IOException {
+            try {
+                return this.message.readBytes(buffer);
+            } catch (JMSException e) {
+                throw new IOException(e.toString());
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    static class MessageConversionException extends RuntimeException {
+        private static final long serialVersionUID = -1464448549601643887L;
+
+        /**
+         *
+         */
+        public MessageConversionException(String msg) {
+            super(msg);
+        }
+
+        /**
+         *
+         */
+        public MessageConversionException(String msg, Throwable cause) {
+            super(msg, cause);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
new file mode 100644
index 0000000..8f6c18b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.JmsHeaders;
+
+/**
+ * An implementation of JMS Message publishing {@link Processor} which upon 
each
+ * invocation of {@link #onTrigger(ProcessContext, ProcessSession)} method will
+ * construct a {@link Message} from the contents of the {@link FlowFile} 
sending
+ * it to the {@link Destination} identified by the
+ * {@link AbstractJMSProcessor#DESTINATION} property while transferring the
+ * incoming {@link FlowFile} to 'success' {@link Relationship}. If message can
+ * not be constructed and/or sent the incoming {@link FlowFile} will be
+ * transitioned to 'failure' {@link Relationship}
+ */
+@Tags({ "jms", "put", "message", "send", "publish" })
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Creates a JMS Message from the contents of a FlowFile 
and sends it to a "
+        + "JMS Destination (queue or topic) as JMS BytesMessage.")
+@SeeAlso(value = { ConsumeJMS.class, JMSConnectionFactoryProvider.class })
+public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All FlowFiles that are sent to the JMS destination 
are routed to this relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("All FlowFiles that cannot be sent to JMS destination 
are routed to this relationship")
+            .build();
+
+    private final static Set<Relationship> relationships;
+
+    /*
+     * Will ensure that the list of property descriptors is build only once.
+     * Will also create a Set of relationships
+     */
+    static {
+        Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    /**
+     * Will construct JMS {@link Message} by extracting its body from the
+     * incoming {@link FlowFile}. {@link FlowFile} attributes that represent
+     * standard JMS headers will be extracted from the {@link FlowFile} and set
+     * as JMS headers on the newly constructed message. For the list of
+     * available message headers please see {@link JmsHeaders}. <br>
+     * <br>
+     * Upon success the incoming {@link FlowFile} is transferred to 
the'success'
+     * {@link Relationship} and upon failure FlowFile is penalized and
+     * transferred to the 'failure' {@link Relationship}
+     *
+     */
+    @Override
+    protected void rendezvousWithJms(ProcessContext context, ProcessSession 
processSession) throws ProcessException {
+        FlowFile flowFile = processSession.get();
+        if (flowFile != null) {
+            try {
+                this.targetResource.publish(this.extractMessageBody(flowFile, 
processSession),
+                        flowFile.getAttributes());
+                processSession.transfer(flowFile, REL_SUCCESS);
+                processSession.getProvenanceReporter().send(flowFile, 
context.getProperty(DESTINATION).getValue());
+            } catch (Exception e) {
+                processSession.transfer(flowFile, REL_FAILURE);
+                this.getLogger().error("Failed while sending message to JMS 
via " + this.targetResource, e);
+                context.yield();
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    /**
+     * Will create an instance of {@link JMSPublisher}
+     */
+    @Override
+    protected JMSPublisher finishBuildingTargetResource(JmsTemplate 
jmsTemplate) {
+        return new JMSPublisher(jmsTemplate, this.getLogger());
+    }
+
+    /**
+     * Extracts contents of the {@link FlowFile} as byte array.
+     */
+    private byte[] extractMessageBody(FlowFile flowFile, ProcessSession 
session) {
+        final byte[] messageContent = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, new InputStreamCallback() {
+            @Override
+            public void process(final InputStream in) throws IOException {
+                StreamUtils.fillBuffer(in, messageContent, true);
+            }
+        });
+        return messageContent;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..8e21d4f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.jms.processors.PublishJMS
+org.apache.nifi.jms.processors.ConsumeJMS
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.ConsumeJMS/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.ConsumeJMS/additionalDetails.html
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.ConsumeJMS/additionalDetails.html
new file mode 100644
index 0000000..1f8fb7e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.ConsumeJMS/additionalDetails.html
@@ -0,0 +1,61 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>ConsumeJMS</title>
+    <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
+</head>
+
+<body>
+<h2>Summary</h2>
+<p>
+    This processor consumes messages from JMS compliant messaging system and 
converts them to a FlowFile to be routed to the next component in the flow.
+</p>
+<p>
+    This processor does two things. It constructs FlowFile by extracting 
information from the consumed JMS message including body, standard 
+    <a 
href="http://docs.spring.io/spring-integration/api/org/springframework/integration/jms/JmsHeaders.html";>JMS
 Headers</a> and Properties. 
+    The message body is written to a FlowFile while standard JMS Headers and 
Properties are set as FlowFile attributes.
+</p>
+
+<h2>Configuration Details</h2>
+<p>
+    At the time of writing this document it only defines the essential 
configuration properties which are suitable for most cases. 
+    Other properties will be defined later as this component progresses.
+    Configuring ConsumeJMS:
+</p>
+<ol>
+    <li><b>User Name</b> - [OPTIONAL] User Name used for authentication and 
authorization when this processor obtains <i>javax.jms.Connection</i> 
+    from the pre-configured <i>javax.jms.ConnectionFactory</i> (see below).
+    </li>
+    <li><b>Password</b> - [OPTIONAL] Password used in conjunction with <b>User 
Name</b>.
+    </li>
+    <li><b>Destination Name</b> - [REQUIRED] the name of the 
<i>javax.jms.Destination</i>. 
+    Usually provided by administrator (e.g., 'topic://myTopic'). 
+    </li>
+    <li><b>Destination Type</b> - [OPTIONAL] the type of the 
<i>javax.jms.Destination</i>. Could be one of 'QUEUE' or 'TOPIC'
+    Usually provided by the administrator. Defaults to 'TOPIC'. 
+    </li>
+    <li><b>Session Cache size</b> - [OPTIONAL] Specify the desired size for 
the JMS Session cache (per JMS Session type). This cache 
+    size is the maximum limit for the number of cached Sessions.
+    Usually provided by the administrator (e.g., '2453'). Defaults to '1'.
+    </li>
+    <li><b>Connection Factory Service</b> - [REQUIRED] link to a 
pre-configured instance of org.apache.nifi.jms.cf.JMSConnectionFactoryProvider.
+    </li>
+</ol>
+
+</body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.PublishJMS/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.PublishJMS/additionalDetails.html
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.PublishJMS/additionalDetails.html
new file mode 100644
index 0000000..b812587
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.PublishJMS/additionalDetails.html
@@ -0,0 +1,64 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PublishJMS</title>
+    <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
+</head>
+
+<body>
+<h2>Summary</h2>
+<p>
+    This processor publishes the contents of the incoming FlowFile to a JMS 
compliant messaging system.
+</p>
+<p>
+    This processor does two things. It constructs JMS Message by extracting 
FlowFile contents (both body and attributes). 
+    Once message is constructed it is sent to a pre-configured JMS Destination.
+    Standard <a 
href="http://docs.spring.io/spring-integration/api/org/springframework/integration/jms/JmsHeaders.html";>JMS
 Headers</a> 
+    will be extracted from the FlowFile and set on <i>javax.jms.Message</i> as 
JMS headers while other 
+    FlowFile attributes will be set as properties of <i>javax.jms.Message</i>. 
Upon success the incoming FlowFile is transfered 
+    to the <i>success</i> Relationship and upon failure FlowFile is
+    penalized and transfered to the <i>failure</i> Relationship. 
+</p>
+<h2>Configuration Details</h2>
+<p>
+    At the time of writing this document it only defines the essential 
configuration properties which are suitable for most cases. 
+    Other properties will be defined later as this component progresses.
+    Configuring PublishJMS:
+</p>
+<ol>
+    <li><b>User Name</b> - [OPTIONAL] User Name used for authentication and 
authorization when this processor obtains <i>javax.jms.Connection</i> 
+    from the pre-configured <i>javax.jms.ConnectionFactory</i> (see below).
+    </li>
+    <li><b>Password</b> - [OPTIONAL] Password used in conjunction with <b>User 
Name</b>.
+    </li>
+    <li><b>Destination Name</b> - [REQUIRED] the name of the 
<i>javax.jms.Destination</i>. 
+    Usually provided by administrator (e.g., 'topic://myTopic'). 
+    </li>
+    <li><b>Destination Type</b> - [OPTIONAL] the type of the 
<i>javax.jms.Destination</i>. Could be one of 'QUEUE' or 'TOPIC'
+    Usually provided by the administrator. Defaults to 'TOPIC'. 
+    </li>
+    <li><b>Session Cache size</b> - [OPTIONAL] Specify the desired size for 
the JMS Session cache (per JMS Session type). This cache 
+    size is the maximum limit for the number of cached Sessions.
+    Usually provided by the administrator (e.g., '2453'). Defaults to '1'.
+    </li>
+    <li><b>Connection Factory Service</b> - [REQUIRED] link to a 
pre-configured instance of org.apache.nifi.jms.cf.JMSConnectionFactoryProvider.
+    </li>
+</ol>
+
+</body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
new file mode 100644
index 0000000..6f0cf39
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.nifi.processor.Processor;
+import org.junit.Test;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+
+public class CommonTest {
+
+    @Test
+    public void validateServiceIsLocatableViaServiceLoader() {
+        ServiceLoader<Processor> loader = ServiceLoader.<Processor> 
load(Processor.class);
+        Iterator<Processor> iter = loader.iterator();
+        boolean pubJmsPresent = false;
+        boolean consumeJmsPresent = false;
+        while (iter.hasNext()) {
+            Processor p = iter.next();
+            if 
(p.getClass().getSimpleName().equals(PublishJMS.class.getSimpleName())) {
+                pubJmsPresent = true;
+            } else if 
(p.getClass().getSimpleName().equals(ConsumeJMS.class.getSimpleName())) {
+                consumeJmsPresent = true;
+            }
+
+        }
+        assertTrue(pubJmsPresent);
+        assertTrue(consumeJmsPresent);
+    }
+
+    static JmsTemplate buildJmsTemplateForDestination(String destinationName, 
boolean pubSub) {
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(
+                "vm://localhost?broker.persistent=false");
+        CachingConnectionFactory cf = new 
CachingConnectionFactory(connectionFactory);
+
+        JmsTemplate jmsTemplate = new JmsTemplate(cf);
+        jmsTemplate.setDefaultDestinationName(destinationName);
+        jmsTemplate.setPubSubDomain(pubSub);
+        return jmsTemplate;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
new file mode 100644
index 0000000..48195a3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.JmsHeaders;
+
+public class ConsumeJMSTest {
+
+    @Test
+    public void validateSuccessfulConsumeAndTransferToSuccess() throws 
Exception {
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("cooQueue", false);
+        JMSPublisher sender = new JMSPublisher(jmsTemplate, 
mock(ProcessorLog.class));
+        sender.publish("Hey dude!".getBytes());
+        TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
+        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+        when(cs.getIdentifier()).thenReturn("cfProvider");
+        
when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory());
+        runner.addControllerService("cfProvider", cs);
+        runner.enableControllerService(cs);
+
+        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+        runner.setProperty(ConsumeJMS.DESTINATION, "cooQueue");
+        runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
+        runner.run(1, false);
+        //
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+        assertNotNull(successFF);
+        assertEquals("cooQueue", 
successFF.getAttributes().get(JmsHeaders.DESTINATION));
+        successFF.assertContentEquals("Hey dude!".getBytes());
+
+        ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
new file mode 100644
index 0000000..e18263e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
+import org.apache.nifi.logging.ProcessorLog;
+import org.junit.Test;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.support.JmsHeaders;
+
+public class JMSPublisherConsumerTest {
+
+    @Test
+    public void validateByesConvertedToBytesMessageOnSend() throws Exception {
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("testQueue", false);
+
+        JMSPublisher publisher = new JMSPublisher(jmsTemplate, 
mock(ProcessorLog.class));
+        publisher.publish("hellomq".getBytes());
+
+        Message receivedMessage = jmsTemplate.receive();
+        assertTrue(receivedMessage instanceof BytesMessage);
+        byte[] bytes = new byte[7];
+        ((BytesMessage) receivedMessage).readBytes(bytes);
+        assertEquals("hellomq", new String(bytes));
+
+        ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+    }
+
+    @Test
+    public void 
validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() throws 
Exception {
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("testQueue", false);
+
+        JMSPublisher publisher = new JMSPublisher(jmsTemplate, 
mock(ProcessorLog.class));
+        Map<String, String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put("foo", "foo");
+        flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
+        publisher.publish("hellomq".getBytes(), flowFileAttributes);
+
+        Message receivedMessage = jmsTemplate.receive();
+        assertTrue(receivedMessage instanceof BytesMessage);
+        assertEquals("foo", receivedMessage.getStringProperty("foo"));
+        assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
+        assertEquals("myTopic", ((Topic) 
receivedMessage.getJMSReplyTo()).getTopicName());
+
+        ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+    }
+
+    /**
+     * At the moment the only two supported message types are TextMessage and
+     * BytesMessage which is sufficient for the type if JMS use cases NiFi is
+     * used. The may change to the point where all message types are supported
+     * at which point this test will no be longer required.
+     */
+    @Test(expected = IllegalStateException.class)
+    public void validateFailOnUnsupportedMessageType() throws Exception {
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("testQueue", false);
+
+        jmsTemplate.send(new MessageCreator() {
+            @Override
+            public Message createMessage(Session session) throws JMSException {
+                return session.createObjectMessage();
+            }
+        });
+
+        JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ProcessorLog.class));
+        try {
+            consumer.consume();
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
+    }
+
+    @Test
+    public void validateConsumeWithCustomHeadersAndProperties() throws 
Exception {
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("testQueue", false);
+
+        jmsTemplate.send(new MessageCreator() {
+            @Override
+            public Message createMessage(Session session) throws JMSException {
+                TextMessage message = session.createTextMessage("hello from 
the other side");
+                message.setStringProperty("foo", "foo");
+                message.setBooleanProperty("bar", false);
+                message.setJMSReplyTo(session.createQueue("fooQueue"));
+                return message;
+            }
+        });
+
+        JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ProcessorLog.class));
+        assertEquals("JMSConsumer[destination:testQueue; pub-sub:false;]", 
consumer.toString());
+
+        JMSResponse response = consumer.consume();
+        assertEquals("hello from the other side", new 
String(response.getMessageBody()));
+        assertEquals("fooQueue", 
response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
+        assertEquals("foo", response.getMessageProperties().get("foo"));
+        assertEquals("false", response.getMessageProperties().get("bar"));
+
+        ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+    }
+}

Reply via email to