Repository: nifi
Updated Branches:
  refs/heads/master 938e32ed9 -> d36b76cc6


NIFI-2745 added _source destination name_ attribute to ConsumeJMS

This closes #992


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d36b76cc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d36b76cc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d36b76cc

Branch: refs/heads/master
Commit: d36b76cc600b76748ffba9b94d6b702af552756f
Parents: 938e32e
Author: Oleg Zhurakousky <[email protected]>
Authored: Wed Sep 7 11:04:54 2016 -0400
Committer: Oleg Zhurakousky <[email protected]>
Committed: Wed Sep 7 15:28:34 2016 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/jms/processors/AbstractJMSProcessor.java    | 5 ++++-
 .../main/java/org/apache/nifi/jms/processors/ConsumeJMS.java    | 3 +++
 .../java/org/apache/nifi/jms/processors/ConsumeJMSTest.java     | 2 ++
 3 files changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d36b76cc/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 398c5c1..20937b5 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -114,6 +114,8 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
 
     private volatile CachingConnectionFactory cachingConnectionFactory;
 
+    protected volatile String destinationName;
+
     /**
      *
      */
@@ -199,7 +201,8 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
 
             JmsTemplate jmsTemplate = new JmsTemplate();
             jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
-            
jmsTemplate.setDefaultDestinationName(context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
+            this.destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
+            jmsTemplate.setDefaultDestinationName(this.destinationName);
             
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
 
             // set of properties that may be good candidates for exposure via 
configuration

http://git-wip-us.apache.org/repos/asf/nifi/blob/d36b76cc/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index ac05f2c..83e594a 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -54,6 +54,8 @@ import org.springframework.jms.core.JmsTemplate;
 @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class })
 public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
 
+    public static final String JMS_SOURCE_DESTINATION_NAME = 
"jms.source.destination";
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All FlowFiles that are received from the JMS 
Destination are routed to this relationship")
@@ -118,6 +120,7 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
         for (Entry<String, Object> headersEntry : jmsHeaders.entrySet()) {
             attributes.put(headersEntry.getKey(), 
String.valueOf(headersEntry.getValue()));
         }
+        attributes.put(JMS_SOURCE_DESTINATION_NAME, this.destinationName);
         flowFile = processSession.putAllAttributes(flowFile, attributes);
         return flowFile;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d36b76cc/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
index 9366e8d..57d7dda 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
@@ -54,6 +54,8 @@ public class ConsumeJMSTest {
         assertNotNull(successFF);
         assertEquals("cooQueue", 
successFF.getAttributes().get(JmsHeaders.DESTINATION));
         successFF.assertContentEquals("Hey dude!".getBytes());
+        String sourceDestination = 
successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME);
+        assertNotNull(sourceDestination);
 
         ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
     }

Reply via email to