This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 1fd592fd0c1ed509432431c8b7812e52434aa8bf Author: Gabor Gyimesi <[email protected]> AuthorDate: Tue Sep 30 16:26:41 2025 +0200 MINIFICPP-2605 Generate MQTT client id in MQTT processors if not set in property Closes #2006 Signed-off-by: Marton Szasz <[email protected]> --- PROCESSORS.md | 4 ++-- .../mqtt/processors/AbstractMQTTProcessor.cpp | 8 +++---- extensions/mqtt/processors/AbstractMQTTProcessor.h | 5 ++-- extensions/mqtt/processors/ConsumeMQTT.cpp | 10 -------- extensions/mqtt/tests/ConsumeMQTTTests.cpp | 28 ---------------------- extensions/mqtt/tests/PublishMQTTTests.cpp | 8 ------- 6 files changed, 9 insertions(+), 54 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index 6749be87b..5204c2f67 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -402,7 +402,7 @@ In the list below, the names of required properties appear in bold. Any other pr | Name | Default Value | Allowable Values | Description | |-----------------------------|---------------|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------| | **Broker URI** | | | The URI to use to connect to the MQTT broker | -| Client ID | | | MQTT client ID to use. WARNING: Must not be empty when using MQTT 3.1.0! | +| Client ID | | | MQTT client ID to use. If not set, a UUID will be generated. | | **MQTT Version** | 3.x AUTO | 3.x AUTO<br/>3.1.0<br/>3.1.1<br/>5.0 | The MQTT specification version when connecting to the broker. | | **Topic** | | | The topic to subscribe to. | | Clean Session | true | true<br/>false | Whether to start afresh rather than remembering previous subscriptions. If true, then make broker forget subscriptions after disconnected. MQTT 3.x only. | @@ -2218,7 +2218,7 @@ In the list below, the names of required properties appear in bold. Any other pr | Name | Default Value | Allowable Values | Description | |-------------------------|---------------|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------| | **Broker URI** | | | The URI to use to connect to the MQTT broker | -| Client ID | | | MQTT client ID to use. WARNING: Must not be empty when using MQTT 3.1.0! | +| Client ID | | | MQTT client ID to use. If not set, a UUID will be generated. | | **MQTT Version** | 3.x AUTO | 3.x AUTO<br/>3.1.0<br/>3.1.1<br/>5.0 | The MQTT specification version when connecting to the broker. | | **Topic** | | | The topic to publish to.<br/>**Supports Expression Language: true** | | Retain | false | true<br/>false | Retain published message in broker | diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp index 4d132baf1..c1616bf5d 100644 --- a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp +++ b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp @@ -30,9 +30,9 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext& context, core::Proc mqtt_version_ = utils::parseEnumProperty<mqtt::MqttVersions>(context, MqttVersion); if (auto value = context.getProperty(ClientID)) { - clientID_ = std::move(*value); - } else if (mqtt_version_ == mqtt::MqttVersions::V_3_1_0) { - throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT 3.1.0 specification does not support empty client IDs"); + client_id_ = std::move(*value); + } else { + client_id_ = utils::IdGenerator::getIdGenerator()->generate().to_string(); } if (auto value = context.getProperty(Username)) { @@ -120,7 +120,7 @@ void AbstractMQTTProcessor::initializeClient() { if (mqtt_version_ == mqtt::MqttVersions::V_5_0) { options.MQTTVersion = MQTTVERSION_5; } - if (MQTTAsync_createWithOptions(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr, &options) != MQTTASYNC_SUCCESS) { + if (MQTTAsync_createWithOptions(&client_, uri_.c_str(), client_id_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr, &options) != MQTTASYNC_SUCCESS) { throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Creating MQTT client failed"); } } diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h b/extensions/mqtt/processors/AbstractMQTTProcessor.h index 8fdc4f045..222777001 100644 --- a/extensions/mqtt/processors/AbstractMQTTProcessor.h +++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h @@ -97,7 +97,8 @@ class AbstractMQTTProcessor : public core::ProcessorImpl { .isRequired(true) .build(); EXTENSIONAPI static constexpr auto ClientID = core::PropertyDefinitionBuilder<>::createProperty("Client ID") - .withDescription("MQTT client ID to use. WARNING: Must not be empty when using MQTT 3.1.0!") + .withDescription("MQTT client ID to use. If not set, a UUID will be generated.") + .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR) .build(); EXTENSIONAPI static constexpr auto QoS = core::PropertyDefinitionBuilder<magic_enum::enum_count<mqtt::MqttQoS>()>::createProperty("Quality of Service") .withDescription("The Quality of Service (QoS) of messages.") @@ -236,7 +237,7 @@ class AbstractMQTTProcessor : public core::ProcessorImpl { std::chrono::seconds keep_alive_interval_{60}; std::chrono::seconds connection_timeout_{10}; mqtt::MqttQoS qos_{mqtt::MqttQoS::LEVEL_0}; - std::string clientID_; + std::string client_id_; std::string username_; std::string password_; mqtt::MqttVersions mqtt_version_{mqtt::MqttVersions::V_3X_AUTO}; diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp b/extensions/mqtt/processors/ConsumeMQTT.cpp index b354eef22..a2f8be2a6 100644 --- a/extensions/mqtt/processors/ConsumeMQTT.cpp +++ b/extensions/mqtt/processors/ConsumeMQTT.cpp @@ -231,16 +231,6 @@ void ConsumeMQTT::checkProperties(core::ProcessContext& context) { logger_->log_warn("MQTT 5.0 specification does not support Clean Session. Property is not used."); } - if (clientID_.empty()) { - if (mqtt_version_ == mqtt::MqttVersions::V_5_0) { - if (session_expiry_interval_ > std::chrono::seconds(0)) { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client ID for durable (Session Expiry Interval > 0) sessions"); - } - } else if (!clean_session_) { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client ID for durable (non-clean) sessions"); - } - } - if (qos_ == mqtt::MqttQoS::LEVEL_0) { if (mqtt_version_ == mqtt::MqttVersions::V_5_0) { if (session_expiry_interval_ > std::chrono::seconds(0)) { diff --git a/extensions/mqtt/tests/ConsumeMQTTTests.cpp b/extensions/mqtt/tests/ConsumeMQTTTests.cpp index 488cbfa21..6e22381c6 100644 --- a/extensions/mqtt/tests/ConsumeMQTTTests.cpp +++ b/extensions/mqtt/tests/ConsumeMQTTTests.cpp @@ -58,27 +58,6 @@ TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_EmptyBrokerURI", "[consumeMQTTTest]") Catch::Matchers::EndsWith("Expected valid value from \"consumeMqttProcessor::Broker URI\", but got PropertyNotSet (Property Error:2)")); } -TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithoutID", "[consumeMQTTTest]") { - REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883")); - REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic")); - REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS.name, "1")); - REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession.name, "false")); - - REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_), - Catch::Matchers::EndsWith("Processor must have a Client ID for durable (non-clean) sessions")); -} - -TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithoutID_V_5", "[consumeMQTTTest]") { - REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883")); - REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic")); - REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS.name, "1")); - REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion.name, std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)})); - REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "1 h")); - - REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_), - Catch::Matchers::EndsWith("Processor must have a Client ID for durable (Session Expiry Interval > 0) sessions")); -} - TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithID", "[consumeMQTTTest]") { REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883")); REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber")); @@ -131,13 +110,6 @@ TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithQoS0_V_5", "[consum "by the broker when QoS is less than 1 for durable (Session Expiry Interval > 0) sessions. Only subscriptions are preserved.", 1s)); } -TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_EmptyClientID_V_3_1_0", "[consumeMQTTTest]") { - REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic")); - REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883")); - REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion.name, std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_3_1_0)})); - REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_), Catch::Matchers::EndsWith("MQTT 3.1.0 specification does not support empty client IDs")); -} - TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_CleanStart_V_3", "[consumeMQTTTest]") { REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883")); REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic")); diff --git a/extensions/mqtt/tests/PublishMQTTTests.cpp b/extensions/mqtt/tests/PublishMQTTTests.cpp index dd1d569f1..bcc84f65f 100644 --- a/extensions/mqtt/tests/PublishMQTTTests.cpp +++ b/extensions/mqtt/tests/PublishMQTTTests.cpp @@ -60,14 +60,6 @@ TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyBrokerURI", "[publishMQTTTest]") Catch::Matchers::EndsWith("Expected valid value from \"publishMqttProcessor::Broker URI\", but got PropertyNotSet (Property Error:2)")); } -TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyClientID_V_3_1_0", "[publishMQTTTest]") { - REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::Topic.name, "mytopic")); - REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883")); - REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion.name, std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_3_1_0)})); - REQUIRE_THROWS_WITH(plan_->scheduleProcessor(publishMqttProcessor_), - Catch::Matchers::EndsWith("MQTT 3.1.0 specification does not support empty client IDs")); -} - TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyClientID_V_3", "[publishMQTTTest]") { REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::Topic.name, "mytopic")); REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
