Repository: nifi
Updated Branches:
  refs/heads/master feaa4c9db -> b693a4a56


NIFI-2789, NIFI-2790 - Read JMS properties and add to FlowFile attributes in 
ConsumeJMS

Remove unused assertEquals import

Move destination from default to send/receive to support EL better


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c2386760
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c2386760
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c2386760

Branch: refs/heads/master
Commit: c23867605857f8380140ba4d69437b58585e9cba
Parents: feaa4c9
Author: Joey Frazee <joey.fra...@icloud.com>
Authored: Mon Sep 19 08:18:30 2016 -0500
Committer: Oleg Zhurakousky <o...@suitcase.io>
Committed: Tue Sep 20 09:30:14 2016 -0400

----------------------------------------------------------------------
 .../jms/processors/AbstractJMSProcessor.java    |  1 -
 .../apache/nifi/jms/processors/ConsumeJMS.java  | 13 +++---
 .../apache/nifi/jms/processors/JMSConsumer.java |  4 +-
 .../nifi/jms/processors/JMSPublisher.java       | 10 ++---
 .../apache/nifi/jms/processors/PublishJMS.java  |  3 +-
 .../apache/nifi/jms/processors/CommonTest.java  |  3 +-
 .../nifi/jms/processors/ConsumeJMSTest.java     | 21 +++++++---
 .../processors/JMSPublisherConsumerTest.java    | 30 +++++++-------
 .../nifi/jms/processors/PublishJMSTest.java     | 42 ++++++++++++++++++--
 9 files changed, 89 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c2386760/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 20937b5..d7c40f7 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -202,7 +202,6 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
             JmsTemplate jmsTemplate = new JmsTemplate();
             jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
             this.destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
-            jmsTemplate.setDefaultDestinationName(this.destinationName);
             
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
 
             // set of properties that may be good candidates for exposure via 
configuration

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2386760/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 83e594a..cdd5fcd 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -78,7 +78,8 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
      */
     @Override
     protected void rendezvousWithJms(ProcessContext context, ProcessSession 
processSession) throws ProcessException {
-        final JMSResponse response = this.targetResource.consume();
+        final String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
+        final JMSResponse response = 
this.targetResource.consume(destinationName);
         if (response != null){
             FlowFile flowFile = processSession.create();
             flowFile = processSession.write(flowFile, new 
OutputStreamCallback() {
@@ -88,7 +89,9 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
                 }
             });
             Map<String, Object> jmsHeaders = response.getMessageHeaders();
-            flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, 
flowFile, processSession);
+            Map<String, Object> jmsProperties = Collections.<String, 
Object>unmodifiableMap(response.getMessageProperties());
+            flowFile = this.updateFlowFileAttributesWithMap(jmsHeaders, 
flowFile, processSession);
+            flowFile = this.updateFlowFileAttributesWithMap(jmsProperties, 
flowFile, processSession);
             processSession.getProvenanceReporter().receive(flowFile, 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
             processSession.transfer(flowFile, REL_SUCCESS);
         } else {
@@ -115,10 +118,10 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
     /**
      *
      */
-    private FlowFile updateFlowFileAttributesWithJmsHeaders(Map<String, 
Object> jmsHeaders, FlowFile flowFile, ProcessSession processSession) {
+    private FlowFile updateFlowFileAttributesWithMap(Map<String, Object> map, 
FlowFile flowFile, ProcessSession processSession) {
         Map<String, String> attributes = new HashMap<String, String>();
-        for (Entry<String, Object> headersEntry : jmsHeaders.entrySet()) {
-            attributes.put(headersEntry.getKey(), 
String.valueOf(headersEntry.getValue()));
+        for (Entry<String, Object> entry : map.entrySet()) {
+            attributes.put(entry.getKey(), String.valueOf(entry.getValue()));
         }
         attributes.put(JMS_SOURCE_DESTINATION_NAME, this.destinationName);
         flowFile = processSession.putAllAttributes(flowFile, attributes);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2386760/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index d88b348..1525738 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -61,8 +61,8 @@ final class JMSConsumer extends JMSWorker {
     /**
      *
      */
-    public JMSResponse consume() {
-        Message message = this.jmsTemplate.receive();
+    public JMSResponse consume(final String destinationName) {
+        Message message = this.jmsTemplate.receive(destinationName);
         if (message != null) {
             byte[] messageBody = null;
             try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2386760/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index cc9ee7f..49355f4 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -63,8 +63,8 @@ final class JMSPublisher extends JMSWorker {
      *
      * @param messageBytes byte array representing contents of the message
      */
-    void publish(byte[] messageBytes) {
-        this.publish(messageBytes, null);
+    void publish(final String destinationName, byte[] messageBytes) {
+        this.publish(destinationName, messageBytes, null);
     }
 
     /**
@@ -74,8 +74,8 @@ final class JMSPublisher extends JMSWorker {
      * @param flowFileAttributes
      *            Map representing {@link FlowFile} attributes.
      */
-    void publish(final byte[] messageBytes, final Map<String, String> 
flowFileAttributes) {
-        this.jmsTemplate.send(new MessageCreator() {
+    void publish(final String destinationName, final byte[] messageBytes, 
final Map<String, String> flowFileAttributes) {
+        this.jmsTemplate.send(destinationName, new MessageCreator() {
             @Override
             public Message createMessage(Session session) throws JMSException {
                 BytesMessage message = session.createBytesMessage();
@@ -83,7 +83,7 @@ final class JMSPublisher extends JMSWorker {
                 if (flowFileAttributes != null && 
!flowFileAttributes.isEmpty()) {
                     // set message headers and properties
                     for (Entry<String, String> entry : 
flowFileAttributes.entrySet()) {
-                        if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && 
!entry.getKey().contains("-")) {// '-' is illegal char in JMS prop names
+                        if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && 
!entry.getKey().contains("-") && !entry.getKey().contains(".")) {// '-' and '.' 
are illegal char in JMS prop names
                             message.setStringProperty(entry.getKey(), 
entry.getValue());
                         } else if 
(entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
                             
message.setJMSDeliveryMode(Integer.parseInt(entry.getValue()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2386760/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index 0802b43..36f7e86 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -98,7 +98,8 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
         FlowFile flowFile = processSession.get();
         if (flowFile != null) {
             try {
-                this.targetResource.publish(this.extractMessageBody(flowFile, 
processSession),
+                final String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
+                this.targetResource.publish(destinationName, 
this.extractMessageBody(flowFile, processSession),
                         flowFile.getAttributes());
                 processSession.transfer(flowFile, REL_SUCCESS);
                 processSession.getProvenanceReporter().send(flowFile, 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2386760/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
index 6f0cf39..8a69a14 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
@@ -48,13 +48,12 @@ public class CommonTest {
         assertTrue(consumeJmsPresent);
     }
 
-    static JmsTemplate buildJmsTemplateForDestination(String destinationName, 
boolean pubSub) {
+    static JmsTemplate buildJmsTemplateForDestination(boolean pubSub) {
         ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(
                 "vm://localhost?broker.persistent=false");
         CachingConnectionFactory cf = new 
CachingConnectionFactory(connectionFactory);
 
         JmsTemplate jmsTemplate = new JmsTemplate(cf);
-        jmsTemplate.setDefaultDestinationName(destinationName);
         jmsTemplate.setPubSubDomain(pubSub);
         return jmsTemplate;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2386760/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
index 57d7dda..e9364d2 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.nifi.jms.processors;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.util.MockFlowFile;
@@ -26,7 +29,6 @@ import 
org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.support.JmsHeaders;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -35,9 +37,13 @@ public class ConsumeJMSTest {
 
     @Test
     public void validateSuccessfulConsumeAndTransferToSuccess() throws 
Exception {
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("cooQueue", false);
+        final String  destinationName = "cooQueue";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
         JMSPublisher sender = new JMSPublisher(jmsTemplate, 
mock(ComponentLog.class));
-        sender.publish("Hey dude!".getBytes());
+        final Map<String, String> senderAttributes = new HashMap<>();
+        senderAttributes.put("filename", "message.txt");
+        senderAttributes.put("attribute_from_sender", "some value");
+        sender.publish(destinationName, "Hey dude!".getBytes(), 
senderAttributes);
         TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
         JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
         when(cs.getIdentifier()).thenReturn("cfProvider");
@@ -46,13 +52,18 @@ public class ConsumeJMSTest {
         runner.enableControllerService(cs);
 
         runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
-        runner.setProperty(ConsumeJMS.DESTINATION, "cooQueue");
+        runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
         runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
         runner.run(1, false);
         //
         final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
         assertNotNull(successFF);
-        assertEquals("cooQueue", 
successFF.getAttributes().get(JmsHeaders.DESTINATION));
+        successFF.assertAttributeExists(JmsHeaders.DESTINATION);
+        successFF.assertAttributeEquals(JmsHeaders.DESTINATION, 
destinationName);
+        successFF.assertAttributeExists("filename");
+        successFF.assertAttributeEquals("filename", "message.txt");
+        successFF.assertAttributeExists("attribute_from_sender");
+        successFF.assertAttributeEquals("attribute_from_sender", "some value");
         successFF.assertContentEquals("Hey dude!".getBytes());
         String sourceDestination = 
successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME);
         assertNotNull(sourceDestination);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2386760/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
index 1aa5288..be097fe 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
@@ -42,12 +42,13 @@ public class JMSPublisherConsumerTest {
 
     @Test
     public void validateByesConvertedToBytesMessageOnSend() throws Exception {
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("testQueue", false);
+        final String destinationName = "testQueue";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
 
         JMSPublisher publisher = new JMSPublisher(jmsTemplate, 
mock(ComponentLog.class));
-        publisher.publish("hellomq".getBytes());
+        publisher.publish(destinationName, "hellomq".getBytes());
 
-        Message receivedMessage = jmsTemplate.receive();
+        Message receivedMessage = jmsTemplate.receive(destinationName);
         assertTrue(receivedMessage instanceof BytesMessage);
         byte[] bytes = new byte[7];
         ((BytesMessage) receivedMessage).readBytes(bytes);
@@ -58,15 +59,16 @@ public class JMSPublisherConsumerTest {
 
     @Test
     public void 
validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() throws 
Exception {
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("testQueue", false);
+        final String destinationName = "testQueue";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
 
         JMSPublisher publisher = new JMSPublisher(jmsTemplate, 
mock(ComponentLog.class));
         Map<String, String> flowFileAttributes = new HashMap<>();
         flowFileAttributes.put("foo", "foo");
         flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
-        publisher.publish("hellomq".getBytes(), flowFileAttributes);
+        publisher.publish(destinationName, "hellomq".getBytes(), 
flowFileAttributes);
 
-        Message receivedMessage = jmsTemplate.receive();
+        Message receivedMessage = jmsTemplate.receive(destinationName);
         assertTrue(receivedMessage instanceof BytesMessage);
         assertEquals("foo", receivedMessage.getStringProperty("foo"));
         assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
@@ -83,9 +85,10 @@ public class JMSPublisherConsumerTest {
      */
     @Test(expected = IllegalStateException.class)
     public void validateFailOnUnsupportedMessageType() throws Exception {
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("testQueue", false);
+        final String destinationName = "testQueue";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
 
-        jmsTemplate.send(new MessageCreator() {
+        jmsTemplate.send(destinationName, new MessageCreator() {
             @Override
             public Message createMessage(Session session) throws JMSException {
                 return session.createObjectMessage();
@@ -94,7 +97,7 @@ public class JMSPublisherConsumerTest {
 
         JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ComponentLog.class));
         try {
-            consumer.consume();
+            consumer.consume(destinationName);
         } finally {
             ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
         }
@@ -102,9 +105,10 @@ public class JMSPublisherConsumerTest {
 
     @Test
     public void validateConsumeWithCustomHeadersAndProperties() throws 
Exception {
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("testQueue", false);
+        final String destinationName = "testQueue";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
 
-        jmsTemplate.send(new MessageCreator() {
+        jmsTemplate.send(destinationName, new MessageCreator() {
             @Override
             public Message createMessage(Session session) throws JMSException {
                 TextMessage message = session.createTextMessage("hello from 
the other side");
@@ -116,9 +120,7 @@ public class JMSPublisherConsumerTest {
         });
 
         JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ComponentLog.class));
-        assertEquals("JMSConsumer[destination:testQueue; pub-sub:false;]", 
consumer.toString());
-
-        JMSResponse response = consumer.consume();
+        JMSResponse response = consumer.consume(destinationName);
         assertEquals("hello from the other side", new 
String(response.getMessageBody()));
         assertEquals("fooQueue", 
response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
         assertEquals("foo", response.getMessageProperties().get("foo"));

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2386760/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
index 36edf79..f7ccf17 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
@@ -43,6 +43,7 @@ public class PublishJMSTest {
     public void validateSuccessfulPublishAndTransferToSuccess() throws 
Exception {
         ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
 
+        final String destinationName = "fooQueue";
         PublishJMS pubProc = new PublishJMS();
         TestRunner runner = TestRunners.newTestRunner(pubProc);
         JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
@@ -53,7 +54,43 @@ public class PublishJMSTest {
         runner.enableControllerService(cs);
 
         runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
-        runner.setProperty(PublishJMS.DESTINATION, "fooQueue");
+        runner.setProperty(PublishJMS.DESTINATION, destinationName);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("foo", "foo");
+        attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
+        runner.enqueue("Hey dude!".getBytes(), attributes);
+        runner.run(1, false);
+
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+        assertNotNull(successFF);
+
+        JmsTemplate jmst = new JmsTemplate(cf);
+        BytesMessage message = (BytesMessage) jmst.receive(destinationName);
+
+        byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
+        assertEquals("Hey dude!", new String(messageBytes));
+        assertEquals("cooQueue", ((Queue) 
message.getJMSReplyTo()).getQueueName());
+        assertEquals("foo", message.getStringProperty("foo"));
+    }
+
+    @Test
+    public void validateSuccessfulPublishAndTransferToSuccessWithEL() throws 
Exception {
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+
+        final String destinationNameExpression = "${foo}Queue";
+        final String destinationName = "fooQueue";
+        PublishJMS pubProc = new PublishJMS();
+        TestRunner runner = TestRunners.newTestRunner(pubProc);
+        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+        when(cs.getIdentifier()).thenReturn("cfProvider");
+        when(cs.getConnectionFactory()).thenReturn(cf);
+
+        runner.addControllerService("cfProvider", cs);
+        runner.enableControllerService(cs);
+
+        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+        runner.setProperty(PublishJMS.DESTINATION, destinationNameExpression);
 
         Map<String, String> attributes = new HashMap<>();
         attributes.put("foo", "foo");
@@ -65,8 +102,7 @@ public class PublishJMSTest {
         assertNotNull(successFF);
 
         JmsTemplate jmst = new JmsTemplate(cf);
-        jmst.setDefaultDestinationName("fooQueue");
-        BytesMessage message = (BytesMessage) jmst.receive();
+        BytesMessage message = (BytesMessage) jmst.receive(destinationName);
 
         byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
         assertEquals("Hey dude!", new String(messageBytes));

Reply via email to