Repository: nifi
Updated Branches:
  refs/heads/master eb844d8c6 -> b4a4cc564


Fixed failing unit tests: Changed the queues used to unique names so that one 
test won't interfere with another; also changed JMSPublisherConsumerTest to 
JMSPublisherConsumerIT since it is an integration test between the publisher 
and consumer with ActiveMQ as the broker


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b4a4cc56
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b4a4cc56
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b4a4cc56

Branch: refs/heads/master
Commit: b4a4cc564e23f7be4512cdc443b2275714c1aea2
Parents: eb844d8
Author: Mark Payne <marka...@hotmail.com>
Authored: Fri Mar 2 08:44:25 2018 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Fri Mar 2 08:44:25 2018 -0500

----------------------------------------------------------------------
 .../jms/processors/JMSPublisherConsumerIT.java  | 303 +++++++++++++++++++
 .../processors/JMSPublisherConsumerTest.java    | 303 -------------------
 .../nifi/jms/processors/PublishJMSTest.java     |   4 +-
 3 files changed, 305 insertions(+), 305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b4a4cc56/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
new file mode 100644
index 0000000..a356a41
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.processors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback;
+import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.Test;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.support.JmsHeaders;
+
+public class JMSPublisherConsumerIT {
+
+    @Test
+    public void validateBytesConvertedToBytesMessageOnSend() throws Exception {
+        final String destinationName = 
"validateBytesConvertedToBytesMessageOnSend";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+
+        try {
+            JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), 
jmsTemplate, mock(ComponentLog.class));
+            publisher.publish(destinationName, "hellomq".getBytes());
+
+            Message receivedMessage = jmsTemplate.receive(destinationName);
+            assertTrue(receivedMessage instanceof BytesMessage);
+            byte[] bytes = new byte[7];
+            ((BytesMessage) receivedMessage).readBytes(bytes);
+            assertEquals("hellomq", new String(bytes));
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
+    }
+
+    @Test
+    public void 
validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() throws 
Exception {
+        final String destinationName = 
"validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+
+        try {
+            JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), 
jmsTemplate, mock(ComponentLog.class));
+            Map<String, String> flowFileAttributes = new HashMap<>();
+            flowFileAttributes.put("foo", "foo");
+            flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
+            publisher.publish(destinationName, "hellomq".getBytes(), 
flowFileAttributes);
+
+            Message receivedMessage = jmsTemplate.receive(destinationName);
+            assertTrue(receivedMessage instanceof BytesMessage);
+            assertEquals("foo", receivedMessage.getStringProperty("foo"));
+            assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
+            assertEquals("myTopic", ((Topic) 
receivedMessage.getJMSReplyTo()).getTopicName());
+
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
+    }
+
+    /**
+     * At the moment the only two supported message types are TextMessage and
+     * BytesMessage which is sufficient for the type if JMS use cases NiFi is
+     * used. The may change to the point where all message types are supported
+     * at which point this test will no be longer required.
+     */
+    @Test(expected = IllegalStateException.class)
+    public void validateFailOnUnsupportedMessageType() throws Exception {
+        final String destinationName = "validateFailOnUnsupportedMessageType";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+
+        try {
+            jmsTemplate.send(destinationName, new MessageCreator() {
+                @Override
+                public Message createMessage(Session session) throws 
JMSException {
+                    return session.createObjectMessage();
+                }
+            });
+
+            JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
+            consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+                @Override
+                public void accept(JMSResponse response) {
+                    // noop
+                }
+            });
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
+    }
+
+    @Test
+    public void validateConsumeWithCustomHeadersAndProperties() throws 
Exception {
+        final String destinationName = 
"validateConsumeWithCustomHeadersAndProperties";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+
+        try {
+            jmsTemplate.send(destinationName, new MessageCreator() {
+                @Override
+                public Message createMessage(Session session) throws 
JMSException {
+                    TextMessage message = session.createTextMessage("hello 
from the other side");
+                    message.setStringProperty("foo", "foo");
+                    message.setBooleanProperty("bar", false);
+                    message.setJMSReplyTo(session.createQueue("fooQueue"));
+                    return message;
+                }
+            });
+
+            JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
+            final AtomicBoolean callbackInvoked = new AtomicBoolean();
+            consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+                @Override
+                public void accept(JMSResponse response) {
+                    callbackInvoked.set(true);
+                    assertEquals("hello from the other side", new 
String(response.getMessageBody()));
+                    assertEquals("fooQueue", 
response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
+                    assertEquals("foo", 
response.getMessageProperties().get("foo"));
+                    assertEquals("false", 
response.getMessageProperties().get("bar"));
+                }
+            });
+            assertTrue(callbackInvoked.get());
+
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testMultipleThreads() throws Exception {
+        String destinationName = "testMultipleThreads";
+        JmsTemplate publishTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+        final CountDownLatch consumerTemplateCloseCount = new 
CountDownLatch(4);
+
+        try {
+            JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) publishTemplate.getConnectionFactory(), 
publishTemplate, mock(ComponentLog.class));
+            for (int i = 0; i < 4000; i++) {
+                publisher.publish(destinationName, 
String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+            }
+
+            final AtomicInteger msgCount = new AtomicInteger(0);
+
+            final ConsumerCallback callback = new ConsumerCallback() {
+                @Override
+                public void accept(JMSResponse response) {
+                    msgCount.incrementAndGet();
+                }
+            };
+
+            final Thread[] threads = new Thread[4];
+            for (int i = 0; i < 4; i++) {
+                final Thread t = new Thread(() -> {
+                    JmsTemplate consumeTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+
+                    try {
+                        JMSConsumer consumer = new 
JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), 
consumeTemplate, mock(ComponentLog.class));
+
+                        for (int j = 0; j < 1000 && msgCount.get() < 4000; 
j++) {
+                            consumer.consume(destinationName, false, false, 
null, callback);
+                        }
+                    } finally {
+                        ((CachingConnectionFactory) 
consumeTemplate.getConnectionFactory()).destroy();
+                        consumerTemplateCloseCount.countDown();
+                    }
+                });
+
+                threads[i] = t;
+                t.start();
+            }
+
+            int iterations = 0;
+            while (msgCount.get() < 4000) {
+                Thread.sleep(10L);
+                if (++iterations % 100 == 0) {
+                    System.out.println(msgCount.get() + " messages received so 
far");
+                }
+            }
+        } finally {
+            ((CachingConnectionFactory) 
publishTemplate.getConnectionFactory()).destroy();
+
+            consumerTemplateCloseCount.await();
+        }
+    }
+
+
+    @Test(timeout = 10000)
+    public void validateMessageRedeliveryWhenNotAcked() throws Exception {
+        String destinationName = "validateMessageRedeliveryWhenNotAcked";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+        try {
+            JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), 
jmsTemplate, mock(ComponentLog.class));
+            publisher.publish(destinationName, 
"1".getBytes(StandardCharsets.UTF_8));
+            publisher.publish(destinationName, 
"2".getBytes(StandardCharsets.UTF_8));
+
+            JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
+            final AtomicBoolean callbackInvoked = new AtomicBoolean();
+            try {
+                consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+                    @Override
+                    public void accept(JMSResponse response) {
+                        callbackInvoked.set(true);
+                        assertEquals("1", new 
String(response.getMessageBody()));
+                        throw new RuntimeException("intentional to avoid 
explicit ack");
+                    }
+                });
+            } catch (Exception e) {
+                // expected
+            }
+
+            assertTrue(callbackInvoked.get());
+            callbackInvoked.set(false);
+
+            // should receive the same message, but will process it 
successfully
+            while (!callbackInvoked.get()) {
+                consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+                    @Override
+                    public void accept(JMSResponse response) {
+                        if (response == null) {
+                            return;
+                        }
+
+                        callbackInvoked.set(true);
+                        assertEquals("1", new 
String(response.getMessageBody()));
+                    }
+                });
+            }
+
+            assertTrue(callbackInvoked.get());
+            callbackInvoked.set(false);
+
+            // receiving next message and fail again
+            try {
+                while (!callbackInvoked.get()) {
+                    consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+                        @Override
+                        public void accept(JMSResponse response) {
+                            if (response == null) {
+                                return;
+                            }
+
+                            callbackInvoked.set(true);
+                            assertEquals("2", new 
String(response.getMessageBody()));
+                            throw new RuntimeException("intentional to avoid 
explicit ack");
+                        }
+                    });
+                }
+            } catch (Exception e) {
+                // ignore
+            }
+            assertTrue(callbackInvoked.get());
+            callbackInvoked.set(false);
+
+            // should receive the same message, but will process it 
successfully
+            try {
+                while (!callbackInvoked.get()) {
+                    consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+                        @Override
+                        public void accept(JMSResponse response) {
+                            if (response == null) {
+                                return;
+                            }
+
+                            callbackInvoked.set(true);
+                            assertEquals("2", new 
String(response.getMessageBody()));
+                        }
+                    });
+                }
+            } catch (Exception e) {
+                // ignore
+            }
+        } finally {
+            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b4a4cc56/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
deleted file mode 100644
index 9825ea3..0000000
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.jms.processors;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback;
-import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
-import org.apache.nifi.logging.ComponentLog;
-import org.junit.Test;
-import org.springframework.jms.connection.CachingConnectionFactory;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.MessageCreator;
-import org.springframework.jms.support.JmsHeaders;
-
-public class JMSPublisherConsumerTest {
-
-    @Test
-    public void validateBytesConvertedToBytesMessageOnSend() throws Exception {
-        final String destinationName = "testQueue";
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
-
-        try {
-            JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), 
jmsTemplate, mock(ComponentLog.class));
-            publisher.publish(destinationName, "hellomq".getBytes());
-
-            Message receivedMessage = jmsTemplate.receive(destinationName);
-            assertTrue(receivedMessage instanceof BytesMessage);
-            byte[] bytes = new byte[7];
-            ((BytesMessage) receivedMessage).readBytes(bytes);
-            assertEquals("hellomq", new String(bytes));
-        } finally {
-            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
-        }
-    }
-
-    @Test
-    public void 
validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() throws 
Exception {
-        final String destinationName = "testQueue";
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
-
-        try {
-            JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), 
jmsTemplate, mock(ComponentLog.class));
-            Map<String, String> flowFileAttributes = new HashMap<>();
-            flowFileAttributes.put("foo", "foo");
-            flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
-            publisher.publish(destinationName, "hellomq".getBytes(), 
flowFileAttributes);
-
-            Message receivedMessage = jmsTemplate.receive(destinationName);
-            assertTrue(receivedMessage instanceof BytesMessage);
-            assertEquals("foo", receivedMessage.getStringProperty("foo"));
-            assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
-            assertEquals("myTopic", ((Topic) 
receivedMessage.getJMSReplyTo()).getTopicName());
-
-        } finally {
-            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
-        }
-    }
-
-    /**
-     * At the moment the only two supported message types are TextMessage and
-     * BytesMessage which is sufficient for the type if JMS use cases NiFi is
-     * used. The may change to the point where all message types are supported
-     * at which point this test will no be longer required.
-     */
-    @Test(expected = IllegalStateException.class)
-    public void validateFailOnUnsupportedMessageType() throws Exception {
-        final String destinationName = "testQueue";
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
-
-        try {
-            jmsTemplate.send(destinationName, new MessageCreator() {
-                @Override
-                public Message createMessage(Session session) throws 
JMSException {
-                    return session.createObjectMessage();
-                }
-            });
-
-            JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
-            consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
-                @Override
-                public void accept(JMSResponse response) {
-                    // noop
-                }
-            });
-        } finally {
-            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
-        }
-    }
-
-    @Test
-    public void validateConsumeWithCustomHeadersAndProperties() throws 
Exception {
-        final String destinationName = "testQueue";
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
-
-        try {
-            jmsTemplate.send(destinationName, new MessageCreator() {
-                @Override
-                public Message createMessage(Session session) throws 
JMSException {
-                    TextMessage message = session.createTextMessage("hello 
from the other side");
-                    message.setStringProperty("foo", "foo");
-                    message.setBooleanProperty("bar", false);
-                    message.setJMSReplyTo(session.createQueue("fooQueue"));
-                    return message;
-                }
-            });
-
-            JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
-            final AtomicBoolean callbackInvoked = new AtomicBoolean();
-            consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
-                @Override
-                public void accept(JMSResponse response) {
-                    callbackInvoked.set(true);
-                    assertEquals("hello from the other side", new 
String(response.getMessageBody()));
-                    assertEquals("fooQueue", 
response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
-                    assertEquals("foo", 
response.getMessageProperties().get("foo"));
-                    assertEquals("false", 
response.getMessageProperties().get("bar"));
-                }
-            });
-            assertTrue(callbackInvoked.get());
-
-        } finally {
-            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
-        }
-    }
-
-    @Test(timeout = 20000)
-    public void testMultipleThreads() throws Exception {
-        String destinationName = "testQueue";
-        JmsTemplate publishTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
-        final CountDownLatch consumerTemplateCloseCount = new 
CountDownLatch(4);
-
-        try {
-            JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) publishTemplate.getConnectionFactory(), 
publishTemplate, mock(ComponentLog.class));
-            for (int i = 0; i < 4000; i++) {
-                publisher.publish(destinationName, 
String.valueOf(i).getBytes(StandardCharsets.UTF_8));
-            }
-
-            final AtomicInteger msgCount = new AtomicInteger(0);
-
-            final ConsumerCallback callback = new ConsumerCallback() {
-                @Override
-                public void accept(JMSResponse response) {
-                    msgCount.incrementAndGet();
-                }
-            };
-
-            final Thread[] threads = new Thread[4];
-            for (int i = 0; i < 4; i++) {
-                final Thread t = new Thread(() -> {
-                    JmsTemplate consumeTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
-
-                    try {
-                        JMSConsumer consumer = new 
JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), 
consumeTemplate, mock(ComponentLog.class));
-
-                        for (int j = 0; j < 1000 && msgCount.get() < 4000; 
j++) {
-                            consumer.consume(destinationName, false, false, 
null, callback);
-                        }
-                    } finally {
-                        ((CachingConnectionFactory) 
consumeTemplate.getConnectionFactory()).destroy();
-                        consumerTemplateCloseCount.countDown();
-                    }
-                });
-
-                threads[i] = t;
-                t.start();
-            }
-
-            int iterations = 0;
-            while (msgCount.get() < 4000) {
-                Thread.sleep(10L);
-                if (++iterations % 100 == 0) {
-                    System.out.println(msgCount.get() + " messages received so 
far");
-                }
-            }
-        } finally {
-            ((CachingConnectionFactory) 
publishTemplate.getConnectionFactory()).destroy();
-
-            consumerTemplateCloseCount.await();
-        }
-    }
-
-
-    @Test(timeout = 10000)
-    public void validateMessageRedeliveryWhenNotAcked() throws Exception {
-        String destinationName = "testQueue";
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
-        try {
-            JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), 
jmsTemplate, mock(ComponentLog.class));
-            publisher.publish(destinationName, 
"1".getBytes(StandardCharsets.UTF_8));
-            publisher.publish(destinationName, 
"2".getBytes(StandardCharsets.UTF_8));
-
-            JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
-            final AtomicBoolean callbackInvoked = new AtomicBoolean();
-            try {
-                consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
-                    @Override
-                    public void accept(JMSResponse response) {
-                        callbackInvoked.set(true);
-                        assertEquals("1", new 
String(response.getMessageBody()));
-                        throw new RuntimeException("intentional to avoid 
explicit ack");
-                    }
-                });
-            } catch (Exception e) {
-                // expected
-            }
-
-            assertTrue(callbackInvoked.get());
-            callbackInvoked.set(false);
-
-            // should receive the same message, but will process it 
successfully
-            while (!callbackInvoked.get()) {
-                consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
-                    @Override
-                    public void accept(JMSResponse response) {
-                        if (response == null) {
-                            return;
-                        }
-
-                        callbackInvoked.set(true);
-                        assertEquals("1", new 
String(response.getMessageBody()));
-                    }
-                });
-            }
-
-            assertTrue(callbackInvoked.get());
-            callbackInvoked.set(false);
-
-            // receiving next message and fail again
-            try {
-                while (!callbackInvoked.get()) {
-                    consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
-                        @Override
-                        public void accept(JMSResponse response) {
-                            if (response == null) {
-                                return;
-                            }
-
-                            callbackInvoked.set(true);
-                            assertEquals("2", new 
String(response.getMessageBody()));
-                            throw new RuntimeException("intentional to avoid 
explicit ack");
-                        }
-                    });
-                }
-            } catch (Exception e) {
-                // ignore
-            }
-            assertTrue(callbackInvoked.get());
-            callbackInvoked.set(false);
-
-            // should receive the same message, but will process it 
successfully
-            try {
-                while (!callbackInvoked.get()) {
-                    consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
-                        @Override
-                        public void accept(JMSResponse response) {
-                            if (response == null) {
-                                return;
-                            }
-
-                            callbackInvoked.set(true);
-                            assertEquals("2", new 
String(response.getMessageBody()));
-                        }
-                    });
-                }
-            } catch (Exception e) {
-                // ignore
-            }
-        } finally {
-            ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b4a4cc56/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
index 06bd775..f6f5ba5 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
@@ -43,7 +43,7 @@ public class PublishJMSTest {
     public void validateSuccessfulPublishAndTransferToSuccess() throws 
Exception {
         ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
 
-        final String destinationName = "fooQueue";
+        final String destinationName = 
"validateSuccessfulPublishAndTransferToSuccess";
         PublishJMS pubProc = new PublishJMS();
         TestRunner runner = TestRunners.newTestRunner(pubProc);
         JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
@@ -128,7 +128,7 @@ public class PublishJMSTest {
         runner.enableControllerService(cs);
 
         runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
-        runner.setProperty(PublishJMS.DESTINATION, "fooQueue");
+        runner.setProperty(PublishJMS.DESTINATION, 
"validateFailedPublishAndTransferToFailure");
 
         runner.enqueue("Hello Joe".getBytes());
 

Reply via email to