NIFI-2789, NIFI-2790 - polishing

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c2e98f96
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c2e98f96
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c2e98f96

Branch: refs/heads/0.x
Commit: c2e98f96ebe30a6a8c919ed8a747a5829f5dea0f
Parents: 58fdfdc
Author: Oleg Zhurakousky <o...@suitcase.io>
Authored: Tue Sep 20 10:18:33 2016 -0400
Committer: Oleg Zhurakousky <o...@suitcase.io>
Committed: Tue Sep 20 10:20:31 2016 -0400

----------------------------------------------------------------------
 .../jms/processors/AbstractJMSProcessor.java     |  4 ++--
 .../apache/nifi/jms/processors/ConsumeJMS.java   | 16 ++++++++++------
 .../apache/nifi/jms/processors/JMSConsumer.java  |  2 +-
 .../apache/nifi/jms/processors/JMSPublisher.java |  7 +++----
 .../apache/nifi/jms/processors/PublishJMS.java   |  9 ++++-----
 .../nifi/jms/processors/ConsumeJMSTest.java      | 19 +++++--------------
 6 files changed, 25 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c2e98f96/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index ed45b84..54e2d89 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -66,9 +66,10 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
             .build();
     static final PropertyDescriptor DESTINATION = new 
PropertyDescriptor.Builder()
             .name("Destination Name")
-            .description("The name of the JMS Destination. Usually provided by 
the administrator (e.g., 'topic://myTopic').")
+            .description("The name of the JMS Destination. Usually provided by 
the administrator (e.g., 'topic://myTopic' or 'myTopic').")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
     static final PropertyDescriptor DESTINATION_TYPE = new 
PropertyDescriptor.Builder()
             .name("Destination Type")
@@ -198,7 +199,6 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
 
             JmsTemplate jmsTemplate = new JmsTemplate();
             jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
-            this.destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
             
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
 
             // set of properties that may be good candidates for exposure via 
configuration

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2e98f96/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index e8e0eb9..131d113 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -50,7 +50,7 @@ import org.springframework.jms.core.JmsTemplate;
 @Tags({ "jms", "get", "message", "receive", "consume" })
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Consumes JMS Message of type BytesMessage or 
TextMessage transforming its content to "
-        + "a FlowFile and transitioning it to 'success' relationship.")
+        + "a FlowFile and transitioning it to 'success' relationship. JMS 
attributes such as headers and properties will be copied as FlowFile 
attributes.")
 @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class })
 public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
 
@@ -88,8 +88,8 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
             });
             Map<String, Object> jmsHeaders = response.getMessageHeaders();
             Map<String, Object> jmsProperties = Collections.<String, 
Object>unmodifiableMap(response.getMessageProperties());
-            flowFile = this.updateFlowFileAttributesWithMap(jmsHeaders, 
flowFile, processSession);
-            flowFile = this.updateFlowFileAttributesWithMap(jmsProperties, 
flowFile, processSession);
+            flowFile = 
this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, 
processSession);
+            flowFile = 
this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, 
processSession);
             processSession.getProvenanceReporter().receive(flowFile, 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
             processSession.transfer(flowFile, REL_SUCCESS);
         } else {
@@ -114,11 +114,15 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
     }
 
     /**
-     *
+     * Copies JMS attributes (i.e., headers and properties) as FF attributes.
+     * Given that FF attributes mandate that values are of type String, the
+     * copied values of JMS attributes will be stringified via
+     * String.valueOf(attribute).
      */
-    private FlowFile updateFlowFileAttributesWithMap(Map<String, Object> map, 
FlowFile flowFile, ProcessSession processSession) {
+    private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, 
Object> jmsAttributes, FlowFile flowFile,
+            ProcessSession processSession) {
         Map<String, String> attributes = new HashMap<String, String>();
-        for (Entry<String, Object> entry : map.entrySet()) {
+        for (Entry<String, Object> entry : jmsAttributes.entrySet()) {
             attributes.put(entry.getKey(), String.valueOf(entry.getValue()));
         }
         flowFile = processSession.putAllAttributes(flowFile, attributes);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2e98f96/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index 4bc4370..5c3b599 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -61,7 +61,7 @@ final class JMSConsumer extends JMSWorker {
     /**
      *
      */
-    public JMSResponse consume(final String destinationName) {
+    public JMSResponse consume(String destinationName) {
         Message message = this.jmsTemplate.receive(destinationName);
         if (message != null) {
             byte[] messageBody = null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2e98f96/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index f469601..1c83540 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -63,7 +63,7 @@ final class JMSPublisher extends JMSWorker {
      *
      * @param messageBytes byte array representing contents of the message
      */
-    void publish(final String destinationName, byte[] messageBytes) {
+    void publish(String destinationName, byte[] messageBytes) {
         this.publish(destinationName, messageBytes, null);
     }
 
@@ -125,9 +125,8 @@ final class JMSPublisher extends JMSWorker {
      *
      */
     private void logUnbuildableDestination(String destinationName, String 
headerName) {
-        logger.warn("Failed to determine destination type from destination 
name '" + destinationName + "'. The '"
-                + headerName + "' will not be set.");
-        processLog.warn("Failed to determine destination type from destination 
name '" + destinationName + "'. The '"
+        this.processLog.warn("Failed to determine destination type from 
destination name '" + destinationName
+                + "'. The '"
                 + headerName + "' will not be set.");
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2e98f96/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index a9125f3..fb8b533 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -55,7 +55,7 @@ import org.springframework.jms.support.JmsHeaders;
 @Tags({ "jms", "put", "message", "send", "publish" })
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Creates a JMS Message from the contents of a FlowFile 
and sends it to a "
-        + "JMS Destination (queue or topic) as JMS BytesMessage.")
+        + "JMS Destination (queue or topic) as JMS BytesMessage. FlowFile 
attributes will be added as JMS headers and/or properties to the outgoing JMS 
message.")
 @SeeAlso(value = { ConsumeJMS.class, JMSConnectionFactoryProvider.class })
 public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
 
@@ -98,11 +98,10 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
         FlowFile flowFile = processSession.get();
         if (flowFile != null) {
             try {
-                final String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
-                this.targetResource.publish(destinationName, 
this.extractMessageBody(flowFile, processSession),
-                        flowFile.getAttributes());
+                String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
+                this.targetResource.publish(destinationName, 
this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
                 processSession.transfer(flowFile, REL_SUCCESS);
-                processSession.getProvenanceReporter().send(flowFile, 
context.getProperty(DESTINATION).getValue());
+                processSession.getProvenanceReporter().send(flowFile, 
destinationName);
             } catch (Exception e) {
                 processSession.transfer(flowFile, REL_FAILURE);
                 this.getLogger().error("Failed while sending message to JMS 
via " + this.targetResource, e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2e98f96/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
index a6305bc..7b66672 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.nifi.jms.processors;
 
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -29,30 +33,17 @@ import 
org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.support.JmsHeaders;
 
-<<<<<<< HEAD
-=======
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
->>>>>>> c238676... NIFI-2789, NIFI-2790 - Read JMS properties and add to 
FlowFile attributes in ConsumeJMS
 public class ConsumeJMSTest {
 
     @Test
     public void validateSuccessfulConsumeAndTransferToSuccess() throws 
Exception {
-<<<<<<< HEAD
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("cooQueue", false);
-        JMSPublisher sender = new JMSPublisher(jmsTemplate, 
mock(ProcessorLog.class));
-        sender.publish("Hey dude!".getBytes());
-=======
         final String  destinationName = "cooQueue";
         JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
-        JMSPublisher sender = new JMSPublisher(jmsTemplate, 
mock(ComponentLog.class));
+        JMSPublisher sender = new JMSPublisher(jmsTemplate, 
mock(ProcessorLog.class));
         final Map<String, String> senderAttributes = new HashMap<>();
         senderAttributes.put("filename", "message.txt");
         senderAttributes.put("attribute_from_sender", "some value");
         sender.publish(destinationName, "Hey dude!".getBytes(), 
senderAttributes);
->>>>>>> c238676... NIFI-2789, NIFI-2790 - Read JMS properties and add to 
FlowFile attributes in ConsumeJMS
         TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
         JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
         when(cs.getIdentifier()).thenReturn("cfProvider");

Reply via email to