This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d0045c1  NIFI-8341 Support Multi Hosts in AMQP Processors
d0045c1 is described below

commit d0045c13b1c79bf435a62a20de4b1b2dd633dedb
Author: nabmoh123 <[email protected]>
AuthorDate: Wed Mar 24 16:16:50 2021 +0000

    NIFI-8341 Support Multi Hosts in AMQP Processors
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #4939.
---
 .../amqp/processors/AbstractAMQPProcessor.java     | 36 +++++++++++++++++-----
 .../amqp/processors/AbstractAMQPProcessorTest.java |  1 +
 .../nifi/amqp/processors/ConsumeAMQPTest.java      |  2 +-
 .../nifi/amqp/processors/PublishAMQPTest.java      |  4 +--
 4 files changed, 33 insertions(+), 10 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
index a5f63d5..0a0418a 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
@@ -16,10 +16,12 @@
  */
 package org.apache.nifi.amqp.processors;
 
+import com.rabbitmq.client.Address;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.DefaultSaslConfig;
 import com.rabbitmq.client.impl.DefaultExceptionHandler;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -55,18 +57,26 @@ import org.apache.nifi.ssl.SSLContextService;
  */
 abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends 
AbstractProcessor {
 
+    public static final PropertyDescriptor BROKERS = new 
PropertyDescriptor.Builder()
+            .name("Brokers")
+            .description("A comma-separated list of known AMQP Brokers in the 
format <host>:<port> (e.g., localhost:5672). If this is " +
+                    "set, Host Name and Port are ignored. Only include hosts 
from the same AMQP cluster.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+            .build();
     public static final PropertyDescriptor HOST = new 
PropertyDescriptor.Builder()
             .name("Host Name")
-            .description("Network address of AMQP broker (e.g., localhost)")
-            .required(true)
+            .description("Network address of AMQP broker (e.g., localhost). If 
Brokers is set, then this property is ignored.")
+            .required(false)
             .defaultValue("localhost")
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
     public static final PropertyDescriptor PORT = new 
PropertyDescriptor.Builder()
             .name("Port")
-            .description("Numeric value identifying Port of AMQP broker (e.g., 
5671)")
-            .required(true)
+            .description("Numeric value identifying Port of AMQP broker (e.g., 
5671). If Brokers is set, then this property is ignored.")
+            .required(false)
             .defaultValue("5672")
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.PORT_VALIDATOR)
@@ -128,6 +138,7 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> 
extends AbstractProce
 
     static {
         final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(BROKERS);
         properties.add(HOST);
         properties.add(PORT);
         properties.add(V_HOST);
@@ -280,11 +291,13 @@ abstract class AbstractAMQPProcessor<T extends 
AMQPWorker> extends AbstractProce
         }
     }
 
+    private Address[] createHostsList(final ProcessContext context) {
+        String evaluatedUrls = 
context.getProperty(BROKERS).evaluateAttributeExpressions().getValue();
+        return Address.parseAddresses(evaluatedUrls);
+    }
 
     protected Connection createConnection(ProcessContext context, 
ExecutorService executor) {
         final ConnectionFactory cf = new ConnectionFactory();
-        
cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue());
-        
cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue()));
         
cf.setUsername(context.getProperty(USER).evaluateAttributeExpressions().getValue());
         cf.setPassword(context.getProperty(PASSWORD).getValue());
 
@@ -317,7 +330,16 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> 
extends AbstractProce
         });
 
         try {
-            Connection connection = cf.newConnection(executor);
+            Connection connection;
+            if (context.getProperty(BROKERS).isSet()) {
+                Address[] hostsList = createHostsList(context);
+                connection = cf.newConnection(executor, hostsList);
+            } else {
+                
cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue());
+                
cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue()));
+                connection = cf.newConnection(executor);
+            }
+
             return connection;
         } catch (Exception e) {
             throw new IllegalStateException("Failed to establish connection 
with AMQP Broker: " + cf.toString(), e);
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
index 19dd1ea..2273b56 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
@@ -39,6 +39,7 @@ public class AbstractAMQPProcessorTest {
         testRunner = TestRunners.newTestRunner(ConsumeAMQP.class);
 
         testRunner.setProperty(ConsumeAMQP.QUEUE, "queue");
+        testRunner.setProperty(AbstractAMQPProcessor.BROKERS, 
"localhost:5672");
     }
 
     @Test
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index 2debebe..125593d 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -163,7 +163,7 @@ public class ConsumeAMQPTest {
 
     private TestRunner initTestRunner(ConsumeAMQP proc) {
         TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(ConsumeAMQP.HOST, "injvm");
+        runner.setProperty(ConsumeAMQP.BROKERS, "injvm:5672");
         runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
         runner.setProperty(ConsumeAMQP.USER, "user");
         runner.setProperty(ConsumeAMQP.PASSWORD, "password");
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
index 556d7b9..345372a 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
@@ -45,7 +45,7 @@ public class PublishAMQPTest {
     public void validateSuccessfulPublishAndTransferToSuccess() throws 
Exception {
         final PublishAMQP pubProc = new LocalPublishAMQP();
         final TestRunner runner = TestRunners.newTestRunner(pubProc);
-        runner.setProperty(PublishAMQP.HOST, "injvm");
+        runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
         runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
         runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
         runner.setProperty(PublishAMQP.USER, "user");
@@ -110,7 +110,7 @@ public class PublishAMQPTest {
     public void validateFailedPublishAndTransferToFailure() throws Exception {
         PublishAMQP pubProc = new LocalPublishAMQP();
         TestRunner runner = TestRunners.newTestRunner(pubProc);
-        runner.setProperty(PublishAMQP.HOST, "injvm");
+        runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
         runner.setProperty(PublishAMQP.EXCHANGE, "badToTheBone");
         runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
         runner.setProperty(PublishAMQP.USER, "user");

Reply via email to