NIFI-3223 added support for expression language in PublishAMQP
- EXCHANGE
- ROUTING_KEY

Signed-off-by: Mike Moser <[email protected]>

This closes #1723.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d38a324b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d38a324b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d38a324b

Branch: refs/heads/support/nifi-0.7.x
Commit: d38a324b3d514d5f1637357ae14f947c42f1dba4
Parents: ceec0ec
Author: Oleg Zhurakousky <[email protected]>
Authored: Fri Jan 27 13:41:54 2017 -0500
Committer: Mike Moser <[email protected]>
Committed: Tue May 2 14:17:47 2017 +0000

----------------------------------------------------------------------
 .../nifi/amqp/processors/AMQPPublisher.java     | 65 ++++++++------------
 .../nifi/amqp/processors/PublishAMQP.java       | 18 +++---
 .../nifi/amqp/processors/AMQPPublisherTest.java | 37 +++--------
 .../nifi/amqp/processors/ConsumeAMQPTest.java   |  6 +-
 4 files changed, 49 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d38a324b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
index 41a08d9..b4d6951 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
@@ -18,9 +18,7 @@ package org.apache.nifi.amqp.processors;
 
 import java.io.IOException;
 
-import org.apache.nifi.logging.ProcessorLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.nifi.logging.ComponentLog;
 
 import com.rabbitmq.client.AMQP.BasicProperties;
 import com.rabbitmq.client.Connection;
@@ -32,50 +30,21 @@ import com.rabbitmq.client.ReturnListener;
  */
 final class AMQPPublisher extends AMQPWorker {
 
-    private final static Logger logger = 
LoggerFactory.getLogger(AMQPPublisher.class);
+    private final ComponentLog processLog;
 
-    private final String exchangeName;
-
-    private final String routingKey;
-
-    private final ProcessorLog processLog;
+    private final String connectionString;
 
     /**
      * Creates an instance of this publisher
      *
      * @param connection
      *            instance of AMQP {@link Connection}
-     * @param exchangeName
-     *            the name of AMQP exchange to which messages will be 
published.
-     *            If not provided 'default' exchange will be used.
-     * @param routingKey
-     *            (required) the name of the routingKey to be used by 
AMQP-based
-     *            system to route messages to its final destination (queue).
      */
-    AMQPPublisher(Connection connection, String exchangeName, String 
routingKey, ProcessorLog processLog) {
+    AMQPPublisher(Connection connection, ComponentLog processLog) {
         super(connection);
         this.processLog = processLog;
-        this.validateStringProperty("routingKey", routingKey);
-        this.exchangeName = exchangeName == null ? "" : exchangeName.trim();
-        if (this.exchangeName.length() == 0) {
-            logger.info("The 'exchangeName' is not specified. Messages will be 
sent to default exchange");
-        }
-
-        this.routingKey = routingKey;
         this.channel.addReturnListener(new UndeliverableMessageLogger());
-        logger.info("Successfully connected AMQPPublisher to " + 
connection.toString() + " and '" + this.exchangeName
-                + "' exchange with '" + routingKey + "' as a routing key.");
-    }
-
-    /**
-     * Publishes message without any AMQP properties (see
-     * {@link BasicProperties}) to a pre-defined AMQP Exchange.
-     *
-     * @param bytes
-     *            bytes representing a message.
-     */
-    void publish(byte[] bytes) {
-        this.publish(bytes, null);
+        this.connectionString = connection.toString();
     }
 
     /**
@@ -86,14 +55,28 @@ final class AMQPPublisher extends AMQPWorker {
      *            bytes representing a message.
      * @param properties
      *            instance of {@link BasicProperties}
+     * @param exchange
+     *            the name of AMQP exchange to which messages will be 
published.
+     *            If not provided 'default' exchange will be used.
+     * @param routingKey
+     *            (required) the name of the routingKey to be used by 
AMQP-based
+     *            system to route messages to its final destination (queue).
      */
-    void publish(byte[] bytes, BasicProperties properties) {
+    void publish(byte[] bytes, BasicProperties properties, String routingKey, 
String exchange) {
+        this.validateStringProperty("routingKey", routingKey);
+        exchange = exchange == null ? "" : exchange.trim();
+        if (exchange.length() == 0) {
+            processLog.info("The 'exchangeName' is not specified. Messages 
will be sent to default exchange");
+        }
+        processLog.info("Successfully connected AMQPPublisher to " + 
this.connectionString + " and '" + exchange
+                + "' exchange with '" + routingKey + "' as a routing key.");
+
         if (this.channel.isOpen()) {
             try {
-                this.channel.basicPublish(this.exchangeName, this.routingKey, 
true, properties, bytes);
+                this.channel.basicPublish(exchange, routingKey, true, 
properties, bytes);
             } catch (Exception e) {
                 throw new IllegalStateException("Failed to publish to '" +
-                        this.exchangeName + "' with '" + this.routingKey + 
"'.", e);
+                        exchange + "' with '" + routingKey + "'.", e);
             }
         } else {
             throw new IllegalStateException("This instance of AMQPPublisher is 
invalid since "
@@ -106,7 +89,7 @@ final class AMQPPublisher extends AMQPWorker {
      */
     @Override
     public String toString() {
-        return super.toString() + ", EXCHANGE:" + this.exchangeName + ", 
ROUTING_KEY:" + this.routingKey;
+        return this.connectionString;
     }
 
     /**
@@ -127,7 +110,7 @@ final class AMQPPublisher extends AMQPWorker {
                 throws IOException {
             String logMessage = "Message destined for '" + exchangeName + "' 
exchange with '" + routingKey
                     + "' as routing key came back with replyCode=" + replyCode 
+ " and replyText=" + replyText + ".";
-            logger.warn(logMessage);
+            processLog.warn(logMessage);
             AMQPPublisher.this.processLog.warn(logMessage);
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d38a324b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
index 85116c2..330346f 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
@@ -70,6 +70,7 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
                     + "It is an optional property. If kept empty the messages 
will be sent to a default AMQP exchange.")
             .required(true)
             .defaultValue("")
+            .expressionLanguageSupported(true)
             .addValidator(Validator.VALID)
             .build();
     public static final PropertyDescriptor ROUTING_KEY = new 
PropertyDescriptor.Builder()
@@ -79,6 +80,7 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
                     + "corresponds to a destination queue name, otherwise a 
binding from the Exchange to a Queue via Routing Key must be set "
                     + "(usually by the AMQP administrator)")
             .required(true)
+            .expressionLanguageSupported(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -130,15 +132,19 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
         FlowFile flowFile = processSession.get();
         if (flowFile != null) {
             BasicProperties amqpProperties = 
this.extractAmqpPropertiesFromFlowFile(flowFile);
+            String routingKey = 
context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
+            if (routingKey == null){
+                throw new IllegalArgumentException("Failed to determine 
'routing key' with provided value '"
+                        + context.getProperty(ROUTING_KEY) + "' after 
evaluating it as expression against incoming FlowFile.");
+            }
+            String exchange = 
context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
 
             byte[] messageContent = this.extractMessage(flowFile, 
processSession);
 
             try {
-                this.targetResource.publish(messageContent, amqpProperties);
+                this.targetResource.publish(messageContent, amqpProperties, 
routingKey, exchange);
                 processSession.transfer(flowFile, REL_SUCCESS);
-                processSession.getProvenanceReporter().send(flowFile,
-                        this.amqpConnection.toString() + "/E:" + 
context.getProperty(EXCHANGE).getValue() + "/RK:"
-                                + context.getProperty(ROUTING_KEY).getValue());
+                processSession.getProvenanceReporter().send(flowFile, 
this.amqpConnection.toString() + "/E:" + exchange + "/RK:" + routingKey);
             } catch (Exception e) {
                 processSession.transfer(processSession.penalize(flowFile), 
REL_FAILURE);
                 this.getLogger().error("Failed while sending message to AMQP 
via " + this.targetResource, e);
@@ -168,9 +174,7 @@ public class PublishAMQP extends 
AbstractAMQPProcessor<AMQPPublisher> {
      */
     @Override
     protected AMQPPublisher finishBuildingTargetResource(ProcessContext 
context) {
-        String exchangeName = context.getProperty(EXCHANGE).getValue();
-        String routingKey = context.getProperty(ROUTING_KEY).getValue();
-        return new AMQPPublisher(this.amqpConnection, exchangeName, 
routingKey, this.getLogger());
+        return new AMQPPublisher(this.amqpConnection, this.getLogger());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/d38a324b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
index 9f01c04..90bd919 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.amqp.processors;
 
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.atMost;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -27,7 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.nifi.util.MockProcessorLog;
+import org.apache.nifi.logging.ComponentLog;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -40,31 +39,24 @@ public class AMQPPublisherTest {
     @SuppressWarnings("resource")
     @Test(expected = IllegalArgumentException.class)
     public void failOnNullConnection() {
-        new AMQPPublisher(null, null, null, null);
-    }
-
-    @SuppressWarnings("resource")
-    @Test(expected = IllegalArgumentException.class)
-    public void failOnMissingRoutingKey() throws Exception {
-        Connection conn = new TestConnection(null, null);
-        new AMQPPublisher(conn, null, "", null);
+        new AMQPPublisher(null, null);
     }
 
     @Test(expected = IllegalStateException.class)
     public void failPublishIfChannelClosed() throws Exception {
         Connection conn = new TestConnection(null, null);
-        try (AMQPPublisher sender = new AMQPPublisher(conn, null, "foo", 
null)) {
+        try (AMQPPublisher sender = new AMQPPublisher(conn, 
mock(ComponentLog.class))) {
             conn.close();
-            sender.publish("oleg".getBytes());
+            sender.publish("oleg".getBytes(), null, "foo", "");
         }
     }
 
     @Test(expected = IllegalStateException.class)
     public void failPublishIfChannelFails() throws Exception {
         TestConnection conn = new TestConnection(null, null);
-        try (AMQPPublisher sender = new AMQPPublisher(conn, null, "foo", 
null)) {
+        try (AMQPPublisher sender = new AMQPPublisher(conn, 
mock(ComponentLog.class))) {
             ((TestChannel) conn.createChannel()).corruptChannel();
-            sender.publish("oleg".getBytes());
+            sender.publish("oleg".getBytes(), null, "foo", "");
         }
     }
 
@@ -77,8 +69,8 @@ public class AMQPPublisherTest {
 
         Connection connection = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
 
-        try (AMQPPublisher sender = new AMQPPublisher(connection, 
"myExchange", "key1", null)) {
-            sender.publish("hello".getBytes());
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), null, "key1", "myExchange");
             Thread.sleep(200);
         }
 
@@ -100,9 +92,8 @@ public class AMQPPublisherTest {
         ReturnListener retListener = mock(ReturnListener.class);
         connection.createChannel().addReturnListener(retListener);
 
-        try (AMQPPublisher sender = new AMQPPublisher(connection, 
"myExchange", "key2",
-                new MockProcessorLog("foo", ""))) {
-            sender.publish("hello".getBytes());
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), null, "key1", "myExchange");
             Thread.sleep(1000);
         }
         Thread.sleep(200);
@@ -111,12 +102,4 @@ public class AMQPPublisherTest {
         connection.close();
     }
 
-    @Test
-    public void validateToString() throws Exception {
-        TestConnection conn = new TestConnection(null, null);
-        try (AMQPPublisher sender = new AMQPPublisher(conn, "myExchange", 
"key1", null)) {
-            String toString = sender.toString();
-            assertTrue(toString.contains("EXCHANGE:myExchange, 
ROUTING_KEY:key1"));
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d38a324b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index 3a2754d..52b48d8 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -17,12 +17,14 @@
 package org.apache.nifi.amqp.processors;
 
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -46,8 +48,8 @@ public class ConsumeAMQPTest {
 
         Connection connection = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
 
-        try (AMQPPublisher sender = new AMQPPublisher(connection, 
"myExchange", "key1", null)) {
-            sender.publish("hello".getBytes(), 
MessageProperties.PERSISTENT_TEXT_PLAIN);
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), 
MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
 
             ConsumeAMQP pubProc = new LocalConsumeAMQP(connection);
             TestRunner runner = TestRunners.newTestRunner(pubProc);

Reply via email to