Repository: nifi
Updated Branches:
  refs/heads/master c58b02518 -> 74bb341ab


NIFI-2630 Allow PublishJMS to send TextMessages
- Added configurable character set encoding for JMS TextMessages
- Improved PublishJMS/ConsumeJMS documentation
- Validate character set in property validator instead of OnScheduled


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/42e6fa42
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/42e6fa42
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/42e6fa42

Branch: refs/heads/master
Commit: 42e6fa42a38b8208e5aebc22968b62b4a2856e2e
Parents: c58b025
Author: Mike Moser <mose...@apache.org>
Authored: Wed Feb 7 16:00:57 2018 +0000
Committer: Mark Payne <marka...@hotmail.com>
Committed: Fri Mar 2 09:11:58 2018 -0500

----------------------------------------------------------------------
 .../nifi/processor/util/StandardValidators.java | 30 +++++++
 .../jms/processors/AbstractJMSProcessor.java    | 22 +++++
 .../apache/nifi/jms/processors/ConsumeJMS.java  | 30 ++++++-
 .../apache/nifi/jms/processors/JMSConsumer.java |  6 +-
 .../nifi/jms/processors/JMSPublisher.java       | 94 ++++++++++++--------
 .../processors/MessageBodyToBytesConverter.java | 17 +++-
 .../apache/nifi/jms/processors/PublishJMS.java  | 37 +++++++-
 .../additionalDetails.html                      |  6 +-
 .../additionalDetails.html                      |  6 +-
 .../jms/processors/JMSPublisherConsumerIT.java  | 20 +++--
 .../nifi/jms/processors/PublishJMSTest.java     | 44 ++++++++-
 11 files changed, 251 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/42e6fa42/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
index a596330..adf499a 100644
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -340,6 +340,36 @@ public class StandardValidators {
     };
 
     /**
+     * This validator will evaluate an expression using ONLY environment and 
variable registry properties,
+     * then validate that the result is a supported character set.
+     */
+    public static final Validator CHARACTER_SET_VALIDATOR_WITH_EVALUATION = 
new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
+            String evaluatedInput = input;
+            if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(input)) {
+                try {
+                    PropertyValue propertyValue = 
context.newPropertyValue(input);
+                    evaluatedInput = (propertyValue == null) ? input : 
propertyValue.evaluateAttributeExpressions().getValue();
+                } catch (final Exception e) {
+                    return new 
ValidationResult.Builder().subject(subject).input(input).explanation("Not a 
valid expression").valid(false).build();
+                }
+            }
+
+            String reason = null;
+            try {
+                if (!Charset.isSupported(evaluatedInput)) {
+                    reason = "Character Set is not supported by this JVM.";
+                }
+            } catch (final IllegalArgumentException iae) {
+                reason = "Character Set value is null or is not supported by 
this JVM.";
+            }
+
+            return new 
ValidationResult.Builder().subject(subject).input(evaluatedInput).explanation(reason).valid(reason
 == null).build();
+        }
+    };
+
+    /**
      * URL Validator that does not allow the Expression Language to be used
      */
     public static final Validator URL_VALIDATOR = createURLValidator();

http://git-wip-us.apache.org/repos/asf/nifi/blob/42e6fa42/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 2758bfe..1ac468c 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.jms.processors;
 
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -51,6 +52,8 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
 
     static final String QUEUE = "QUEUE";
     static final String TOPIC = "TOPIC";
+    static final String TEXT_MESSAGE = "text";
+    static final String BYTES_MESSAGE = "bytes";
 
     static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
             .name("User Name")
@@ -96,6 +99,23 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
             .defaultValue("1")
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             .build();
+    static final PropertyDescriptor MESSAGE_BODY = new 
PropertyDescriptor.Builder()
+            .name("message-body-type")
+            .displayName("Message Body Type")
+            .description("The type of JMS message body to construct.")
+            .required(true)
+            .defaultValue(BYTES_MESSAGE)
+            .allowableValues(BYTES_MESSAGE, TEXT_MESSAGE)
+            .build();
+    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+            .name("character-set")
+            .displayName("Character Set")
+            .description("The name of the character set to use to construct or 
interpret TextMessages")
+            .required(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue(Charset.defaultCharset().name())
+            .expressionLanguageSupported(true)
+            .build();
 
 
     static final PropertyDescriptor CF_SERVICE = new 
PropertyDescriptor.Builder()
@@ -117,6 +137,8 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
         propertyDescriptors.add(PASSWORD);
         propertyDescriptors.add(CLIENT_ID);
         propertyDescriptors.add(SESSION_CACHE_SIZE);
+        propertyDescriptors.add(MESSAGE_BODY);
+        propertyDescriptors.add(CHARSET);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/42e6fa42/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index a199411..e3e9a75 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -29,6 +29,8 @@ import javax.jms.Session;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -45,6 +47,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.JmsHeaders;
 
 /**
  * Consuming JMS processor which upon each invocation of
@@ -57,6 +60,19 @@ import org.springframework.jms.core.JmsTemplate;
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Consumes JMS Message of type BytesMessage or 
TextMessage transforming its content to "
         + "a FlowFile and transitioning it to 'success' relationship. JMS 
attributes such as headers and properties will be copied as FlowFile 
attributes.")
+@WritesAttributes({
+        @WritesAttribute(attribute = JmsHeaders.DELIVERY_MODE, description = 
"The JMSDeliveryMode from the message header."),
+        @WritesAttribute(attribute = JmsHeaders.EXPIRATION, description = "The 
JMSExpiration from the message header."),
+        @WritesAttribute(attribute = JmsHeaders.PRIORITY, description = "The 
JMSPriority from the message header."),
+        @WritesAttribute(attribute = JmsHeaders.REDELIVERED, description = 
"The JMSRedelivered from the message header."),
+        @WritesAttribute(attribute = JmsHeaders.TIMESTAMP, description = "The 
JMSTimestamp from the message header."),
+        @WritesAttribute(attribute = JmsHeaders.CORRELATION_ID, description = 
"The JMSCorrelationID from the message header."),
+        @WritesAttribute(attribute = JmsHeaders.MESSAGE_ID, description = "The 
JMSMessageID from the message header."),
+        @WritesAttribute(attribute = JmsHeaders.TYPE, description = "The 
JMSType from the message header."),
+        @WritesAttribute(attribute = JmsHeaders.REPLY_TO, description = "The 
JMSReplyTo from the message header."),
+        @WritesAttribute(attribute = JmsHeaders.DESTINATION, description = 
"The JMSDestination from the message header."),
+        @WritesAttribute(attribute = "other attributes", description = "Each 
message property is written to an attribute.")
+})
 @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class })
 public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
 
@@ -125,6 +141,15 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
     static {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.addAll(propertyDescriptors);
+        _propertyDescriptors.remove(MESSAGE_BODY);
+
+        // change the validator on CHARSET property
+        _propertyDescriptors.remove(CHARSET);
+        PropertyDescriptor CHARSET_WITH_EL_VALIDATOR_PROPERTY = new 
PropertyDescriptor.Builder().fromPropertyDescriptor(CHARSET)
+                
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR_WITH_EVALUATION)
+                .build();
+        _propertyDescriptors.add(CHARSET_WITH_EL_VALIDATOR_PROPERTY);
+
         _propertyDescriptors.add(ACKNOWLEDGEMENT_MODE);
         _propertyDescriptors.add(DURABLE_SUBSCRIBER);
         _propertyDescriptors.add(SHARED_SUBSCRIBER);
@@ -138,7 +163,7 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
 
     /**
      * Will construct a {@link FlowFile} containing the body of the consumed 
JMS
-     * message (if {@link GetResponse} returned by {@link JMSConsumer} is not
+     * message (if {@link JMSResponse} returned by {@link JMSConsumer} is not
      * null) and JMS properties that came with message which are added to a
      * {@link FlowFile} as attributes, transferring {@link FlowFile} to
      * 'success' {@link Relationship}.
@@ -151,8 +176,9 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
         final Boolean sharedBoolean = 
context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
         final boolean shared = sharedBoolean == null ? false : sharedBoolean;
         final String subscriptionName = 
context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
+        final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
 
-        consumer.consume(destinationName, durable, shared, subscriptionName, 
new ConsumerCallback() {
+        consumer.consume(destinationName, durable, shared, subscriptionName, 
charset, new ConsumerCallback() {
             @Override
             public void accept(final JMSResponse response) {
                 if (response == null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/42e6fa42/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index 841b62d..07aee32 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.jms.processors;
 
+import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -78,7 +79,8 @@ final class JMSConsumer extends JMSWorker {
     }
 
 
-    public void consume(final String destinationName, final boolean durable, 
final boolean shared, final String subscriberName, final ConsumerCallback 
consumerCallback) {
+    public void consume(final String destinationName, final boolean durable, 
final boolean shared, final String subscriberName, final String charset,
+                        final ConsumerCallback consumerCallback) {
         this.jmsTemplate.execute(new SessionCallback<Void>() {
             @Override
             public Void doInJms(final Session session) throws JMSException {
@@ -95,7 +97,7 @@ final class JMSConsumer extends JMSWorker {
                     if (message != null) {
                         byte[] messageBody = null;
                         if (message instanceof TextMessage) {
-                            messageBody = 
MessageBodyToBytesConverter.toBytes((TextMessage) message);
+                            messageBody = 
MessageBodyToBytesConverter.toBytes((TextMessage) message, 
Charset.forName(charset));
                         } else if (message instanceof BytesMessage) {
                             messageBody = 
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
                         } else {

http://git-wip-us.apache.org/repos/asf/nifi/blob/42e6fa42/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index 671f5c9..9912c81 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -25,6 +25,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 
 import org.apache.nifi.logging.ComponentLog;
@@ -54,48 +55,69 @@ final class JMSPublisher extends JMSWorker {
             public Message createMessage(Session session) throws JMSException {
                 BytesMessage message = session.createBytesMessage();
                 message.writeBytes(messageBytes);
+                setMessageHeaderAndProperties(message, flowFileAttributes);
+                return message;
+            }
+        });
+    }
 
-                if (flowFileAttributes != null && 
!flowFileAttributes.isEmpty()) {
-                    // set message headers and properties
-                    for (Entry<String, String> entry : 
flowFileAttributes.entrySet()) {
-                        if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && 
!entry.getKey().contains("-") && !entry.getKey().contains(".")) {// '-' and '.' 
are illegal char in JMS prop names
-                            message.setStringProperty(entry.getKey(), 
entry.getValue());
-                        } else if 
(entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
-                            
message.setJMSDeliveryMode(Integer.parseInt(entry.getValue()));
-                        } else if 
(entry.getKey().equals(JmsHeaders.EXPIRATION)) {
-                            
message.setJMSExpiration(Integer.parseInt(entry.getValue()));
-                        } else if (entry.getKey().equals(JmsHeaders.PRIORITY)) 
{
-                            
message.setJMSPriority(Integer.parseInt(entry.getValue()));
-                        } else if 
(entry.getKey().equals(JmsHeaders.REDELIVERED)) {
-                            
message.setJMSRedelivered(Boolean.parseBoolean(entry.getValue()));
-                        } else if 
(entry.getKey().equals(JmsHeaders.TIMESTAMP)) {
-                            
message.setJMSTimestamp(Long.parseLong(entry.getValue()));
-                        } else if 
(entry.getKey().equals(JmsHeaders.CORRELATION_ID)) {
-                            message.setJMSCorrelationID(entry.getValue());
-                        } else if (entry.getKey().equals(JmsHeaders.TYPE)) {
-                            message.setJMSType(entry.getValue());
-                        } else if (entry.getKey().equals(JmsHeaders.REPLY_TO)) 
{
-                            Destination destination = 
buildDestination(entry.getValue());
-                            if (destination != null) {
-                                message.setJMSReplyTo(destination);
-                            } else {
-                                logUnbuildableDestination(entry.getKey(), 
JmsHeaders.REPLY_TO);
-                            }
-                        } else if 
(entry.getKey().equals(JmsHeaders.DESTINATION)) {
-                            Destination destination = 
buildDestination(entry.getValue());
-                            if (destination != null) {
-                                message.setJMSDestination(destination);
-                            } else {
-                                logUnbuildableDestination(entry.getKey(), 
JmsHeaders.DESTINATION);
-                            }
-                        }
-                    }
-                }
+    void publish(String destinationName, String messageText) {
+        this.publish(destinationName, messageText, null);
+    }
+
+    void publish(String destinationName, String messageText, final Map<String, 
String> flowFileAttributes) {
+        this.jmsTemplate.send(destinationName, new MessageCreator() {
+            @Override
+            public Message createMessage(Session session) throws JMSException {
+                TextMessage message = session.createTextMessage(messageText);
+                setMessageHeaderAndProperties(message, flowFileAttributes);
                 return message;
             }
         });
     }
 
+    void setMessageHeaderAndProperties(final Message message, final 
Map<String, String> flowFileAttributes) throws JMSException {
+        if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) {
+            for (Entry<String, String> entry : flowFileAttributes.entrySet()) {
+                try {
+                    if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && 
!entry.getKey().contains("-") && !entry.getKey().contains(".")) {// '-' and '.' 
are illegal char in JMS prop names
+                        message.setStringProperty(entry.getKey(), 
entry.getValue());
+                    } else if 
(entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
+                        
message.setJMSDeliveryMode(Integer.parseInt(entry.getValue()));
+                    } else if (entry.getKey().equals(JmsHeaders.EXPIRATION)) {
+                        
message.setJMSExpiration(Integer.parseInt(entry.getValue()));
+                    } else if (entry.getKey().equals(JmsHeaders.PRIORITY)) {
+                        
message.setJMSPriority(Integer.parseInt(entry.getValue()));
+                    } else if (entry.getKey().equals(JmsHeaders.REDELIVERED)) {
+                        
message.setJMSRedelivered(Boolean.parseBoolean(entry.getValue()));
+                    } else if (entry.getKey().equals(JmsHeaders.TIMESTAMP)) {
+                        
message.setJMSTimestamp(Long.parseLong(entry.getValue()));
+                    } else if 
(entry.getKey().equals(JmsHeaders.CORRELATION_ID)) {
+                        message.setJMSCorrelationID(entry.getValue());
+                    } else if (entry.getKey().equals(JmsHeaders.TYPE)) {
+                        message.setJMSType(entry.getValue());
+                    } else if (entry.getKey().equals(JmsHeaders.REPLY_TO)) {
+                        Destination destination = 
buildDestination(entry.getValue());
+                        if (destination != null) {
+                            message.setJMSReplyTo(destination);
+                        } else {
+                            logUnbuildableDestination(entry.getKey(), 
JmsHeaders.REPLY_TO);
+                        }
+                    } else if (entry.getKey().equals(JmsHeaders.DESTINATION)) {
+                        Destination destination = 
buildDestination(entry.getValue());
+                        if (destination != null) {
+                            message.setJMSDestination(destination);
+                        } else {
+                            logUnbuildableDestination(entry.getKey(), 
JmsHeaders.DESTINATION);
+                        }
+                    }
+                } catch (NumberFormatException ne) {
+                    this.processLog.warn("Incompatible value for attribute " + 
entry.getKey()
+                            + " [" + entry.getValue() + "] is not a number. 
Ignoring this attribute.");
+                }
+            }
+        }
+    }
 
     private void logUnbuildableDestination(String destinationName, String 
headerName) {
         this.processLog.warn("Failed to determine destination type from 
destination name '{}'. The '{}' header will not be set.", new Object[] 
{destinationName, headerName});

http://git-wip-us.apache.org/repos/asf/nifi/blob/42e6fa42/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java
index ed212db..47827cc 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java
@@ -18,6 +18,7 @@ package org.apache.nifi.jms.processors;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.Charset;
 
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
@@ -36,8 +37,22 @@ abstract class MessageBodyToBytesConverter {
      * @return  byte array representing the {@link TextMessage}
      */
     public static byte[] toBytes(TextMessage message) {
+        return MessageBodyToBytesConverter.toBytes(message, null);
+    }
+
+    /**
+     *
+     * @param message instance of {@link TextMessage}
+     * @param charset character set used to interpret the TextMessage
+     * @return  byte array representing the {@link TextMessage}
+     */
+    public static byte[] toBytes(TextMessage message, Charset charset) {
         try {
-            return message.getText().getBytes();
+            if (charset == null) {
+                return message.getText().getBytes();
+            } else {
+                return message.getText().getBytes(charset);
+            }
         } catch (JMSException e) {
             throw new MessageConversionException("Failed to convert 
BytesMessage to byte[]", e);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/42e6fa42/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index 80d6f3e..5fe49bc 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.jms.processors;
 
+import java.io.StringWriter;
+import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -23,8 +25,11 @@ import java.util.Set;
 import javax.jms.Destination;
 import javax.jms.Message;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -53,7 +58,20 @@ import org.springframework.jms.support.JmsHeaders;
 @Tags({ "jms", "put", "message", "send", "publish" })
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Creates a JMS Message from the contents of a FlowFile 
and sends it to a "
-        + "JMS Destination (queue or topic) as JMS BytesMessage. FlowFile 
attributes will be added as JMS headers and/or properties to the outgoing JMS 
message.")
+        + "JMS Destination (queue or topic) as JMS BytesMessage or 
TextMessage. "
+        + "FlowFile attributes will be added as JMS headers and/or properties 
to the outgoing JMS message.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = JmsHeaders.DELIVERY_MODE, description = 
"This attribute becomes the JMSDeliveryMode message header. Must be an 
integer."),
+        @ReadsAttribute(attribute = JmsHeaders.EXPIRATION, description = "This 
attribute becomes the JMSExpiration message header. Must be an integer."),
+        @ReadsAttribute(attribute = JmsHeaders.PRIORITY, description = "This 
attribute becomes the JMSPriority message header. Must be an integer."),
+        @ReadsAttribute(attribute = JmsHeaders.REDELIVERED, description = 
"This attribute becomes the JMSRedelivered message header."),
+        @ReadsAttribute(attribute = JmsHeaders.TIMESTAMP, description = "This 
attribute becomes the JMSTimestamp message header. Must be a long."),
+        @ReadsAttribute(attribute = JmsHeaders.CORRELATION_ID, description = 
"This attribute becomes the JMSCorrelationID message header."),
+        @ReadsAttribute(attribute = JmsHeaders.TYPE, description = "This 
attribute becomes the JMSType message header. Must be an integer."),
+        @ReadsAttribute(attribute = JmsHeaders.REPLY_TO, description = "This 
attribute becomes the JMSReplyTo message header. Must be an integer."),
+        @ReadsAttribute(attribute = JmsHeaders.DESTINATION, description = 
"This attribute becomes the JMSDestination message header. Must be an 
integer."),
+        @ReadsAttribute(attribute = "other attributes", description = "All 
other attributes that do not start with " + JmsHeaders.PREFIX + " are added as 
message properties.")
+})
 @SeeAlso(value = { ConsumeJMS.class, JMSConnectionFactoryProvider.class })
 public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
 
@@ -96,7 +114,16 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
         if (flowFile != null) {
             try {
                 String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
-                publisher.publish(destinationName, 
this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
+                String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
+                switch (context.getProperty(MESSAGE_BODY).getValue()) {
+                    case TEXT_MESSAGE:
+                        publisher.publish(destinationName, 
this.extractTextMessageBody(flowFile, processSession, charset), 
flowFile.getAttributes());
+                        break;
+                    case BYTES_MESSAGE:
+                    default:
+                        publisher.publish(destinationName, 
this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
+                        break;
+                }
                 processSession.transfer(flowFile, REL_SUCCESS);
                 processSession.getProvenanceReporter().send(flowFile, 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
             } catch (Exception e) {
@@ -131,4 +158,10 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
         session.read(flowFile, in -> StreamUtils.fillBuffer(in, 
messageContent, true));
         return messageContent;
     }
+
+    private String extractTextMessageBody(FlowFile flowFile, ProcessSession 
session, String charset) {
+        final StringWriter writer = new StringWriter();
+        session.read(flowFile, in -> IOUtils.copy(in, writer, 
Charset.forName(charset)));
+        return writer.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/42e6fa42/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.ConsumeJMS/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.ConsumeJMS/additionalDetails.html
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.ConsumeJMS/additionalDetails.html
index ac40cec..52e2fed 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.ConsumeJMS/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.ConsumeJMS/additionalDetails.html
@@ -27,7 +27,7 @@
 </p>
 <p>
     This processor does two things. It constructs FlowFile by extracting 
information from the consumed JMS message including body, standard 
-    <a 
href="http://docs.spring.io/spring-integration/api/org/springframework/integration/jms/JmsHeaders.html";>JMS
 Headers</a> and Properties. 
+    <a 
href="http://docs.spring.io/spring-integration/docs/4.2.0.RELEASE/api/org/springframework/integration/jms/JmsHeaders.html";>JMS
 Headers</a> and Properties.
     The message body is written to a FlowFile while standard JMS Headers and 
Properties are set as FlowFile attributes.
 </p>
 
@@ -49,10 +49,6 @@
     <li><b>Destination Type</b> - [OPTIONAL] the type of the 
<i>javax.jms.Destination</i>. Could be one of 'QUEUE' or 'TOPIC'
     Usually provided by the administrator. Defaults to 'TOPIC'. 
     </li>
-    <li><b>Session Cache size</b> - [OPTIONAL] Specify the desired size for 
the JMS Session cache (per JMS Session type). This cache 
-    size is the maximum limit for the number of cached Sessions.
-    Usually provided by the administrator (e.g., '2453'). Defaults to '1'.
-    </li>
     <li><b>Connection Factory Service</b> - [REQUIRED] link to a 
pre-configured instance of org.apache.nifi.jms.cf.JMSConnectionFactoryProvider.
     </li>
 </ol>

http://git-wip-us.apache.org/repos/asf/nifi/blob/42e6fa42/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.PublishJMS/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.PublishJMS/additionalDetails.html
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.PublishJMS/additionalDetails.html
index a85b21c..1f381e2 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.PublishJMS/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.processors.PublishJMS/additionalDetails.html
@@ -28,7 +28,7 @@
 <p>
     This processor does two things. It constructs JMS Message by extracting 
FlowFile contents (both body and attributes). 
     Once message is constructed it is sent to a pre-configured JMS Destination.
-    Standard <a 
href="http://docs.spring.io/spring-integration/api/org/springframework/integration/jms/JmsHeaders.html";>JMS
 Headers</a> 
+    Standard <a 
href="http://docs.spring.io/spring-integration/docs/4.2.0.RELEASE/api/org/springframework/integration/jms/JmsHeaders.html";>JMS
 Headers</a>
     will be extracted from the FlowFile and set on <i>javax.jms.Message</i> as 
JMS headers while other 
     FlowFile attributes will be set as properties of <i>javax.jms.Message</i>. 
Upon success the incoming FlowFile is transferred
     to the <i>success</i> Relationship and upon failure FlowFile is
@@ -52,10 +52,6 @@
     <li><b>Destination Type</b> - [OPTIONAL] the type of the 
<i>javax.jms.Destination</i>. Could be one of 'QUEUE' or 'TOPIC'
     Usually provided by the administrator. Defaults to 'TOPIC'. 
     </li>
-    <li><b>Session Cache size</b> - [OPTIONAL] Specify the desired size for 
the JMS Session cache (per JMS Session type). This cache 
-    size is the maximum limit for the number of cached Sessions.
-    Usually provided by the administrator (e.g., '2453'). Defaults to '1'.
-    </li>
     <li><b>Connection Factory Service</b> - [REQUIRED] link to a 
pre-configured instance of org.apache.nifi.jms.cf.JMSConnectionFactoryProvider.
     </li>
 </ol>

http://git-wip-us.apache.org/repos/asf/nifi/blob/42e6fa42/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
index a356a41..83cd320 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.jms.processors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -73,12 +74,17 @@ public class JMSPublisherConsumerIT {
             JMSPublisher publisher = new 
JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), 
jmsTemplate, mock(ComponentLog.class));
             Map<String, String> flowFileAttributes = new HashMap<>();
             flowFileAttributes.put("foo", "foo");
+            flowFileAttributes.put("illegal-property", "value");
+            flowFileAttributes.put("another.illegal", "value");
             flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
+            flowFileAttributes.put(JmsHeaders.EXPIRATION, "never"); // value 
expected to be integer, make sure non-integer doesn't cause problems
             publisher.publish(destinationName, "hellomq".getBytes(), 
flowFileAttributes);
 
             Message receivedMessage = jmsTemplate.receive(destinationName);
             assertTrue(receivedMessage instanceof BytesMessage);
             assertEquals("foo", receivedMessage.getStringProperty("foo"));
+            assertFalse(receivedMessage.propertyExists("illegal-property"));
+            assertFalse(receivedMessage.propertyExists("another.illegal"));
             assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
             assertEquals("myTopic", ((Topic) 
receivedMessage.getJMSReplyTo()).getTopicName());
 
@@ -107,7 +113,7 @@ public class JMSPublisherConsumerIT {
             });
 
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
-            consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, "UTF-8", new 
ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     // noop
@@ -137,7 +143,7 @@ public class JMSPublisherConsumerIT {
 
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
             final AtomicBoolean callbackInvoked = new AtomicBoolean();
-            consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, "UTF-8", new 
ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     callbackInvoked.set(true);
@@ -184,7 +190,7 @@ public class JMSPublisherConsumerIT {
                         JMSConsumer consumer = new 
JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), 
consumeTemplate, mock(ComponentLog.class));
 
                         for (int j = 0; j < 1000 && msgCount.get() < 4000; 
j++) {
-                            consumer.consume(destinationName, false, false, 
null, callback);
+                            consumer.consume(destinationName, false, false, 
null, "UTF-8", callback);
                         }
                     } finally {
                         ((CachingConnectionFactory) 
consumeTemplate.getConnectionFactory()).destroy();
@@ -223,7 +229,7 @@ public class JMSPublisherConsumerIT {
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
             final AtomicBoolean callbackInvoked = new AtomicBoolean();
             try {
-                consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+                consumer.consume(destinationName, false, false, null, "UTF-8", 
new ConsumerCallback() {
                     @Override
                     public void accept(JMSResponse response) {
                         callbackInvoked.set(true);
@@ -240,7 +246,7 @@ public class JMSPublisherConsumerIT {
 
             // should receive the same message, but will process it 
successfully
             while (!callbackInvoked.get()) {
-                consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+                consumer.consume(destinationName, false, false, null, "UTF-8", 
new ConsumerCallback() {
                     @Override
                     public void accept(JMSResponse response) {
                         if (response == null) {
@@ -259,7 +265,7 @@ public class JMSPublisherConsumerIT {
             // receiving next message and fail again
             try {
                 while (!callbackInvoked.get()) {
-                    consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+                    consumer.consume(destinationName, false, false, null, 
"UTF-8", new ConsumerCallback() {
                         @Override
                         public void accept(JMSResponse response) {
                             if (response == null) {
@@ -281,7 +287,7 @@ public class JMSPublisherConsumerIT {
             // should receive the same message, but will process it 
successfully
             try {
                 while (!callbackInvoked.get()) {
-                    consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
+                    consumer.consume(destinationName, false, false, null, 
"UTF-8", new ConsumerCallback() {
                         @Override
                         public void accept(JMSResponse response) {
                             if (response == null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/42e6fa42/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
index f6f5ba5..a23da26 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
@@ -27,7 +27,9 @@ import org.springframework.jms.support.JmsHeaders;
 
 import javax.jms.BytesMessage;
 import javax.jms.ConnectionFactory;
+import javax.jms.Message;
 import javax.jms.Queue;
+import javax.jms.TextMessage;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -76,7 +78,7 @@ public class PublishJMSTest {
         runner.run(1, true, false); // Run once just so that we can trigger 
the shutdown of the Connection Factory
     }
 
-    @Test
+    @Test(timeout = 10000)
     public void validateSuccessfulPublishAndTransferToSuccessWithEL() throws 
Exception {
         ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
 
@@ -138,4 +140,44 @@ public class PublishJMSTest {
         
assertTrue(runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).isEmpty());
         
assertNotNull(runner.getFlowFilesForRelationship(PublishJMS.REL_FAILURE).get(0));
     }
+
+    @Test(timeout = 10000)
+    public void validatePublishTextMessage() throws Exception {
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+
+        final String destinationName = "fooQueue";
+        PublishJMS pubProc = new PublishJMS();
+        TestRunner runner = TestRunners.newTestRunner(pubProc);
+        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+        when(cs.getIdentifier()).thenReturn("cfProvider");
+        when(cs.getConnectionFactory()).thenReturn(cf);
+
+        runner.addControllerService("cfProvider", cs);
+        runner.enableControllerService(cs);
+
+        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+        runner.setProperty(PublishJMS.DESTINATION, destinationName);
+        runner.setProperty(PublishJMS.MESSAGE_BODY, "text");
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("foo", "foo");
+        attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
+        runner.enqueue("Hey dude!".getBytes(), attributes);
+        runner.run(1, false);
+
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+        assertNotNull(successFF);
+
+        JmsTemplate jmst = new JmsTemplate(cf);
+        Message message = jmst.receive(destinationName);
+        assertTrue(message instanceof TextMessage);
+        TextMessage textMessage = (TextMessage) message;
+
+        byte[] messageBytes = MessageBodyToBytesConverter.toBytes(textMessage);
+        assertEquals("Hey dude!", new String(messageBytes));
+        assertEquals("cooQueue", ((Queue) 
message.getJMSReplyTo()).getQueueName());
+        assertEquals("foo", message.getStringProperty("foo"));
+
+        runner.run(1, true, false); // Run once just so that we can trigger 
the shutdown of the Connection Factory
+    }
 }

Reply via email to