Repository: nifi Updated Branches: refs/heads/master cfeebfe7c -> 812da19ca
http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java new file mode 100644 index 0000000..28d4ac6 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.BytesMessage; +import javax.jms.ConnectionFactory; +import javax.jms.Queue; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.support.JmsHeaders; + +public class PublishJMSTest { + + @Test + public void validateSuccessfulPublishAndTransferToSuccess() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + + PublishJMS pubProc = new PublishJMS(); + TestRunner runner = TestRunners.newTestRunner(pubProc); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(cf); + + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(PublishJMS.DESTINATION, "fooQueue"); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("foo", "foo"); + attributes.put(JmsHeaders.REPLY_TO, "cooQueue"); + runner.enqueue("Hey dude!".getBytes(), attributes); + runner.run(1, false); + + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); + assertNotNull(successFF); + + JmsTemplate jmst = new JmsTemplate(cf); + jmst.setDefaultDestinationName("fooQueue"); + BytesMessage message = (BytesMessage) jmst.receive(); + + byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message); + assertEquals("Hey dude!", new String(messageBytes)); + assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName()); + assertEquals("foo", message.getStringProperty("foo")); + } + + @Test + public void validateFailedPublishAndTransferToFailure() throws Exception { + ConnectionFactory cf = mock(ConnectionFactory.class); + + PublishJMS pubProc = new PublishJMS(); + TestRunner runner = TestRunners.newTestRunner(pubProc); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(cf); + + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(PublishJMS.DESTINATION, "fooQueue"); + + runner.enqueue("Hello Joe".getBytes()); + + runner.run(); + Thread.sleep(200); + + assertTrue(runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).isEmpty()); + assertNotNull(runner.getFlowFilesForRelationship(PublishJMS.REL_FAILURE).get(0)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/resources/log4j.properties b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/resources/log4j.properties new file mode 100644 index 0000000..35778d8 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/resources/log4j.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootCategory=WARN, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n + +log4j.category.org.apache.nifi.processors.kafka=INFO +log4j.category.kafka=ERROR +#log4j.category.org.apache.nifi.startup=INFO http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/nifi-jms-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/pom.xml b/nifi-nar-bundles/nifi-jms-bundle/pom.xml new file mode 100644 index 0000000..26b682a --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/pom.xml @@ -0,0 +1,32 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>0.6.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-jms-bundle</artifactId> + <version>0.6.0-SNAPSHOT</version> + <packaging>pom</packaging> + <description>A bundle of processors that publish to and consume messages from JMS.</description> + <modules> + <module>nifi-jms-cf-service</module> + <module>nifi-jms-cf-service-nar</module> + <module>nifi-jms-processors</module> + <module>nifi-jms-processors-nar</module> + </modules> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 0639d5a..b908ec1 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -54,7 +54,8 @@ <module>nifi-scripting-bundle</module> <module>nifi-elasticsearch-bundle</module> <module>nifi-amqp-bundle</module> - <module>nifi-splunk-bundle</module> + <module>nifi-splunk-bundle</module> + <module>nifi-jms-bundle</module> </modules> <dependencyManagement> <dependencies> http://git-wip-us.apache.org/repos/asf/nifi/blob/812da19c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 15fe0cb..4a7431e 100644 --- a/pom.xml +++ b/pom.xml @@ -1032,6 +1032,18 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-jms-cf-service-nar</artifactId> + <version>0.6.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-jms-processors-nar</artifactId> + <version>0.6.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId> <version>0.6.0-SNAPSHOT</version> <type>nar</type>
