NIFI-2789, NIFI-2790 - polishing

This closes #1027


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b693a4a5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b693a4a5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b693a4a5

Branch: refs/heads/master
Commit: b693a4a5611b7ef48787cfa9cdde0bbffc95939f
Parents: c238676
Author: Oleg Zhurakousky <o...@suitcase.io>
Authored: Tue Sep 20 09:29:00 2016 -0400
Committer: Oleg Zhurakousky <o...@suitcase.io>
Committed: Tue Sep 20 09:32:43 2016 -0400

----------------------------------------------------------------------
 .../nifi/jms/processors/AbstractJMSProcessor.java   |  2 +-
 .../org/apache/nifi/jms/processors/ConsumeJMS.java  | 16 ++++++++++------
 .../org/apache/nifi/jms/processors/JMSConsumer.java |  2 +-
 .../apache/nifi/jms/processors/JMSPublisher.java    |  9 ++++-----
 .../org/apache/nifi/jms/processors/PublishJMS.java  |  7 +++----
 5 files changed, 19 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b693a4a5/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index d7c40f7..cae3dc2 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -66,7 +66,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
             .build();
     static final PropertyDescriptor DESTINATION = new 
PropertyDescriptor.Builder()
             .name("Destination Name")
-            .description("The name of the JMS Destination. Usually provided by 
the administrator (e.g., 'topic://myTopic').")
+            .description("The name of the JMS Destination. Usually provided by 
the administrator (e.g., 'topic://myTopic' or 'myTopic').")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(true)

http://git-wip-us.apache.org/repos/asf/nifi/blob/b693a4a5/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index cdd5fcd..aea6c9c 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -50,7 +50,7 @@ import org.springframework.jms.core.JmsTemplate;
 @Tags({ "jms", "get", "message", "receive", "consume" })
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Consumes JMS Message of type BytesMessage or 
TextMessage transforming its content to "
-        + "a FlowFile and transitioning it to 'success' relationship.")
+        + "a FlowFile and transitioning it to 'success' relationship. JMS 
attributes such as headers and properties will be copied as FlowFile 
attributes.")
 @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class })
 public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
 
@@ -90,8 +90,8 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
             });
             Map<String, Object> jmsHeaders = response.getMessageHeaders();
             Map<String, Object> jmsProperties = Collections.<String, 
Object>unmodifiableMap(response.getMessageProperties());
-            flowFile = this.updateFlowFileAttributesWithMap(jmsHeaders, 
flowFile, processSession);
-            flowFile = this.updateFlowFileAttributesWithMap(jmsProperties, 
flowFile, processSession);
+            flowFile = 
this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, 
processSession);
+            flowFile = 
this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, 
processSession);
             processSession.getProvenanceReporter().receive(flowFile, 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
             processSession.transfer(flowFile, REL_SUCCESS);
         } else {
@@ -116,11 +116,15 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
     }
 
     /**
-     *
+     * Copies JMS attributes (i.e., headers and properties) as FF attributes.
+     * Given that FF attributes mandate that values are of type String, the
+     * copied values of JMS attributes will be stringified via
+     * String.valueOf(attribute).
      */
-    private FlowFile updateFlowFileAttributesWithMap(Map<String, Object> map, 
FlowFile flowFile, ProcessSession processSession) {
+    private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, 
Object> jmsAttributes, FlowFile flowFile,
+            ProcessSession processSession) {
         Map<String, String> attributes = new HashMap<String, String>();
-        for (Entry<String, Object> entry : map.entrySet()) {
+        for (Entry<String, Object> entry : jmsAttributes.entrySet()) {
             attributes.put(entry.getKey(), String.valueOf(entry.getValue()));
         }
         attributes.put(JMS_SOURCE_DESTINATION_NAME, this.destinationName);

http://git-wip-us.apache.org/repos/asf/nifi/blob/b693a4a5/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index 1525738..0ddbe48 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -61,7 +61,7 @@ final class JMSConsumer extends JMSWorker {
     /**
      *
      */
-    public JMSResponse consume(final String destinationName) {
+    public JMSResponse consume(String destinationName) {
         Message message = this.jmsTemplate.receive(destinationName);
         if (message != null) {
             byte[] messageBody = null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b693a4a5/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index 49355f4..526f0bd 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -63,7 +63,7 @@ final class JMSPublisher extends JMSWorker {
      *
      * @param messageBytes byte array representing contents of the message
      */
-    void publish(final String destinationName, byte[] messageBytes) {
+    void publish(String destinationName, byte[] messageBytes) {
         this.publish(destinationName, messageBytes, null);
     }
 
@@ -74,7 +74,7 @@ final class JMSPublisher extends JMSWorker {
      * @param flowFileAttributes
      *            Map representing {@link FlowFile} attributes.
      */
-    void publish(final String destinationName, final byte[] messageBytes, 
final Map<String, String> flowFileAttributes) {
+    void publish(final String destinationName, final byte[] messageBytes, 
Map<String, String> flowFileAttributes) {
         this.jmsTemplate.send(destinationName, new MessageCreator() {
             @Override
             public Message createMessage(Session session) throws JMSException {
@@ -125,9 +125,8 @@ final class JMSPublisher extends JMSWorker {
      *
      */
     private void logUnbuildableDestination(String destinationName, String 
headerName) {
-        logger.warn("Failed to determine destination type from destination 
name '" + destinationName + "'. The '"
-                + headerName + "' will not be set.");
-        processLog.warn("Failed to determine destination type from destination 
name '" + destinationName + "'. The '"
+        this.processLog.warn("Failed to determine destination type from 
destination name '" + destinationName
+                + "'. The '"
                 + headerName + "' will not be set.");
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/b693a4a5/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index 36f7e86..1f7a44e 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -55,7 +55,7 @@ import org.springframework.jms.support.JmsHeaders;
 @Tags({ "jms", "put", "message", "send", "publish" })
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Creates a JMS Message from the contents of a FlowFile 
and sends it to a "
-        + "JMS Destination (queue or topic) as JMS BytesMessage.")
+        + "JMS Destination (queue or topic) as JMS BytesMessage. FlowFile 
attributes will be added as JMS headers and/or properties to the outgoing JMS 
message.")
 @SeeAlso(value = { ConsumeJMS.class, JMSConnectionFactoryProvider.class })
 public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
 
@@ -98,9 +98,8 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
         FlowFile flowFile = processSession.get();
         if (flowFile != null) {
             try {
-                final String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
-                this.targetResource.publish(destinationName, 
this.extractMessageBody(flowFile, processSession),
-                        flowFile.getAttributes());
+                String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
+                this.targetResource.publish(destinationName, 
this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
                 processSession.transfer(flowFile, REL_SUCCESS);
                 processSession.getProvenanceReporter().send(flowFile, 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
             } catch (Exception e) {

Reply via email to