NIFI-4902: This closes #2485. Updated ConsumeAMQP, PublishAMQP to use one 
connection per concurrent task instead of a single connection shared by all 
concurrent tasks. This offers far better throughput when the network latency is 
non-trivial. Also refactored to simplify code

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/39556e35
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/39556e35
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/39556e35

Branch: refs/heads/master
Commit: 39556e35131638bfb5795dd41736c0faae8aaf39
Parents: 5bdb7cf
Author: Mark Payne <[email protected]>
Authored: Wed Feb 21 09:31:36 2018 -0500
Committer: joewitt <[email protected]>
Committed: Mon Mar 19 11:40:49 2018 -0400

----------------------------------------------------------------------
 .../nifi-amqp-processors/pom.xml                |   2 +-
 .../nifi/amqp/processors/AMQPConsumer.java      |  11 +-
 .../nifi/amqp/processors/AMQPPublisher.java     |  36 +-
 .../nifi/amqp/processors/AMQPResource.java      |  70 ++++
 .../apache/nifi/amqp/processors/AMQPUtils.java  | 240 ------------
 .../apache/nifi/amqp/processors/AMQPWorker.java |  18 +-
 .../amqp/processors/AbstractAMQPProcessor.java  | 136 +++----
 .../nifi/amqp/processors/ConsumeAMQP.java       | 139 ++++---
 .../nifi/amqp/processors/PublishAMQP.java       | 239 ++++++------
 .../nifi/amqp/processors/AMQPPublisherTest.java |   4 +-
 .../nifi/amqp/processors/AMQPUtilsTest.java     |  52 ---
 .../processors/AbstractAMQPProcessorTest.java   |   6 +-
 .../nifi/amqp/processors/ConsumeAMQPTest.java   |  32 +-
 .../nifi/amqp/processors/PublishAMQPTest.java   |  65 +---
 .../nifi/amqp/processors/TestChannel.java       | 376 ++++++++++---------
 .../nifi/amqp/processors/TestConnection.java    |  93 +++--
 16 files changed, 632 insertions(+), 887 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/39556e35/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
index 0511323..61f3fac 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
@@ -20,7 +20,7 @@ language governing permissions and limitations under the 
License. -->
     <packaging>jar</packaging>
 
     <properties>
-        <amqp-client.version>3.6.0</amqp-client.version>
+        <amqp-client.version>5.2.0</amqp-client.version>
     </properties>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/39556e35/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
index 96d5385..0466469 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
@@ -33,14 +33,8 @@ import com.rabbitmq.client.GetResponse;
 final class AMQPConsumer extends AMQPWorker {
 
     private final static Logger logger = 
LoggerFactory.getLogger(AMQPConsumer.class);
-
     private final String queueName;
 
-    /**
-     * Creates an instance of this consumer
-     * @param connection instance of AMQP {@link Connection}
-     * @param queueName name of the queue from which messages will be consumed.
-     */
     AMQPConsumer(Connection connection, String queueName) {
         super(connection);
         this.validateStringProperty("queueName", queueName);
@@ -60,7 +54,7 @@ final class AMQPConsumer extends AMQPWorker {
      */
     public GetResponse consume() {
         try {
-            return this.channel.basicGet(this.queueName, true);
+            return getChannel().basicGet(this.queueName, true);
         } catch (IOException e) {
             logger.error("Failed to receive message from AMQP; " + this + ". 
Possible reasons: Queue '" + this.queueName
                     + "' may not have been defined", e);
@@ -68,9 +62,6 @@ final class AMQPConsumer extends AMQPWorker {
         }
     }
 
-    /**
-     *
-     */
     @Override
     public String toString() {
         return super.toString() + ", QUEUE:" + this.queueName;

http://git-wip-us.apache.org/repos/asf/nifi/blob/39556e35/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
index 6f2fddc..553fc83 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import org.apache.nifi.logging.ComponentLog;
 
 import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ReturnListener;
 
@@ -31,19 +32,17 @@ import com.rabbitmq.client.ReturnListener;
 final class AMQPPublisher extends AMQPWorker {
 
     private final ComponentLog processLog;
-
     private final String connectionString;
 
     /**
      * Creates an instance of this publisher
      *
-     * @param connection
-     *            instance of AMQP {@link Connection}
+     * @param connection instance of AMQP {@link Connection}
      */
     AMQPPublisher(Connection connection, ComponentLog processLog) {
         super(connection);
         this.processLog = processLog;
-        this.channel.addReturnListener(new UndeliverableMessageLogger());
+        getChannel().addReturnListener(new UndeliverableMessageLogger());
         this.connectionString = connection.toString();
     }
 
@@ -51,15 +50,11 @@ final class AMQPPublisher extends AMQPWorker {
      * Publishes message with provided AMQP properties (see
      * {@link BasicProperties}) to a pre-defined AMQP Exchange.
      *
-     * @param bytes
-     *            bytes representing a message.
-     * @param properties
-     *            instance of {@link BasicProperties}
-     * @param exchange
-     *            the name of AMQP exchange to which messages will be 
published.
+     * @param bytes bytes representing a message.
+     * @param properties instance of {@link BasicProperties}
+     * @param exchange the name of AMQP exchange to which messages will be 
published.
      *            If not provided 'default' exchange will be used.
-     * @param routingKey
-     *            (required) the name of the routingKey to be used by 
AMQP-based
+     * @param routingKey (required) the name of the routingKey to be used by 
AMQP-based
      *            system to route messages to its final destination (queue).
      */
     void publish(byte[] bytes, BasicProperties properties, String routingKey, 
String exchange) {
@@ -71,22 +66,18 @@ final class AMQPPublisher extends AMQPWorker {
         processLog.info("Successfully connected AMQPPublisher to " + 
this.connectionString + " and '" + exchange
                 + "' exchange with '" + routingKey + "' as a routing key.");
 
-        if (this.channel.isOpen()) {
+        final Channel channel = getChannel();
+        if (channel.isOpen()) {
             try {
-                this.channel.basicPublish(exchange, routingKey, true, 
properties, bytes);
+                channel.basicPublish(exchange, routingKey, true, properties, 
bytes);
             } catch (Exception e) {
-                throw new IllegalStateException("Failed to publish to '" +
-                        exchange + "' with '" + routingKey + "'.", e);
+                throw new IllegalStateException("Failed to publish to Exchange 
'" + exchange + "' with Routing Key '" + routingKey + "'.", e);
             }
         } else {
-            throw new IllegalStateException("This instance of AMQPPublisher is 
invalid since "
-                    + "its publishingChannel is closed");
+            throw new IllegalStateException("This instance of AMQPPublisher is 
invalid since its publishingChannel is closed");
         }
     }
 
-    /**
-     *
-     */
     @Override
     public String toString() {
         return this.connectionString;
@@ -106,8 +97,7 @@ final class AMQPPublisher extends AMQPWorker {
      */
     private final class UndeliverableMessageLogger implements ReturnListener {
         @Override
-        public void handleReturn(int replyCode, String replyText, String 
exchangeName, String routingKey, BasicProperties properties, byte[] message)
-                throws IOException {
+        public void handleReturn(int replyCode, String replyText, String 
exchangeName, String routingKey, BasicProperties properties, byte[] message) 
throws IOException {
             String logMessage = "Message destined for '" + exchangeName + "' 
exchange with '" + routingKey
                     + "' as routing key came back with replyCode=" + replyCode 
+ " and replyText=" + replyText + ".";
             processLog.warn(logMessage);

http://git-wip-us.apache.org/repos/asf/nifi/blob/39556e35/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java
new file mode 100644
index 0000000..2319e7a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.amqp.processors;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import com.rabbitmq.client.Connection;
+
+public class AMQPResource<T extends AMQPWorker> implements Closeable {
+    private final Connection connection;
+    private final T worker;
+
+    public AMQPResource(final Connection connection, final T worker) {
+        this.connection = connection;
+        this.worker = worker;
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    public T getWorker() {
+        return worker;
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOException ioe = null;
+
+        try {
+            worker.close();
+        } catch (final IOException e) {
+            ioe = e;
+        } catch (final TimeoutException e) {
+            ioe = new IOException(e);
+        }
+
+        try {
+            connection.close();
+        } catch (final IOException e) {
+            if (ioe == null) {
+                ioe = e;
+            } else {
+                ioe.addSuppressed(e);
+            }
+        }
+
+        if (ioe != null) {
+            throw ioe;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/39556e35/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java
deleted file mode 100644
index 68302a2..0000000
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPUtils.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.amqp.processors;
-
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.rabbitmq.client.AMQP.BasicProperties;
-
-/**
- * Utility helper class simplify interactions with target AMQP API and NIFI 
API.
- */
-abstract class AMQPUtils {
-
-    public final static String AMQP_PROP_DELIMITER = "$";
-
-    public final static String AMQP_PROP_PREFIX = "amqp" + AMQP_PROP_DELIMITER;
-
-    private final static Logger logger = 
LoggerFactory.getLogger(AMQPUtils.class);
-
-    public enum PropertyNames {
-        CONTENT_TYPE(AMQP_PROP_PREFIX + "contentType"),
-        CONTENT_ENCODING(AMQP_PROP_PREFIX + "contentEncoding"),
-        HEADERS(AMQP_PROP_PREFIX + "headers"),
-        DELIVERY_MODE(AMQP_PROP_PREFIX + "deliveryMode"),
-        PRIORITY(AMQP_PROP_PREFIX + "priority"),
-        CORRELATION_ID(AMQP_PROP_PREFIX + "correlationId"),
-        REPLY_TO(AMQP_PROP_PREFIX + "replyTo"),
-        EXPIRATION(AMQP_PROP_PREFIX + "expiration"),
-        MESSAGE_ID(AMQP_PROP_PREFIX + "messageId"),
-        TIMESTAMP(AMQP_PROP_PREFIX + "timestamp"),
-        TYPE(AMQP_PROP_PREFIX + "type"),
-        USER_ID(AMQP_PROP_PREFIX + "userId"),
-        APP_ID(AMQP_PROP_PREFIX + "appId"),
-        CLUSTER_ID(AMQP_PROP_PREFIX + "clusterId");
-
-        PropertyNames(String value) {
-            this.value = value;
-        }
-
-        private final String value;
-
-        private static final Map<String, PropertyNames> lookup = new 
HashMap<>();
-
-        public static PropertyNames fromValue(String s) {
-            return lookup.get(s);
-        }
-
-        static {
-            for (PropertyNames propertyNames : PropertyNames.values()) {
-                lookup.put(propertyNames.getValue(), propertyNames);
-            }
-        }
-
-        public String getValue() {
-            return value;
-        }
-
-        @Override
-        public String toString() {
-            return value;
-        }
-    }
-
-    /**
-     * Updates {@link FlowFile} with attributes representing AMQP properties
-     *
-     * @param amqpProperties instance of {@link BasicProperties}
-     * @param flowFile       instance of target {@link FlowFile}
-     * @param processSession instance of {@link ProcessSession}
-     */
-    public static FlowFile 
updateFlowFileAttributesWithAmqpProperties(BasicProperties amqpProperties, 
FlowFile flowFile, ProcessSession processSession) {
-        if (amqpProperties != null) {
-            try {
-                Method[] methods = BasicProperties.class.getDeclaredMethods();
-                Map<String, String> attributes = new HashMap<>();
-                for (Method method : methods) {
-                    if (Modifier.isPublic(method.getModifiers()) && 
method.getName().startsWith("get")) {
-                        Object amqpPropertyValue = 
method.invoke(amqpProperties);
-                        if (amqpPropertyValue != null) {
-                            String propertyName = 
extractPropertyNameFromMethod(method);
-                            if (isValidAmqpPropertyName(propertyName)) {
-                                if 
(propertyName.equals(PropertyNames.CONTENT_TYPE.getValue())) {
-                                    
attributes.put(CoreAttributes.MIME_TYPE.key(), amqpPropertyValue.toString());
-                                }
-                                attributes.put(propertyName, 
amqpPropertyValue.toString());
-                            }
-                        }
-                    }
-                }
-                flowFile = processSession.putAllAttributes(flowFile, 
attributes);
-            } catch (Exception e) {
-                logger.warn("Failed to update FlowFile with AMQP attributes", 
e);
-            }
-        }
-        return flowFile;
-    }
-
-    /**
-     * Will validate if provided name corresponds to valid AMQP property.
-     *
-     * @param name the name of the property
-     * @return 'true' if valid otherwise 'false'
-     */
-    public static boolean isValidAmqpPropertyName(String name) {
-        return PropertyNames.fromValue(name) != null;
-    }
-
-    /**
-     *
-     */
-    private static String extractPropertyNameFromMethod(Method method) {
-        char c[] = method.getName().substring(3).toCharArray();
-        c[0] = Character.toLowerCase(c[0]);
-        return AMQP_PROP_PREFIX + new String(c);
-    }
-
-    /**
-     * Will validate if provided amqpPropValue can be converted to a {@link 
Map}.
-     * Should be passed in the format: amqp$headers=key=value,key=value etc.
-     *
-     * @param amqpPropValue the value of the property
-     * @return {@link Map} if valid otherwise null
-     */
-    public static Map<String, Object> validateAMQPHeaderProperty(String 
amqpPropValue) {
-        String[] strEntries = amqpPropValue.split(",");
-        Map<String, Object> headers = new HashMap<>();
-        for (String strEntry : strEntries) {
-            String[] kv = strEntry.split("=");
-            if (kv.length == 2) {
-                headers.put(kv[0].trim(), kv[1].trim());
-            } else {
-                logger.warn("Malformed key value pair for AMQP header 
property: " + amqpPropValue);
-            }
-        }
-
-        return headers;
-    }
-
-    /**
-     * Will validate if provided amqpPropValue can be converted to an {@link 
Integer}, and that its
-     * value is 1 or 2.
-     *
-     * @param amqpPropValue the value of the property
-     * @return {@link Integer} if valid otherwise null
-     */
-    public static Integer validateAMQPDeliveryModeProperty(String 
amqpPropValue) {
-        Integer deliveryMode = toInt(amqpPropValue);
-
-        if (deliveryMode == null || !(deliveryMode == 1 || deliveryMode == 2)) 
{
-            logger.warn("Invalid value for AMQP deliveryMode property: " + 
amqpPropValue);
-        }
-        return deliveryMode;
-    }
-
-    /**
-     * Will validate if provided amqpPropValue can be converted to an {@link 
Integer} and that its
-     * value is between 0 and 9 (inclusive).
-     *
-     * @param amqpPropValue the value of the property
-     * @return {@link Integer} if valid otherwise null
-     */
-    public static Integer validateAMQPPriorityProperty(String amqpPropValue) {
-        Integer priority = toInt(amqpPropValue);
-
-        if (priority == null || !(priority >= 0 && priority <= 9)) {
-            logger.warn("Invalid value for AMQP priority property: " + 
amqpPropValue);
-        }
-        return priority;
-    }
-
-    /**
-     * Will validate if provided amqpPropValue can be converted to a {@link 
Date}.
-     *
-     * @param amqpPropValue the value of the property
-     * @return {@link Date} if valid otherwise null
-     */
-    public static Date validateAMQPTimestampProperty(String amqpPropValue) {
-        Long timestamp = toLong(amqpPropValue);
-
-        if (timestamp == null) {
-            logger.warn("Invalid value for AMQP timestamp property: " + 
amqpPropValue);
-            return null;
-        }
-
-        //milliseconds are lost when sending to AMQP
-        return new Date(timestamp);
-    }
-
-    /**
-     * Takes a {@link String} and tries to convert to an {@link Integer}.
-     *
-     * @param strVal the value to be converted
-     * @return {@link Integer} if valid otherwise null
-     */
-    private static Integer toInt(String strVal) {
-        try {
-            return Integer.parseInt(strVal);
-        } catch (NumberFormatException aE) {
-            return null;
-        }
-    }
-
-    /**
-     * Takes a {@link String} and tries to convert to a {@link Long}.
-     *
-     * @param strVal the value to be converted
-     * @return {@link Long} if valid otherwise null
-     */
-    private static Long toLong(String strVal) {
-        try {
-            return Long.parseLong(strVal);
-        } catch (NumberFormatException aE) {
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/39556e35/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java
index a4de05e..990ed0b 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java
@@ -34,19 +34,18 @@ import com.rabbitmq.client.Connection;
 abstract class AMQPWorker implements AutoCloseable {
 
     private final static Logger logger = 
LoggerFactory.getLogger(AMQPWorker.class);
-
-    protected final Channel channel;
+    private final Channel channel;
 
     /**
      * Creates an instance of this worker initializing it with AMQP
      * {@link Connection} and creating a target {@link Channel} used by
      * sub-classes to interact with AMQP-based messaging system.
      *
-     * @param connection
-     *            instance of {@link Connection}
+     * @param connection instance of {@link Connection}
      */
-    public AMQPWorker(Connection connection) {
-        this.validateConnection(connection);
+    public AMQPWorker(final Connection connection) {
+        validateConnection(connection);
+
         try {
             this.channel = connection.createChannel();
         } catch (IOException e) {
@@ -55,6 +54,10 @@ abstract class AMQPWorker implements AutoCloseable {
         }
     }
 
+    protected Channel getChannel() {
+        return channel;
+    }
+
     /**
      * Closes {@link Channel} created when instance of this class was created.
      */
@@ -91,8 +94,7 @@ abstract class AMQPWorker implements AutoCloseable {
     /**
      * Validates that {@link Connection} is not null and open.
      *
-     * @param connection
-     *            instance of {@link Connection}
+     * @param connection instance of {@link Connection}
      */
     private void validateConnection(Connection connection) {
         if (connection == null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/39556e35/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
index efd1be5..3e55283 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
@@ -16,9 +16,11 @@
  */
 package org.apache.nifi.amqp.processors;
 
-import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.net.ssl.SSLContext;
 
@@ -29,7 +31,6 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.security.util.SslContextFactory;
@@ -119,112 +120,95 @@ abstract class AbstractAMQPProcessor<T extends 
AMQPWorker> extends AbstractProce
             .defaultValue("REQUIRED")
             .build();
 
-    static List<PropertyDescriptor> descriptors = new ArrayList<>();
+    private static final List<PropertyDescriptor> propertyDescriptors;
 
-    /*
-     * Will ensure that list of PropertyDescriptors is build only once, since
-     * all other lifecycle methods are invoked multiple times.
-     */
     static {
-        descriptors.add(HOST);
-        descriptors.add(PORT);
-        descriptors.add(V_HOST);
-        descriptors.add(USER);
-        descriptors.add(PASSWORD);
-        descriptors.add(AMQP_VERSION);
-        descriptors.add(SSL_CONTEXT_SERVICE);
-        descriptors.add(USE_CERT_AUTHENTICATION);
-        descriptors.add(CLIENT_AUTH);
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HOST);
+        properties.add(PORT);
+        properties.add(V_HOST);
+        properties.add(USER);
+        properties.add(PASSWORD);
+        properties.add(AMQP_VERSION);
+        properties.add(SSL_CONTEXT_SERVICE);
+        properties.add(USE_CERT_AUTHENTICATION);
+        properties.add(CLIENT_AUTH);
+        propertyDescriptors = Collections.unmodifiableList(properties);
     }
 
-    protected volatile Connection amqpConnection;
+    protected static List<PropertyDescriptor> getCommonPropertyDescriptors() {
+        return propertyDescriptors;
+    }
 
-    protected volatile T targetResource;
+    private final BlockingQueue<AMQPResource<T>> resourceQueue = new 
LinkedBlockingQueue<>();
 
     /**
-     * Will builds target resource ({@link AMQPPublisher} or
-     * {@link AMQPConsumer}) upon first invocation and will delegate to the
-     * implementation of
-     * {@link #rendezvousWithAmqp(ProcessContext, ProcessSession)} method for
-     * further processing.
+     * Will builds target resource ({@link AMQPPublisher} or {@link 
AMQPConsumer}) upon first invocation and will delegate to the
+     * implementation of {@link #processResource(ProcessContext, 
ProcessSession)} method for further processing.
      */
     @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        synchronized (this) {
-            this.buildTargetResource(context);
+    public final void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+        AMQPResource<T> resource = resourceQueue.poll();
+        if (resource == null) {
+            resource = createResource(context);
         }
-        this.rendezvousWithAmqp(context, session);
-    }
 
-    /**
-     * Will close current AMQP connection.
-     */
-    @OnStopped
-    public void close() {
         try {
-            if (this.targetResource != null) {
-                this.targetResource.close();
+            processResource(resource.getConnection(), resource.getWorker(), 
context, session);
+            resourceQueue.offer(resource);
+        } catch (final Exception e) {
+            try {
+                resource.close();
+            } catch (final Exception e2) {
+                e.addSuppressed(e2);
             }
-        } catch (Exception e) {
-            this.getLogger().warn("Failure while closing target resource " + 
this.targetResource, e);
+
+            throw e;
         }
-        try {
-            if (this.amqpConnection != null) {
-                this.amqpConnection.close();
+    }
+
+
+    @OnStopped
+    public void close() {
+        AMQPResource<T> resource;
+        while ((resource = resourceQueue.poll()) != null) {
+            try {
+                resource.close();
+            } catch (final Exception e) {
+                getLogger().warn("Failed to close AMQP Connection", e);
             }
-        } catch (IOException e) {
-            this.getLogger().warn("Failure while closing connection", e);
         }
-        this.amqpConnection = null;
     }
 
     /**
-     * Delegate method to supplement
-     * {@link #onTrigger(ProcessContext, ProcessSession)}. It is implemented by
-     * sub-classes to perform {@link Processor} specific functionality.
-     *
-     * @param context
-     *            instance of {@link ProcessContext}
-     * @param session
-     *            instance of {@link ProcessSession}
+     * Performs functionality of the Processor, given the appropriate 
connection and worker
      */
-    protected abstract void rendezvousWithAmqp(ProcessContext context, 
ProcessSession session) throws ProcessException;
+    protected abstract void processResource(Connection connection, T worker, 
ProcessContext context, ProcessSession session) throws ProcessException;
 
     /**
-     * Delegate method to supplement building of target {@link AMQPWorker} (see
-     * {@link AMQPPublisher} or {@link AMQPConsumer}) and is implemented by
-     * sub-classes.
+     * Builds the appropriate AMQP Worker for the implementing processor
      *
-     * @param context
-     *            instance of {@link ProcessContext}
+     * @param context instance of {@link ProcessContext}
      * @return new instance of {@link AMQPWorker}
      */
-    protected abstract T finishBuildingTargetResource(ProcessContext context);
+    protected abstract T createAMQPWorker(ProcessContext context, Connection 
connection);
 
-    /**
-     * Builds target resource ({@link AMQPPublisher} or {@link AMQPConsumer}).
-     * It does so by making a {@link Connection} and then delegating to the
-     * implementation of {@link #finishBuildingTargetResource(ProcessContext)}
-     * which will build {@link AMQPWorker} (see {@link AMQPPublisher} or
-     * {@link AMQPConsumer}).
-     */
-    private void buildTargetResource(ProcessContext context) {
-        if (this.amqpConnection == null || !this.amqpConnection.isOpen()) {
-            this.amqpConnection = this.createConnection(context);
-            this.targetResource = this.finishBuildingTargetResource(context);
-        }
+
+    private AMQPResource<T> createResource(final ProcessContext context) {
+        final Connection connection = createConnection(context);
+        final T worker = createAMQPWorker(context, connection);
+        return new AMQPResource<>(connection, worker);
     }
 
-    /**
-     * Creates {@link Connection} to AMQP system.
-     */
-    private Connection createConnection(ProcessContext context) {
-        ConnectionFactory cf = new ConnectionFactory();
+
+    protected Connection createConnection(ProcessContext context) {
+        final ConnectionFactory cf = new ConnectionFactory();
         cf.setHost(context.getProperty(HOST).getValue());
         cf.setPort(Integer.parseInt(context.getProperty(PORT).getValue()));
         cf.setUsername(context.getProperty(USER).getValue());
         cf.setPassword(context.getProperty(PASSWORD).getValue());
-        String vHost = context.getProperty(V_HOST).getValue();
+
+        final String vHost = context.getProperty(V_HOST).getValue();
         if (vHost != null) {
             cf.setVirtualHost(vHost);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/39556e35/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
index db1f29a..1b0ee52 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
@@ -16,16 +16,18 @@
  */
 package org.apache.nifi.amqp.processors;
 
-import java.io.IOException;
-import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -33,24 +35,34 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.GetResponse;
 
-/**
- * Consuming AMQP processor which upon each invocation of
- * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct a
- * {@link FlowFile} containing the body of the consumed AMQP message and AMQP
- * properties that came with message which are added to a {@link FlowFile} as
- * attributes.
- */
 @Tags({ "amqp", "rabbit", "get", "message", "receive", "consume" })
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Consumes AMQP Message transforming its content to a 
FlowFile and transitioning it to 'success' relationship")
+@CapabilityDescription("Consumes AMQP Messages from an AMQP Broker using the 
AMQP 0.9.1 protocol. Each message that is received from the AMQP Broker will be 
"
+    + "emitted as its own FlowFile to the 'success' relationship.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "amqp$appId", description = "The App ID field 
from the AMQP Message"),
+    @WritesAttribute(attribute = "amqp$contentEncoding", description = "The 
Content Encoding reported by the AMQP Message"),
+    @WritesAttribute(attribute = "amqp$contentType", description = "The 
Content Type reported by the AMQP Message"),
+    @WritesAttribute(attribute = "amqp$headers", description = "The headers 
present on the AMQP Message"),
+    @WritesAttribute(attribute = "amqp$deliveryMode", description = "The 
numeric indicator for the Message's Delivery Mode"),
+    @WritesAttribute(attribute = "amqp$priority", description = "The Message 
priority"),
+    @WritesAttribute(attribute = "amqp$correlationId", description = "The 
Message's Correlation ID"),
+    @WritesAttribute(attribute = "amqp$replyTo", description = "The value of 
the Message's Reply-To field"),
+    @WritesAttribute(attribute = "amqp$expiration", description = "The Message 
Expiration"),
+    @WritesAttribute(attribute = "amqp$messageId", description = "The unique 
ID of the Message"),
+    @WritesAttribute(attribute = "amqp$timestamp", description = "The 
timestamp of the Message, as the number of milliseconds since epoch"),
+    @WritesAttribute(attribute = "amqp$type", description = "The type of 
message"),
+    @WritesAttribute(attribute = "amqp$userId", description = "The ID of the 
user"),
+    @WritesAttribute(attribute = "amqp$clusterId", description = "The ID of 
the AMQP Cluster"),
+})
 public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
+    private static final String ATTRIBUTES_PREFIX = "amqp$";
 
     public static final PropertyDescriptor QUEUE = new 
PropertyDescriptor.Builder()
             .name("Queue")
@@ -64,73 +76,82 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
             .description("All FlowFiles that are received from the AMQP queue 
are routed to this relationship")
             .build();
 
-    private final static List<PropertyDescriptor> propertyDescriptors;
-
-    private final static Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+    private static final Set<Relationship> relationships;
 
-    /*
-     * Will ensure that the list of property descriptors is build only once.
-     * Will also create a Set of relationships
-     */
     static {
-        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
-        _propertyDescriptors.add(QUEUE);
-        _propertyDescriptors.addAll(descriptors);
-        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
-
-        Set<Relationship> _relationships = new HashSet<>();
-        _relationships.add(REL_SUCCESS);
-        relationships = Collections.unmodifiableSet(_relationships);
+        List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(QUEUE);
+        properties.addAll(getCommonPropertyDescriptors());
+        propertyDescriptors = Collections.unmodifiableList(properties);
+
+        Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        relationships = Collections.unmodifiableSet(rels);
     }
 
     /**
-     * Will construct a {@link FlowFile} containing the body of the consumed
-     * AMQP message (if {@link GetResponse} returned by {@link AMQPConsumer} is
-     * not null) and AMQP properties that came with message which are added to 
a
-     * {@link FlowFile} as attributes, transferring {@link FlowFile} to
+     * Will construct a {@link FlowFile} containing the body of the consumed 
AMQP message (if {@link GetResponse} returned by {@link AMQPConsumer} is
+     * not null) and AMQP properties that came with message which are added to 
a {@link FlowFile} as attributes, transferring {@link FlowFile} to
      * 'success' {@link Relationship}.
      */
     @Override
-    protected void rendezvousWithAmqp(ProcessContext context, ProcessSession 
processSession) throws ProcessException {
-        final GetResponse response = this.targetResource.consume();
-        if (response != null){
-            FlowFile flowFile = processSession.create();
-            flowFile = processSession.write(flowFile, new 
OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException 
{
-                    out.write(response.getBody());
-                }
-            });
-            BasicProperties amqpProperties = response.getProps();
-            flowFile = 
AMQPUtils.updateFlowFileAttributesWithAmqpProperties(amqpProperties, flowFile, 
processSession);
-            processSession.getProvenanceReporter().receive(flowFile,
-                    this.amqpConnection.toString() + "/" + 
context.getProperty(QUEUE).getValue());
-            processSession.transfer(flowFile, REL_SUCCESS);
-        } else {
+    protected void processResource(final Connection connection, final 
AMQPConsumer consumer, final ProcessContext context, final ProcessSession 
session) {
+        final GetResponse response = consumer.consume();
+        if (response == null) {
             context.yield();
+            return;
         }
+
+        FlowFile flowFile = session.create();
+        flowFile = session.write(flowFile, out -> 
out.write(response.getBody()));
+
+        final BasicProperties amqpProperties = response.getProps();
+        final Map<String, String> attributes = buildAttributes(amqpProperties);
+        flowFile = session.putAllAttributes(flowFile, attributes);
+
+        session.getProvenanceReporter().receive(flowFile, 
connection.toString() + "/" + context.getProperty(QUEUE).getValue());
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+    private Map<String, String> buildAttributes(final BasicProperties 
properties) {
+        final Map<String, String> attributes = new HashMap<>();
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", 
properties.getAppId());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", 
properties.getContentEncoding());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "contentType", 
properties.getContentType());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "headers", 
properties.getHeaders());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "deliveryMode", 
properties.getDeliveryMode());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "priority", 
properties.getPriority());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "correlationId", 
properties.getCorrelationId());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "replyTo", 
properties.getReplyTo());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "expiration", 
properties.getExpiration());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "messageId", 
properties.getMessageId());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "timestamp", 
properties.getTimestamp() == null ? null : properties.getTimestamp().getTime());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "type", 
properties.getType());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "userId", 
properties.getUserId());
+        addAttribute(attributes, ATTRIBUTES_PREFIX + "clusterId", 
properties.getClusterId());
+        return attributes;
+    }
+
+    private void addAttribute(final Map<String, String> attributes, final 
String attributeName, final Object value) {
+        if (value == null) {
+            return;
+        }
+
+        attributes.put(attributeName, value.toString());
     }
 
-    /**
-     * Will create an instance of {@link AMQPConsumer}
-     */
     @Override
-    protected AMQPConsumer finishBuildingTargetResource(ProcessContext 
context) {
-        String queueName = context.getProperty(QUEUE).getValue();
-        return new AMQPConsumer(this.amqpConnection, queueName);
+    protected AMQPConsumer createAMQPWorker(final ProcessContext context, 
final Connection connection) {
+        final String queueName = context.getProperty(QUEUE).getValue();
+        return new AMQPConsumer(connection, queueName);
     }
 
-    /**
-     *
-     */
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return propertyDescriptors;
     }
 
-    /**
-     *
-     */
     @Override
     public Set<Relationship> getRelationships() {
         return relationships;

http://git-wip-us.apache.org/repos/asf/nifi/blob/39556e35/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
index 857e591..7dce05e 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
@@ -20,16 +20,20 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
+import java.util.function.Consumer;
 
 import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -45,27 +49,34 @@ import org.apache.nifi.stream.io.StreamUtils;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.Connection;
 
-/**
- * Publishing AMQP processor which upon each invocation of
- * {@link #onTrigger(ProcessContext, ProcessSession)} method will construct an
- * AMQP message sending it to an exchange identified during construction of 
this
- * class while transferring the incoming {@link FlowFile} to 'success'
- * {@link Relationship}.
- *
- * Expects that queues, exchanges and bindings are pre-defined by an AMQP
- * administrator
- */
 @Tags({ "amqp", "rabbit", "put", "message", "send", "publish" })
 @InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Creates a AMQP Message from the contents of a FlowFile 
and sends the message to an AMQP Exchange."
-        + "In a typical AMQP exchange model, the message that is sent to the 
AMQP Exchange will be routed based on the 'Routing Key' "
-        + "to its final destination in the queue (the binding). If due to some 
misconfiguration the binding between the Exchange, Routing Key "
-        + "and Queue is not set up, the message will have no final destination 
and will return (i.e., the data will not make it to the queue). If "
-        + "that happens you will see a log in both app-log and bulletin 
stating to that effect. Fixing the binding "
-        + "(normally done by AMQP administrator) will resolve the issue.")
+@CapabilityDescription("Creates an AMQP Message from the contents of a 
FlowFile and sends the message to an AMQP Exchange. "
+    + "In a typical AMQP exchange model, the message that is sent to the AMQP 
Exchange will be routed based on the 'Routing Key' "
+    + "to its final destination in the queue (the binding). If due to some 
misconfiguration the binding between the Exchange, Routing Key "
+    + "and Queue is not set up, the message will have no final destination and 
will return (i.e., the data will not make it to the queue). If "
+    + "that happens you will see a log in both app-log and bulletin stating to 
that effect, and the FlowFile will be routed to the 'failure' relationship.")
 @SystemResourceConsideration(resource = SystemResource.MEMORY)
+@ReadsAttributes({
+    @ReadsAttribute(attribute = "amqp$appId", description = "The App ID field 
to set on the AMQP Message"),
+    @ReadsAttribute(attribute = "amqp$contentEncoding", description = "The 
Content Encoding to set on the AMQP Message"),
+    @ReadsAttribute(attribute = "amqp$contentType", description = "The Content 
Type to set on the AMQP Message"),
+    @ReadsAttribute(attribute = "amqp$headers", description = "The headers to 
set on the AMQP Message"),
+    @ReadsAttribute(attribute = "amqp$deliveryMode", description = "The 
numeric indicator for the Message's Delivery Mode"),
+    @ReadsAttribute(attribute = "amqp$priority", description = "The Message 
priority"),
+    @ReadsAttribute(attribute = "amqp$correlationId", description = "The 
Message's Correlation ID"),
+    @ReadsAttribute(attribute = "amqp$replyTo", description = "The value of 
the Message's Reply-To field"),
+    @ReadsAttribute(attribute = "amqp$expiration", description = "The Message 
Expiration"),
+    @ReadsAttribute(attribute = "amqp$messageId", description = "The unique ID 
of the Message"),
+    @ReadsAttribute(attribute = "amqp$timestamp", description = "The timestamp 
of the Message, as the number of milliseconds since epoch"),
+    @ReadsAttribute(attribute = "amqp$type", description = "The type of 
message"),
+    @ReadsAttribute(attribute = "amqp$userId", description = "The ID of the 
user"),
+    @ReadsAttribute(attribute = "amqp$clusterId", description = "The ID of the 
AMQP Cluster"),
+})
 public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
+    private static final String ATTRIBUTES_PREFIX = "amqp$";
 
     public static final PropertyDescriptor EXCHANGE = new 
PropertyDescriptor.Builder()
             .name("Exchange Name")
@@ -100,84 +111,71 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
 
     private final static Set<Relationship> relationships;
 
-    /*
-     * Will ensure that the list of property descriptors is build only once.
-     * Will also create a Set of relationships
-     */
     static {
-        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
-        _propertyDescriptors.add(EXCHANGE);
-        _propertyDescriptors.add(ROUTING_KEY);
-        _propertyDescriptors.addAll(descriptors);
-        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+        List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(EXCHANGE);
+        properties.add(ROUTING_KEY);
+        properties.addAll(getCommonPropertyDescriptors());
+        propertyDescriptors = Collections.unmodifiableList(properties);
 
-        Set<Relationship> _relationships = new HashSet<>();
-        _relationships.add(REL_SUCCESS);
-        _relationships.add(REL_FAILURE);
-        relationships = Collections.unmodifiableSet(_relationships);
+        Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(rels);
     }
 
+
     /**
-     * Will construct AMQP message by extracting its body from the incoming
-     * {@link FlowFile}. AMQP Properties will be extracted from the
-     * {@link FlowFile} and converted to {@link BasicProperties} to be sent
-     * along with the message. Upon success the incoming {@link FlowFile} is
-     * transferred to 'success' {@link Relationship} and upon failure FlowFile 
is
-     * penalized and transferred to the 'failure' {@link Relationship}
+     * Will construct AMQP message by extracting its body from the incoming 
{@link FlowFile}. AMQP Properties will be extracted from the
+     * {@link FlowFile} and converted to {@link BasicProperties} to be sent 
along with the message. Upon success the incoming {@link FlowFile} is
+     * transferred to 'success' {@link Relationship} and upon failure FlowFile 
is penalized and transferred to the 'failure' {@link Relationship}
      * <br>
+     *
      * NOTE: Attributes extracted from {@link FlowFile} are considered
      * candidates for AMQP properties if their names are prefixed with
      * {@link AMQPUtils#AMQP_PROP_PREFIX} (e.g., amqp$contentType=text/xml)
-     *
      */
     @Override
-    protected void rendezvousWithAmqp(ProcessContext context, ProcessSession 
processSession) throws ProcessException {
-        FlowFile flowFile = processSession.get();
-        if (flowFile != null) {
-            BasicProperties amqpProperties = 
this.extractAmqpPropertiesFromFlowFile(flowFile);
-            String routingKey = 
context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
-            if (routingKey == null){
-                throw new IllegalArgumentException("Failed to determine 
'routing key' with provided value '"
-                        + context.getProperty(ROUTING_KEY) + "' after 
evaluating it as expression against incoming FlowFile.");
-            }
-            String exchange = 
context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
+    protected void processResource(final Connection connection, final 
AMQPPublisher publisher, ProcessContext context, ProcessSession session) throws 
ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
 
-            byte[] messageContent = this.extractMessage(flowFile, 
processSession);
+        final BasicProperties amqpProperties = 
extractAmqpPropertiesFromFlowFile(flowFile);
+        final String routingKey = 
context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
+        if (routingKey == null) {
+            throw new IllegalArgumentException("Failed to determine 'routing 
key' with provided value '"
+                + context.getProperty(ROUTING_KEY) + "' after evaluating it as 
expression against incoming FlowFile.");
+        }
 
-            try {
-                this.targetResource.publish(messageContent, amqpProperties, 
routingKey, exchange);
-                processSession.transfer(flowFile, REL_SUCCESS);
-                processSession.getProvenanceReporter().send(flowFile, 
this.amqpConnection.toString() + "/E:" + exchange + "/RK:" + routingKey);
-            } catch (Exception e) {
-                processSession.transfer(processSession.penalize(flowFile), 
REL_FAILURE);
-                this.getLogger().error("Failed while sending message to AMQP 
via " + this.targetResource, e);
-                context.yield();
-            }
+        final String exchange = 
context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
+        final byte[] messageContent = extractMessage(flowFile, session);
+
+        try {
+            publisher.publish(messageContent, amqpProperties, routingKey, 
exchange);
+            session.transfer(flowFile, REL_SUCCESS);
+            session.getProvenanceReporter().send(flowFile, 
connection.toString() + "/E:" + exchange + "/RK:" + routingKey);
+        } catch (Exception e) {
+            session.transfer(session.penalize(flowFile), REL_FAILURE);
+            getLogger().error("Failed while sending message to AMQP via " + 
publisher, e);
         }
     }
 
-    /**
-     *
-     */
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return propertyDescriptors;
     }
 
-    /**
-     *
-     */
     @Override
     public Set<Relationship> getRelationships() {
         return relationships;
     }
 
-    /**
-     * Will create an instance of {@link AMQPPublisher}
-     */
     @Override
-    protected AMQPPublisher finishBuildingTargetResource(ProcessContext 
context) {
-        return new AMQPPublisher(this.amqpConnection, this.getLogger());
+    protected AMQPPublisher createAMQPWorker(final ProcessContext context, 
final Connection connection) {
+        return new AMQPPublisher(connection, getLogger());
     }
 
     /**
@@ -194,6 +192,20 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
         return messageContent;
     }
 
+
+    private void updateBuilderFromAttribute(final FlowFile flowFile, final 
String attribute, final Consumer<String> updater) {
+        final String attributeValue = flowFile.getAttribute(ATTRIBUTES_PREFIX 
+ attribute);
+        if (attributeValue == null) {
+            return;
+        }
+
+        try {
+            updater.accept(attributeValue);
+        } catch (final Exception e) {
+            getLogger().warn("Failed to update AMQP Message Property " + 
attribute, e);
+        }
+    }
+
     /**
      * Extracts AMQP properties from the {@link FlowFile} attributes. 
Attributes
      * extracted from {@link FlowFile} are considered candidates for AMQP
@@ -208,66 +220,45 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
      * {@link AMQPUtils#validateAMQPTimestampProperty}
      */
     private BasicProperties extractAmqpPropertiesFromFlowFile(FlowFile 
flowFile) {
-        Map<String, String> attributes = flowFile.getAttributes();
-        AMQP.BasicProperties.Builder builder = new 
AMQP.BasicProperties.Builder();
-        for (Entry<String, String> attributeEntry : attributes.entrySet()) {
-            if 
(attributeEntry.getKey().startsWith(AMQPUtils.AMQP_PROP_PREFIX)) {
-                String amqpPropName = attributeEntry.getKey();
-                String amqpPropValue = attributeEntry.getValue();
+        final AMQP.BasicProperties.Builder builder = new 
AMQP.BasicProperties.Builder();
 
-                AMQPUtils.PropertyNames propertyNames = 
AMQPUtils.PropertyNames.fromValue(amqpPropName);
+        updateBuilderFromAttribute(flowFile, "contentType", 
builder::contentType);
+        updateBuilderFromAttribute(flowFile, "contentEncoding", 
builder::contentEncoding);
+        updateBuilderFromAttribute(flowFile, "deliveryMode", mode -> 
builder.deliveryMode(Integer.parseInt(mode)));
+        updateBuilderFromAttribute(flowFile, "priority", pri -> 
builder.priority(Integer.parseInt(pri)));
+        updateBuilderFromAttribute(flowFile, "correlationId", 
builder::correlationId);
+        updateBuilderFromAttribute(flowFile, "replyTo", builder::replyTo);
+        updateBuilderFromAttribute(flowFile, "expiration", 
builder::expiration);
+        updateBuilderFromAttribute(flowFile, "messageId", builder::messageId);
+        updateBuilderFromAttribute(flowFile, "timestamp", ts -> 
builder.timestamp(new Date(Long.parseLong(ts))));
+        updateBuilderFromAttribute(flowFile, "type", builder::type);
+        updateBuilderFromAttribute(flowFile, "userId", builder::userId);
+        updateBuilderFromAttribute(flowFile, "appId", builder::appId);
+        updateBuilderFromAttribute(flowFile, "clusterId", builder::clusterId);
+        updateBuilderFromAttribute(flowFile, "headers", headers -> 
builder.headers(validateAMQPHeaderProperty(headers)));
 
-                if (propertyNames != null) {
-                    switch (propertyNames){
-                        case CONTENT_TYPE:
-                            builder.contentType(amqpPropValue);
-                            break;
-                        case CONTENT_ENCODING:
-                            builder.contentEncoding(amqpPropValue);
-                            break;
-                        case HEADERS:
-                            
builder.headers(AMQPUtils.validateAMQPHeaderProperty(amqpPropValue));
-                            break;
-                        case DELIVERY_MODE:
-                            
builder.deliveryMode(AMQPUtils.validateAMQPDeliveryModeProperty(amqpPropValue));
-                            break;
-                        case PRIORITY:
-                            
builder.priority(AMQPUtils.validateAMQPPriorityProperty(amqpPropValue));
-                            break;
-                        case CORRELATION_ID:
-                            builder.correlationId(amqpPropValue);
-                            break;
-                        case REPLY_TO:
-                            builder.replyTo(amqpPropValue);
-                            break;
-                        case EXPIRATION:
-                            builder.expiration(amqpPropValue);
-                            break;
-                        case MESSAGE_ID:
-                            builder.messageId(amqpPropValue);
-                            break;
-                        case TIMESTAMP:
-                            
builder.timestamp(AMQPUtils.validateAMQPTimestampProperty(amqpPropValue));
-                            break;
-                        case TYPE:
-                            builder.type(amqpPropValue);
-                            break;
-                        case USER_ID:
-                            builder.userId(amqpPropValue);
-                            break;
-                        case APP_ID:
-                            builder.appId(amqpPropValue);
-                            break;
-                        case CLUSTER_ID:
-                            builder.clusterId(amqpPropValue);
-                            break;
-                    }
+        return builder.build();
+    }
 
-                } else {
-                    getLogger().warn("Unrecognised AMQP property '" + 
amqpPropName + "', will ignore.");
-                }
+    /**
+     * Will validate if provided amqpPropValue can be converted to a {@link 
Map}.
+     * Should be passed in the format: amqp$headers=key=value,key=value etc.
+     *
+     * @param amqpPropValue the value of the property
+     * @return {@link Map} if valid otherwise null
+     */
+    private Map<String, Object> validateAMQPHeaderProperty(String 
amqpPropValue) {
+        String[] strEntries = amqpPropValue.split(",");
+        Map<String, Object> headers = new HashMap<>();
+        for (String strEntry : strEntries) {
+            String[] kv = strEntry.split("=");
+            if (kv.length == 2) {
+                headers.put(kv[0].trim(), kv[1].trim());
+            } else {
+                getLogger().warn("Malformed key value pair for AMQP header 
property: " + amqpPropValue);
             }
         }
-        return builder.build();
+
+        return headers;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/39556e35/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
index 33844c3..51bd59f 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
@@ -72,7 +72,6 @@ public class AMQPPublisherTest {
 
         try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
             sender.publish("hello".getBytes(), null, "key1", "myExchange");
-            Thread.sleep(200);
         }
 
         assertNotNull(connection.createChannel().basicGet("queue1", true));
@@ -95,9 +94,8 @@ public class AMQPPublisherTest {
 
         try (AMQPPublisher sender = new AMQPPublisher(connection, new 
MockComponentLog("foo", ""))) {
             sender.publish("hello".getBytes(), null, "key1", "myExchange");
-            Thread.sleep(1000);
         }
-        Thread.sleep(200);
+
         verify(retListener, atMost(1)).handleReturn(Mockito.anyInt(), 
Mockito.anyString(), Mockito.anyString(),
                 Mockito.anyString(), Mockito.any(BasicProperties.class), 
(byte[]) Mockito.any());
         connection.close();

http://git-wip-us.apache.org/repos/asf/nifi/blob/39556e35/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java
deleted file mode 100644
index 5452809..0000000
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.amqp.processors;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.util.MockProcessSession;
-import org.apache.nifi.util.SharedSessionState;
-import org.junit.Test;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.AMQP.BasicProperties;
-
-public class AMQPUtilsTest {
-
-
-    @Test
-    public void validateUpdateFlowFileAttributesWithAmqpProperties() {
-        PublishAMQP processor = new PublishAMQP();
-        ProcessSession processSession = new MockProcessSession(new 
SharedSessionState(processor, new AtomicLong()),
-                processor);
-        FlowFile sourceFlowFile = processSession.create();
-        BasicProperties amqpProperties = new AMQP.BasicProperties.Builder()
-                .contentType("text/plain").deliveryMode(2)
-                .priority(1).userId("joe")
-                .build();
-        FlowFile f2 = 
AMQPUtils.updateFlowFileAttributesWithAmqpProperties(amqpProperties, 
sourceFlowFile,
-                processSession);
-
-        assertEquals("text/plain", 
f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "contentType"));
-        assertEquals("joe", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX 
+ "userId"));
-        assertEquals("2", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + 
"deliveryMode"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/39556e35/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
index 0657a65..bc4c32d 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
@@ -29,6 +29,8 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.rabbitmq.client.Connection;
+
 
 /**
  * Unit tests for the AbstractAMQPProcessor class
@@ -77,12 +79,12 @@ public class AbstractAMQPProcessorTest {
      */
     public static class MockAbstractAMQPProcessor extends 
AbstractAMQPProcessor<AMQPConsumer> {
         @Override
-        protected void rendezvousWithAmqp(ProcessContext context, 
ProcessSession session) throws ProcessException {
+        protected void processResource(Connection connection, AMQPConsumer 
consumer, ProcessContext context, ProcessSession session) throws 
ProcessException {
             // nothing to do
         }
 
         @Override
-        protected AMQPConsumer finishBuildingTargetResource(ProcessContext 
context) {
+        protected AMQPConsumer createAMQPWorker(ProcessContext context, 
Connection connection) {
             return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/39556e35/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index 52b48d8..66abb2d 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -20,14 +20,12 @@ import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.mock;
 
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -41,12 +39,10 @@ public class ConsumeAMQPTest {
 
     @Test
     public void validateSuccessfullConsumeAndTransferToSuccess() throws 
Exception {
-        Map<String, List<String>> routingMap = new HashMap<>();
-        routingMap.put("key1", Arrays.asList("queue1", "queue2"));
-        Map<String, String> exchangeToRoutingKeymap = new HashMap<>();
-        exchangeToRoutingKeymap.put("myExchange", "key1");
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
 
-        Connection connection = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
 
         try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
             sender.publish("hello".getBytes(), 
MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
@@ -57,7 +53,6 @@ public class ConsumeAMQPTest {
             runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
 
             runner.run();
-            Thread.sleep(200);
             final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
             assertNotNull(successFF);
         }
@@ -65,25 +60,20 @@ public class ConsumeAMQPTest {
     }
 
     public static class LocalConsumeAMQP extends ConsumeAMQP {
+        private final Connection connection;
 
-        private final Connection conection;
         public LocalConsumeAMQP(Connection connection) {
-            this.conection = connection;
+            this.connection = connection;
         }
 
         @Override
-        public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-            synchronized (this) {
-                if (this.amqpConnection == null || 
!this.amqpConnection.isOpen()) {
-                    this.amqpConnection = this.conection;
-                    this.targetResource = 
this.finishBuildingTargetResource(context);
-                }
-            }
-            this.rendezvousWithAmqp(context, session);
+        protected AMQPConsumer createAMQPWorker(ProcessContext context, 
Connection connection) {
+            return new AMQPConsumer(connection, 
context.getProperty(QUEUE).getValue());
         }
 
-        public Connection getConnection() {
-            return this.amqpConnection;
+        @Override
+        protected Connection createConnection(ProcessContext context) {
+            return connection;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/39556e35/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
index fec3d50..cee44a1 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
@@ -22,15 +22,13 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -44,13 +42,13 @@ public class PublishAMQPTest {
 
     @Test
     public void validateSuccessfullPublishAndTransferToSuccess() throws 
Exception {
-        PublishAMQP pubProc = new LocalPublishAMQP(false);
-        TestRunner runner = TestRunners.newTestRunner(pubProc);
+        final PublishAMQP pubProc = new LocalPublishAMQP();
+        final TestRunner runner = TestRunners.newTestRunner(pubProc);
         runner.setProperty(PublishAMQP.HOST, "injvm");
         runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
         runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
 
-        Map<String, String> attributes = new HashMap<>();
+        final Map<String, String> attributes = new HashMap<>();
         attributes.put("foo", "bar");
         attributes.put("amqp$contentType", "foo/bar");
         attributes.put("amqp$contentEncoding", "foobar123");
@@ -70,20 +68,21 @@ public class PublishAMQPTest {
         runner.enqueue("Hello Joe".getBytes(), attributes);
 
         runner.run();
+
         final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
         assertNotNull(successFF);
-        Channel channel = ((LocalPublishAMQP) 
pubProc).getConnection().createChannel();
-        GetResponse msg1 = channel.basicGet("queue1", true);
+
+        final Channel channel = ((LocalPublishAMQP) 
pubProc).getConnection().createChannel();
+        final GetResponse msg1 = channel.basicGet("queue1", true);
         assertNotNull(msg1);
         assertEquals("foo/bar", msg1.getProps().getContentType());
-
         assertEquals("foobar123", msg1.getProps().getContentEncoding());
 
-        Map<String, Object> headerMap = msg1.getProps().getHeaders();
+        final Map<String, Object> headerMap = msg1.getProps().getHeaders();
 
-        Object foo = headerMap.get("foo");
-        Object foo2 = headerMap.get("foo2");
-        Object foo3 = headerMap.get("foo3");
+        final Object foo = headerMap.get("foo");
+        final Object foo2 = headerMap.get("foo2");
+        final Object foo3 = headerMap.get("foo3");
 
         assertEquals("bar", foo.toString());
         assertEquals("bar2", foo2.toString());
@@ -115,53 +114,29 @@ public class PublishAMQPTest {
         runner.enqueue("Hello Joe".getBytes());
 
         runner.run();
-        Thread.sleep(200);
 
         
assertTrue(runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty());
         
assertNotNull(runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).get(0));
     }
 
-    public static class LocalPublishAMQP extends PublishAMQP {
 
-        private final boolean closeConnection;
+    public static class LocalPublishAMQP extends PublishAMQP {
+        private TestConnection connection;
 
         public LocalPublishAMQP() {
-            this(true);
-        }
+            final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+            final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
 
-        public LocalPublishAMQP(boolean closeConection) {
-            this.closeConnection = closeConection;
+            connection = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
         }
 
         @Override
-        public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-            synchronized (this) {
-                if (this.amqpConnection == null || 
!this.amqpConnection.isOpen()) {
-                    Map<String, List<String>> routingMap = new HashMap<>();
-                    routingMap.put("key1", Arrays.asList("queue1", "queue2"));
-                    Map<String, String> exchangeToRoutingKeymap = new 
HashMap<>();
-                    exchangeToRoutingKeymap.put("myExchange", "key1");
-                    this.amqpConnection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
-                    this.targetResource = 
this.finishBuildingTargetResource(context);
-                }
-            }
-            this.rendezvousWithAmqp(context, session);
+        protected Connection createConnection(ProcessContext context) {
+            return connection;
         }
 
         public Connection getConnection() {
-            this.close();
-            return this.amqpConnection;
-        }
-
-        // since we really don't have any real connection (rather emulated 
one), the override is
-        // needed here so the call to close from TestRunner does nothing since 
we are
-        // grabbing the emulated connection later to do the assertions in some 
tests.
-        @Override
-        @OnStopped
-        public void close() {
-            if (this.closeConnection) {
-                super.close();
-            }
+            return connection;
         }
     }
 }

Reply via email to