This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d0045c1 NIFI-8341 Support Multi Hosts in AMQP Processors
d0045c1 is described below
commit d0045c13b1c79bf435a62a20de4b1b2dd633dedb
Author: nabmoh123 <[email protected]>
AuthorDate: Wed Mar 24 16:16:50 2021 +0000
NIFI-8341 Support Multi Hosts in AMQP Processors
Signed-off-by: Pierre Villard <[email protected]>
This closes #4939.
---
.../amqp/processors/AbstractAMQPProcessor.java | 36 +++++++++++++++++-----
.../amqp/processors/AbstractAMQPProcessorTest.java | 1 +
.../nifi/amqp/processors/ConsumeAMQPTest.java | 2 +-
.../nifi/amqp/processors/PublishAMQPTest.java | 4 +--
4 files changed, 33 insertions(+), 10 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
index a5f63d5..0a0418a 100644
---
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
+++
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
@@ -16,10 +16,12 @@
*/
package org.apache.nifi.amqp.processors;
+import com.rabbitmq.client.Address;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultSaslConfig;
import com.rabbitmq.client.impl.DefaultExceptionHandler;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -55,18 +57,26 @@ import org.apache.nifi.ssl.SSLContextService;
*/
abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends
AbstractProcessor {
+ public static final PropertyDescriptor BROKERS = new
PropertyDescriptor.Builder()
+ .name("Brokers")
+ .description("A comma-separated list of known AMQP Brokers in the
format <host>:<port> (e.g., localhost:5672). If this is " +
+ "set, Host Name and Port are ignored. Only include hosts
from the same AMQP cluster.")
+ .required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+ .build();
public static final PropertyDescriptor HOST = new
PropertyDescriptor.Builder()
.name("Host Name")
- .description("Network address of AMQP broker (e.g., localhost)")
- .required(true)
+ .description("Network address of AMQP broker (e.g., localhost). If
Brokers is set, then this property is ignored.")
+ .required(false)
.defaultValue("localhost")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor PORT = new
PropertyDescriptor.Builder()
.name("Port")
- .description("Numeric value identifying Port of AMQP broker (e.g.,
5671)")
- .required(true)
+ .description("Numeric value identifying Port of AMQP broker (e.g.,
5671). If Brokers is set, then this property is ignored.")
+ .required(false)
.defaultValue("5672")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.PORT_VALIDATOR)
@@ -128,6 +138,7 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker>
extends AbstractProce
static {
final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(BROKERS);
properties.add(HOST);
properties.add(PORT);
properties.add(V_HOST);
@@ -280,11 +291,13 @@ abstract class AbstractAMQPProcessor<T extends
AMQPWorker> extends AbstractProce
}
}
+ private Address[] createHostsList(final ProcessContext context) {
+ String evaluatedUrls =
context.getProperty(BROKERS).evaluateAttributeExpressions().getValue();
+ return Address.parseAddresses(evaluatedUrls);
+ }
protected Connection createConnection(ProcessContext context,
ExecutorService executor) {
final ConnectionFactory cf = new ConnectionFactory();
-
cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue());
-
cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue()));
cf.setUsername(context.getProperty(USER).evaluateAttributeExpressions().getValue());
cf.setPassword(context.getProperty(PASSWORD).getValue());
@@ -317,7 +330,16 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker>
extends AbstractProce
});
try {
- Connection connection = cf.newConnection(executor);
+ Connection connection;
+ if (context.getProperty(BROKERS).isSet()) {
+ Address[] hostsList = createHostsList(context);
+ connection = cf.newConnection(executor, hostsList);
+ } else {
+
cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue());
+
cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue()));
+ connection = cf.newConnection(executor);
+ }
+
return connection;
} catch (Exception e) {
throw new IllegalStateException("Failed to establish connection
with AMQP Broker: " + cf.toString(), e);
diff --git
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
index 19dd1ea..2273b56 100644
---
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
+++
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
@@ -39,6 +39,7 @@ public class AbstractAMQPProcessorTest {
testRunner = TestRunners.newTestRunner(ConsumeAMQP.class);
testRunner.setProperty(ConsumeAMQP.QUEUE, "queue");
+ testRunner.setProperty(AbstractAMQPProcessor.BROKERS,
"localhost:5672");
}
@Test
diff --git
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index 2debebe..125593d 100644
---
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -163,7 +163,7 @@ public class ConsumeAMQPTest {
private TestRunner initTestRunner(ConsumeAMQP proc) {
TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setProperty(ConsumeAMQP.HOST, "injvm");
+ runner.setProperty(ConsumeAMQP.BROKERS, "injvm:5672");
runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
runner.setProperty(ConsumeAMQP.USER, "user");
runner.setProperty(ConsumeAMQP.PASSWORD, "password");
diff --git
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
index 556d7b9..345372a 100644
---
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
+++
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
@@ -45,7 +45,7 @@ public class PublishAMQPTest {
public void validateSuccessfulPublishAndTransferToSuccess() throws
Exception {
final PublishAMQP pubProc = new LocalPublishAMQP();
final TestRunner runner = TestRunners.newTestRunner(pubProc);
- runner.setProperty(PublishAMQP.HOST, "injvm");
+ runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
runner.setProperty(PublishAMQP.USER, "user");
@@ -110,7 +110,7 @@ public class PublishAMQPTest {
public void validateFailedPublishAndTransferToFailure() throws Exception {
PublishAMQP pubProc = new LocalPublishAMQP();
TestRunner runner = TestRunners.newTestRunner(pubProc);
- runner.setProperty(PublishAMQP.HOST, "injvm");
+ runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
runner.setProperty(PublishAMQP.EXCHANGE, "badToTheBone");
runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
runner.setProperty(PublishAMQP.USER, "user");