Repository: nifi
Updated Branches:
  refs/heads/0.x 639e6d6a7 -> c2e98f96e


NIFI-2789, NIFI-2790 - Read JMS properties and add to FlowFile attributes in 
ConsumeJMS

Remove unused assertEquals import

Move destination from default to send/receive to support EL better


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/58fdfdc7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/58fdfdc7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/58fdfdc7

Branch: refs/heads/0.x
Commit: 58fdfdc76e2e59bffbbcdd4e96ad2a190b412b29
Parents: 639e6d6
Author: Joey Frazee <joey.fra...@icloud.com>
Authored: Mon Sep 19 08:18:30 2016 -0500
Committer: Oleg Zhurakousky <o...@suitcase.io>
Committed: Tue Sep 20 10:11:27 2016 -0400

----------------------------------------------------------------------
 .../jms/processors/AbstractJMSProcessor.java    |  2 +-
 .../apache/nifi/jms/processors/ConsumeJMS.java  | 15 ++++---
 .../apache/nifi/jms/processors/JMSConsumer.java |  4 +-
 .../nifi/jms/processors/JMSPublisher.java       | 10 ++---
 .../apache/nifi/jms/processors/PublishJMS.java  |  3 +-
 .../apache/nifi/jms/processors/CommonTest.java  |  3 +-
 .../nifi/jms/processors/ConsumeJMSTest.java     | 32 ++++++++++++---
 .../processors/JMSPublisherConsumerTest.java    | 31 ++++++++-------
 .../nifi/jms/processors/PublishJMSTest.java     | 42 ++++++++++++++++++--
 9 files changed, 101 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/58fdfdc7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index f8030db..ed45b84 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -198,7 +198,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
 
             JmsTemplate jmsTemplate = new JmsTemplate();
             jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
-            
jmsTemplate.setDefaultDestinationName(context.getProperty(DESTINATION).getValue());
+            this.destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
             
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
 
             // set of properties that may be good candidates for exposure via 
configuration

http://git-wip-us.apache.org/repos/asf/nifi/blob/58fdfdc7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index a4cad0d..e8e0eb9 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -76,7 +76,8 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
      */
     @Override
     protected void rendezvousWithJms(ProcessContext context, ProcessSession 
processSession) throws ProcessException {
-        final JMSResponse response = this.targetResource.consume();
+        final String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
+        final JMSResponse response = 
this.targetResource.consume(destinationName);
         if (response != null){
             FlowFile flowFile = processSession.create();
             flowFile = processSession.write(flowFile, new 
OutputStreamCallback() {
@@ -86,8 +87,10 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
                 }
             });
             Map<String, Object> jmsHeaders = response.getMessageHeaders();
-            flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, 
flowFile, processSession);
-            processSession.getProvenanceReporter().receive(flowFile, 
context.getProperty(DESTINATION).getValue());
+            Map<String, Object> jmsProperties = Collections.<String, 
Object>unmodifiableMap(response.getMessageProperties());
+            flowFile = this.updateFlowFileAttributesWithMap(jmsHeaders, 
flowFile, processSession);
+            flowFile = this.updateFlowFileAttributesWithMap(jmsProperties, 
flowFile, processSession);
+            processSession.getProvenanceReporter().receive(flowFile, 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
             processSession.transfer(flowFile, REL_SUCCESS);
         } else {
             context.yield();
@@ -113,10 +116,10 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
     /**
      *
      */
-    private FlowFile updateFlowFileAttributesWithJmsHeaders(Map<String, 
Object> jmsHeaders, FlowFile flowFile, ProcessSession processSession) {
+    private FlowFile updateFlowFileAttributesWithMap(Map<String, Object> map, 
FlowFile flowFile, ProcessSession processSession) {
         Map<String, String> attributes = new HashMap<String, String>();
-        for (Entry<String, Object> headersEntry : jmsHeaders.entrySet()) {
-            attributes.put(headersEntry.getKey(), 
String.valueOf(headersEntry.getValue()));
+        for (Entry<String, Object> entry : map.entrySet()) {
+            attributes.put(entry.getKey(), String.valueOf(entry.getValue()));
         }
         flowFile = processSession.putAllAttributes(flowFile, attributes);
         return flowFile;

http://git-wip-us.apache.org/repos/asf/nifi/blob/58fdfdc7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index e69705a..4bc4370 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -61,8 +61,8 @@ final class JMSConsumer extends JMSWorker {
     /**
      *
      */
-    public JMSResponse consume() {
-        Message message = this.jmsTemplate.receive();
+    public JMSResponse consume(final String destinationName) {
+        Message message = this.jmsTemplate.receive(destinationName);
         if (message != null) {
             byte[] messageBody = null;
             try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/58fdfdc7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
index 3643c51..f469601 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java
@@ -63,8 +63,8 @@ final class JMSPublisher extends JMSWorker {
      *
      * @param messageBytes byte array representing contents of the message
      */
-    void publish(byte[] messageBytes) {
-        this.publish(messageBytes, null);
+    void publish(final String destinationName, byte[] messageBytes) {
+        this.publish(destinationName, messageBytes, null);
     }
 
     /**
@@ -74,8 +74,8 @@ final class JMSPublisher extends JMSWorker {
      * @param flowFileAttributes
      *            Map representing {@link FlowFile} attributes.
      */
-    void publish(final byte[] messageBytes, final Map<String, String> 
flowFileAttributes) {
-        this.jmsTemplate.send(new MessageCreator() {
+    void publish(final String destinationName, final byte[] messageBytes, 
final Map<String, String> flowFileAttributes) {
+        this.jmsTemplate.send(destinationName, new MessageCreator() {
             @Override
             public Message createMessage(Session session) throws JMSException {
                 BytesMessage message = session.createBytesMessage();
@@ -83,7 +83,7 @@ final class JMSPublisher extends JMSWorker {
                 if (flowFileAttributes != null && 
!flowFileAttributes.isEmpty()) {
                     // set message headers and properties
                     for (Entry<String, String> entry : 
flowFileAttributes.entrySet()) {
-                        if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && 
!entry.getKey().contains("-")) {// '-' is illegal char in JMS prop names
+                        if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && 
!entry.getKey().contains("-") && !entry.getKey().contains(".")) {// '-' and '.' 
are illegal char in JMS prop names
                             message.setStringProperty(entry.getKey(), 
entry.getValue());
                         } else if 
(entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
                             
message.setJMSDeliveryMode(Integer.parseInt(entry.getValue()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/58fdfdc7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index 8f6c18b..a9125f3 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -98,7 +98,8 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
         FlowFile flowFile = processSession.get();
         if (flowFile != null) {
             try {
-                this.targetResource.publish(this.extractMessageBody(flowFile, 
processSession),
+                final String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
+                this.targetResource.publish(destinationName, 
this.extractMessageBody(flowFile, processSession),
                         flowFile.getAttributes());
                 processSession.transfer(flowFile, REL_SUCCESS);
                 processSession.getProvenanceReporter().send(flowFile, 
context.getProperty(DESTINATION).getValue());

http://git-wip-us.apache.org/repos/asf/nifi/blob/58fdfdc7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
index 6f0cf39..8a69a14 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
@@ -48,13 +48,12 @@ public class CommonTest {
         assertTrue(consumeJmsPresent);
     }
 
-    static JmsTemplate buildJmsTemplateForDestination(String destinationName, 
boolean pubSub) {
+    static JmsTemplate buildJmsTemplateForDestination(boolean pubSub) {
         ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(
                 "vm://localhost?broker.persistent=false");
         CachingConnectionFactory cf = new 
CachingConnectionFactory(connectionFactory);
 
         JmsTemplate jmsTemplate = new JmsTemplate(cf);
-        jmsTemplate.setDefaultDestinationName(destinationName);
         jmsTemplate.setPubSubDomain(pubSub);
         return jmsTemplate;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/58fdfdc7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
index 48195a3..a6305bc 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java
@@ -16,10 +16,8 @@
  */
 package org.apache.nifi.jms.processors;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
 import org.apache.nifi.logging.ProcessorLog;
@@ -31,13 +29,30 @@ import 
org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.support.JmsHeaders;
 
+<<<<<<< HEAD
+=======
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+>>>>>>> c238676... NIFI-2789, NIFI-2790 - Read JMS properties and add to 
FlowFile attributes in ConsumeJMS
 public class ConsumeJMSTest {
 
     @Test
     public void validateSuccessfulConsumeAndTransferToSuccess() throws 
Exception {
+<<<<<<< HEAD
         JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("cooQueue", false);
         JMSPublisher sender = new JMSPublisher(jmsTemplate, 
mock(ProcessorLog.class));
         sender.publish("Hey dude!".getBytes());
+=======
+        final String  destinationName = "cooQueue";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+        JMSPublisher sender = new JMSPublisher(jmsTemplate, 
mock(ComponentLog.class));
+        final Map<String, String> senderAttributes = new HashMap<>();
+        senderAttributes.put("filename", "message.txt");
+        senderAttributes.put("attribute_from_sender", "some value");
+        sender.publish(destinationName, "Hey dude!".getBytes(), 
senderAttributes);
+>>>>>>> c238676... NIFI-2789, NIFI-2790 - Read JMS properties and add to 
FlowFile attributes in ConsumeJMS
         TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
         JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
         when(cs.getIdentifier()).thenReturn("cfProvider");
@@ -46,13 +61,18 @@ public class ConsumeJMSTest {
         runner.enableControllerService(cs);
 
         runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
-        runner.setProperty(ConsumeJMS.DESTINATION, "cooQueue");
+        runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
         runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
         runner.run(1, false);
         //
         final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
         assertNotNull(successFF);
-        assertEquals("cooQueue", 
successFF.getAttributes().get(JmsHeaders.DESTINATION));
+        successFF.assertAttributeExists(JmsHeaders.DESTINATION);
+        successFF.assertAttributeEquals(JmsHeaders.DESTINATION, 
destinationName);
+        successFF.assertAttributeExists("filename");
+        successFF.assertAttributeEquals("filename", "message.txt");
+        successFF.assertAttributeExists("attribute_from_sender");
+        successFF.assertAttributeEquals("attribute_from_sender", "some value");
         successFF.assertContentEquals("Hey dude!".getBytes());
 
         ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();

http://git-wip-us.apache.org/repos/asf/nifi/blob/58fdfdc7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
index e18263e..58de99f 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
@@ -42,12 +42,12 @@ public class JMSPublisherConsumerTest {
 
     @Test
     public void validateByesConvertedToBytesMessageOnSend() throws Exception {
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("testQueue", false);
-
+        final String destinationName = "testQueue";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
         JMSPublisher publisher = new JMSPublisher(jmsTemplate, 
mock(ProcessorLog.class));
-        publisher.publish("hellomq".getBytes());
+        publisher.publish(destinationName, "hellomq".getBytes());
 
-        Message receivedMessage = jmsTemplate.receive();
+        Message receivedMessage = jmsTemplate.receive(destinationName);
         assertTrue(receivedMessage instanceof BytesMessage);
         byte[] bytes = new byte[7];
         ((BytesMessage) receivedMessage).readBytes(bytes);
@@ -58,15 +58,16 @@ public class JMSPublisherConsumerTest {
 
     @Test
     public void 
validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() throws 
Exception {
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("testQueue", false);
+        final String destinationName = "testQueue";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
 
         JMSPublisher publisher = new JMSPublisher(jmsTemplate, 
mock(ProcessorLog.class));
         Map<String, String> flowFileAttributes = new HashMap<>();
         flowFileAttributes.put("foo", "foo");
         flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
-        publisher.publish("hellomq".getBytes(), flowFileAttributes);
+        publisher.publish(destinationName, "hellomq".getBytes(), 
flowFileAttributes);
 
-        Message receivedMessage = jmsTemplate.receive();
+        Message receivedMessage = jmsTemplate.receive(destinationName);
         assertTrue(receivedMessage instanceof BytesMessage);
         assertEquals("foo", receivedMessage.getStringProperty("foo"));
         assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
@@ -83,9 +84,10 @@ public class JMSPublisherConsumerTest {
      */
     @Test(expected = IllegalStateException.class)
     public void validateFailOnUnsupportedMessageType() throws Exception {
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("testQueue", false);
+        final String destinationName = "testQueue";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
 
-        jmsTemplate.send(new MessageCreator() {
+        jmsTemplate.send(destinationName, new MessageCreator() {
             @Override
             public Message createMessage(Session session) throws JMSException {
                 return session.createObjectMessage();
@@ -94,7 +96,7 @@ public class JMSPublisherConsumerTest {
 
         JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ProcessorLog.class));
         try {
-            consumer.consume();
+            consumer.consume(destinationName);
         } finally {
             ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
         }
@@ -102,9 +104,10 @@ public class JMSPublisherConsumerTest {
 
     @Test
     public void validateConsumeWithCustomHeadersAndProperties() throws 
Exception {
-        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination("testQueue", false);
+        final String destinationName = "testQueue";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
 
-        jmsTemplate.send(new MessageCreator() {
+        jmsTemplate.send(destinationName, new MessageCreator() {
             @Override
             public Message createMessage(Session session) throws JMSException {
                 TextMessage message = session.createTextMessage("hello from 
the other side");
@@ -116,9 +119,7 @@ public class JMSPublisherConsumerTest {
         });
 
         JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ProcessorLog.class));
-        assertEquals("JMSConsumer[destination:testQueue; pub-sub:false;]", 
consumer.toString());
-
-        JMSResponse response = consumer.consume();
+        JMSResponse response = consumer.consume(destinationName);
         assertEquals("hello from the other side", new 
String(response.getMessageBody()));
         assertEquals("fooQueue", 
response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
         assertEquals("foo", response.getMessageProperties().get("foo"));

http://git-wip-us.apache.org/repos/asf/nifi/blob/58fdfdc7/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
index 28d4ac6..a55a44a 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java
@@ -44,6 +44,7 @@ public class PublishJMSTest {
     public void validateSuccessfulPublishAndTransferToSuccess() throws 
Exception {
         ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
 
+        final String destinationName = "fooQueue";
         PublishJMS pubProc = new PublishJMS();
         TestRunner runner = TestRunners.newTestRunner(pubProc);
         JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
@@ -54,7 +55,43 @@ public class PublishJMSTest {
         runner.enableControllerService(cs);
 
         runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
-        runner.setProperty(PublishJMS.DESTINATION, "fooQueue");
+        runner.setProperty(PublishJMS.DESTINATION, destinationName);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("foo", "foo");
+        attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
+        runner.enqueue("Hey dude!".getBytes(), attributes);
+        runner.run(1, false);
+
+        final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
+        assertNotNull(successFF);
+
+        JmsTemplate jmst = new JmsTemplate(cf);
+        BytesMessage message = (BytesMessage) jmst.receive(destinationName);
+
+        byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
+        assertEquals("Hey dude!", new String(messageBytes));
+        assertEquals("cooQueue", ((Queue) 
message.getJMSReplyTo()).getQueueName());
+        assertEquals("foo", message.getStringProperty("foo"));
+    }
+
+    @Test
+    public void validateSuccessfulPublishAndTransferToSuccessWithEL() throws 
Exception {
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+
+        final String destinationNameExpression = "${foo}Queue";
+        final String destinationName = "fooQueue";
+        PublishJMS pubProc = new PublishJMS();
+        TestRunner runner = TestRunners.newTestRunner(pubProc);
+        JMSConnectionFactoryProviderDefinition cs = 
mock(JMSConnectionFactoryProviderDefinition.class);
+        when(cs.getIdentifier()).thenReturn("cfProvider");
+        when(cs.getConnectionFactory()).thenReturn(cf);
+
+        runner.addControllerService("cfProvider", cs);
+        runner.enableControllerService(cs);
+
+        runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
+        runner.setProperty(PublishJMS.DESTINATION, destinationNameExpression);
 
         Map<String, String> attributes = new HashMap<>();
         attributes.put("foo", "foo");
@@ -66,8 +103,7 @@ public class PublishJMSTest {
         assertNotNull(successFF);
 
         JmsTemplate jmst = new JmsTemplate(cf);
-        jmst.setDefaultDestinationName("fooQueue");
-        BytesMessage message = (BytesMessage) jmst.receive();
+        BytesMessage message = (BytesMessage) jmst.receive(destinationName);
 
         byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
         assertEquals("Hey dude!", new String(messageBytes));

Reply via email to