This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f8789b  NIFI-8409 - ConsumeMQTT Processor Broker URI and Username 
Expression Language
0f8789b is described below

commit 0f8789b8b07df40336386b2831721f1e0c4a497b
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Apr 9 17:27:22 2021 +0200

    NIFI-8409 - ConsumeMQTT Processor Broker URI and Username Expression 
Language
    
    This closes #4984
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java | 8 +++++---
 .../java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java     | 3 ++-
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index 3d34d94..a83bcc5 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -119,6 +119,7 @@ public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProces
             .description("The URI to use to connect to the MQTT broker (e.g. 
tcp://localhost:1883). The 'tcp', 'ssl', 'ws' and 'wss' schemes are supported. 
In order to use 'ssl', the SSL Context " +
                     "Service property must be set.")
             .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(BROKER_VALIDATOR)
             .build();
 
@@ -135,6 +136,7 @@ public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProces
             .name("Username")
             .description("Username to use when connecting to the broker")
             .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -275,7 +277,7 @@ public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProces
         }
 
         try {
-            URI brokerURI = new 
URI(validationContext.getProperty(PROP_BROKER_URI).getValue());
+            URI brokerURI = new 
URI(validationContext.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue());
             if (brokerURI.getScheme().equalsIgnoreCase("ssl") && 
!validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet()) {
                 results.add(new 
ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName() + " or " 
+ PROP_BROKER_URI.getName()).valid(false).explanation("if the 'ssl' scheme is 
used in " +
                         "the broker URI, the SSL Context Service must be 
set.").build());
@@ -314,7 +316,7 @@ public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProces
     }
 
     protected void onScheduled(final ProcessContext context){
-        broker = context.getProperty(PROP_BROKER_URI).getValue();
+        broker = 
context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue();
         brokerUri = broker.endsWith("/") ? broker : broker + "/";
         clientID = 
context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
 
@@ -345,7 +347,7 @@ public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProces
 
         PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
         if(usernameProp.isSet()) {
-            connOpts.setUserName(usernameProp.getValue());
+            
connOpts.setUserName(usernameProp.evaluateAttributeExpressions().getValue());
             
connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
         }
     }
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index d8c86d6..cf66d1a 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -82,7 +82,8 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
     public void testSSLContextServiceTruststoreOnly() throws 
InitializationException {
         String brokerURI = "ssl://localhost:8883";
         TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
-        runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, brokerURI);
+        runner.setVariable("brokerURI", brokerURI);
+        runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "${brokerURI}");
         runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
         runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
         runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");

Reply via email to