Repository: nifi Updated Branches: refs/heads/master 938e32ed9 -> d36b76cc6
NIFI-2745 added _source destination name_ attribute to ConsumeJMS This closes #992 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d36b76cc Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d36b76cc Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d36b76cc Branch: refs/heads/master Commit: d36b76cc600b76748ffba9b94d6b702af552756f Parents: 938e32e Author: Oleg Zhurakousky <[email protected]> Authored: Wed Sep 7 11:04:54 2016 -0400 Committer: Oleg Zhurakousky <[email protected]> Committed: Wed Sep 7 15:28:34 2016 -0400 ---------------------------------------------------------------------- .../org/apache/nifi/jms/processors/AbstractJMSProcessor.java | 5 ++++- .../main/java/org/apache/nifi/jms/processors/ConsumeJMS.java | 3 +++ .../java/org/apache/nifi/jms/processors/ConsumeJMSTest.java | 2 ++ 3 files changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d36b76cc/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index 398c5c1..20937b5 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -114,6 +114,8 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess private volatile CachingConnectionFactory cachingConnectionFactory; + protected volatile String destinationName; + /** * */ @@ -199,7 +201,8 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setConnectionFactory(this.cachingConnectionFactory); - jmsTemplate.setDefaultDestinationName(context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); + this.destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); + jmsTemplate.setDefaultDestinationName(this.destinationName); jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); // set of properties that may be good candidates for exposure via configuration http://git-wip-us.apache.org/repos/asf/nifi/blob/d36b76cc/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index ac05f2c..83e594a 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -54,6 +54,8 @@ import org.springframework.jms.core.JmsTemplate; @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class }) public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> { + public static final String JMS_SOURCE_DESTINATION_NAME = "jms.source.destination"; + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("All FlowFiles that are received from the JMS Destination are routed to this relationship") @@ -118,6 +120,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> { for (Entry<String, Object> headersEntry : jmsHeaders.entrySet()) { attributes.put(headersEntry.getKey(), String.valueOf(headersEntry.getValue())); } + attributes.put(JMS_SOURCE_DESTINATION_NAME, this.destinationName); flowFile = processSession.putAllAttributes(flowFile, attributes); return flowFile; } http://git-wip-us.apache.org/repos/asf/nifi/blob/d36b76cc/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java index 9366e8d..57d7dda 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java @@ -54,6 +54,8 @@ public class ConsumeJMSTest { assertNotNull(successFF); assertEquals("cooQueue", successFF.getAttributes().get(JmsHeaders.DESTINATION)); successFF.assertContentEquals("Hey dude!".getBytes()); + String sourceDestination = successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME); + assertNotNull(sourceDestination); ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); }
