Repository: nifi Updated Branches: refs/heads/master 75af3a2eb -> dc8b62c3a
http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java new file mode 100644 index 0000000..9f01c04 --- /dev/null +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.amqp.processors; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.MockProcessorLog; +import org.junit.Test; +import org.mockito.Mockito; + +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ReturnListener; + +public class AMQPPublisherTest { + + @SuppressWarnings("resource") + @Test(expected = IllegalArgumentException.class) + public void failOnNullConnection() { + new AMQPPublisher(null, null, null, null); + } + + @SuppressWarnings("resource") + @Test(expected = IllegalArgumentException.class) + public void failOnMissingRoutingKey() throws Exception { + Connection conn = new TestConnection(null, null); + new AMQPPublisher(conn, null, "", null); + } + + @Test(expected = IllegalStateException.class) + public void failPublishIfChannelClosed() throws Exception { + Connection conn = new TestConnection(null, null); + try (AMQPPublisher sender = new AMQPPublisher(conn, null, "foo", null)) { + conn.close(); + sender.publish("oleg".getBytes()); + } + } + + @Test(expected = IllegalStateException.class) + public void failPublishIfChannelFails() throws Exception { + TestConnection conn = new TestConnection(null, null); + try (AMQPPublisher sender = new AMQPPublisher(conn, null, "foo", null)) { + ((TestChannel) conn.createChannel()).corruptChannel(); + sender.publish("oleg".getBytes()); + } + } + + @Test + public void validateSuccessfullPublishingAndRouting() throws Exception { + Map<String, List<String>> routingMap = new HashMap<>(); + routingMap.put("key1", Arrays.asList("queue1", "queue2")); + Map<String, String> exchangeToRoutingKeymap = new HashMap<>(); + exchangeToRoutingKeymap.put("myExchange", "key1"); + + Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key1", null)) { + sender.publish("hello".getBytes()); + Thread.sleep(200); + } + + assertNotNull(connection.createChannel().basicGet("queue1", true)); + assertNotNull(connection.createChannel().basicGet("queue2", true)); + + connection.close(); + } + + @Test + public void validateSuccessfullPublishingAndUndeliverableRoutingKey() throws Exception { + Map<String, List<String>> routingMap = new HashMap<>(); + routingMap.put("key1", Arrays.asList("queue1", "queue2")); + Map<String, String> exchangeToRoutingKeymap = new HashMap<>(); + exchangeToRoutingKeymap.put("myExchange", "key1"); + + Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + ReturnListener retListener = mock(ReturnListener.class); + connection.createChannel().addReturnListener(retListener); + + try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key2", + new MockProcessorLog("foo", ""))) { + sender.publish("hello".getBytes()); + Thread.sleep(1000); + } + Thread.sleep(200); + verify(retListener, atMost(1)).handleReturn(Mockito.anyInt(), Mockito.anyString(), Mockito.anyString(), + Mockito.anyString(), Mockito.any(BasicProperties.class), (byte[]) Mockito.any()); + connection.close(); + } + + @Test + public void validateToString() throws Exception { + TestConnection conn = new TestConnection(null, null); + try (AMQPPublisher sender = new AMQPPublisher(conn, "myExchange", "key1", null)) { + String toString = sender.toString(); + assertTrue(toString.contains("EXCHANGE:myExchange, ROUTING_KEY:key1")); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java new file mode 100644 index 0000000..5452809 --- /dev/null +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPUtilsTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.amqp.processors; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.util.MockProcessSession; +import org.apache.nifi.util.SharedSessionState; +import org.junit.Test; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; + +public class AMQPUtilsTest { + + + @Test + public void validateUpdateFlowFileAttributesWithAmqpProperties() { + PublishAMQP processor = new PublishAMQP(); + ProcessSession processSession = new MockProcessSession(new SharedSessionState(processor, new AtomicLong()), + processor); + FlowFile sourceFlowFile = processSession.create(); + BasicProperties amqpProperties = new AMQP.BasicProperties.Builder() + .contentType("text/plain").deliveryMode(2) + .priority(1).userId("joe") + .build(); + FlowFile f2 = AMQPUtils.updateFlowFileAttributesWithAmqpProperties(amqpProperties, sourceFlowFile, + processSession); + + assertEquals("text/plain", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "contentType")); + assertEquals("joe", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "userId")); + assertEquals("2", f2.getAttributes().get(AMQPUtils.AMQP_PROP_PREFIX + "deliveryMode")); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java new file mode 100644 index 0000000..3a2754d --- /dev/null +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.amqp.processors; + +import static org.junit.Assert.assertNotNull; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.MessageProperties; + +public class ConsumeAMQPTest { + + + @Test + public void validateSuccessfullConsumeAndTransferToSuccess() throws Exception { + Map<String, List<String>> routingMap = new HashMap<>(); + routingMap.put("key1", Arrays.asList("queue1", "queue2")); + Map<String, String> exchangeToRoutingKeymap = new HashMap<>(); + exchangeToRoutingKeymap.put("myExchange", "key1"); + + Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key1", null)) { + sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN); + + ConsumeAMQP pubProc = new LocalConsumeAMQP(connection); + TestRunner runner = TestRunners.newTestRunner(pubProc); + runner.setProperty(ConsumeAMQP.HOST, "injvm"); + runner.setProperty(ConsumeAMQP.QUEUE, "queue1"); + + runner.run(); + Thread.sleep(200); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + assertNotNull(successFF); + } + + } + + public static class LocalConsumeAMQP extends ConsumeAMQP { + + private final Connection conection; + public LocalConsumeAMQP(Connection connection) { + this.conection = connection; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + synchronized (this) { + if (this.amqpConnection == null || !this.amqpConnection.isOpen()) { + this.amqpConnection = this.conection; + this.targetResource = this.finishBuildingTargetResource(context); + } + } + this.rendezvousWithAmqp(context, session); + } + + public Connection getConnection() { + return this.amqpConnection; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java new file mode 100644 index 0000000..3a9b8d1 --- /dev/null +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.amqp.processors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.GetResponse; + +public class PublishAMQPTest { + + @Test + public void validateSuccessfullPublishAndTransferToSuccess() throws Exception { + PublishAMQP pubProc = new LocalPublishAMQP(false); + TestRunner runner = TestRunners.newTestRunner(pubProc); + runner.setProperty(PublishAMQP.HOST, "injvm"); + runner.setProperty(PublishAMQP.EXCHANGE, "myExchange"); + runner.setProperty(PublishAMQP.ROUTING_KEY, "key1"); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("foo", "bar"); + attributes.put("amqp$contentType", "foo/bar"); + runner.enqueue("Hello Joe".getBytes(), attributes); + + runner.run(); + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + assertNotNull(successFF); + Channel channel = ((LocalPublishAMQP) pubProc).getConnection().createChannel(); + GetResponse msg1 = channel.basicGet("queue1", true); + assertNotNull(msg1); + assertEquals("foo/bar", msg1.getProps().getContentType()); + assertNotNull(channel.basicGet("queue2", true)); + } + + @Test + public void validateFailedPublishAndTransferToFailure() throws Exception { + PublishAMQP pubProc = new LocalPublishAMQP(); + TestRunner runner = TestRunners.newTestRunner(pubProc); + runner.setProperty(PublishAMQP.HOST, "injvm"); + runner.setProperty(PublishAMQP.EXCHANGE, "badToTheBone"); + runner.setProperty(PublishAMQP.ROUTING_KEY, "key1"); + + runner.enqueue("Hello Joe".getBytes()); + + runner.run(); + Thread.sleep(200); + + assertTrue(runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty()); + assertNotNull(runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).get(0)); + } + + public static class LocalPublishAMQP extends PublishAMQP { + + private final boolean closeConnection; + + public LocalPublishAMQP() { + this(true); + } + + public LocalPublishAMQP(boolean closeConection) { + this.closeConnection = closeConection; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + synchronized (this) { + if (this.amqpConnection == null || !this.amqpConnection.isOpen()) { + Map<String, List<String>> routingMap = new HashMap<>(); + routingMap.put("key1", Arrays.asList("queue1", "queue2")); + Map<String, String> exchangeToRoutingKeymap = new HashMap<>(); + exchangeToRoutingKeymap.put("myExchange", "key1"); + this.amqpConnection = new TestConnection(exchangeToRoutingKeymap, routingMap); + this.targetResource = this.finishBuildingTargetResource(context); + } + } + this.rendezvousWithAmqp(context, session); + } + + public Connection getConnection() { + this.close(); + return this.amqpConnection; + } + + // since we really don't have any real connection (rather emulated one), the override is + // needed here so the call to close from TestRunner does nothing since we are + // grabbing the emulated connection later to do the assertions in some tests. + @Override + @OnStopped + public void close() { + if (this.closeConnection) { + super.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java new file mode 100644 index 0000000..f793084 --- /dev/null +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.amqp.processors; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; + +import com.rabbitmq.client.AMQP.Basic.RecoverOk; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.AMQP.Exchange.BindOk; +import com.rabbitmq.client.AMQP.Exchange.DeclareOk; +import com.rabbitmq.client.AMQP.Exchange.DeleteOk; +import com.rabbitmq.client.AMQP.Exchange.UnbindOk; +import com.rabbitmq.client.AMQP.Queue.PurgeOk; +import com.rabbitmq.client.AMQP.Tx.CommitOk; +import com.rabbitmq.client.AMQP.Tx.RollbackOk; +import com.rabbitmq.client.AMQP.Tx.SelectOk; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Command; +import com.rabbitmq.client.ConfirmListener; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.FlowListener; +import com.rabbitmq.client.GetResponse; +import com.rabbitmq.client.Method; +import com.rabbitmq.client.ReturnListener; +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; + +/** + * Implementation of {@link Channel} to be used during testing + */ +class TestChannel implements Channel { + + private final ExecutorService executorService; + + private final Map<String, BlockingQueue<GetResponse>> enqueuedMessages; + + private final Map<String, List<String>> routingKeyToQueueMappings; + + private final Map<String, String> exchangeToRoutingKeyMappings; + + private final List<ReturnListener> returnListeners; + + private boolean open; + + private boolean corrupted; + + private Connection connection; + + public TestChannel(Map<String, String> exchangeToRoutingKeyMappings, + Map<String, List<String>> routingKeyToQueueMappings) { + this.enqueuedMessages = new HashMap<>(); + this.routingKeyToQueueMappings = routingKeyToQueueMappings; + if (this.routingKeyToQueueMappings != null) { + for (List<String> queues : routingKeyToQueueMappings.values()) { + for (String queue : queues) { + this.enqueuedMessages.put(queue, new ArrayBlockingQueue<GetResponse>(100)); + } + } + } + this.exchangeToRoutingKeyMappings = exchangeToRoutingKeyMappings; + this.executorService = Executors.newCachedThreadPool(); + this.returnListeners = new ArrayList<>(); + this.open = true; + } + + void corruptChannel() { + this.corrupted = true; + } + + void setConnection(Connection connection) { + this.connection = connection; + } + + @Override + public void addShutdownListener(ShutdownListener listener) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public void removeShutdownListener(ShutdownListener listener) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public ShutdownSignalException getCloseReason() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void notifyListeners() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public boolean isOpen() { + return this.open; + } + + @Override + public int getChannelNumber() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public Connection getConnection() { + return this.connection; + } + + @Override + public void close() throws IOException, TimeoutException { + this.open = false; + } + + @Override + public void close(int closeCode, String closeMessage) throws IOException, TimeoutException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public boolean flowBlocked() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void abort() throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public void abort(int closeCode, String closeMessage) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public void addReturnListener(ReturnListener listener) { + this.returnListeners.add(listener); + } + + @Override + public boolean removeReturnListener(ReturnListener listener) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void clearReturnListeners() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public void addFlowListener(FlowListener listener) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public boolean removeFlowListener(FlowListener listener) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void clearFlowListeners() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public void addConfirmListener(ConfirmListener listener) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public boolean removeConfirmListener(ConfirmListener listener) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void clearConfirmListeners() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public Consumer getDefaultConsumer() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void setDefaultConsumer(Consumer consumer) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public void basicQos(int prefetchCount, boolean global) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public void basicQos(int prefetchCount) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) + throws IOException { + this.basicPublish(exchange, routingKey, true, props, body); + } + + @Override + public void basicPublish(final String exchange, final String routingKey, boolean mandatory, + final BasicProperties props, final byte[] body) throws IOException { + if (this.corrupted) { + throw new IOException("Channel is corrupted"); + } + + if (exchange.equals("")){ // default exchange; routingKey corresponds to a queue. + BlockingQueue<GetResponse> messages = this.getMessageQueue(routingKey); + GetResponse response = new GetResponse(null, props, body, messages.size()); + messages.offer(response); + } else { + String rKey = this.exchangeToRoutingKeyMappings.get(exchange); + + if (rKey.equals(routingKey)) { + List<String> queueNames = this.routingKeyToQueueMappings.get(routingKey); + if (queueNames == null || queueNames.isEmpty()) { + this.discard(exchange, routingKey, mandatory, props, body); + } else { + for (String queueName : queueNames) { + BlockingQueue<GetResponse> messages = this.getMessageQueue(queueName); + GetResponse response = new GetResponse(null, props, body, messages.size()); + messages.offer(response); + } + } + } else { + this.discard(exchange, routingKey, mandatory, props, body); + } + + } + } + + private void discard(final String exchange, final String routingKey, boolean mandatory, final BasicProperties props, + final byte[] body) { + // NO ROUTE. Invoke return listener async + for (final ReturnListener listener : returnListeners) { + this.executorService.execute(new Runnable() { + @Override + public void run() { + try { + listener.handleReturn(-9, "Rejecting", exchange, routingKey, props, body); + } catch (Exception e) { + throw new IllegalStateException("Failed to send return message", e); + } + } + }); + } + } + + private BlockingQueue<GetResponse> getMessageQueue(String name) { + BlockingQueue<GetResponse> messages = this.enqueuedMessages.get(name); + if (messages == null) { + messages = new ArrayBlockingQueue<>(100); + this.enqueuedMessages.put(name, messages); + } + return messages; + } + + @Override + public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, + BasicProperties props, byte[] body) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public DeclareOk exchangeDeclare(String exchange, String type) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, + Map<String, Object> arguments) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, + boolean internal, Map<String, Object> arguments) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, + boolean internal, Map<String, Object> arguments) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public DeclareOk exchangeDeclarePassive(String name) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public DeleteOk exchangeDelete(String exchange) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public BindOk exchangeBind(String destination, String source, String routingKey) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) + throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) + throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) + throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void exchangeUnbindNoWait(String destination, String source, String routingKey, + Map<String, Object> arguments) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare() throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, + boolean autoDelete, Map<String, Object> arguments) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, + Map<String, Object> arguments) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) + throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) + throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, + Map<String, Object> arguments) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) + throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public com.rabbitmq.client.AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) + throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public com.rabbitmq.client.AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, + Map<String, Object> arguments) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public PurgeOk queuePurge(String queue) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public GetResponse basicGet(String queue, boolean autoAck) throws IOException { + BlockingQueue<GetResponse> messages = this.enqueuedMessages.get(queue); + if (messages == null) { + throw new IOException("Queue is not defined"); + } else { + return messages.poll(); + } + } + + @Override + public void basicAck(long deliveryTag, boolean multiple) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public void basicReject(long deliveryTag, boolean requeue) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public String basicConsume(String queue, Consumer callback) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) + throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) + throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, + Map<String, Object> arguments, Consumer callback) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void basicCancel(String consumerTag) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public RecoverOk basicRecover() throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public RecoverOk basicRecover(boolean requeue) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public SelectOk txSelect() throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public CommitOk txCommit() throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public RollbackOk txRollback() throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public com.rabbitmq.client.AMQP.Confirm.SelectOk confirmSelect() throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public long getNextPublishSeqNo() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public boolean waitForConfirms() throws InterruptedException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void waitForConfirmsOrDie() throws IOException, InterruptedException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public void asyncRpc(Method method) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + + } + + @Override + public Command rpc(Method method) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public long messageCount(String queue) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public long consumerCount(String queue) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java new file mode 100644 index 0000000..cb29478 --- /dev/null +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.amqp.processors; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.List; +import java.util.Map; + +import com.rabbitmq.client.BlockedListener; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ExceptionHandler; +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; + +/** + * Implementation of {@link Connection} to be used for testing. Will return the + * same instance of {@link Channel} when {@link #createChannel()} is called. + * + * This class essentially emulates AMQP system and attempts to ensure the same + * behavior on publish/subscribe and other core operations used by the NIFI AMQP + * component. + * + * NOTE: Only methods that are used by the framework are implemented. More + * could/should be added later + */ +class TestConnection implements Connection { + + private final TestChannel channel; + + private boolean open; + + public TestConnection(Map<String, String> exchangeToRoutingKeyMappings, + Map<String, List<String>> routingKeyToQueueMappings) { + this.channel = new TestChannel(exchangeToRoutingKeyMappings, routingKeyToQueueMappings); + this.channel.setConnection(this); + this.open = true; + } + + @Override + public void addShutdownListener(ShutdownListener listener) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void removeShutdownListener(ShutdownListener listener) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public ShutdownSignalException getCloseReason() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void notifyListeners() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public boolean isOpen() { + return this.open; + } + + @Override + public InetAddress getAddress() { + try { + return InetAddress.getByName("localhost"); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + @Override + public int getPort() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public int getChannelMax() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public int getFrameMax() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public int getHeartbeat() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public Map<String, Object> getClientProperties() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public Map<String, Object> getServerProperties() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public Channel createChannel() throws IOException { + return this.channel; + } + + @Override + public Channel createChannel(int channelNumber) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void close() throws IOException { + this.open = false; + try { + this.channel.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void close(int closeCode, String closeMessage) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void close(int timeout) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void close(int closeCode, String closeMessage, int timeout) throws IOException { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void abort() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void abort(int closeCode, String closeMessage) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void abort(int timeout) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void abort(int closeCode, String closeMessage, int timeout) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void addBlockedListener(BlockedListener listener) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public boolean removeBlockedListener(BlockedListener listener) { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public void clearBlockedListeners() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } + + @Override + public ExceptionHandler getExceptionHandler() { + throw new UnsupportedOperationException( + "This method is not currently supported as it is not used by current API in testing"); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/resources/log4j.properties b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/resources/log4j.properties new file mode 100644 index 0000000..8c502ec --- /dev/null +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/resources/log4j.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=INFO, CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender + +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x \u2013 %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/nifi-amqp-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/pom.xml b/nifi-nar-bundles/nifi-amqp-bundle/pom.xml new file mode 100644 index 0000000..7faae8f --- /dev/null +++ b/nifi-nar-bundles/nifi-amqp-bundle/pom.xml @@ -0,0 +1,40 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>0.4.2-SNAPSHOT</version> + </parent> + <artifactId>nifi-amqp-bundle</artifactId> + <version>0.4.2-SNAPSHOT</version> + <packaging>pom</packaging> + <description>A bundle of processors that run Flume sources/sinks</description> + <modules> + <module>nifi-amqp-processors</module> + <module>nifi-amqp-nar</module> + </modules> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-amqp-processors</artifactId> + <version>0.4.2-SNAPSHOT</version> + </dependency> + </dependencies> + </dependencyManagement> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 0cdc917..3947d18 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -53,6 +53,7 @@ <module>nifi-html-bundle</module> <module>nifi-scripting-bundle</module> <module>nifi-elasticsearch-bundle</module> + <module>nifi-amqp-bundle</module> </modules> <dependencyManagement> <dependencies> http://git-wip-us.apache.org/repos/asf/nifi/blob/dc8b62c3/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7eb3532..d535ae2 100644 --- a/pom.xml +++ b/pom.xml @@ -1004,6 +1004,12 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-amqp-nar</artifactId> + <version>0.4.2-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId> <version>0.4.2-SNAPSHOT</version> <type>nar</type>
