This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 23b04ae  NIFI-6856 - Make client ID a non-required field for the 
MQTTConsume and MQTTProduce processors. Generates a random ID if not set.
23b04ae is described below

commit 23b04ae96863fc3b0fd3b268bcb7d155edf17f26
Author: Justin Miller <[email protected]>
AuthorDate: Fri Nov 8 11:34:57 2019 -0600

    NIFI-6856 - Make client ID a non-required field for the MQTTConsume and 
MQTTProduce processors. Generates a
    random ID if not set.
    
    Also add group ID field to ConsumeMQTT processor. Allows consumer to join 
consumer group at $share/<group_id>/<topic_filter>
    
    add expression language support for the MQTT client ID
    
    Setting client id in publish test fails because it is not a flowfile 
attribute.
    Remove client id and autogenerate it when testing.
    
    Since the evaluation is done in onScheduled, there is no flow file 
available and we're not using the attributes to make the expression language 
evaluation. You can change the scope to use the Variable Registry.
    
    Co-Authored-By: Pierre Villard <[email protected]>
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #3879.
---
 .../apache/nifi/processors/mqtt/ConsumeMQTT.java   | 22 +++++++++++++++++++++-
 .../mqtt/common/AbstractMQTTProcessor.java         | 13 ++++++++++---
 .../nifi/processors/mqtt/TestPublishMQTT.java      |  1 -
 3 files changed, 31 insertions(+), 5 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index f0cba72..94d5397 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -90,6 +90,13 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
     public final static String IS_DUPLICATE_ATTRIBUTE_KEY =  
"mqtt.isDuplicate";
     public final static String IS_RETAINED_ATTRIBUTE_KEY =  "mqtt.isRetained";
 
+    public static final PropertyDescriptor PROP_GROUPID = new 
PropertyDescriptor.Builder()
+            .name("Group ID")
+            .description("MQTT consumer group ID to use. If group ID not set, 
client will connect as individual consumer.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     public static final PropertyDescriptor PROP_TOPIC_FILTER = new 
PropertyDescriptor.Builder()
             .name("Topic Filter")
             .description("The MQTT topic filter to designate the topics to 
subscribe to.")
@@ -121,6 +128,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
     private volatile long maxQueueSize;
 
     private volatile int qos;
+    private volatile String topicPrefix = "";
     private volatile String topicFilter;
     private final AtomicBoolean scheduled = new AtomicBoolean(false);
 
@@ -136,6 +144,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
 
     static{
         final List<PropertyDescriptor> innerDescriptorsList = 
getAbstractPropertyDescriptors();
+        innerDescriptorsList.add(PROP_GROUPID);
         innerDescriptorsList.add(PROP_TOPIC_FILTER);
         innerDescriptorsList.add(PROP_QOS);
         innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE);
@@ -184,6 +193,12 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
                     .build());
         }
 
+        final boolean clientIDSet = context.getProperty(PROP_CLIENTID).isSet();
+        final boolean groupIDSet = context.getProperty(PROP_GROUPID).isSet();
+        if (clientIDSet && groupIDSet) {
+            results.add(new ValidationResult.Builder().subject("Client ID and 
Group ID").valid(false).explanation("if client ID is not unique, multiple nodes 
cannot join the consumer group").build());
+        }
+
         return results;
     }
 
@@ -208,6 +223,11 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
         qos = context.getProperty(PROP_QOS).asInteger();
         maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong();
         topicFilter = context.getProperty(PROP_TOPIC_FILTER).getValue();
+
+        if (context.getProperty(PROP_GROUPID).isSet()) {
+            topicPrefix = "$share/" + 
context.getProperty(PROP_GROUPID).getValue() + "/";
+        }
+
         scheduled.set(true);
     }
 
@@ -266,7 +286,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor  
implements MqttCallback
             if (!mqttClient.isConnected()) {
                 logger.debug("Connecting client");
                 mqttClient.connect(connOpts);
-                mqttClient.subscribe(topicFilter, qos);
+                mqttClient.subscribe(topicPrefix + topicFilter, qos);
             }
         } catch (MqttException e) {
             logger.error("Connection to {} lost (or was never connected) and 
connection failed. Yielding processor", new Object[]{broker}, e);
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index a1e65f3..34c3e1f 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -23,6 +23,7 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -43,6 +44,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
+import java.util.UUID;
 
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
@@ -122,8 +124,9 @@ public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProces
 
     public static final PropertyDescriptor PROP_CLIENTID = new 
PropertyDescriptor.Builder()
             .name("Client ID")
-            .description("MQTT client ID to use")
-            .required(true)
+            .description("MQTT client ID to use. If not set, a UUID will be 
generated.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .build();
 
@@ -297,7 +300,11 @@ public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProces
 
     protected void onScheduled(final ProcessContext context){
         broker = context.getProperty(PROP_BROKER_URI).getValue();
-        clientID = context.getProperty(PROP_CLIENTID).getValue();
+        clientID = 
context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
+
+        if (clientID == null) {
+            clientID = UUID.randomUUID().toString();
+        }
 
         connOpts = new MqttConnectOptions();
         
connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
index 9916408..9c886d2 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
@@ -67,7 +67,6 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
         UnitTestablePublishMqtt proc = new UnitTestablePublishMqtt();
         testRunner = TestRunners.newTestRunner(proc);
         testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, 
"tcp://localhost:1883");
-        testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient");
         testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
         topic = "testTopic";
         testRunner.setProperty(PublishMQTT.PROP_TOPIC, topic);

Reply via email to