Repository: nifi
Updated Branches:
  refs/heads/master ada069ce4 -> 3f0aa6e13


Changed some unit tests to Integration Tests because they are creating embedded 
JMS Brokers (implicitly) and commnicating with it. This becomes brittle in a 
full build, especially if any other unit test attempts to create a broker 
implicitly due to the way that the activemq broker implicit creation works.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3f0aa6e1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3f0aa6e1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3f0aa6e1

Branch: refs/heads/master
Commit: 3f0aa6e13cba9b96fbe2839a9ad232dab574c720
Parents: ada069c
Author: Mark Payne <[email protected]>
Authored: Fri Mar 16 12:07:33 2018 -0400
Committer: Mark Payne <[email protected]>
Committed: Fri Mar 16 12:07:33 2018 -0400

----------------------------------------------------------------------
 .../apache/nifi/jms/processors/CommonTest.java  |  26 ---
 .../nifi/jms/processors/ConsumeJMSIT.java       |  75 ++++++++
 .../nifi/jms/processors/ConsumeJMSTest.java     |  75 --------
 .../nifi/jms/processors/PublishJMSIT.java       | 183 +++++++++++++++++++
 .../nifi/jms/processors/PublishJMSTest.java     | 183 -------------------
 5 files changed, 258 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3f0aa6e1/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
index 5e963d2..bb1a8ae 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
@@ -16,41 +16,15 @@
  */
 package org.apache.nifi.jms.processors;
 
-import static org.junit.Assert.assertTrue;
-
-import java.util.Iterator;
-import java.util.ServiceLoader;
-
 import javax.jms.ConnectionFactory;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.nifi.processor.Processor;
-import org.junit.Test;
 import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 
 public class CommonTest {
 
-    @Test
-    public void validateServiceIsLocatableViaServiceLoader() {
-        ServiceLoader<Processor> loader = ServiceLoader.<Processor> 
load(Processor.class);
-        Iterator<Processor> iter = loader.iterator();
-        boolean pubJmsPresent = false;
-        boolean consumeJmsPresent = false;
-        while (iter.hasNext()) {
-            Processor p = iter.next();
-            if 
(p.getClass().getSimpleName().equals(PublishJMS.class.getSimpleName())) {
-                pubJmsPresent = true;
-            } else if 
(p.getClass().getSimpleName().equals(ConsumeJMS.class.getSimpleName())) {
-                consumeJmsPresent = true;
-            }
-
-        }
-        assertTrue(pubJmsPresent);
-        assertTrue(consumeJmsPresent);
-    }
-
     static JmsTemplate buildJmsTemplateForDestination(boolean pubSub) {
         ConnectionFactory activeMqConnectionFactory = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
         final ConnectionFactory connectionFactory = new 
CachingConnectionFactory(activeMqConnectionFactory);

http://git-wip-us.apache.org/repos/asf/nifi/blob/3f0aa6e1/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
new file mode 100644
index 0000000..d3012e9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.JmsHeaders;
+
+public class ConsumeJMSIT {
+
+    @Test
+    public void validateSuccessfulConsumeAndTransferToSuccess() throws 
Exception {
+        final String  destinationName = "cooQueue";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+        try {
+            JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
+            final Map<String, String> senderAttributes = new HashMap<>();
+            senderAttributes.put("filename", "message.txt");
+            senderAttributes.put("attribute_from_sender", "some value");
+            sender.publish(destinationName, "Hey dude!".getBytes(), 
senderAttributes);
+            TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
+            JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+            when(cs.getIdentifier()).thenReturn("cfProvider");
+            
when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory());
+            runner.addControllerService("cfProvider", cs);
+            runner.enableControllerService(cs);
+
+            runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+            runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
+            runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
+            runner.run(1, false);
+            //
+            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeExists(JmsHeaders.DESTINATION);
+            successFF.assertAttributeEquals(JmsHeaders.DESTINATION, 
destinationName);
+            successFF.assertAttributeExists("filename");
+            successFF.assertAttributeEquals("filename", "message.txt");
+            successFF.assertAttributeExists("attribute_from_sender");
+            successFF.assertAttributeEquals("attribute_from_sender", "some 
value");
+            successFF.assertContentEquals("Hey dude!".getBytes());
+            String sourceDestination = 
successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME);
+            assertNotNull(sourceDestination);
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3f0aa6e1/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
deleted file mode 100644
index 23cf806..0000000
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.jms.processors;
-
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-import org.springframework.jms.connection.CachingConnectionFactory;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.support.JmsHeaders;
-
-public class ConsumeJMSTest {
-
-    @Test
-    public void validateSuccessfulConsumeAndTransferToSuccess() throws 
Exception {
-        final String  destinationName = "cooQueue";
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
-        try {
-            JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
-            final Map<String, String> senderAttributes = new HashMap<>();
-            senderAttributes.put("filename", "message.txt");
-            senderAttributes.put("attribute_from_sender", "some value");
-            sender.publish(destinationName, "Hey dude!".getBytes(), 
senderAttributes);
-            TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
-            JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
-            when(cs.getIdentifier()).thenReturn("cfProvider");
-            
when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory());
-            runner.addControllerService("cfProvider", cs);
-            runner.enableControllerService(cs);
-
-            runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
-            runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
-            runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
-            runner.run(1, false);
-            //
-            final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
-            assertNotNull(successFF);
-            successFF.assertAttributeExists(JmsHeaders.DESTINATION);
-            successFF.assertAttributeEquals(JmsHeaders.DESTINATION, 
destinationName);
-            successFF.assertAttributeExists("filename");
-            successFF.assertAttributeEquals("filename", "message.txt");
-            successFF.assertAttributeExists("attribute_from_sender");
-            successFF.assertAttributeEquals("attribute_from_sender", "some 
value");
-            successFF.assertContentEquals("Hey dude!".getBytes());
-            String sourceDestination = 
successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME);
-            assertNotNull(sourceDestination);
-        } finally {
-            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3f0aa6e1/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
new file mode 100644
index 0000000..a365ad5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.JmsHeaders;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.TextMessage;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PublishJMSIT {
+
+    @Test(timeout = 10000)
+    public void validateSuccessfulPublishAndTransferToSuccess() throws 
Exception {
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+
+        final String destinationName = 
"validateSuccessfulPublishAndTransferToSuccess";
+        PublishJMS pubProc = new PublishJMS();
+        TestRunner runner = TestRunners.newTestRunner(pubProc);
+        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+        when(cs.getIdentifier()).thenReturn("cfProvider");
+        when(cs.getConnectionFactory()).thenReturn(cf);
+
+        runner.addControllerService("cfProvider", cs);
+        runner.enableControllerService(cs);
+
+        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+        runner.setProperty(PublishJMS.DESTINATION, destinationName);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("foo", "foo");
+        attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
+        runner.enqueue("Hey dude!".getBytes(), attributes);
+        runner.run(1, false); // Run once but don't shut down because we want 
the Connection Factory left in tact so that we can use it.
+
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+        assertNotNull(successFF);
+
+        JmsTemplate jmst = new JmsTemplate(cf);
+        BytesMessage message = (BytesMessage) jmst.receive(destinationName);
+
+        byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
+        assertEquals("Hey dude!", new String(messageBytes));
+        assertEquals("cooQueue", ((Queue) 
message.getJMSReplyTo()).getQueueName());
+        assertEquals("foo", message.getStringProperty("foo"));
+
+        runner.run(1, true, false); // Run once just so that we can trigger 
the shutdown of the Connection Factory
+    }
+
+    @Test(timeout = 10000)
+    public void validateSuccessfulPublishAndTransferToSuccessWithEL() throws 
Exception {
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+
+        final String destinationNameExpression = "${foo}Queue";
+        final String destinationName = "fooQueue";
+        PublishJMS pubProc = new PublishJMS();
+        TestRunner runner = TestRunners.newTestRunner(pubProc);
+        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+        when(cs.getIdentifier()).thenReturn("cfProvider");
+        when(cs.getConnectionFactory()).thenReturn(cf);
+
+        runner.addControllerService("cfProvider", cs);
+        runner.enableControllerService(cs);
+
+        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+        runner.setProperty(PublishJMS.DESTINATION, destinationNameExpression);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("foo", "foo");
+        attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
+        runner.enqueue("Hey dude!".getBytes(), attributes);
+        runner.run(1, false); // Run once but don't shut down because we want 
the Connection Factory left in tact so that we can use it.
+
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+        assertNotNull(successFF);
+
+        JmsTemplate jmst = new JmsTemplate(cf);
+        BytesMessage message = (BytesMessage) jmst.receive(destinationName);
+
+        byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
+        assertEquals("Hey dude!", new String(messageBytes));
+        assertEquals("cooQueue", ((Queue) 
message.getJMSReplyTo()).getQueueName());
+        assertEquals("foo", message.getStringProperty("foo"));
+
+        runner.run(1, true, false); // Run once just so that we can trigger 
the shutdown of the Connection Factory
+    }
+
+    @Test
+    public void validateFailedPublishAndTransferToFailure() throws Exception {
+        ConnectionFactory cf = mock(ConnectionFactory.class);
+
+        PublishJMS pubProc = new PublishJMS();
+        TestRunner runner = TestRunners.newTestRunner(pubProc);
+        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+        when(cs.getIdentifier()).thenReturn("cfProvider");
+        when(cs.getConnectionFactory()).thenReturn(cf);
+
+        runner.addControllerService("cfProvider", cs);
+        runner.enableControllerService(cs);
+
+        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+        runner.setProperty(PublishJMS.DESTINATION, 
"validateFailedPublishAndTransferToFailure");
+
+        runner.enqueue("Hello Joe".getBytes());
+
+        runner.run();
+        Thread.sleep(200);
+
+        
assertTrue(runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).isEmpty());
+        
assertNotNull(runner.getFlowFilesForRelationship(PublishJMS.REL_FAILURE).get(0));
+    }
+
+    @Test(timeout = 10000)
+    public void validatePublishTextMessage() throws Exception {
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+
+        final String destinationName = "validatePublishTextMessage";
+        PublishJMS pubProc = new PublishJMS();
+        TestRunner runner = TestRunners.newTestRunner(pubProc);
+        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+        when(cs.getIdentifier()).thenReturn("cfProvider");
+        when(cs.getConnectionFactory()).thenReturn(cf);
+
+        runner.addControllerService("cfProvider", cs);
+        runner.enableControllerService(cs);
+
+        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+        runner.setProperty(PublishJMS.DESTINATION, destinationName);
+        runner.setProperty(PublishJMS.MESSAGE_BODY, "text");
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("foo", "foo");
+        attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
+        runner.enqueue("Hey dude!".getBytes(), attributes);
+        runner.run(1, false);
+
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+        assertNotNull(successFF);
+
+        JmsTemplate jmst = new JmsTemplate(cf);
+        Message message = jmst.receive(destinationName);
+        assertTrue(message instanceof TextMessage);
+        TextMessage textMessage = (TextMessage) message;
+
+        byte[] messageBytes = MessageBodyToBytesConverter.toBytes(textMessage);
+        assertEquals("Hey dude!", new String(messageBytes));
+        assertEquals("cooQueue", ((Queue) 
message.getJMSReplyTo()).getQueueName());
+        assertEquals("foo", message.getStringProperty("foo"));
+
+        runner.run(1, true, false); // Run once just so that we can trigger 
the shutdown of the Connection Factory
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3f0aa6e1/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
deleted file mode 100644
index 281d39d..0000000
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.jms.processors;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.support.JmsHeaders;
-
-import javax.jms.BytesMessage;
-import javax.jms.ConnectionFactory;
-import javax.jms.Message;
-import javax.jms.Queue;
-import javax.jms.TextMessage;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class PublishJMSTest {
-
-    @Test(timeout = 10000)
-    public void validateSuccessfulPublishAndTransferToSuccess() throws 
Exception {
-        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
-
-        final String destinationName = 
"validateSuccessfulPublishAndTransferToSuccess";
-        PublishJMS pubProc = new PublishJMS();
-        TestRunner runner = TestRunners.newTestRunner(pubProc);
-        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
-        when(cs.getIdentifier()).thenReturn("cfProvider");
-        when(cs.getConnectionFactory()).thenReturn(cf);
-
-        runner.addControllerService("cfProvider", cs);
-        runner.enableControllerService(cs);
-
-        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
-        runner.setProperty(PublishJMS.DESTINATION, destinationName);
-
-        Map<String, String> attributes = new HashMap<>();
-        attributes.put("foo", "foo");
-        attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
-        runner.enqueue("Hey dude!".getBytes(), attributes);
-        runner.run(1, false); // Run once but don't shut down because we want 
the Connection Factory left in tact so that we can use it.
-
-        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
-        assertNotNull(successFF);
-
-        JmsTemplate jmst = new JmsTemplate(cf);
-        BytesMessage message = (BytesMessage) jmst.receive(destinationName);
-
-        byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
-        assertEquals("Hey dude!", new String(messageBytes));
-        assertEquals("cooQueue", ((Queue) 
message.getJMSReplyTo()).getQueueName());
-        assertEquals("foo", message.getStringProperty("foo"));
-
-        runner.run(1, true, false); // Run once just so that we can trigger 
the shutdown of the Connection Factory
-    }
-
-    @Test(timeout = 10000)
-    public void validateSuccessfulPublishAndTransferToSuccessWithEL() throws 
Exception {
-        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
-
-        final String destinationNameExpression = "${foo}Queue";
-        final String destinationName = "fooQueue";
-        PublishJMS pubProc = new PublishJMS();
-        TestRunner runner = TestRunners.newTestRunner(pubProc);
-        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
-        when(cs.getIdentifier()).thenReturn("cfProvider");
-        when(cs.getConnectionFactory()).thenReturn(cf);
-
-        runner.addControllerService("cfProvider", cs);
-        runner.enableControllerService(cs);
-
-        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
-        runner.setProperty(PublishJMS.DESTINATION, destinationNameExpression);
-
-        Map<String, String> attributes = new HashMap<>();
-        attributes.put("foo", "foo");
-        attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
-        runner.enqueue("Hey dude!".getBytes(), attributes);
-        runner.run(1, false); // Run once but don't shut down because we want 
the Connection Factory left in tact so that we can use it.
-
-        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
-        assertNotNull(successFF);
-
-        JmsTemplate jmst = new JmsTemplate(cf);
-        BytesMessage message = (BytesMessage) jmst.receive(destinationName);
-
-        byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
-        assertEquals("Hey dude!", new String(messageBytes));
-        assertEquals("cooQueue", ((Queue) 
message.getJMSReplyTo()).getQueueName());
-        assertEquals("foo", message.getStringProperty("foo"));
-
-        runner.run(1, true, false); // Run once just so that we can trigger 
the shutdown of the Connection Factory
-    }
-
-    @Test
-    public void validateFailedPublishAndTransferToFailure() throws Exception {
-        ConnectionFactory cf = mock(ConnectionFactory.class);
-
-        PublishJMS pubProc = new PublishJMS();
-        TestRunner runner = TestRunners.newTestRunner(pubProc);
-        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
-        when(cs.getIdentifier()).thenReturn("cfProvider");
-        when(cs.getConnectionFactory()).thenReturn(cf);
-
-        runner.addControllerService("cfProvider", cs);
-        runner.enableControllerService(cs);
-
-        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
-        runner.setProperty(PublishJMS.DESTINATION, 
"validateFailedPublishAndTransferToFailure");
-
-        runner.enqueue("Hello Joe".getBytes());
-
-        runner.run();
-        Thread.sleep(200);
-
-        
assertTrue(runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).isEmpty());
-        
assertNotNull(runner.getFlowFilesForRelationship(PublishJMS.REL_FAILURE).get(0));
-    }
-
-    @Test(timeout = 10000)
-    public void validatePublishTextMessage() throws Exception {
-        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
-
-        final String destinationName = "validatePublishTextMessage";
-        PublishJMS pubProc = new PublishJMS();
-        TestRunner runner = TestRunners.newTestRunner(pubProc);
-        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
-        when(cs.getIdentifier()).thenReturn("cfProvider");
-        when(cs.getConnectionFactory()).thenReturn(cf);
-
-        runner.addControllerService("cfProvider", cs);
-        runner.enableControllerService(cs);
-
-        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
-        runner.setProperty(PublishJMS.DESTINATION, destinationName);
-        runner.setProperty(PublishJMS.MESSAGE_BODY, "text");
-
-        Map<String, String> attributes = new HashMap<>();
-        attributes.put("foo", "foo");
-        attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
-        runner.enqueue("Hey dude!".getBytes(), attributes);
-        runner.run(1, false);
-
-        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
-        assertNotNull(successFF);
-
-        JmsTemplate jmst = new JmsTemplate(cf);
-        Message message = jmst.receive(destinationName);
-        assertTrue(message instanceof TextMessage);
-        TextMessage textMessage = (TextMessage) message;
-
-        byte[] messageBytes = MessageBodyToBytesConverter.toBytes(textMessage);
-        assertEquals("Hey dude!", new String(messageBytes));
-        assertEquals("cooQueue", ((Queue) 
message.getJMSReplyTo()).getQueueName());
-        assertEquals("foo", message.getStringProperty("foo"));
-
-        runner.run(1, true, false); // Run once just so that we can trigger 
the shutdown of the Connection Factory
-    }
-}

Reply via email to