Repository: nifi
Updated Branches:
  refs/heads/master 34bd2061f -> de2dd93f2


NIFI-1300 Corrects inconsistent error behavior in PutJMS and added PutJMS unit 
testing

Reviewed by Tony Kurc ([email protected]) This closes #145


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/de2dd93f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/de2dd93f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/de2dd93f

Branch: refs/heads/master
Commit: de2dd93f26dd34271632451621d5521fae11c979
Parents: 34bd206
Author: Joe Skora <[email protected]>
Authored: Thu Dec 17 12:27:56 2015 -0500
Committer: Tony Kurc <[email protected]>
Committed: Mon Jan 4 00:26:21 2016 -0500

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/PutJMS.java |   5 +-
 .../nifi/processors/standard/TestPutJMS.java    | 565 +++++++++++++++++++
 2 files changed, 568 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/de2dd93f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
index b8902a9..7405ca5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
@@ -251,8 +251,9 @@ public class PutJMS extends AbstractProcessor {
                 final String flowFileDescription = successfulFlowFiles.size() 
> 10 ? successfulFlowFiles.size() + " FlowFiles" : 
successfulFlowFiles.toString();
                 logger.info("Sent {} to JMS Server and transferred to 
'success'", new Object[]{flowFileDescription});
             } catch (JMSException e) {
-                logger.error("Failed to commit JMS Session due to {}; rolling 
back session", new Object[]{e});
-                session.rollback();
+                logger.error("Failed to commit JMS Session due to {} and 
transferred to 'failure'", new Object[]{e});
+                session.transfer(flowFiles, REL_FAILURE);
+                context.yield();
                 wrappedProducer.close(logger);
             }
         } finally {

http://git-wip-us.apache.org/repos/asf/nifi/blob/de2dd93f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java
new file mode 100644
index 0000000..acba96c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.standard.util.JmsFactory;
+import org.apache.nifi.processors.standard.util.JmsProperties;
+import org.apache.nifi.processors.standard.util.WrappedMessageProducer;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+public class TestPutJMS {
+
+    private final String TEST_PROVIDER = JmsProperties.ACTIVEMQ_PROVIDER;
+    private final String TEST_URL = "vm://localhost?broker.persistent=false";
+    private final String TEST_DEST_TYPE = JmsProperties.DESTINATION_TYPE_QUEUE;
+    private final String TEST_DEST_NAME = "queue.testing";
+    private final String TEST_ACK_MODE = JmsProperties.ACK_MODE_AUTO;
+
+    private String testQueueSuffix() {
+        final StackTraceElement[] trace = 
Thread.currentThread().getStackTrace();
+        return "." + trace[2].getMethodName();
+    }
+
+    private void injectFieldValue(Class klass, Object instance, String 
fieldName, Object fieldValue) throws NoSuchFieldException, 
IllegalAccessException {
+        Field field = klass.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        field.set(instance, fieldValue);
+    }
+
+    @Test
+    public void testGetRelationships() {
+        final PutJMS putJMS = new PutJMS();
+        final Set<Relationship> relationships = putJMS.getRelationships();
+        assertEquals(2, relationships.size());
+        assertTrue(relationships.contains(PutJMS.REL_FAILURE));
+        assertTrue(relationships.contains(PutJMS.REL_SUCCESS));
+    }
+
+    @Test
+    public void testCleanupResources() throws JMSException, 
NoSuchFieldException, IllegalAccessException {
+        final PutJMS putJMS = new PutJMS();
+        final TestRunner runnerPut = TestRunners.newTestRunner(putJMS);
+        runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerPut.setProperty(JmsProperties.URL, TEST_URL);
+        runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE);
+        runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + 
testQueueSuffix());
+
+        final Queue<WrappedMessageProducer> wrappedMessageProducerQueue = 
(Queue) spy(new LinkedBlockingQueue<>());
+        injectFieldValue(PutJMS.class, putJMS, "producerQueue", 
wrappedMessageProducerQueue);
+
+        final WrappedMessageProducer wrappedProducer = 
JmsFactory.createMessageProducer(runnerPut.getProcessContext(), true);
+        wrappedMessageProducerQueue.offer(wrappedProducer);
+
+        assertNotNull(wrappedMessageProducerQueue.peek());
+        putJMS.cleanupResources();
+        assertNull(wrappedMessageProducerQueue.peek());
+    }
+
+
+    @Test
+    public void testCreateMessageDirectly() throws JMSException {
+        final PutJMS putJMS = new PutJMS();
+        final TestRunner runnerPut = TestRunners.newTestRunner(putJMS);
+        runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerPut.setProperty(JmsProperties.URL, TEST_URL);
+        runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE);
+        runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + 
testQueueSuffix());
+        runnerPut.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, 
TEST_ACK_MODE);
+
+        final WrappedMessageProducer wrappedProducer = 
JmsFactory.createMessageProducer(runnerPut.getProcessContext(), true);
+        final Session jmsSession = wrappedProducer.getSession();
+        final MessageProducer producer = wrappedProducer.getProducer();
+        final Message message = 
jmsSession.createTextMessage("createMessageDirectly");
+
+        producer.send(message);
+        jmsSession.commit();
+
+        final GetJMSQueue getJmsQueue = new GetJMSQueue();
+        final TestRunner runnerGet = TestRunners.newTestRunner(getJmsQueue);
+
+        runnerGet.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerGet.setProperty(JmsProperties.URL, TEST_URL);
+        runnerGet.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + 
testQueueSuffix());
+        runnerGet.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, 
TEST_ACK_MODE);
+
+        runnerGet.run();
+
+        final List<MockFlowFile> flowFiles = 
runnerGet.getFlowFilesForRelationship(
+                new Relationship.Builder().name("success").build());
+
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile successFlowFile = flowFiles.get(0);
+        successFlowFile.assertContentEquals("createMessageDirectly");
+        successFlowFile.assertAttributeEquals("jms.JMSDestination", 
TEST_DEST_NAME + testQueueSuffix());
+        producer.close();
+        jmsSession.close();
+    }
+
+    @Test
+    public void testPutGetAttributesAndProps() throws JMSException {
+        final PutJMS putJMS = spy(new PutJMS());
+        final TestRunner runnerPut = TestRunners.newTestRunner(putJMS);
+        runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerPut.setProperty(JmsProperties.URL, TEST_URL);
+        runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE);
+        runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + 
testQueueSuffix());
+        runnerPut.setProperty(JmsProperties.REPLY_TO_QUEUE, TEST_DEST_NAME + 
testQueueSuffix() + ".reply");
+        runnerPut.setProperty(JmsProperties.ATTRIBUTES_TO_JMS_PROPS, "true");
+
+        runnerPut.run();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "file1.txt");
+        attributes.put("jms.string", "banana");
+        attributes.put("jms.integer", "50");
+        attributes.put("jms.integer.type", "integer");
+        attributes.put("jms.float", "3.14159");
+        attributes.put("jms.float.type", "float");
+        attributes.put("jms.boolean", "true");
+        attributes.put("jms.boolean.type", "boolean");
+        attributes.put("jms.long", "123456789");
+        attributes.put("jms.long.type", "long");
+        attributes.put("jms.short", "16384");
+        attributes.put("jms.short.type", "short");
+        attributes.put("jms.byte", "127");
+        attributes.put("jms.byte.type", "byte");
+        attributes.put("jms.double", "3.1415626547");
+        attributes.put("jms.double.type", "double");
+        attributes.put("jms.object", "{\"id\":215, \"name\": \"john doe\"}");
+        attributes.put("jms.object.type", "object");
+        attributes.put("jms.eyes", "blue");
+        attributes.put("jms.eyes.type", "color");
+        attributes.put("jms.badinteger", "3.14");
+        attributes.put("jms.badinteger.type", "integer");
+        runnerPut.enqueue("putGetMessage".getBytes(), attributes);
+
+        runnerPut.run();
+
+        assertEquals(0, 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size());
+        assertEquals(1, 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size());
+
+        final GetJMSQueue getJmsQueue = new GetJMSQueue();
+        final TestRunner runnerGet = TestRunners.newTestRunner(getJmsQueue);
+        runnerGet.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerGet.setProperty(JmsProperties.URL, TEST_URL);
+        runnerGet.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + 
testQueueSuffix());
+        runnerGet.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, 
TEST_ACK_MODE);
+        runnerGet.setProperty(JmsProperties.JMS_PROPS_TO_ATTRIBUTES, "true");
+
+        runnerGet.run();
+
+        assertEquals(1, 
runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS).size());
+
+        final List<MockFlowFile> flowFilesGet = 
runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS);
+
+        assertEquals(1, flowFilesGet.size());
+        final MockFlowFile successFlowFile = flowFilesGet.get(0);
+
+        successFlowFile.assertContentEquals("putGetMessage");
+        successFlowFile.assertAttributeEquals("jms.JMSDestination", 
TEST_DEST_NAME + testQueueSuffix());
+        successFlowFile.assertAttributeEquals("jms.JMSReplyTo", "queue://" + 
TEST_DEST_NAME + testQueueSuffix() + ".reply");
+        successFlowFile.assertAttributeEquals("jms.string", "banana");
+        successFlowFile.assertAttributeEquals("jms.integer", "50");
+        successFlowFile.assertAttributeEquals("jms.float", "3.14159");
+        successFlowFile.assertAttributeEquals("jms.boolean", "true");
+        successFlowFile.assertAttributeEquals("jms.long", "123456789");
+        successFlowFile.assertAttributeEquals("jms.short", "16384");
+        successFlowFile.assertAttributeEquals("jms.byte", "127");
+        successFlowFile.assertAttributeEquals("jms.double", "3.1415626547");
+        successFlowFile.assertAttributeEquals("jms.object", "{\"id\":215, 
\"name\": \"john doe\"}");
+        successFlowFile.assertAttributeEquals("jms.eyes", null);
+        successFlowFile.assertAttributeEquals("jms.badinteger", null);
+    }
+
+    @Test
+    public void testPutGetMessageTypes() throws JMSException {
+        final GetJMSQueue getJmsQueue = new GetJMSQueue();
+        final TestRunner runnerGet = TestRunners.newTestRunner(getJmsQueue);
+        runnerGet.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerGet.setProperty(JmsProperties.URL, TEST_URL);
+        runnerGet.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + 
testQueueSuffix());
+        runnerGet.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, 
TEST_ACK_MODE);
+        runnerGet.setProperty(JmsProperties.JMS_PROPS_TO_ATTRIBUTES, "true");
+
+        //------------------------------------------------------------
+        final PutJMS putJMStext = spy(new PutJMS());
+        final TestRunner runnerPutText = TestRunners.newTestRunner(putJMStext);
+        runnerPutText.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerPutText.setProperty(JmsProperties.URL, TEST_URL);
+        runnerPutText.setProperty(JmsProperties.DESTINATION_TYPE, 
TEST_DEST_TYPE);
+        runnerPutText.setProperty(JmsProperties.DESTINATION_NAME, 
TEST_DEST_NAME + testQueueSuffix());
+        runnerPutText.setProperty(JmsProperties.MESSAGE_TYPE, 
JmsProperties.MSG_TYPE_TEXT);
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "file1.txt");
+        runnerPutText.enqueue("putGetTextMessage", attributes);
+
+        runnerPutText.run();
+
+        assertEquals(0, 
runnerPutText.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size());
+        assertEquals(1, 
runnerPutText.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size());
+
+        runnerGet.run();
+
+        final List<MockFlowFile> ffText = 
runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS);
+        assertEquals(1, ffText.size());
+        final MockFlowFile successText = ffText.get(0);
+        successText.assertContentEquals("putGetTextMessage");
+
+        //------------------------------------------------------------
+        final PutJMS putJMSempty = spy(new PutJMS());
+        final TestRunner runnerPutEmpty = 
TestRunners.newTestRunner(putJMSempty);
+        runnerPutEmpty.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerPutEmpty.setProperty(JmsProperties.URL, TEST_URL);
+        runnerPutEmpty.setProperty(JmsProperties.DESTINATION_TYPE, 
TEST_DEST_TYPE);
+        runnerPutEmpty.setProperty(JmsProperties.DESTINATION_NAME, 
TEST_DEST_NAME + testQueueSuffix());
+        runnerPutEmpty.setProperty(JmsProperties.MESSAGE_TYPE, 
JmsProperties.MSG_TYPE_EMPTY);
+
+        final Map<String, String> attributesEmpty = new HashMap<>();
+        attributesEmpty.put("filename", "file1.txt");
+        runnerPutEmpty.enqueue("putGetEmptyMessage", attributesEmpty);
+
+        runnerPutEmpty.run();
+
+        assertEquals(0, 
runnerPutEmpty.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size());
+        assertEquals(1, 
runnerPutEmpty.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size());
+
+        runnerGet.clearTransferState();
+        runnerGet.run();
+
+        final List<MockFlowFile> ffEmpty = 
runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS);
+        assertEquals(1, ffEmpty.size());
+        final MockFlowFile successEmpty = ffEmpty.get(0);
+        successEmpty.assertContentEquals("");
+
+        //------------------------------------------------------------
+        final PutJMS putJMSstream = spy(new PutJMS());
+        final TestRunner runnerPutStream = 
TestRunners.newTestRunner(putJMSstream);
+        runnerPutStream.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerPutStream.setProperty(JmsProperties.URL, TEST_URL);
+        runnerPutStream.setProperty(JmsProperties.DESTINATION_TYPE, 
TEST_DEST_TYPE);
+        runnerPutStream.setProperty(JmsProperties.DESTINATION_NAME, 
TEST_DEST_NAME + testQueueSuffix());
+        runnerPutStream.setProperty(JmsProperties.MESSAGE_TYPE, 
JmsProperties.MSG_TYPE_STREAM);
+
+        final Map<String, String> attributesStream = new HashMap<>();
+        attributesStream.put("filename", "file1.txt");
+        runnerPutStream.enqueue("putGetStreamMessage", attributesStream);
+
+        runnerGet.clearTransferState();
+        runnerPutStream.run();
+
+        assertEquals(0, 
runnerPutStream.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size());
+        assertEquals(1, 
runnerPutStream.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size());
+
+        runnerGet.run();
+
+        final List<MockFlowFile> ffStream = 
runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS);
+        assertEquals(1, ffStream.size());
+        final MockFlowFile successStream = ffStream.get(0);
+        successStream.assertContentEquals("putGetStreamMessage");
+
+        //------------------------------------------------------------
+        final PutJMS putJMSmap = spy(new PutJMS());
+        final TestRunner runnerPutMap = TestRunners.newTestRunner(putJMSmap);
+        runnerPutMap.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerPutMap.setProperty(JmsProperties.URL, TEST_URL);
+        runnerPutMap.setProperty(JmsProperties.DESTINATION_TYPE, 
TEST_DEST_TYPE);
+        runnerPutMap.setProperty(JmsProperties.DESTINATION_NAME, 
TEST_DEST_NAME + testQueueSuffix());
+        runnerPutMap.setProperty(JmsProperties.MESSAGE_TYPE, 
JmsProperties.MSG_TYPE_MAP);
+
+        final Map<String, String> attributesMap = new HashMap<>();
+        attributesMap.put("filename", "file1.txt");
+        runnerPutMap.enqueue("putGetMapMessage", attributesMap);
+
+        runnerGet.clearTransferState();
+        runnerPutMap.run();
+
+        assertEquals(0, 
runnerPutMap.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size());
+        assertEquals(1, 
runnerPutMap.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size());
+
+        runnerGet.run();
+
+        final List<MockFlowFile> ffMap = 
runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS);
+        assertEquals(1, ffMap.size());
+        final MockFlowFile successMap = ffMap.get(0);
+        successMap.assertContentEquals("");
+
+        //------------------------------------------------------------
+        final PutJMS putJMSByte = spy(new PutJMS());
+        final TestRunner runnerPutByte = TestRunners.newTestRunner(putJMSByte);
+        runnerPutByte.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerPutByte.setProperty(JmsProperties.URL, TEST_URL);
+        runnerPutByte.setProperty(JmsProperties.DESTINATION_TYPE, 
TEST_DEST_TYPE);
+        runnerPutByte.setProperty(JmsProperties.DESTINATION_NAME, 
TEST_DEST_NAME + testQueueSuffix());
+        runnerPutByte.setProperty(JmsProperties.MESSAGE_TYPE, 
JmsProperties.MSG_TYPE_BYTE);
+
+        final Map<String, String> attributesByte = new HashMap<>();
+        attributesByte.put("filename", "file1.txt");
+        runnerPutByte.enqueue("putGetTextMessage", attributesByte);
+
+        runnerPutByte.run();
+
+        assertEquals(0, 
runnerPutByte.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size());
+        assertEquals(1, 
runnerPutByte.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size());
+
+        runnerGet.clearTransferState();
+        runnerGet.run();
+
+        final List<MockFlowFile> ffByte = 
runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS);
+        assertEquals(1, ffByte.size());
+        final MockFlowFile successByte = ffByte.get(0);
+        successByte.assertContentEquals("putGetTextMessage");
+    }
+
+    @Test
+    public void testTTL() throws JMSException, InterruptedException {
+        final PutJMS putJMS = spy(new PutJMS());
+        final TestRunner runnerPut = TestRunners.newTestRunner(putJMS);
+        runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerPut.setProperty(JmsProperties.URL, TEST_URL);
+        runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE);
+        runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + 
testQueueSuffix());
+        runnerPut.setProperty(JmsProperties.MESSAGE_TTL, "3 s");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "file1.txt");
+        runnerPut.enqueue("ttl10secNotExpired".getBytes(), attributes);
+
+        runnerPut.run();
+
+        assertEquals(0, 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size());
+        assertEquals(1, 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size());
+
+        final GetJMSQueue getJmsQueue = new GetJMSQueue();
+        final TestRunner runnerGet = TestRunners.newTestRunner(getJmsQueue);
+        runnerGet.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerGet.setProperty(JmsProperties.URL, TEST_URL);
+        runnerGet.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + 
testQueueSuffix());
+        runnerGet.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, 
TEST_ACK_MODE);
+        runnerGet.setProperty(JmsProperties.TIMEOUT, "1 s");
+
+        runnerGet.run();
+
+        final List<MockFlowFile> flowFiles1 = 
runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS);
+        assertEquals(1, flowFiles1.size());
+        final MockFlowFile successFlowFile1 = flowFiles1.get(0);
+        successFlowFile1.assertContentEquals("ttl10secNotExpired");
+
+        runnerPut.clearTransferState();
+        runnerGet.clearTransferState();
+
+        runnerPut.setProperty(JmsProperties.MESSAGE_TTL, "1 s");
+        runnerPut.enqueue("ttl1secExpired".getBytes(), attributes);
+
+        runnerPut.run();
+
+        assertEquals(0, 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size());
+        assertEquals(1, 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size());
+
+        Thread.sleep(2000L);
+        runnerGet.run();
+
+        final List<MockFlowFile> flowFiles2 = 
runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS);
+        assertEquals(0, flowFiles2.size());
+    }
+
+    @Test
+    public void testFailureOnFileExceedsBufferSize() throws JMSException {
+        final PutJMS putJMS = spy(new PutJMS());
+        final TestRunner runnerPut = TestRunners.newTestRunner(putJMS);
+        runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerPut.setProperty(JmsProperties.URL, TEST_URL);
+        runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE);
+        runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + 
testQueueSuffix());
+        runnerPut.setProperty(JmsProperties.MAX_BUFFER_SIZE, "10 B");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "file1.txt");
+        runnerPut.enqueue("failureOnFileExceedsBufferSize".getBytes(), 
attributes);
+
+        runnerPut.run();
+
+        assertEquals(1, 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size());
+        assertEquals(0, 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size());
+    }
+
+    @Test(expected = NumberFormatException.class)
+    public void testBadMessagePriorityValueFails() throws JMSException {
+        final PutJMS putJMS = spy(new PutJMS());
+        final TestRunner runnerPut = TestRunners.newTestRunner(putJMS);
+        runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerPut.setProperty(JmsProperties.URL, TEST_URL);
+        runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE);
+        runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + 
testQueueSuffix());
+        runnerPut.setProperty(JmsProperties.MESSAGE_PRIORITY, "negative one");
+        assertEquals(PutJMS.DEFAULT_MESSAGE_PRIORITY, 
runnerPut.getProcessContext().getProperty(JmsProperties.MESSAGE_PRIORITY).asInteger().intValue());
+    }
+
+    @Test
+    public void testBadMessagePriorityRunSucceeds() throws JMSException {
+        final PutJMS putJMS = spy(new PutJMS());
+        final TestRunner runnerPut = TestRunners.newTestRunner(putJMS);
+        runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerPut.setProperty(JmsProperties.URL, TEST_URL);
+        runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE);
+        runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + 
testQueueSuffix());
+        runnerPut.setProperty(JmsProperties.MESSAGE_PRIORITY, "negative one");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "file1.txt");
+        runnerPut.enqueue("badMessagePriorityRunSucceeds".getBytes(), 
attributes);
+
+        runnerPut.run();
+
+        assertEquals(0, 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size());
+        assertEquals(1, 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size());
+    }
+
+    @Test
+    public void testPutSendRoutesToFailure() throws JMSException, 
NoSuchFieldException, IllegalAccessException {
+
+        final PutJMS putJMS = spy(new PutJMS());
+
+        final TestRunner runnerPut = TestRunners.newTestRunner(putJMS);
+        runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerPut.setProperty(JmsProperties.URL, TEST_URL);
+        runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE);
+        runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + 
testQueueSuffix());
+
+        final ProcessContext context = runnerPut.getProcessContext();
+
+        final Queue<WrappedMessageProducer> wrappedMessageProducerQueue = 
(Queue) spy(new LinkedBlockingQueue<>());
+        injectFieldValue(PutJMS.class, putJMS, "producerQueue", 
wrappedMessageProducerQueue);
+
+        final WrappedMessageProducer wrappedMessageProducer = 
spy(JmsFactory.createMessageProducer(context, true));
+        final MessageProducer messageProducer = 
spy(wrappedMessageProducer.getProducer());
+
+        doAnswer(new Answer<WrappedMessageProducer>() {
+            @Override
+            public WrappedMessageProducer answer(InvocationOnMock 
invocationOnMock) {
+                return wrappedMessageProducer;
+            }
+        }).when(wrappedMessageProducerQueue).poll();
+        assertEquals(wrappedMessageProducer, 
wrappedMessageProducerQueue.poll());
+
+        doAnswer(new Answer<MessageProducer>() {
+            @Override
+            public MessageProducer answer(InvocationOnMock invocationOnMock) {
+                return messageProducer;
+            }
+        }).when(wrappedMessageProducer).getProducer();
+
+        doThrow(new JMSException("force send to 
fail")).when(messageProducer).send(any(Message.class));
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "file1.txt");
+        runnerPut.enqueue("putSendRoutesToFailure".getBytes(), attributes);
+
+        runnerPut.run();
+
+        assertEquals(0, 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size());
+        assertEquals(1, 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size());
+
+        final List<MockFlowFile> flowFilesFail = 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE);
+        assertEquals(1, flowFilesFail.size());
+    }
+
+    @Test
+    public void testPutCommitRoutesToFailure() throws JMSException, 
NoSuchFieldException, IllegalAccessException {
+
+        final PutJMS putJMS = spy(new PutJMS());
+
+        final TestRunner runnerPut = TestRunners.newTestRunner(putJMS);
+        runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER);
+        runnerPut.setProperty(JmsProperties.URL, TEST_URL);
+        runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE);
+        runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + 
testQueueSuffix());
+
+        final ProcessContext context = runnerPut.getProcessContext();
+        final Queue<WrappedMessageProducer> wrappedMessageProducerQueue = 
(Queue) spy(new LinkedBlockingQueue<>());
+        injectFieldValue(PutJMS.class, putJMS, "producerQueue", 
wrappedMessageProducerQueue);
+
+        final WrappedMessageProducer wrappedMessageProducer = 
spy(JmsFactory.createMessageProducer(context, true));
+        final MessageProducer messageProducer = 
spy(wrappedMessageProducer.getProducer());
+        final Connection connection = JmsFactory.createConnection(context);
+        final Session jmsSession = spy(JmsFactory.createSession(context, 
connection, true));
+
+        doAnswer(new Answer<WrappedMessageProducer>() {
+            @Override
+            public WrappedMessageProducer answer(InvocationOnMock 
invocationOnMock) {
+                return wrappedMessageProducer;
+            }
+        }).when(wrappedMessageProducerQueue).poll();
+
+        doAnswer(new Answer<MessageProducer>() {
+            @Override
+            public MessageProducer answer(InvocationOnMock invocationOnMock) {
+                return messageProducer;
+            }
+        }).when(wrappedMessageProducer).getProducer();
+
+        doAnswer(new Answer<Session>() {
+            @Override
+            public Session answer(InvocationOnMock invocationOnMock) {
+                return jmsSession;
+            }
+        }).when(wrappedMessageProducer).getSession();
+
+        doThrow(new JMSException("force commit to 
fail")).when(jmsSession).commit();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "file1.txt");
+        runnerPut.enqueue("putCommitRoutesToFailure".getBytes(), attributes);
+
+        runnerPut.run();
+
+        assertEquals(0, 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size());
+        assertEquals(1, 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size());
+
+        final List<MockFlowFile> flowFilesFail = 
runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE);
+        assertEquals(1, flowFilesFail.size());
+    }
+}
\ No newline at end of file

Reply via email to