Repository: nifi Updated Branches: refs/heads/master d42f1d4ad -> 8d37af07b
NIFI-1381 Removing the hardcoded jms:// prefix and instead deferring to the URI specified by the processor properties of PutJMS Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8d37af07 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8d37af07 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8d37af07 Branch: refs/heads/master Commit: 8d37af07b9f4486a063fda69f289bada97fac640 Parents: d42f1d4 Author: Aldrin Piri <[email protected]> Authored: Sun Jan 10 22:28:09 2016 -0500 Committer: joewitt <[email protected]> Committed: Sun Jan 24 23:10:47 2016 -0500 ---------------------------------------------------------------------- .../apache/nifi/processors/standard/PutJMS.java | 2 +- .../nifi/processors/standard/TestPutJMS.java | 46 +++++++++++++++++++- 2 files changed, 46 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/8d37af07/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java index 7405ca5..c2cd35f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java @@ -241,7 +241,7 @@ public class PutJMS extends AbstractProcessor { } successfulFlowFiles.add(flowFile); - session.getProvenanceReporter().send(flowFile, "jms://" + context.getProperty(URL).getValue()); + session.getProvenanceReporter().send(flowFile, context.getProperty(URL).getValue()); } try { http://git-wip-us.apache.org/repos/asf/nifi/blob/8d37af07/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java index df1e4a4..f27f3cb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutJMS.java @@ -21,6 +21,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processors.standard.util.JmsFactory; import org.apache.nifi.processors.standard.util.JmsProperties; import org.apache.nifi.processors.standard.util.WrappedMessageProducer; +import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -127,7 +128,7 @@ public class TestPutJMS { runnerGet.run(); final List<MockFlowFile> flowFiles = runnerGet.getFlowFilesForRelationship( - new Relationship.Builder().name("success").build()); + new Relationship.Builder().name("success").build()); assertEquals(1, flowFiles.size()); final MockFlowFile successFlowFile = flowFiles.get(0); @@ -561,4 +562,47 @@ public class TestPutJMS { final List<MockFlowFile> flowFilesFail = runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE); assertEquals(1, flowFilesFail.size()); } + + @Test + public void testPutProvenanceSendEventTransitUri() throws JMSException { + final PutJMS putJMS = spy(new PutJMS()); + final TestRunner runnerPut = TestRunners.newTestRunner(putJMS); + runnerPut.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerPut.setProperty(JmsProperties.URL, TEST_URL); + runnerPut.setProperty(JmsProperties.DESTINATION_TYPE, TEST_DEST_TYPE); + runnerPut.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerPut.setProperty(JmsProperties.ATTRIBUTES_TO_JMS_PROPS, "true"); + + runnerPut.enqueue("putGetMessage".getBytes()); + + runnerPut.run(); + + assertEquals(0, runnerPut.getFlowFilesForRelationship(PutJMS.REL_FAILURE).size()); + assertEquals(1, runnerPut.getFlowFilesForRelationship(PutJMS.REL_SUCCESS).size()); + + final List<ProvenanceEventRecord> putProvenanceEvents = runnerPut.getProvenanceEvents(); + + assertEquals(1, putProvenanceEvents.size()); + // Verify the transitUri is the same as that configured in the properties + assertEquals(TEST_URL, putProvenanceEvents.get(0).getTransitUri()); + + final GetJMSQueue getJmsQueue = new GetJMSQueue(); + final TestRunner runnerGet = TestRunners.newTestRunner(getJmsQueue); + runnerGet.setProperty(JmsProperties.JMS_PROVIDER, TEST_PROVIDER); + runnerGet.setProperty(JmsProperties.URL, TEST_URL); + runnerGet.setProperty(JmsProperties.DESTINATION_NAME, TEST_DEST_NAME + testQueueSuffix()); + runnerGet.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, TEST_ACK_MODE); + + runnerGet.run(); + + assertEquals(1, runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS).size()); + + final List<MockFlowFile> flowFilesGet = runnerGet.getFlowFilesForRelationship(GetJMSQueue.REL_SUCCESS); + + assertEquals(1, flowFilesGet.size()); + final MockFlowFile successFlowFile = flowFilesGet.get(0); + + successFlowFile.assertContentEquals("putGetMessage"); + } + }
