This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0f8789b NIFI-8409 - ConsumeMQTT Processor Broker URI and Username
Expression Language
0f8789b is described below
commit 0f8789b8b07df40336386b2831721f1e0c4a497b
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Apr 9 17:27:22 2021 +0200
NIFI-8409 - ConsumeMQTT Processor Broker URI and Username Expression
Language
This closes #4984
Signed-off-by: David Handermann <[email protected]>
---
.../apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java | 8 +++++---
.../java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java | 3 ++-
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index 3d34d94..a83bcc5 100644
---
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -119,6 +119,7 @@ public abstract class AbstractMQTTProcessor extends
AbstractSessionFactoryProces
.description("The URI to use to connect to the MQTT broker (e.g.
tcp://localhost:1883). The 'tcp', 'ssl', 'ws' and 'wss' schemes are supported.
In order to use 'ssl', the SSL Context " +
"Service property must be set.")
.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(BROKER_VALIDATOR)
.build();
@@ -135,6 +136,7 @@ public abstract class AbstractMQTTProcessor extends
AbstractSessionFactoryProces
.name("Username")
.description("Username to use when connecting to the broker")
.required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -275,7 +277,7 @@ public abstract class AbstractMQTTProcessor extends
AbstractSessionFactoryProces
}
try {
- URI brokerURI = new
URI(validationContext.getProperty(PROP_BROKER_URI).getValue());
+ URI brokerURI = new
URI(validationContext.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue());
if (brokerURI.getScheme().equalsIgnoreCase("ssl") &&
!validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet()) {
results.add(new
ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName() + " or "
+ PROP_BROKER_URI.getName()).valid(false).explanation("if the 'ssl' scheme is
used in " +
"the broker URI, the SSL Context Service must be
set.").build());
@@ -314,7 +316,7 @@ public abstract class AbstractMQTTProcessor extends
AbstractSessionFactoryProces
}
protected void onScheduled(final ProcessContext context){
- broker = context.getProperty(PROP_BROKER_URI).getValue();
+ broker =
context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue();
brokerUri = broker.endsWith("/") ? broker : broker + "/";
clientID =
context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
@@ -345,7 +347,7 @@ public abstract class AbstractMQTTProcessor extends
AbstractSessionFactoryProces
PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
if(usernameProp.isSet()) {
- connOpts.setUserName(usernameProp.getValue());
+
connOpts.setUserName(usernameProp.evaluateAttributeExpressions().getValue());
connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
}
}
diff --git
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index d8c86d6..cf66d1a 100644
---
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -82,7 +82,8 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
public void testSSLContextServiceTruststoreOnly() throws
InitializationException {
String brokerURI = "ssl://localhost:8883";
TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
- runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, brokerURI);
+ runner.setVariable("brokerURI", brokerURI);
+ runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "${brokerURI}");
runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");