Repository: nifi
Updated Branches:
  refs/heads/master 75af3a2eb -> dc8b62c3a


http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
new file mode 100644
index 0000000..9f01c04
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.amqp.processors;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.MockProcessorLog;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ReturnListener;
+
+public class AMQPPublisherTest {
+
+    @SuppressWarnings("resource")
+    @Test(expected = IllegalArgumentException.class)
+    public void failOnNullConnection() {
+        new AMQPPublisher(null, null, null, null);
+    }
+
+    @SuppressWarnings("resource")
+    @Test(expected = IllegalArgumentException.class)
+    public void failOnMissingRoutingKey() throws Exception {
+        Connection conn = new TestConnection(null, null);
+        new AMQPPublisher(conn, null, "", null);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void failPublishIfChannelClosed() throws Exception {
+        Connection conn = new TestConnection(null, null);
+        try (AMQPPublisher sender = new AMQPPublisher(conn, null, "foo", 
null)) {
+            conn.close();
+            sender.publish("oleg".getBytes());
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void failPublishIfChannelFails() throws Exception {
+        TestConnection conn = new TestConnection(null, null);
+        try (AMQPPublisher sender = new AMQPPublisher(conn, null, "foo", 
null)) {
+            ((TestChannel) conn.createChannel()).corruptChannel();
+            sender.publish("oleg".getBytes());
+        }
+    }
+
+    @Test
+    public void validateSuccessfullPublishingAndRouting() throws Exception {
+        Map<String, List<String>> routingMap = new HashMap<>();
+        routingMap.put("key1", Arrays.asList("queue1", "queue2"));
+        Map<String, String> exchangeToRoutingKeymap = new HashMap<>();
+        exchangeToRoutingKeymap.put("myExchange", "key1");
+
+        Connection connection = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
"myExchange", "key1", null)) {
+            sender.publish("hello".getBytes());
+            Thread.sleep(200);
+        }
+
+        assertNotNull(connection.createChannel().basicGet("queue1", true));
+        assertNotNull(connection.createChannel().basicGet("queue2", true));
+
+        connection.close();
+    }
+
+    @Test
+    public void validateSuccessfullPublishingAndUndeliverableRoutingKey() 
throws Exception {
+        Map<String, List<String>> routingMap = new HashMap<>();
+        routingMap.put("key1", Arrays.asList("queue1", "queue2"));
+        Map<String, String> exchangeToRoutingKeymap = new HashMap<>();
+        exchangeToRoutingKeymap.put("myExchange", "key1");
+
+        Connection connection = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
+
+        ReturnListener retListener = mock(ReturnListener.class);
+        connection.createChannel().addReturnListener(retListener);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
"myExchange", "key2",
+                new MockProcessorLog("foo", ""))) {
+            sender.publish("hello".getBytes());
+            Thread.sleep(1000);
+        }
+        Thread.sleep(200);
+        verify(retListener, atMost(1)).handleReturn(Mockito.anyInt(), 
Mockito.anyString(), Mockito.anyString(),
+                Mockito.anyString(), Mockito.any(BasicProperties.class), 
(byte[]) Mockito.any());
+        connection.close();
+    }
+
+    @Test
+    public void validateToString() throws Exception {
+        TestConnection conn = new TestConnection(null, null);
+        try (AMQPPublisher sender = new AMQPPublisher(conn, "myExchange", 
"key1", null)) {
+            String toString = sender.toString();
+            assertTrue(toString.contains("EXCHANGE:myExchange, 
ROUTING_KEY:key1"));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java
new file mode 100644
index 0000000..5452809
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.amqp.processors;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.SharedSessionState;
+import org.junit.Test;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AMQP.BasicProperties;
+
+public class AMQPUtilsTest {
+
+
+    @Test
+    public void validateUpdateFlowFileAttributesWithAmqpProperties() {
+        PublishAMQP processor = new PublishAMQP();
+        ProcessSession processSession = new MockProcessSession(new 
SharedSessionState(processor, new AtomicLong()),
+                processor);
+        FlowFile sourceFlowFile = processSession.create();
+        BasicProperties amqpProperties = new AMQP.BasicProperties.Builder()
+                .contentType("text/plain").deliveryMode(2)
+                .priority(1).userId("joe")
+                .build();
+        FlowFile f2 = 
AMQPUtils.updateFlowFileAttributesWithAmqpProperties(amqpProperties, 
sourceFlowFile,
+                processSession);
+
+        assertEquals("text/plain", 
f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "contentType"));
+        assertEquals("joe", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX 
+ "userId"));
+        assertEquals("2", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + 
"deliveryMode"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
new file mode 100644
index 0000000..3a2754d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.amqp.processors;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.MessageProperties;
+
+public class ConsumeAMQPTest {
+
+
+    @Test
+    public void validateSuccessfullConsumeAndTransferToSuccess() throws 
Exception {
+        Map<String, List<String>> routingMap = new HashMap<>();
+        routingMap.put("key1", Arrays.asList("queue1", "queue2"));
+        Map<String, String> exchangeToRoutingKeymap = new HashMap<>();
+        exchangeToRoutingKeymap.put("myExchange", "key1");
+
+        Connection connection = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
"myExchange", "key1", null)) {
+            sender.publish("hello".getBytes(), 
MessageProperties.PERSISTENT_TEXT_PLAIN);
+
+            ConsumeAMQP pubProc = new LocalConsumeAMQP(connection);
+            TestRunner runner = TestRunners.newTestRunner(pubProc);
+            runner.setProperty(ConsumeAMQP.HOST, "injvm");
+            runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
+
+            runner.run();
+            Thread.sleep(200);
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+        }
+
+    }
+
+    public static class LocalConsumeAMQP extends ConsumeAMQP {
+
+        private final Connection conection;
+        public LocalConsumeAMQP(Connection connection) {
+            this.conection = connection;
+        }
+
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+            synchronized (this) {
+                if (this.amqpConnection == null || 
!this.amqpConnection.isOpen()) {
+                    this.amqpConnection = this.conection;
+                    this.targetResource = 
this.finishBuildingTargetResource(context);
+                }
+            }
+            this.rendezvousWithAmqp(context, session);
+        }
+
+        public Connection getConnection() {
+            return this.amqpConnection;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
new file mode 100644
index 0000000..3a9b8d1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.amqp.processors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.GetResponse;
+
+public class PublishAMQPTest {
+
+    @Test
+    public void validateSuccessfullPublishAndTransferToSuccess() throws 
Exception {
+        PublishAMQP pubProc = new LocalPublishAMQP(false);
+        TestRunner runner = TestRunners.newTestRunner(pubProc);
+        runner.setProperty(PublishAMQP.HOST, "injvm");
+        runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
+        runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("foo", "bar");
+        attributes.put("amqp$contentType", "foo/bar");
+        runner.enqueue("Hello Joe".getBytes(), attributes);
+
+        runner.run();
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+        assertNotNull(successFF);
+        Channel channel = ((LocalPublishAMQP) 
pubProc).getConnection().createChannel();
+        GetResponse msg1 = channel.basicGet("queue1", true);
+        assertNotNull(msg1);
+        assertEquals("foo/bar", msg1.getProps().getContentType());
+        assertNotNull(channel.basicGet("queue2", true));
+    }
+
+    @Test
+    public void validateFailedPublishAndTransferToFailure() throws Exception {
+        PublishAMQP pubProc = new LocalPublishAMQP();
+        TestRunner runner = TestRunners.newTestRunner(pubProc);
+        runner.setProperty(PublishAMQP.HOST, "injvm");
+        runner.setProperty(PublishAMQP.EXCHANGE, "badToTheBone");
+        runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
+
+        runner.enqueue("Hello Joe".getBytes());
+
+        runner.run();
+        Thread.sleep(200);
+
+        
assertTrue(runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty());
+        
assertNotNull(runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).get(0));
+    }
+
+    public static class LocalPublishAMQP extends PublishAMQP {
+
+        private final boolean closeConnection;
+
+        public LocalPublishAMQP() {
+            this(true);
+        }
+
+        public LocalPublishAMQP(boolean closeConection) {
+            this.closeConnection = closeConection;
+        }
+
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+            synchronized (this) {
+                if (this.amqpConnection == null || 
!this.amqpConnection.isOpen()) {
+                    Map<String, List<String>> routingMap = new HashMap<>();
+                    routingMap.put("key1", Arrays.asList("queue1", "queue2"));
+                    Map<String, String> exchangeToRoutingKeymap = new 
HashMap<>();
+                    exchangeToRoutingKeymap.put("myExchange", "key1");
+                    this.amqpConnection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+                    this.targetResource = 
this.finishBuildingTargetResource(context);
+                }
+            }
+            this.rendezvousWithAmqp(context, session);
+        }
+
+        public Connection getConnection() {
+            this.close();
+            return this.amqpConnection;
+        }
+
+        // since we really don't have any real connection (rather emulated 
one), the override is
+        // needed here so the call to close from TestRunner does nothing since 
we are
+        // grabbing the emulated connection later to do the assertions in some 
tests.
+        @Override
+        @OnStopped
+        public void close() {
+            if (this.closeConnection) {
+                super.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
new file mode 100644
index 0000000..f793084
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.amqp.processors;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+
+import com.rabbitmq.client.AMQP.Basic.RecoverOk;
+import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.AMQP.Exchange.BindOk;
+import com.rabbitmq.client.AMQP.Exchange.DeclareOk;
+import com.rabbitmq.client.AMQP.Exchange.DeleteOk;
+import com.rabbitmq.client.AMQP.Exchange.UnbindOk;
+import com.rabbitmq.client.AMQP.Queue.PurgeOk;
+import com.rabbitmq.client.AMQP.Tx.CommitOk;
+import com.rabbitmq.client.AMQP.Tx.RollbackOk;
+import com.rabbitmq.client.AMQP.Tx.SelectOk;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Command;
+import com.rabbitmq.client.ConfirmListener;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.FlowListener;
+import com.rabbitmq.client.GetResponse;
+import com.rabbitmq.client.Method;
+import com.rabbitmq.client.ReturnListener;
+import com.rabbitmq.client.ShutdownListener;
+import com.rabbitmq.client.ShutdownSignalException;
+
+/**
+ * Implementation of {@link Channel} to be used during testing
+ */
+class TestChannel implements Channel {
+
+    private final ExecutorService executorService;
+
+    private final Map<String, BlockingQueue<GetResponse>> enqueuedMessages;
+
+    private final Map<String, List<String>> routingKeyToQueueMappings;
+
+    private final Map<String, String> exchangeToRoutingKeyMappings;
+
+    private final List<ReturnListener> returnListeners;
+
+    private boolean open;
+
+    private boolean corrupted;
+
+    private Connection connection;
+
+    public TestChannel(Map<String, String> exchangeToRoutingKeyMappings,
+            Map<String, List<String>> routingKeyToQueueMappings) {
+        this.enqueuedMessages = new HashMap<>();
+        this.routingKeyToQueueMappings = routingKeyToQueueMappings;
+        if (this.routingKeyToQueueMappings != null) {
+            for (List<String> queues : routingKeyToQueueMappings.values()) {
+                for (String queue : queues) {
+                    this.enqueuedMessages.put(queue, new 
ArrayBlockingQueue<GetResponse>(100));
+                }
+            }
+        }
+        this.exchangeToRoutingKeyMappings = exchangeToRoutingKeyMappings;
+        this.executorService = Executors.newCachedThreadPool();
+        this.returnListeners = new ArrayList<>();
+        this.open = true;
+    }
+
+    void corruptChannel() {
+        this.corrupted = true;
+    }
+
+    void setConnection(Connection connection) {
+        this.connection = connection;
+    }
+
+    @Override
+    public void addShutdownListener(ShutdownListener listener) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public void removeShutdownListener(ShutdownListener listener) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public ShutdownSignalException getCloseReason() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void notifyListeners() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public boolean isOpen() {
+        return this.open;
+    }
+
+    @Override
+    public int getChannelNumber() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public Connection getConnection() {
+        return this.connection;
+    }
+
+    @Override
+    public void close() throws IOException, TimeoutException {
+        this.open = false;
+    }
+
+    @Override
+    public void close(int closeCode, String closeMessage) throws IOException, 
TimeoutException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public boolean flowBlocked() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void abort() throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public void abort(int closeCode, String closeMessage) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public void addReturnListener(ReturnListener listener) {
+        this.returnListeners.add(listener);
+    }
+
+    @Override
+    public boolean removeReturnListener(ReturnListener listener) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void clearReturnListeners() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public void addFlowListener(FlowListener listener) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public boolean removeFlowListener(FlowListener listener) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void clearFlowListeners() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public void addConfirmListener(ConfirmListener listener) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public boolean removeConfirmListener(ConfirmListener listener) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void clearConfirmListeners() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public Consumer getDefaultConsumer() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void setDefaultConsumer(Consumer consumer) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public void basicQos(int prefetchSize, int prefetchCount, boolean global) 
throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public void basicQos(int prefetchCount, boolean global) throws IOException 
{
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public void basicQos(int prefetchCount) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public void basicPublish(String exchange, String routingKey, 
BasicProperties props, byte[] body)
+            throws IOException {
+        this.basicPublish(exchange, routingKey, true, props, body);
+    }
+
+    @Override
+    public void basicPublish(final String exchange, final String routingKey, 
boolean mandatory,
+            final BasicProperties props, final byte[] body) throws IOException 
{
+        if (this.corrupted) {
+            throw new IOException("Channel is corrupted");
+        }
+
+        if (exchange.equals("")){ // default exchange; routingKey corresponds 
to a queue.
+            BlockingQueue<GetResponse> messages = 
this.getMessageQueue(routingKey);
+            GetResponse response = new GetResponse(null, props, body, 
messages.size());
+            messages.offer(response);
+        } else {
+            String rKey = this.exchangeToRoutingKeyMappings.get(exchange);
+
+            if (rKey.equals(routingKey)) {
+                List<String> queueNames = 
this.routingKeyToQueueMappings.get(routingKey);
+                if (queueNames == null || queueNames.isEmpty()) {
+                    this.discard(exchange, routingKey, mandatory, props, body);
+                } else {
+                    for (String queueName : queueNames) {
+                        BlockingQueue<GetResponse> messages = 
this.getMessageQueue(queueName);
+                        GetResponse response = new GetResponse(null, props, 
body, messages.size());
+                        messages.offer(response);
+                    }
+                }
+            } else {
+                this.discard(exchange, routingKey, mandatory, props, body);
+            }
+
+        }
+    }
+
+    private void discard(final String exchange, final String routingKey, 
boolean mandatory, final BasicProperties props,
+            final byte[] body) {
+        // NO ROUTE. Invoke return listener async
+        for (final ReturnListener listener : returnListeners) {
+            this.executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        listener.handleReturn(-9, "Rejecting", exchange, 
routingKey, props, body);
+                    } catch (Exception e) {
+                        throw new IllegalStateException("Failed to send return 
message", e);
+                    }
+                }
+            });
+        }
+    }
+
+    private BlockingQueue<GetResponse> getMessageQueue(String name) {
+        BlockingQueue<GetResponse> messages = this.enqueuedMessages.get(name);
+        if (messages == null) {
+            messages = new ArrayBlockingQueue<>(100);
+            this.enqueuedMessages.put(name, messages);
+        }
+        return messages;
+    }
+
+    @Override
+    public void basicPublish(String exchange, String routingKey, boolean 
mandatory, boolean immediate,
+            BasicProperties props, byte[] body) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public DeclareOk exchangeDeclare(String exchange, String type) throws 
IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public DeclareOk exchangeDeclare(String exchange, String type, boolean 
durable) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public DeclareOk exchangeDeclare(String exchange, String type, boolean 
durable, boolean autoDelete,
+            Map<String, Object> arguments) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public DeclareOk exchangeDeclare(String exchange, String type, boolean 
durable, boolean autoDelete,
+            boolean internal, Map<String, Object> arguments) throws 
IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void exchangeDeclareNoWait(String exchange, String type, boolean 
durable, boolean autoDelete,
+            boolean internal, Map<String, Object> arguments) throws 
IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public DeclareOk exchangeDeclarePassive(String name) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws 
IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws 
IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public DeleteOk exchangeDelete(String exchange) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public BindOk exchangeBind(String destination, String source, String 
routingKey) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public BindOk exchangeBind(String destination, String source, String 
routingKey, Map<String, Object> arguments)
+            throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void exchangeBindNoWait(String destination, String source, String 
routingKey, Map<String, Object> arguments)
+            throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public UnbindOk exchangeUnbind(String destination, String source, String 
routingKey) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public UnbindOk exchangeUnbind(String destination, String source, String 
routingKey, Map<String, Object> arguments)
+            throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void exchangeUnbindNoWait(String destination, String source, String 
routingKey,
+            Map<String, Object> arguments) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare() throws 
IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String queue, 
boolean durable, boolean exclusive,
+            boolean autoDelete, Map<String, Object> arguments) throws 
IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void queueDeclareNoWait(String queue, boolean durable, boolean 
exclusive, boolean autoDelete,
+            Map<String, Object> arguments) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclarePassive(String 
queue) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue) 
throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue, 
boolean ifUnused, boolean ifEmpty)
+            throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void queueDeleteNoWait(String queue, boolean ifUnused, boolean 
ifEmpty) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, 
String exchange, String routingKey)
+            throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, 
String exchange, String routingKey,
+            Map<String, Object> arguments) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void queueBindNoWait(String queue, String exchange, String 
routingKey, Map<String, Object> arguments)
+            throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public com.rabbitmq.client.AMQP.Queue.UnbindOk queueUnbind(String queue, 
String exchange, String routingKey)
+            throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public com.rabbitmq.client.AMQP.Queue.UnbindOk queueUnbind(String queue, 
String exchange, String routingKey,
+            Map<String, Object> arguments) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public PurgeOk queuePurge(String queue) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public GetResponse basicGet(String queue, boolean autoAck) throws 
IOException {
+        BlockingQueue<GetResponse> messages = this.enqueuedMessages.get(queue);
+        if (messages == null) {
+            throw new IOException("Queue is not defined");
+        } else {
+            return messages.poll();
+        }
+    }
+
+    @Override
+    public void basicAck(long deliveryTag, boolean multiple) throws 
IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public void basicNack(long deliveryTag, boolean multiple, boolean requeue) 
throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public void basicReject(long deliveryTag, boolean requeue) throws 
IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public String basicConsume(String queue, Consumer callback) throws 
IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public String basicConsume(String queue, boolean autoAck, Consumer 
callback) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public String basicConsume(String queue, boolean autoAck, Map<String, 
Object> arguments, Consumer callback)
+            throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public String basicConsume(String queue, boolean autoAck, String 
consumerTag, Consumer callback)
+            throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public String basicConsume(String queue, boolean autoAck, String 
consumerTag, boolean noLocal, boolean exclusive,
+            Map<String, Object> arguments, Consumer callback) throws 
IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void basicCancel(String consumerTag) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public RecoverOk basicRecover() throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public RecoverOk basicRecover(boolean requeue) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public SelectOk txSelect() throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public CommitOk txCommit() throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public RollbackOk txRollback() throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public com.rabbitmq.client.AMQP.Confirm.SelectOk confirmSelect() throws 
IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public long getNextPublishSeqNo() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public boolean waitForConfirms() throws InterruptedException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public boolean waitForConfirms(long timeout) throws InterruptedException, 
TimeoutException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void waitForConfirmsOrDie() throws IOException, 
InterruptedException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public void waitForConfirmsOrDie(long timeout) throws IOException, 
InterruptedException, TimeoutException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public void asyncRpc(Method method) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+
+    }
+
+    @Override
+    public Command rpc(Method method) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public long messageCount(String queue) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public long consumerCount(String queue) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java
new file mode 100644
index 0000000..cb29478
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.amqp.processors;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+
+import com.rabbitmq.client.BlockedListener;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ExceptionHandler;
+import com.rabbitmq.client.ShutdownListener;
+import com.rabbitmq.client.ShutdownSignalException;
+
+/**
+ * Implementation of {@link Connection} to be used for testing. Will return the
+ * same instance of {@link Channel} when {@link #createChannel()} is called.
+ *
+ * This class essentially emulates AMQP system and attempts to ensure the same
+ * behavior on publish/subscribe and other core operations used by the NIFI 
AMQP
+ * component.
+ *
+ * NOTE: Only methods that are used by the framework are implemented. More
+ * could/should be added later
+ */
+class TestConnection implements Connection {
+
+    private final TestChannel channel;
+
+    private boolean open;
+
+    public TestConnection(Map<String, String> exchangeToRoutingKeyMappings,
+            Map<String, List<String>> routingKeyToQueueMappings) {
+        this.channel = new TestChannel(exchangeToRoutingKeyMappings, 
routingKeyToQueueMappings);
+        this.channel.setConnection(this);
+        this.open = true;
+    }
+
+    @Override
+    public void addShutdownListener(ShutdownListener listener) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void removeShutdownListener(ShutdownListener listener) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public ShutdownSignalException getCloseReason() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void notifyListeners() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public boolean isOpen() {
+        return this.open;
+    }
+
+    @Override
+    public InetAddress getAddress() {
+        try {
+            return InetAddress.getByName("localhost");
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public int getPort() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public int getChannelMax() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public int getFrameMax() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public int getHeartbeat() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public Map<String, Object> getClientProperties() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public Map<String, Object> getServerProperties() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public Channel createChannel() throws IOException {
+        return this.channel;
+    }
+
+    @Override
+    public Channel createChannel(int channelNumber) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.open = false;
+        try {
+            this.channel.close();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void close(int closeCode, String closeMessage) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void close(int timeout) throws IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void close(int closeCode, String closeMessage, int timeout) throws 
IOException {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void abort() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void abort(int closeCode, String closeMessage) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void abort(int timeout) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void abort(int closeCode, String closeMessage, int timeout) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void addBlockedListener(BlockedListener listener) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public boolean removeBlockedListener(BlockedListener listener) {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public void clearBlockedListeners() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+
+    @Override
+    public ExceptionHandler getExceptionHandler() {
+        throw new UnsupportedOperationException(
+                "This method is not currently supported as it is not used by 
current API in testing");
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/resources/log4j.properties
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8c502ec
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/resources/log4j.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=INFO, CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x \u2013 
%m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/pom.xml 
b/nifi-nar-bundles/nifi-amqp-bundle/pom.xml
new file mode 100644
index 0000000..7faae8f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-amqp-bundle/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>0.4.2-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-amqp-bundle</artifactId>
+    <version>0.4.2-SNAPSHOT</version>
+    <packaging>pom</packaging>
+    <description>A bundle of processors that run Flume 
sources/sinks</description>
+    <modules>
+        <module>nifi-amqp-processors</module>
+        <module>nifi-amqp-nar</module>
+    </modules>
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-amqp-processors</artifactId>
+                <version>0.4.2-SNAPSHOT</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 0cdc917..3947d18 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -53,6 +53,7 @@
         <module>nifi-html-bundle</module>
         <module>nifi-scripting-bundle</module>
         <module>nifi-elasticsearch-bundle</module>
+        <module>nifi-amqp-bundle</module>
     </modules>
     <dependencyManagement>
         <dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7eb3532..d535ae2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1004,6 +1004,12 @@ language governing permissions and limitations under the 
License. -->
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-amqp-nar</artifactId>
+                <version>0.4.2-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId>
                 <version>0.4.2-SNAPSHOT</version>
                 <type>nar</type>

Reply via email to