Repository: nifi Updated Branches: refs/heads/master ada069ce4 -> 3f0aa6e13
Changed some unit tests to Integration Tests because they are creating embedded JMS Brokers (implicitly) and commnicating with it. This becomes brittle in a full build, especially if any other unit test attempts to create a broker implicitly due to the way that the activemq broker implicit creation works. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3f0aa6e1 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3f0aa6e1 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3f0aa6e1 Branch: refs/heads/master Commit: 3f0aa6e13cba9b96fbe2839a9ad232dab574c720 Parents: ada069c Author: Mark Payne <[email protected]> Authored: Fri Mar 16 12:07:33 2018 -0400 Committer: Mark Payne <[email protected]> Committed: Fri Mar 16 12:07:33 2018 -0400 ---------------------------------------------------------------------- .../apache/nifi/jms/processors/CommonTest.java | 26 --- .../nifi/jms/processors/ConsumeJMSIT.java | 75 ++++++++ .../nifi/jms/processors/ConsumeJMSTest.java | 75 -------- .../nifi/jms/processors/PublishJMSIT.java | 183 +++++++++++++++++++ .../nifi/jms/processors/PublishJMSTest.java | 183 ------------------- 5 files changed, 258 insertions(+), 284 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/3f0aa6e1/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java index 5e963d2..bb1a8ae 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java @@ -16,41 +16,15 @@ */ package org.apache.nifi.jms.processors; -import static org.junit.Assert.assertTrue; - -import java.util.Iterator; -import java.util.ServiceLoader; - import javax.jms.ConnectionFactory; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.nifi.processor.Processor; -import org.junit.Test; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; public class CommonTest { - @Test - public void validateServiceIsLocatableViaServiceLoader() { - ServiceLoader<Processor> loader = ServiceLoader.<Processor> load(Processor.class); - Iterator<Processor> iter = loader.iterator(); - boolean pubJmsPresent = false; - boolean consumeJmsPresent = false; - while (iter.hasNext()) { - Processor p = iter.next(); - if (p.getClass().getSimpleName().equals(PublishJMS.class.getSimpleName())) { - pubJmsPresent = true; - } else if (p.getClass().getSimpleName().equals(ConsumeJMS.class.getSimpleName())) { - consumeJmsPresent = true; - } - - } - assertTrue(pubJmsPresent); - assertTrue(consumeJmsPresent); - } - static JmsTemplate buildJmsTemplateForDestination(boolean pubSub) { ConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); final ConnectionFactory connectionFactory = new CachingConnectionFactory(activeMqConnectionFactory); http://git-wip-us.apache.org/repos/asf/nifi/blob/3f0aa6e1/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java new file mode 100644 index 0000000..d3012e9 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.support.JmsHeaders; + +public class ConsumeJMSIT { + + @Test + public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception { + final String destinationName = "cooQueue"; + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); + try { + JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); + final Map<String, String> senderAttributes = new HashMap<>(); + senderAttributes.put("filename", "message.txt"); + senderAttributes.put("attribute_from_sender", "some value"); + sender.publish(destinationName, "Hey dude!".getBytes(), senderAttributes); + TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS()); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory()); + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(ConsumeJMS.DESTINATION, destinationName); + runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE); + runner.run(1, false); + // + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); + assertNotNull(successFF); + successFF.assertAttributeExists(JmsHeaders.DESTINATION); + successFF.assertAttributeEquals(JmsHeaders.DESTINATION, destinationName); + successFF.assertAttributeExists("filename"); + successFF.assertAttributeEquals("filename", "message.txt"); + successFF.assertAttributeExists("attribute_from_sender"); + successFF.assertAttributeEquals("attribute_from_sender", "some value"); + successFF.assertContentEquals("Hey dude!".getBytes()); + String sourceDestination = successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME); + assertNotNull(sourceDestination); + } finally { + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3f0aa6e1/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java deleted file mode 100644 index 23cf806..0000000 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.jms.processors; - -import static org.junit.Assert.assertNotNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Test; -import org.springframework.jms.connection.CachingConnectionFactory; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.support.JmsHeaders; - -public class ConsumeJMSTest { - - @Test - public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception { - final String destinationName = "cooQueue"; - JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); - try { - JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); - final Map<String, String> senderAttributes = new HashMap<>(); - senderAttributes.put("filename", "message.txt"); - senderAttributes.put("attribute_from_sender", "some value"); - sender.publish(destinationName, "Hey dude!".getBytes(), senderAttributes); - TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS()); - JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); - when(cs.getIdentifier()).thenReturn("cfProvider"); - when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory()); - runner.addControllerService("cfProvider", cs); - runner.enableControllerService(cs); - - runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); - runner.setProperty(ConsumeJMS.DESTINATION, destinationName); - runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE); - runner.run(1, false); - // - final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); - assertNotNull(successFF); - successFF.assertAttributeExists(JmsHeaders.DESTINATION); - successFF.assertAttributeEquals(JmsHeaders.DESTINATION, destinationName); - successFF.assertAttributeExists("filename"); - successFF.assertAttributeEquals("filename", "message.txt"); - successFF.assertAttributeExists("attribute_from_sender"); - successFF.assertAttributeEquals("attribute_from_sender", "some value"); - successFF.assertContentEquals("Hey dude!".getBytes()); - String sourceDestination = successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME); - assertNotNull(sourceDestination); - } finally { - ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/3f0aa6e1/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java new file mode 100644 index 0000000..a365ad5 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.support.JmsHeaders; + +import javax.jms.BytesMessage; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.TextMessage; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class PublishJMSIT { + + @Test(timeout = 10000) + public void validateSuccessfulPublishAndTransferToSuccess() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + + final String destinationName = "validateSuccessfulPublishAndTransferToSuccess"; + PublishJMS pubProc = new PublishJMS(); + TestRunner runner = TestRunners.newTestRunner(pubProc); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(cf); + + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(PublishJMS.DESTINATION, destinationName); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("foo", "foo"); + attributes.put(JmsHeaders.REPLY_TO, "cooQueue"); + runner.enqueue("Hey dude!".getBytes(), attributes); + runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it. + + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); + assertNotNull(successFF); + + JmsTemplate jmst = new JmsTemplate(cf); + BytesMessage message = (BytesMessage) jmst.receive(destinationName); + + byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message); + assertEquals("Hey dude!", new String(messageBytes)); + assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName()); + assertEquals("foo", message.getStringProperty("foo")); + + runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory + } + + @Test(timeout = 10000) + public void validateSuccessfulPublishAndTransferToSuccessWithEL() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + + final String destinationNameExpression = "${foo}Queue"; + final String destinationName = "fooQueue"; + PublishJMS pubProc = new PublishJMS(); + TestRunner runner = TestRunners.newTestRunner(pubProc); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(cf); + + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(PublishJMS.DESTINATION, destinationNameExpression); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("foo", "foo"); + attributes.put(JmsHeaders.REPLY_TO, "cooQueue"); + runner.enqueue("Hey dude!".getBytes(), attributes); + runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it. + + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); + assertNotNull(successFF); + + JmsTemplate jmst = new JmsTemplate(cf); + BytesMessage message = (BytesMessage) jmst.receive(destinationName); + + byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message); + assertEquals("Hey dude!", new String(messageBytes)); + assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName()); + assertEquals("foo", message.getStringProperty("foo")); + + runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory + } + + @Test + public void validateFailedPublishAndTransferToFailure() throws Exception { + ConnectionFactory cf = mock(ConnectionFactory.class); + + PublishJMS pubProc = new PublishJMS(); + TestRunner runner = TestRunners.newTestRunner(pubProc); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(cf); + + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(PublishJMS.DESTINATION, "validateFailedPublishAndTransferToFailure"); + + runner.enqueue("Hello Joe".getBytes()); + + runner.run(); + Thread.sleep(200); + + assertTrue(runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).isEmpty()); + assertNotNull(runner.getFlowFilesForRelationship(PublishJMS.REL_FAILURE).get(0)); + } + + @Test(timeout = 10000) + public void validatePublishTextMessage() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + + final String destinationName = "validatePublishTextMessage"; + PublishJMS pubProc = new PublishJMS(); + TestRunner runner = TestRunners.newTestRunner(pubProc); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(cf); + + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(PublishJMS.DESTINATION, destinationName); + runner.setProperty(PublishJMS.MESSAGE_BODY, "text"); + + Map<String, String> attributes = new HashMap<>(); + attributes.put("foo", "foo"); + attributes.put(JmsHeaders.REPLY_TO, "cooQueue"); + runner.enqueue("Hey dude!".getBytes(), attributes); + runner.run(1, false); + + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); + assertNotNull(successFF); + + JmsTemplate jmst = new JmsTemplate(cf); + Message message = jmst.receive(destinationName); + assertTrue(message instanceof TextMessage); + TextMessage textMessage = (TextMessage) message; + + byte[] messageBytes = MessageBodyToBytesConverter.toBytes(textMessage); + assertEquals("Hey dude!", new String(messageBytes)); + assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName()); + assertEquals("foo", message.getStringProperty("foo")); + + runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3f0aa6e1/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java deleted file mode 100644 index 281d39d..0000000 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.jms.processors; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Test; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.support.JmsHeaders; - -import javax.jms.BytesMessage; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.Queue; -import javax.jms.TextMessage; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class PublishJMSTest { - - @Test(timeout = 10000) - public void validateSuccessfulPublishAndTransferToSuccess() throws Exception { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); - - final String destinationName = "validateSuccessfulPublishAndTransferToSuccess"; - PublishJMS pubProc = new PublishJMS(); - TestRunner runner = TestRunners.newTestRunner(pubProc); - JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); - when(cs.getIdentifier()).thenReturn("cfProvider"); - when(cs.getConnectionFactory()).thenReturn(cf); - - runner.addControllerService("cfProvider", cs); - runner.enableControllerService(cs); - - runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); - runner.setProperty(PublishJMS.DESTINATION, destinationName); - - Map<String, String> attributes = new HashMap<>(); - attributes.put("foo", "foo"); - attributes.put(JmsHeaders.REPLY_TO, "cooQueue"); - runner.enqueue("Hey dude!".getBytes(), attributes); - runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it. - - final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); - assertNotNull(successFF); - - JmsTemplate jmst = new JmsTemplate(cf); - BytesMessage message = (BytesMessage) jmst.receive(destinationName); - - byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message); - assertEquals("Hey dude!", new String(messageBytes)); - assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName()); - assertEquals("foo", message.getStringProperty("foo")); - - runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory - } - - @Test(timeout = 10000) - public void validateSuccessfulPublishAndTransferToSuccessWithEL() throws Exception { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); - - final String destinationNameExpression = "${foo}Queue"; - final String destinationName = "fooQueue"; - PublishJMS pubProc = new PublishJMS(); - TestRunner runner = TestRunners.newTestRunner(pubProc); - JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); - when(cs.getIdentifier()).thenReturn("cfProvider"); - when(cs.getConnectionFactory()).thenReturn(cf); - - runner.addControllerService("cfProvider", cs); - runner.enableControllerService(cs); - - runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); - runner.setProperty(PublishJMS.DESTINATION, destinationNameExpression); - - Map<String, String> attributes = new HashMap<>(); - attributes.put("foo", "foo"); - attributes.put(JmsHeaders.REPLY_TO, "cooQueue"); - runner.enqueue("Hey dude!".getBytes(), attributes); - runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it. - - final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); - assertNotNull(successFF); - - JmsTemplate jmst = new JmsTemplate(cf); - BytesMessage message = (BytesMessage) jmst.receive(destinationName); - - byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message); - assertEquals("Hey dude!", new String(messageBytes)); - assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName()); - assertEquals("foo", message.getStringProperty("foo")); - - runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory - } - - @Test - public void validateFailedPublishAndTransferToFailure() throws Exception { - ConnectionFactory cf = mock(ConnectionFactory.class); - - PublishJMS pubProc = new PublishJMS(); - TestRunner runner = TestRunners.newTestRunner(pubProc); - JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); - when(cs.getIdentifier()).thenReturn("cfProvider"); - when(cs.getConnectionFactory()).thenReturn(cf); - - runner.addControllerService("cfProvider", cs); - runner.enableControllerService(cs); - - runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); - runner.setProperty(PublishJMS.DESTINATION, "validateFailedPublishAndTransferToFailure"); - - runner.enqueue("Hello Joe".getBytes()); - - runner.run(); - Thread.sleep(200); - - assertTrue(runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).isEmpty()); - assertNotNull(runner.getFlowFilesForRelationship(PublishJMS.REL_FAILURE).get(0)); - } - - @Test(timeout = 10000) - public void validatePublishTextMessage() throws Exception { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); - - final String destinationName = "validatePublishTextMessage"; - PublishJMS pubProc = new PublishJMS(); - TestRunner runner = TestRunners.newTestRunner(pubProc); - JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); - when(cs.getIdentifier()).thenReturn("cfProvider"); - when(cs.getConnectionFactory()).thenReturn(cf); - - runner.addControllerService("cfProvider", cs); - runner.enableControllerService(cs); - - runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); - runner.setProperty(PublishJMS.DESTINATION, destinationName); - runner.setProperty(PublishJMS.MESSAGE_BODY, "text"); - - Map<String, String> attributes = new HashMap<>(); - attributes.put("foo", "foo"); - attributes.put(JmsHeaders.REPLY_TO, "cooQueue"); - runner.enqueue("Hey dude!".getBytes(), attributes); - runner.run(1, false); - - final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); - assertNotNull(successFF); - - JmsTemplate jmst = new JmsTemplate(cf); - Message message = jmst.receive(destinationName); - assertTrue(message instanceof TextMessage); - TextMessage textMessage = (TextMessage) message; - - byte[] messageBytes = MessageBodyToBytesConverter.toBytes(textMessage); - assertEquals("Hey dude!", new String(messageBytes)); - assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName()); - assertEquals("foo", message.getStringProperty("foo")); - - runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory - } -}
