This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 1fd592fd0c1ed509432431c8b7812e52434aa8bf
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Tue Sep 30 16:26:41 2025 +0200

    MINIFICPP-2605 Generate MQTT client id in MQTT processors if not set in 
property
    
    Closes #2006
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 PROCESSORS.md                                      |  4 ++--
 .../mqtt/processors/AbstractMQTTProcessor.cpp      |  8 +++----
 extensions/mqtt/processors/AbstractMQTTProcessor.h |  5 ++--
 extensions/mqtt/processors/ConsumeMQTT.cpp         | 10 --------
 extensions/mqtt/tests/ConsumeMQTTTests.cpp         | 28 ----------------------
 extensions/mqtt/tests/PublishMQTTTests.cpp         |  8 -------
 6 files changed, 9 insertions(+), 54 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 6749be87b..5204c2f67 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -402,7 +402,7 @@ In the list below, the names of required properties appear 
in bold. Any other pr
 | Name                        | Default Value | Allowable Values               
      | Description                                                             
                                                                                
  |
 
|-----------------------------|---------------|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
 | **Broker URI**              |               |                                
      | The URI to use to connect to the MQTT broker                            
                                                                                
  |
-| Client ID                   |               |                                
      | MQTT client ID to use. WARNING: Must not be empty when using MQTT 
3.1.0!                                                                          
        |
+| Client ID                   |               |                                
      | MQTT client ID to use. If not set, a UUID will be generated.            
                                                                                
  |
 | **MQTT Version**            | 3.x AUTO      | 3.x 
AUTO<br/>3.1.0<br/>3.1.1<br/>5.0 | The MQTT specification version when 
connecting to the broker.                                                       
                                      |
 | **Topic**                   |               |                                
      | The topic to subscribe to.                                              
                                                                                
  |
 | Clean Session               | true          | true<br/>false                 
      | Whether to start afresh rather than remembering previous subscriptions. 
If true, then make broker forget subscriptions after disconnected. MQTT 3.x 
only. |
@@ -2218,7 +2218,7 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | Name                    | Default Value | Allowable Values                   
  | Description                                                                 
                                                |
 
|-------------------------|---------------|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|
 | **Broker URI**          |               |                                    
  | The URI to use to connect to the MQTT broker                                
                                                |
-| Client ID               |               |                                    
  | MQTT client ID to use. WARNING: Must not be empty when using MQTT 3.1.0!    
                                                |
+| Client ID               |               |                                    
  | MQTT client ID to use. If not set, a UUID will be generated.                
                                                |
 | **MQTT Version**        | 3.x AUTO      | 3.x 
AUTO<br/>3.1.0<br/>3.1.1<br/>5.0 | The MQTT specification version when 
connecting to the broker.                                                       
        |
 | **Topic**               |               |                                    
  | The topic to publish to.<br/>**Supports Expression Language: true**         
                                                |
 | Retain                  | false         | true<br/>false                     
  | Retain published message in broker                                          
                                                |
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp 
b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
index 4d132baf1..c1616bf5d 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
@@ -30,9 +30,9 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext& 
context, core::Proc
   mqtt_version_ = utils::parseEnumProperty<mqtt::MqttVersions>(context, 
MqttVersion);
 
   if (auto value = context.getProperty(ClientID)) {
-    clientID_ = std::move(*value);
-  } else if (mqtt_version_ == mqtt::MqttVersions::V_3_1_0) {
-    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT 
3.1.0 specification does not support empty client IDs");
+    client_id_ = std::move(*value);
+  } else {
+    client_id_ = utils::IdGenerator::getIdGenerator()->generate().to_string();
   }
 
   if (auto value = context.getProperty(Username)) {
@@ -120,7 +120,7 @@ void AbstractMQTTProcessor::initializeClient() {
     if (mqtt_version_ == mqtt::MqttVersions::V_5_0) {
       options.MQTTVersion = MQTTVERSION_5;
     }
-    if (MQTTAsync_createWithOptions(&client_, uri_.c_str(), clientID_.c_str(), 
MQTTCLIENT_PERSISTENCE_NONE, nullptr, &options) != MQTTASYNC_SUCCESS) {
+    if (MQTTAsync_createWithOptions(&client_, uri_.c_str(), 
client_id_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr, &options) != 
MQTTASYNC_SUCCESS) {
       throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
"Creating MQTT client failed");
     }
   }
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h 
b/extensions/mqtt/processors/AbstractMQTTProcessor.h
index 8fdc4f045..222777001 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.h
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h
@@ -97,7 +97,8 @@ class AbstractMQTTProcessor : public core::ProcessorImpl {
       .isRequired(true)
       .build();
   EXTENSIONAPI static constexpr auto ClientID = 
core::PropertyDefinitionBuilder<>::createProperty("Client ID")
-      .withDescription("MQTT client ID to use. WARNING: Must not be empty when 
using MQTT 3.1.0!")
+      .withDescription("MQTT client ID to use. If not set, a UUID will be 
generated.")
+      .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR)
       .build();
   EXTENSIONAPI static constexpr auto QoS = 
core::PropertyDefinitionBuilder<magic_enum::enum_count<mqtt::MqttQoS>()>::createProperty("Quality
 of Service")
       .withDescription("The Quality of Service (QoS) of messages.")
@@ -236,7 +237,7 @@ class AbstractMQTTProcessor : public core::ProcessorImpl {
   std::chrono::seconds keep_alive_interval_{60};
   std::chrono::seconds connection_timeout_{10};
   mqtt::MqttQoS qos_{mqtt::MqttQoS::LEVEL_0};
-  std::string clientID_;
+  std::string client_id_;
   std::string username_;
   std::string password_;
   mqtt::MqttVersions mqtt_version_{mqtt::MqttVersions::V_3X_AUTO};
diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp 
b/extensions/mqtt/processors/ConsumeMQTT.cpp
index b354eef22..a2f8be2a6 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.cpp
+++ b/extensions/mqtt/processors/ConsumeMQTT.cpp
@@ -231,16 +231,6 @@ void ConsumeMQTT::checkProperties(core::ProcessContext& 
context) {
     logger_->log_warn("MQTT 5.0 specification does not support Clean Session. 
Property is not used.");
   }
 
-  if (clientID_.empty()) {
-    if (mqtt_version_ == mqtt::MqttVersions::V_5_0) {
-      if (session_expiry_interval_ > std::chrono::seconds(0)) {
-        throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a 
Client ID for durable (Session Expiry Interval > 0) sessions");
-      }
-    } else if (!clean_session_) {
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a 
Client ID for durable (non-clean) sessions");
-    }
-  }
-
   if (qos_ == mqtt::MqttQoS::LEVEL_0) {
     if (mqtt_version_ == mqtt::MqttVersions::V_5_0) {
       if (session_expiry_interval_ > std::chrono::seconds(0)) {
diff --git a/extensions/mqtt/tests/ConsumeMQTTTests.cpp 
b/extensions/mqtt/tests/ConsumeMQTTTests.cpp
index 488cbfa21..6e22381c6 100644
--- a/extensions/mqtt/tests/ConsumeMQTTTests.cpp
+++ b/extensions/mqtt/tests/ConsumeMQTTTests.cpp
@@ -58,27 +58,6 @@ TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_EmptyBrokerURI", 
"[consumeMQTTTest]")
       Catch::Matchers::EndsWith("Expected valid value from 
\"consumeMqttProcessor::Broker URI\", but got PropertyNotSet (Property 
Error:2)"));
 }
 
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithoutID", 
"[consumeMQTTTest]") {
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS.name,
 "1"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession.name,
 "false"));
-
-  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_),
-    Catch::Matchers::EndsWith("Processor must have a Client ID for durable 
(non-clean) sessions"));
-}
-
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithoutID_V_5", 
"[consumeMQTTTest]") {
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS.name,
 "1"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
 
std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval.name,
 "1 h"));
-
-  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_),
-                      Catch::Matchers::EndsWith("Processor must have a Client 
ID for durable (Session Expiry Interval > 0) sessions"));
-}
-
 TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithID", 
"[consumeMQTTTest]") {
   
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
   
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID.name,
 "subscriber"));
@@ -131,13 +110,6 @@ TEST_CASE_METHOD(Fixture, 
"ConsumeMQTTTest_DurableSessionWithQoS0_V_5", "[consum
                                                     "by the broker when QoS is 
less than 1 for durable (Session Expiry Interval > 0) sessions. Only 
subscriptions are preserved.", 1s));
 }
 
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_EmptyClientID_V_3_1_0", 
"[consumeMQTTTest]") {
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
 
std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_3_1_0)}));
-  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_), 
Catch::Matchers::EndsWith("MQTT 3.1.0 specification does not support empty 
client IDs"));
-}
-
 TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_CleanStart_V_3", 
"[consumeMQTTTest]") {
   
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
   
REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name,
 "mytopic"));
diff --git a/extensions/mqtt/tests/PublishMQTTTests.cpp 
b/extensions/mqtt/tests/PublishMQTTTests.cpp
index dd1d569f1..bcc84f65f 100644
--- a/extensions/mqtt/tests/PublishMQTTTests.cpp
+++ b/extensions/mqtt/tests/PublishMQTTTests.cpp
@@ -60,14 +60,6 @@ TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyBrokerURI", 
"[publishMQTTTest]")
       Catch::Matchers::EndsWith("Expected valid value from 
\"publishMqttProcessor::Broker URI\", but got PropertyNotSet (Property 
Error:2)"));
 }
 
-TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyClientID_V_3_1_0", 
"[publishMQTTTest]") {
-  
REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::Topic.name,
 "mytopic"));
-  
REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));
-  
REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
 
std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_3_1_0)}));
-  REQUIRE_THROWS_WITH(plan_->scheduleProcessor(publishMqttProcessor_),
-      Catch::Matchers::EndsWith("MQTT 3.1.0 specification does not support 
empty client IDs"));
-}
-
 TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyClientID_V_3", 
"[publishMQTTTest]") {
   
REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::Topic.name,
 "mytopic"));
   
REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name,
 "127.0.0.1:1883"));

Reply via email to