This is an automated email from the ASF dual-hosted git repository. martinzink pushed a commit to branch controller_c_api_requirements in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit becaaa9bb58c0121afa5d4593e67b6600db79ed1 Author: Martin Zink <[email protected]> AuthorDate: Thu Feb 12 16:31:28 2026 +0100 MINIFICPP-2710 Refactor ControllerServiceNodeMap Closes #2098 Signed-off-by: Martin Zink <[email protected]> --- .../azure/tests/ListAzureBlobStorageTests.cpp | 129 ++++++++++----------- .../tests/unit/ControllerServiceTests.cpp | 6 +- libminifi/include/core/FlowConfiguration.h | 2 +- libminifi/include/core/ProcessContextImpl.h | 2 +- .../core/controller/ControllerServiceNodeMap.h | 19 ++- .../core/controller/ControllerServiceProvider.h | 10 +- .../ForwardingControllerServiceProvider.h | 4 +- .../controller/StandardControllerServiceProvider.h | 5 +- libminifi/src/core/FlowConfiguration.cpp | 4 +- libminifi/src/core/ProcessGroup.cpp | 2 +- .../core/controller/ControllerServiceNodeMap.cpp | 100 +++++++++------- .../core/controller/ControllerServiceProvider.cpp | 11 +- .../StandardControllerServiceProvider.cpp | 11 +- .../src/core/flow/StructuredConfiguration.cpp | 5 +- libminifi/test/libtest/unit/TestBase.cpp | 5 +- libminifi/test/unit/ProcessorConfigUtilsTests.cpp | 2 +- 16 files changed, 172 insertions(+), 145 deletions(-) diff --git a/extensions/azure/tests/ListAzureBlobStorageTests.cpp b/extensions/azure/tests/ListAzureBlobStorageTests.cpp index 31bdad909..8f837e57f 100644 --- a/extensions/azure/tests/ListAzureBlobStorageTests.cpp +++ b/extensions/azure/tests/ListAzureBlobStorageTests.cpp @@ -55,14 +55,14 @@ class ListAzureBlobStorageTestsFixture { plan_->addProcessor(std::move(list_azure_blob_storage_unique_ptr), "ListAzureBlobStorage", { {"success", "d"} }); auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true); - plan_->setProperty(logattribute, minifi::processors::LogAttribute::FlowFilesToLog, "0"); + CHECK(plan_->setProperty(logattribute, minifi::processors::LogAttribute::FlowFilesToLog, "0")); azure_storage_cred_service_ = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService"); } void setDefaultCredentials() { - plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY)); } ListAzureBlobStorageTestsFixture(ListAzureBlobStorageTestsFixture&&) = delete; @@ -87,82 +87,80 @@ namespace { using namespace std::literals::chrono_literals; TEST_CASE_METHOD(ListAzureBlobStorageTestsFixture, "Test credentials settings", "[azureStorageCredentials]") { - plan_->setProperty(list_azure_blob_storage_, "Container Name", CONTAINER_NAME); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Container Name", CONTAINER_NAME)); SECTION("No credentials are set") { REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception); } SECTION("No account key or SAS is set") { - plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception); } SECTION("Credentials set in Azure Storage Credentials Service") { - auto azure_storage_cred_service = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService"); - plan_->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(azure_storage_cred_service, "Storage Account Key", STORAGE_ACCOUNT_KEY); - plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService"); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Storage Account Key", STORAGE_ACCOUNT_KEY)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService")); test_controller_.runSession(plan_, true); auto passed_params = mock_blob_storage_ptr_->getPassedListParams(); REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY); } SECTION("Overriding credentials set in Azure Storage Credentials Service with connection string") { - auto azure_storage_cred_service = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService"); - plan_->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(azure_storage_cred_service, "Storage Account Key", STORAGE_ACCOUNT_KEY); - plan_->setProperty(azure_storage_cred_service, "Connection String", CONNECTION_STRING); - plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService"); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Storage Account Key", STORAGE_ACCOUNT_KEY)); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Connection String", CONNECTION_STRING)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService")); test_controller_.runSession(plan_, true); auto passed_params = mock_blob_storage_ptr_->getPassedListParams(); REQUIRE(passed_params.credentials.buildConnectionString() == CONNECTION_STRING); } SECTION("Account name and key set in properties") { - plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY)); test_controller_.runSession(plan_, true); auto passed_params = mock_blob_storage_ptr_->getPassedListParams(); REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY); } SECTION("Account name and SAS token set in properties") { - plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(list_azure_blob_storage_, "SAS Token", SAS_TOKEN); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "SAS Token", SAS_TOKEN)); test_controller_.runSession(plan_, true); auto passed_params = mock_blob_storage_ptr_->getPassedListParams(); REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";SharedAccessSignature=" + SAS_TOKEN); } SECTION("Account name and SAS token with question mark set in properties") { - plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(list_azure_blob_storage_, "SAS Token", "?" + SAS_TOKEN); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "SAS Token", "?" + SAS_TOKEN)); test_controller_.runSession(plan_, true); auto passed_params = mock_blob_storage_ptr_->getPassedListParams(); REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";SharedAccessSignature=" + SAS_TOKEN); } SECTION("Endpoint suffix overriden") { - plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY); - plan_->setProperty(list_azure_blob_storage_, "Common Storage Account Endpoint Suffix", ENDPOINT_SUFFIX); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Common Storage Account Endpoint Suffix", ENDPOINT_SUFFIX)); test_controller_.runSession(plan_, true); auto passed_params = mock_blob_storage_ptr_->getPassedListParams(); REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY + ";EndpointSuffix=" + ENDPOINT_SUFFIX); } SECTION("Use connection string") { - plan_->setProperty(list_azure_blob_storage_, "Connection String", CONNECTION_STRING); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Connection String", CONNECTION_STRING)); test_controller_.runSession(plan_, true); auto passed_params = mock_blob_storage_ptr_->getPassedListParams(); REQUIRE(passed_params.credentials.buildConnectionString() == CONNECTION_STRING); } SECTION("Overriding credentials with connection string") { - plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY); - plan_->setProperty(list_azure_blob_storage_, "Connection String", CONNECTION_STRING); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Connection String", CONNECTION_STRING)); test_controller_.runSession(plan_, true); auto passed_params = mock_blob_storage_ptr_->getPassedListParams(); REQUIRE(passed_params.credentials.buildConnectionString() == CONNECTION_STRING); @@ -186,9 +184,9 @@ TEST_CASE_METHOD(ListAzureBlobStorageTestsFixture, "Test credentials settings", credential_configuration_strategy_string = "Workload Identity"; } - plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(list_azure_blob_storage_, "Credential Configuration Strategy", credential_configuration_strategy_string); - plan_->setProperty(list_azure_blob_storage_, "Managed Identity Client ID", managed_identity_client_id); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Credential Configuration Strategy", credential_configuration_strategy_string)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Managed Identity Client ID", managed_identity_client_id)); test_controller_.runSession(plan_, true); auto passed_params = mock_blob_storage_ptr_->getPassedListParams(); CHECK(passed_params.credentials.buildConnectionString().empty()); @@ -217,12 +215,11 @@ TEST_CASE_METHOD(ListAzureBlobStorageTestsFixture, "Test credentials settings", credential_configuration_strategy_string = "Workload Identity"; } - auto azure_storage_cred_service = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService"); - plan_->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(azure_storage_cred_service, "Credential Configuration Strategy", credential_configuration_strategy_string); - plan_->setProperty(azure_storage_cred_service, "Common Storage Account Endpoint Suffix", "core.chinacloudapi.cn"); - plan_->setProperty(azure_storage_cred_service, "Managed Identity Client ID", managed_identity_client_id); - plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService"); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Credential Configuration Strategy", credential_configuration_strategy_string)); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Common Storage Account Endpoint Suffix", "core.chinacloudapi.cn")); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Managed Identity Client ID", managed_identity_client_id)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService")); test_controller_.runSession(plan_, true); auto passed_params = mock_blob_storage_ptr_->getPassedListParams(); CHECK(passed_params.credentials.buildConnectionString().empty()); @@ -234,59 +231,55 @@ TEST_CASE_METHOD(ListAzureBlobStorageTestsFixture, "Test credentials settings", } SECTION("Azure Storage Credentials Service overrides properties") { - auto azure_storage_cred_service = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService"); - plan_->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(azure_storage_cred_service, "Storage Account Key", STORAGE_ACCOUNT_KEY); - plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService"); - plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY); - plan_->setProperty(list_azure_blob_storage_, "Connection String", CONNECTION_STRING); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Storage Account Key", STORAGE_ACCOUNT_KEY)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService")); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Connection String", CONNECTION_STRING)); test_controller_.runSession(plan_, true); auto passed_params = mock_blob_storage_ptr_->getPassedListParams(); REQUIRE(passed_params.credentials.buildConnectionString() == "AccountName=" + STORAGE_ACCOUNT_NAME + ";AccountKey=" + STORAGE_ACCOUNT_KEY); } SECTION("Azure Storage Credentials Service is set with invalid parameters") { - auto azure_storage_cred_service = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService"); - plan_->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService"); - plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService")); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY)); REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception); } SECTION("Azure Storage Credentials Service name is invalid") { - auto azure_storage_cred_service = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService"); - plan_->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(azure_storage_cred_service, "Storage Account Key", STORAGE_ACCOUNT_KEY); - plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "invalid_name"); - plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Storage Account Key", STORAGE_ACCOUNT_KEY)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "invalid_name")); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY)); REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception); } SECTION("Both SAS Token and Storage Account Key cannot be set in credentials service") { - auto azure_storage_cred_service = plan_->addController("AzureStorageCredentialsService", "AzureStorageCredentialsService"); - plan_->setProperty(azure_storage_cred_service, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(azure_storage_cred_service, "Storage Account Key", STORAGE_ACCOUNT_KEY); - plan_->setProperty(azure_storage_cred_service, "SAS Token", SAS_TOKEN); - plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService"); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(azure_storage_cred_service_, "Storage Account Key", STORAGE_ACCOUNT_KEY)); + CHECK(plan_->setProperty(azure_storage_cred_service_, "SAS Token", SAS_TOKEN)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Azure Storage Credentials Service", "AzureStorageCredentialsService")); REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception); } SECTION("Both SAS Token and Storage Account Key cannot be set in properties") { - plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME); - plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY); - plan_->setProperty(list_azure_blob_storage_, "SAS Token", SAS_TOKEN); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Name", STORAGE_ACCOUNT_NAME)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "Storage Account Key", STORAGE_ACCOUNT_KEY)); + CHECK(plan_->setProperty(list_azure_blob_storage_, "SAS Token", SAS_TOKEN)); REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception); } } TEST_CASE_METHOD(ListAzureBlobStorageTestsFixture, "List all files every time", "[ListAzureBlobStorage]") { setDefaultCredentials(); - plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::ContainerName, CONTAINER_NAME); - plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::Prefix, PREFIX); - plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::ListingStrategy, magic_enum::enum_name(minifi::azure::EntityTracking::none)); + CHECK(plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::ContainerName, CONTAINER_NAME)); + CHECK(plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::Prefix, PREFIX)); + CHECK(plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::ListingStrategy, magic_enum::enum_name(minifi::azure::EntityTracking::none))); test_controller_.runSession(plan_, true); using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; auto run_assertions = [this]() { @@ -319,9 +312,9 @@ TEST_CASE_METHOD(ListAzureBlobStorageTestsFixture, "List all files every time", TEST_CASE_METHOD(ListAzureBlobStorageTestsFixture, "Do not list same files the second time when timestamps are tracked", "[ListAzureBlobStorage]") { setDefaultCredentials(); - plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::ContainerName, CONTAINER_NAME); - plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::Prefix, PREFIX); - plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::ListingStrategy, magic_enum::enum_name(minifi::azure::EntityTracking::timestamps)); + CHECK(plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::ContainerName, CONTAINER_NAME)); + CHECK(plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::Prefix, PREFIX)); + CHECK(plan_->setProperty(list_azure_blob_storage_, minifi::azure::processors::ListAzureBlobStorage::ListingStrategy, magic_enum::enum_name(minifi::azure::EntityTracking::timestamps))); test_controller_.runSession(plan_, true); using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; auto passed_params = mock_blob_storage_ptr_->getPassedListParams(); diff --git a/extensions/standard-processors/tests/unit/ControllerServiceTests.cpp b/extensions/standard-processors/tests/unit/ControllerServiceTests.cpp index 705d0fd62..0f981582c 100644 --- a/extensions/standard-processors/tests/unit/ControllerServiceTests.cpp +++ b/extensions/standard-processors/tests/unit/ControllerServiceTests.cpp @@ -41,13 +41,13 @@ TEST_CASE("Test ControllerServicesMap", "[cs1]") { auto service = std::make_shared<core::controller::ControllerService>("", utils::Identifier{}, std::make_unique<MockControllerService>()); auto testNode = std::make_shared<core::controller::StandardControllerServiceNode>(service, "ID", std::make_shared<minifi::ConfigureImpl>()); - map.put("ID", testNode); + map.put("ID", testNode, nullptr); REQUIRE(1 == map.getAllControllerServices().size()); REQUIRE(nullptr != map.get("ID")); - REQUIRE(false== map.put("", testNode)); - REQUIRE(false== map.put("", nullptr)); + REQUIRE(false== map.put("", testNode, nullptr)); + REQUIRE(false== map.put("", nullptr, nullptr)); // ensure the pointer is the same diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index b79c87bfe..8ae3e5330 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -90,7 +90,7 @@ class FlowConfiguration : public CoreComponentImpl { static std::unique_ptr<core::ProcessGroup> createSimpleProcessGroup(const std::string &name, const utils::Identifier &uuid, int version); static std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(const std::string &name, const utils::Identifier &uuid); - std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &name, const utils::Identifier &uuid); + std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &name, const utils::Identifier &uuid, ProcessGroup* parent); // Create Connection [[nodiscard]] std::unique_ptr<minifi::Connection> createConnection(const std::string &name, const utils::Identifier &uuid) const; diff --git a/libminifi/include/core/ProcessContextImpl.h b/libminifi/include/core/ProcessContextImpl.h index 19d9464fe..06511d237 100644 --- a/libminifi/include/core/ProcessContextImpl.h +++ b/libminifi/include/core/ProcessContextImpl.h @@ -134,7 +134,7 @@ class ProcessContextImpl : public core::VariableRegistryImpl, public virtual Pro /* Function to help creating a state storage */ auto create_provider = [&](const std::string& type, const std::unordered_map<std::string, std::string>& extraProperties) -> std::shared_ptr<core::StateStorage> { - auto new_node = controller_service_provider->createControllerService(type, DefaultStateStorageName); + auto new_node = controller_service_provider->createControllerService(type, DefaultStateStorageName, nullptr, std::nullopt); if (new_node == nullptr) { return nullptr; } new_node->initialize(); auto storage = new_node->getControllerServiceImplementation(); diff --git a/libminifi/include/core/controller/ControllerServiceNodeMap.h b/libminifi/include/core/controller/ControllerServiceNodeMap.h index 27a47e104..1ee21a50f 100644 --- a/libminifi/include/core/controller/ControllerServiceNodeMap.h +++ b/libminifi/include/core/controller/ControllerServiceNodeMap.h @@ -43,18 +43,25 @@ class ControllerServiceNodeMap { ControllerServiceNode* get(const std::string &id) const; ControllerServiceNode* get(const std::string &id, const utils::Identifier &processor_or_controller_uuid) const; - bool put(const std::string &id, const std::shared_ptr<ControllerServiceNode> &serviceNode); - bool put(const std::string &id, ProcessGroup* process_group); + bool put(std::string id, std::shared_ptr<ControllerServiceNode> controller_service_node, ProcessGroup* parent_group); + + bool registerAlternativeKey(std::string primary_key, std::string alternative_key); void clear(); std::vector<std::shared_ptr<ControllerServiceNode>> getAllControllerServices() const; protected: mutable std::mutex mutex_; - // Map of controller service id to the controller service node - std::map<std::string, std::shared_ptr<ControllerServiceNode>> controller_service_nodes_; - // Map of controller service id to the process group that contains it - std::map<std::string, gsl::not_null<ProcessGroup*>> process_groups_; + + struct ServiceEntry { + std::shared_ptr<ControllerServiceNode> controller_service_node; + ProcessGroup* parent_group; + }; + + const ServiceEntry* getEntry(std::string_view primary_key, const std::scoped_lock<std::mutex>& mutex) const; + + std::map<std::string, ServiceEntry, std::less<>> services_; + std::map<std::string, std::string, std::less<>> alternative_keys; }; } // namespace controller diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h index 72b461052..ccc59b77f 100644 --- a/libminifi/include/core/controller/ControllerServiceProvider.h +++ b/libminifi/include/core/controller/ControllerServiceProvider.h @@ -68,7 +68,10 @@ class ControllerServiceProvider : public CoreComponentImpl, public ConfigurableC return controller_map_->get(id, processor_or_controller_uuid); } - virtual std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &id) = 0; + virtual std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, + const std::string &id, + ProcessGroup* parent, + const std::optional<std::string>& alternative_key) = 0; virtual void clearControllerServices() = 0; @@ -83,7 +86,10 @@ class ControllerServiceProvider : public CoreComponentImpl, public ConfigurableC std::shared_ptr<ControllerService> getControllerService(const std::string &identifier) const override; std::shared_ptr<ControllerService> getControllerService(const std::string &identifier, const utils::Identifier &processor_uuid) const override; - virtual void putControllerServiceNode(const std::string& identifier, const std::shared_ptr<ControllerServiceNode>& controller_service_node, ProcessGroup* process_group); + virtual void putControllerServiceNode(const std::string& primary_key, + const std::shared_ptr<ControllerServiceNode>& controller_service_node, + ProcessGroup* process_group, + const std::optional<std::string>& alternative_key); bool supportsDynamicProperties() const final { return false; diff --git a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h index 45cce2aef..e331bd8e3 100644 --- a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h +++ b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h @@ -31,8 +31,8 @@ class ForwardingControllerServiceProvider : public ControllerServiceProvider { public: using ControllerServiceProvider::ControllerServiceProvider; - std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &id) override { - return controller_service_provider_impl_->createControllerService(type, id); + std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &id, ProcessGroup* parent, const std::optional<std::string>& alternative_key) override { + return controller_service_provider_impl_->createControllerService(type, id, parent, alternative_key); } ControllerServiceNode* getControllerServiceNode(const std::string &id) const override { diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h index 2fe6c95c3..9136522ee 100644 --- a/libminifi/include/core/controller/StandardControllerServiceProvider.h +++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h @@ -48,7 +48,10 @@ class StandardControllerServiceProvider : public ControllerServiceProvider { stopEnableRetryThread(); } - std::shared_ptr<ControllerServiceNode> createControllerService(const std::string& type, const std::string& id) override; + std::shared_ptr<ControllerServiceNode> createControllerService(const std::string& type, + const std::string& id, + ProcessGroup* parent_group, + const std::optional<std::string>& alternative_key) override; void enableAllControllerServices() override; void disableAllControllerServices() override; void clearControllerServices() override; diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 11f58bd10..aae42ca4e 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -195,8 +195,8 @@ std::unique_ptr<minifi::Connection> FlowConfiguration::createConnection(const st } std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, - const utils::Identifier& uuid) { - std::shared_ptr<core::controller::ControllerServiceNode> controllerServicesNode = service_provider_->createControllerService(class_name, name); + const utils::Identifier& uuid, ProcessGroup* parent) { + std::shared_ptr<core::controller::ControllerServiceNode> controllerServicesNode = service_provider_->createControllerService(class_name, name, parent, uuid.to_string()); if (nullptr != controllerServicesNode) controllerServicesNode->setUUID(uuid); return controllerServicesNode; diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index d8e8ea066..c88709e68 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -279,7 +279,7 @@ Processor* ProcessGroup::findProcessorByName(const std::string &processorName, T } void ProcessGroup::addControllerService(const std::string &nodeId, const std::shared_ptr<core::controller::ControllerServiceNode> &node) { - controller_service_map_.put(nodeId, node); + controller_service_map_.put(nodeId, node, this); } core::controller::ControllerServiceNode* ProcessGroup::findControllerService(const std::string &nodeId, Traverse traverse) const { diff --git a/libminifi/src/core/controller/ControllerServiceNodeMap.cpp b/libminifi/src/core/controller/ControllerServiceNodeMap.cpp index 4ad00dd4f..f66316e39 100644 --- a/libminifi/src/core/controller/ControllerServiceNodeMap.cpp +++ b/libminifi/src/core/controller/ControllerServiceNodeMap.cpp @@ -17,81 +17,93 @@ */ #include "core/controller/ControllerServiceNodeMap.h" + +#include <ranges> + #include "core/ProcessGroup.h" namespace org::apache::nifi::minifi::core::controller { ControllerServiceNode* ControllerServiceNodeMap::get(const std::string &id) const { - std::lock_guard<std::mutex> lock(mutex_); - auto exists = controller_service_nodes_.find(id); - if (exists != controller_service_nodes_.end()) - return exists->second.get(); - else - return nullptr; + const std::scoped_lock lock(mutex_); + if (const auto entry = getEntry(id, lock)) { + return entry->controller_service_node.get(); + } + return nullptr; } ControllerServiceNode* ControllerServiceNodeMap::get(const std::string &id, const utils::Identifier& processor_or_controller_uuid) const { - std::lock_guard<std::mutex> lock(mutex_); - ControllerServiceNode* controller = nullptr; - auto exists = controller_service_nodes_.find(id); - if (exists != controller_service_nodes_.end()) { - controller = exists->second.get(); - } else { + const std::scoped_lock lock(mutex_); + const auto entry = getEntry(id, lock); + if (!entry || !entry->parent_group) { return nullptr; } - auto process_group_of_controller_exists = process_groups_.find(id); - ProcessGroup* process_group = nullptr; - if (process_group_of_controller_exists != process_groups_.end()) { - process_group = process_group_of_controller_exists->second; - } else { - return nullptr; - } - if (process_group->findProcessorById(processor_or_controller_uuid, ProcessGroup::Traverse::IncludeChildren)) { - return controller; + if (entry->parent_group->findProcessorById(processor_or_controller_uuid, ProcessGroup::Traverse::IncludeChildren)) { + return entry->controller_service_node.get(); } - if (process_group->findControllerService(processor_or_controller_uuid.to_string(), ProcessGroup::Traverse::IncludeChildren)) { - return controller; + if (entry->parent_group->findControllerService(processor_or_controller_uuid.to_string(), ProcessGroup::Traverse::IncludeChildren)) { + return entry->controller_service_node.get(); } return nullptr; } -bool ControllerServiceNodeMap::put(const std::string &id, const std::shared_ptr<ControllerServiceNode> &serviceNode) { - if (id.empty() || serviceNode == nullptr) +bool ControllerServiceNodeMap::put(std::string id, std::shared_ptr<ControllerServiceNode> controller_service_node, + ProcessGroup* parent_group) { + std::scoped_lock lock(mutex_); + if (id.empty() || controller_service_node == nullptr || alternative_keys.contains(id)) { return false; - std::lock_guard<std::mutex> lock(mutex_); - controller_service_nodes_[id] = serviceNode; - return true; + } + auto [_it, success] = services_.emplace(std::move(id), ServiceEntry{.controller_service_node = std::move(controller_service_node), .parent_group = parent_group}); + return success; } -bool ControllerServiceNodeMap::put(const std::string &id, ProcessGroup* process_group) { - if (id.empty() || process_group == nullptr) - return false; - std::lock_guard<std::mutex> lock(mutex_); - process_groups_.emplace(id, gsl::make_not_null(process_group)); - return true; -} void ControllerServiceNodeMap::clear() { - std::lock_guard<std::mutex> lock(mutex_); - for (const auto& [id, node] : controller_service_nodes_) { - node->disable(); + std::scoped_lock lock(mutex_); + for (const auto& node: services_ | std::views::values) { + node.controller_service_node->disable(); } - controller_service_nodes_.clear(); - process_groups_.clear(); + services_.clear(); + alternative_keys.clear(); } std::vector<std::shared_ptr<ControllerServiceNode>> ControllerServiceNodeMap::getAllControllerServices() const { - std::lock_guard<std::mutex> lock(mutex_); + std::scoped_lock lock(mutex_); std::vector<std::shared_ptr<ControllerServiceNode>> services; - services.reserve(controller_service_nodes_.size()); - for (const auto& [id, node] : controller_service_nodes_) { - services.push_back(node); + services.reserve(services_.size()); + for (const auto& [controller_service_node, _parent_group]: services_ | std::views::values) { + services.push_back(controller_service_node); } return services; } +const ControllerServiceNodeMap::ServiceEntry* ControllerServiceNodeMap::getEntry(const std::string_view key, const std::scoped_lock<std::mutex>&) const { + const auto it = services_.find(key); + if (it != services_.end()) { + return &it->second; + } + const auto primary_key_it = alternative_keys.find(key); + if (primary_key_it == alternative_keys.end()) { + return nullptr; + } + const auto it_from_primary = services_.find(primary_key_it->second); + gsl_Expects(it_from_primary != services_.end()); + return &it_from_primary->second; +} + + +bool ControllerServiceNodeMap::registerAlternativeKey(std::string primary_key, std::string alternative_key) { + std::scoped_lock lock(mutex_); + if (!services_.contains(primary_key) || services_.contains(alternative_key) || alternative_keys.contains(alternative_key)) { + return false; + } + + auto [_it, success] = alternative_keys.emplace(std::move(alternative_key), std::move(primary_key)); + return success; +} + } // namespace org::apache::nifi::minifi::core::controller diff --git a/libminifi/src/core/controller/ControllerServiceProvider.cpp b/libminifi/src/core/controller/ControllerServiceProvider.cpp index 288929ccb..28e60c22b 100644 --- a/libminifi/src/core/controller/ControllerServiceProvider.cpp +++ b/libminifi/src/core/controller/ControllerServiceProvider.cpp @@ -43,10 +43,15 @@ std::shared_ptr<ControllerService> ControllerServiceProvider::getControllerServi } } -void ControllerServiceProvider::putControllerServiceNode(const std::string& identifier, const std::shared_ptr<ControllerServiceNode>& controller_service_node, ProcessGroup* process_group) { +void ControllerServiceProvider::putControllerServiceNode(const std::string& primary_key, + const std::shared_ptr<ControllerServiceNode>& controller_service_node, + ProcessGroup* process_group, + const std::optional<std::string>& alternative_key) { gsl_Expects(controller_map_); - controller_map_->put(identifier, controller_service_node); - controller_map_->put(identifier, process_group); + controller_map_->put(primary_key, controller_service_node, process_group); + if (alternative_key) { + controller_map_->registerAlternativeKey(primary_key, *alternative_key); + } } } // namespace org::apache::nifi::minifi::core::controller diff --git a/libminifi/src/core/controller/StandardControllerServiceProvider.cpp b/libminifi/src/core/controller/StandardControllerServiceProvider.cpp index 28cb21cde..d93c4cd6b 100644 --- a/libminifi/src/core/controller/StandardControllerServiceProvider.cpp +++ b/libminifi/src/core/controller/StandardControllerServiceProvider.cpp @@ -24,7 +24,10 @@ using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core::controller { -std::shared_ptr<ControllerServiceNode> StandardControllerServiceProvider::createControllerService(const std::string& type, const std::string& id) { +std::shared_ptr<ControllerServiceNode> StandardControllerServiceProvider::createControllerService(const std::string& type, + const std::string& id, + ProcessGroup* parent_group, + const std::optional<std::string>& alternative_key) { std::shared_ptr<ControllerService> new_controller_service = extension_loader_.instantiate<ControllerService>(type, id); if (!new_controller_service) { @@ -35,7 +38,11 @@ std::shared_ptr<ControllerServiceNode> StandardControllerServiceProvider::create sharedFromThis<ControllerServiceProvider>(), id, configuration_); - controller_map_->put(id, new_service_node); + controller_map_->put(id, new_service_node, parent_group); + if (alternative_key) { + controller_map_->registerAlternativeKey(id, *alternative_key); + } + return new_service_node; } diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp index db738846a..2a364ce86 100644 --- a/libminifi/src/core/flow/StructuredConfiguration.cpp +++ b/libminifi/src/core/flow/StructuredConfiguration.cpp @@ -656,7 +656,7 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser utils::Identifier uuid; uuid = id; - std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node = createControllerService(type, name, uuid); + std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node = createControllerService(type, name, uuid, parent_group); if (nullptr != controller_service_node) { logger_->log_debug("Created Controller Service with UUID {} and name {}", id, name); controller_service_node->initialize(); @@ -668,9 +668,6 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser } } - service_provider_->putControllerServiceNode(id, controller_service_node, parent_group); - service_provider_->putControllerServiceNode(name, controller_service_node, parent_group); - parent_group->addControllerService(controller_service_node->getName(), controller_service_node); parent_group->addControllerService(controller_service_node->getUUIDStr(), controller_service_node); } else { diff --git a/libminifi/test/libtest/unit/TestBase.cpp b/libminifi/test/libtest/unit/TestBase.cpp index 0cf593b34..e205ba4a6 100644 --- a/libminifi/test/libtest/unit/TestBase.cpp +++ b/libminifi/test/libtest/unit/TestBase.cpp @@ -367,7 +367,7 @@ std::shared_ptr<minifi::core::controller::ControllerServiceNode> TestPlan::addCo minifi::utils::Identifier uuid = minifi::utils::IdGenerator::getIdGenerator()->generate(); std::shared_ptr<minifi::core::controller::ControllerServiceNode> controller_service_node = - controller_services_provider_->createControllerService(controller_name, name); + controller_services_provider_->createControllerService(controller_name, name, root_process_group_.get(), uuid.to_string()); if (controller_service_node == nullptr) { return nullptr; } @@ -378,9 +378,6 @@ std::shared_ptr<minifi::core::controller::ControllerServiceNode> TestPlan::addCo controller_service_node->setUUID(uuid); controller_service_node->setName(name); - controller_services_provider_->putControllerServiceNode(uuid.to_string(), controller_service_node, root_process_group_.get()); - controller_services_provider_->putControllerServiceNode(name, controller_service_node, root_process_group_.get()); - root_process_group_->addControllerService(uuid.to_string(), controller_service_node); return controller_service_node; diff --git a/libminifi/test/unit/ProcessorConfigUtilsTests.cpp b/libminifi/test/unit/ProcessorConfigUtilsTests.cpp index 298faa228..71004d240 100644 --- a/libminifi/test/unit/ProcessorConfigUtilsTests.cpp +++ b/libminifi/test/unit/ProcessorConfigUtilsTests.cpp @@ -111,7 +111,7 @@ class WrongTestControllerService : public TestControllerService {}; class TestControllerServiceProvider : public controller::ControllerServiceProvider { public: using ControllerServiceProvider::ControllerServiceProvider; - std::shared_ptr<controller::ControllerServiceNode> createControllerService(const std::string&, const std::string&) override { return nullptr; } + std::shared_ptr<controller::ControllerServiceNode> createControllerService(const std::string&, const std::string&, ProcessGroup*, const std::optional<std::string>&) override { return nullptr; } void clearControllerServices() override {} void enableAllControllerServices() override {} void disableAllControllerServices() override {}
