This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit ccccbef805e7fb11e926ede35af181c6ca61fd2a Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed Feb 12 17:11:45 2025 +0100 MINIFICPP-2525 Handle errors when enabling controllers Signed-off-by: Ferenc Gerlits <[email protected]> Closes #1933 --- controller/tests/ControllerTests.cpp | 2 +- .../controllerservices/CouchbaseClusterService.h | 4 +- .../tests/SmbConnectionControllerServiceTests.cpp | 2 +- libminifi/include/core/FlowConfiguration.h | 3 +- .../ForwardingControllerServiceProvider.h | 4 +- .../controller/StandardControllerServiceNode.h | 26 +-- .../controller/StandardControllerServiceProvider.h | 74 +++----- libminifi/src/core/FlowConfiguration.cpp | 18 +- .../core/controller/ControllerServiceNodeMap.cpp | 1 + .../controller/StandardControllerServiceNode.cpp | 51 ++++-- .../StandardControllerServiceProvider.cpp | 128 +++++++++++++ .../src/core/flow/StructuredConfiguration.cpp | 3 +- .../integration/C2ControllerEnableFailureTest.cpp | 197 +++++++++++++++++++++ libminifi/test/libtest/unit/ProvenanceTestHelper.h | 54 ------ libminifi/test/libtest/unit/TestBase.cpp | 2 +- .../test/resources/TestC2InvalidController.yml | 41 +++++ libminifi/test/resources/TestC2ValidController.yml | 41 +++++ libminifi/test/unit/SchedulingAgentTests.cpp | 1 - .../core/controller/ControllerServiceProvider.h | 2 +- utils/include/core/ProcessContext.h | 16 +- 20 files changed, 502 insertions(+), 168 deletions(-) diff --git a/controller/tests/ControllerTests.cpp b/controller/tests/ControllerTests.cpp index adbe80ca6..ab9debb68 100644 --- a/controller/tests/ControllerTests.cpp +++ b/controller/tests/ControllerTests.cpp @@ -212,7 +212,7 @@ class TestControllerServiceProvider : public core::controller::ControllerService return is_ssl_ ? ssl_context_service_ : nullptr; } - std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string&, const std::string&, const std::string&, bool) override { + std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string&, const std::string&) override { return nullptr; } void clearControllerServices() override { diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.h b/extensions/couchbase/controllerservices/CouchbaseClusterService.h index a6bfc30b1..0da173c91 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.h +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.h @@ -138,11 +138,11 @@ class CouchbaseClusterService : public core::controller::ControllerServiceImpl { void initialize() override; void yield() override { - }; + } bool isWorkAvailable() override { return false; - }; + } bool isRunning() const override { return getState() == core::controller::ControllerServiceState::ENABLED; diff --git a/extensions/smb/tests/SmbConnectionControllerServiceTests.cpp b/extensions/smb/tests/SmbConnectionControllerServiceTests.cpp index c9693dc46..217122fa3 100644 --- a/extensions/smb/tests/SmbConnectionControllerServiceTests.cpp +++ b/extensions/smb/tests/SmbConnectionControllerServiceTests.cpp @@ -35,7 +35,7 @@ struct SmbConnectionControllerServiceFixture { TEST_CASE_METHOD(SmbConnectionControllerServiceFixture, "SmbConnectionControllerService onEnable throws when empty") { - REQUIRE_THROWS(plan_->finalize()); + REQUIRE_THROWS(smb_connection_node_->getControllerServiceImplementation()->onEnable()); } TEST_CASE_METHOD(SmbConnectionControllerServiceFixture, "SmbConnectionControllerService anonymous connection") { diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index a4066f969..a85af2c61 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -88,8 +88,7 @@ class FlowConfiguration : public CoreComponentImpl { static std::unique_ptr<core::ProcessGroup> createSimpleProcessGroup(const std::string &name, const utils::Identifier &uuid, int version); static std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(const std::string &name, const utils::Identifier &uuid); - std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &full_class_name, const std::string &name, - const utils::Identifier &uuid); + std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &name, const utils::Identifier &uuid); // Create Connection [[nodiscard]] std::unique_ptr<minifi::Connection> createConnection(const std::string &name, const utils::Identifier &uuid) const; diff --git a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h index f31629f1c..09d91b00f 100644 --- a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h +++ b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h @@ -31,8 +31,8 @@ class ForwardingControllerServiceProvider : public ControllerServiceProviderImpl public: using ControllerServiceProviderImpl::ControllerServiceProviderImpl; - std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &longType, const std::string &id, bool firstTimeAdded) override { - return controller_service_provider_impl_->createControllerService(type, longType, id, firstTimeAdded); + std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &id) override { + return controller_service_provider_impl_->createControllerService(type, id); } ControllerServiceNode* getControllerServiceNode(const std::string &id) const override { diff --git a/libminifi/include/core/controller/StandardControllerServiceNode.h b/libminifi/include/core/controller/StandardControllerServiceNode.h index 1e9b85d16..fc4dc7bf0 100644 --- a/libminifi/include/core/controller/StandardControllerServiceNode.h +++ b/libminifi/include/core/controller/StandardControllerServiceNode.h @@ -46,39 +46,17 @@ class StandardControllerServiceNode : public ControllerServiceNodeImpl { StandardControllerServiceNode(const StandardControllerServiceNode &other) = delete; StandardControllerServiceNode &operator=(const StandardControllerServiceNode &parent) = delete; - /** - * Initializes the controller service node. - */ void initialize() override { ControllerServiceNodeImpl::initialize(); active = false; } - bool canEnable() override { - if (!active.load()) { - for (auto linked_service : linked_controller_services_) { - if (!linked_service->canEnable()) { - return false; - } - } - return true; - } else { - return false; - } - } - + bool canEnable() override; bool enable() override; - - bool disable() override { - controller_service_->setState(DISABLED); - active = false; - return true; - } + bool disable() override; protected: - // controller service provider. std::shared_ptr<ControllerServiceProvider> provider; - std::mutex mutex_; private: diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h index 9f1dfbe47..54eae7115 100644 --- a/libminifi/include/core/controller/StandardControllerServiceProvider.h +++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h @@ -15,19 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #pragma once #include <string> #include <utility> - #include <memory> -#include <vector> -#include "core/ProcessGroup.h" -#include "core/ClassLoader.h" +#include <memory> +#include <unordered_set> +#include <thread> #include "core/controller/ControllerService.h" #include "ControllerServiceNodeMap.h" -#include "ControllerServiceNode.h" -#include "StandardControllerServiceNode.h" #include "ControllerServiceProvider.h" #include "core/logging/LoggerFactory.h" @@ -39,6 +35,7 @@ class StandardControllerServiceProvider : public ControllerServiceProviderImpl : ControllerServiceProviderImpl(std::move(services)), extension_loader_(loader), configuration_(std::move(configuration)), + admin_yield_duration_(readAdministrativeYieldDuration()), logger_(logging::LoggerFactory<StandardControllerServiceProvider>::getLogger()) { } @@ -47,64 +44,35 @@ class StandardControllerServiceProvider : public ControllerServiceProviderImpl StandardControllerServiceProvider& operator=(const StandardControllerServiceProvider &other) = delete; StandardControllerServiceProvider& operator=(StandardControllerServiceProvider &&other) = delete; - - std::shared_ptr<ControllerServiceNode> createControllerService(const std::string& type, const std::string&, const std::string& id, bool) override { - std::shared_ptr<ControllerService> new_controller_service = extension_loader_.instantiate<ControllerService>(type, id); - - if (!new_controller_service) { - return nullptr; - } - - std::shared_ptr<ControllerServiceNode> new_service_node = std::make_shared<StandardControllerServiceNode>(new_controller_service, - sharedFromThis<ControllerServiceProvider>(), id, - configuration_); - - controller_map_->put(id, new_service_node); - return new_service_node; - } - - void enableAllControllerServices() override { - logger_->log_info("Enabling {} controller services", controller_map_->getAllControllerServices().size()); - for (const auto& service : controller_map_->getAllControllerServices()) { - logger_->log_info("Enabling {}", service->getName()); - if (!service->canEnable()) { - logger_->log_warn("Service {} cannot be enabled", service->getName()); - continue; - } - if (!service->enable()) { - logger_->log_warn("Could not enable {}", service->getName()); - } - } - } - - void disableAllControllerServices() override { - logger_->log_info("Disabling {} controller services", controller_map_->getAllControllerServices().size()); - for (const auto& service : controller_map_->getAllControllerServices()) { - logger_->log_info("Disabling {}", service->getName()); - if (!service->enabled()) { - logger_->log_warn("Service {} is not enabled", service->getName()); - continue; - } - if (!service->disable()) { - logger_->log_warn("Could not disable {}", service->getName()); - } - } + ~StandardControllerServiceProvider() override { + stopEnableRetryThread(); } - void clearControllerServices() override { - controller_map_->clear(); - } + std::shared_ptr<ControllerServiceNode> createControllerService(const std::string& type, const std::string& id) override; + void enableAllControllerServices() override; + void disableAllControllerServices() override; + void clearControllerServices() override; protected: + void stopEnableRetryThread(); + void startEnableRetryThread(); + bool canEdit() override { return false; } ClassLoader &extension_loader_; - std::shared_ptr<Configure> configuration_; private: + std::chrono::milliseconds readAdministrativeYieldDuration() const; + + std::thread controller_service_enable_retry_thread_; + std::atomic_bool enable_retry_thread_running_{false}; + std::mutex enable_retry_mutex_; + std::condition_variable enable_retry_condition_; + std::unordered_set<std::shared_ptr<ControllerServiceNode>> controller_services_to_enable_; + std::chrono::milliseconds admin_yield_duration_; std::shared_ptr<logging::Logger> logger_; }; diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 302595af3..807102c96 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -99,7 +99,14 @@ std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const s auto old_parameter_providers = std::move(parameter_providers_); service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(std::make_unique<core::controller::ControllerServiceNodeMap>(), configuration_); auto payload = getRootFromPayload(yamlConfigPayload); - if (!url.empty() && payload != nullptr) { + if (!payload) { + service_provider_ = old_provider; + parameter_contexts_ = std::move(old_parameter_contexts); + parameter_providers_ = std::move(old_parameter_providers); + return nullptr; + } + + if (!url.empty()) { std::string payload_flow_id; std::string bucket_id; auto path_split = utils::string::split(url, "/"); @@ -111,11 +118,8 @@ std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const s } } flow_version_->setFlowVersion(url, bucket_id, flow_id ? *flow_id : payload_flow_id); - } else { - service_provider_ = old_provider; - parameter_contexts_ = std::move(old_parameter_contexts); - parameter_providers_ = std::move(old_parameter_providers); } + return payload; } @@ -174,9 +178,9 @@ std::unique_ptr<minifi::Connection> FlowConfiguration::createConnection(const st return std::make_unique<minifi::ConnectionImpl>(flow_file_repo_, content_repo_, name, uuid); } -std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &full_class_name, const std::string &name, +std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, const utils::Identifier& uuid) { - std::shared_ptr<core::controller::ControllerServiceNode> controllerServicesNode = service_provider_->createControllerService(class_name, full_class_name, name, true); + std::shared_ptr<core::controller::ControllerServiceNode> controllerServicesNode = service_provider_->createControllerService(class_name, name); if (nullptr != controllerServicesNode) controllerServicesNode->setUUID(uuid); return controllerServicesNode; diff --git a/libminifi/src/core/controller/ControllerServiceNodeMap.cpp b/libminifi/src/core/controller/ControllerServiceNodeMap.cpp index dded7150c..4ad00dd4f 100644 --- a/libminifi/src/core/controller/ControllerServiceNodeMap.cpp +++ b/libminifi/src/core/controller/ControllerServiceNodeMap.cpp @@ -81,6 +81,7 @@ void ControllerServiceNodeMap::clear() { node->disable(); } controller_service_nodes_.clear(); + process_groups_.clear(); } std::vector<std::shared_ptr<ControllerServiceNode>> ControllerServiceNodeMap::getAllControllerServices() const { diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp b/libminifi/src/core/controller/StandardControllerServiceNode.cpp index 69eb33709..e4529fefb 100644 --- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp +++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp @@ -19,9 +19,20 @@ #include "core/controller/StandardControllerServiceNode.h" #include <memory> #include <mutex> +#include <algorithm> namespace org::apache::nifi::minifi::core::controller { +bool StandardControllerServiceNode::canEnable() { + if (active) { + return false; + } + + return std::all_of(linked_controller_services_.begin(), linked_controller_services_.end(), [](auto linked_service) { + return linked_service->canEnable(); + }); +} + bool StandardControllerServiceNode::enable() { logger_->log_trace("Enabling CSN {}", getName()); if (active) { @@ -38,24 +49,42 @@ bool StandardControllerServiceNode::enable() { } } std::shared_ptr<ControllerService> impl = getControllerServiceImplementation(); - if (nullptr != impl) { - std::lock_guard<std::mutex> lock(mutex_); - std::vector<std::shared_ptr<ControllerService>> services; - std::vector<ControllerServiceNode*> service_nodes; - services.reserve(linked_controller_services_.size()); - for (const auto& service : linked_controller_services_) { - services.push_back(service->getControllerServiceImplementation()); - if (!service->enable()) { - logger_->log_debug("Linked Service '{}' could not be enabled", service->getName()); - return false; - } + if (nullptr == impl) { + logger_->log_warn("Service '{}' service implementation could not be found", controller_service_->getName()); + controller_service_->setState(ENABLING); + return false; + } + + std::lock_guard<std::mutex> lock(mutex_); + std::vector<std::shared_ptr<ControllerService>> services; + std::vector<ControllerServiceNode*> service_nodes; + services.reserve(linked_controller_services_.size()); + for (const auto& service : linked_controller_services_) { + services.push_back(service->getControllerServiceImplementation()); + if (!service->enable()) { + logger_->log_warn("Linked Service '{}' could not be enabled", service->getName()); + return false; } + } + + try { impl->setLinkedControllerServices(services); impl->onEnable(); + } catch(const std::exception& e) { + logger_->log_warn("Service '{}' failed to enable: {}", getName(), e.what()); + controller_service_->setState(ENABLING); + return false; } + active = true; controller_service_->setState(ENABLED); return true; } +bool StandardControllerServiceNode::disable() { + controller_service_->setState(DISABLED); + active = false; + return true; +} + } // namespace org::apache::nifi::minifi::core::controller diff --git a/libminifi/src/core/controller/StandardControllerServiceProvider.cpp b/libminifi/src/core/controller/StandardControllerServiceProvider.cpp new file mode 100644 index 000000000..28cb21cde --- /dev/null +++ b/libminifi/src/core/controller/StandardControllerServiceProvider.cpp @@ -0,0 +1,128 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "core/controller/StandardControllerServiceProvider.h" + +#include "core/controller/StandardControllerServiceNode.h" +#include "core/TypedValues.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::core::controller { + +std::shared_ptr<ControllerServiceNode> StandardControllerServiceProvider::createControllerService(const std::string& type, const std::string& id) { + std::shared_ptr<ControllerService> new_controller_service = extension_loader_.instantiate<ControllerService>(type, id); + + if (!new_controller_service) { + return nullptr; + } + + std::shared_ptr<ControllerServiceNode> new_service_node = std::make_shared<StandardControllerServiceNode>(new_controller_service, + sharedFromThis<ControllerServiceProvider>(), id, + configuration_); + + controller_map_->put(id, new_service_node); + return new_service_node; +} + +void StandardControllerServiceProvider::enableAllControllerServices() { + gsl_Expects(!enable_retry_thread_running_); + { + std::lock_guard<std::mutex> lock(enable_retry_mutex_); + logger_->log_info("Enabling {} controller services", controller_map_->getAllControllerServices().size()); + for (const auto& service : controller_map_->getAllControllerServices()) { + logger_->log_info("Enabling {}", service->getName()); + if (!service->canEnable()) { + logger_->log_warn("Service {} cannot be enabled", service->getName()); + continue; + } + if (!service->enable()) { + logger_->log_warn("Could not enable {}", service->getName()); + controller_services_to_enable_.insert(service); + } + } + } + startEnableRetryThread(); +} + +void StandardControllerServiceProvider::disableAllControllerServices() { + stopEnableRetryThread(); + logger_->log_info("Disabling {} controller services", controller_map_->getAllControllerServices().size()); + for (const auto& service : controller_map_->getAllControllerServices()) { + logger_->log_info("Disabling {}", service->getName()); + if (!service->disable()) { + logger_->log_warn("Could not disable {}", service->getName()); + } + } +} + +void StandardControllerServiceProvider::clearControllerServices() { + stopEnableRetryThread(); + controller_map_->clear(); +} + +void StandardControllerServiceProvider::stopEnableRetryThread() { + enable_retry_thread_running_ = false; + enable_retry_condition_.notify_all(); + if (controller_service_enable_retry_thread_.joinable()) { + controller_service_enable_retry_thread_.join(); + } +} + +void StandardControllerServiceProvider::startEnableRetryThread() { + enable_retry_thread_running_ = true; + controller_service_enable_retry_thread_ = std::thread([this]() { + if (controller_services_to_enable_.empty()) { + return; + } + std::unique_lock<std::mutex> lock(enable_retry_mutex_); + enable_retry_condition_.wait_for(lock, admin_yield_duration_, [this]() { + return !enable_retry_thread_running_; + }); + while (enable_retry_thread_running_) { + for (auto it = controller_services_to_enable_.begin(); it != controller_services_to_enable_.end();) { + if ((*it)->enable()) { + it = controller_services_to_enable_.erase(it); + } else { + ++it; + } + } + if (controller_services_to_enable_.empty()) { + break; + } + enable_retry_condition_.wait_for(lock, admin_yield_duration_, [this]() { + return !enable_retry_thread_running_; + }); + } + controller_services_to_enable_.clear(); + }); +} + +std::chrono::milliseconds StandardControllerServiceProvider::readAdministrativeYieldDuration() const { + std::chrono::milliseconds admin_yield_duration = 30s; + std::string yield_value_str; + + if (configuration_->get(Configure::nifi_administrative_yield_duration, yield_value_str)) { + std::optional<core::TimePeriodValue> value = core::TimePeriodValue::fromString(yield_value_str); + if (value) { + admin_yield_duration = value->getMilliseconds(); + } + } + return admin_yield_duration; +} + +} // namespace org::apache::nifi::minifi::core::controller diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp index 7b706bb6a..9e873032b 100644 --- a/libminifi/src/core/flow/StructuredConfiguration.cpp +++ b/libminifi/src/core/flow/StructuredConfiguration.cpp @@ -631,7 +631,6 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser auto type = getRequiredField(service_node, schema_.type); logger_->log_debug("Using type {} for controller service node", type); - std::string fullType = type; type = utils::string::partAfterLastOccurrenceOf(type, '.'); auto name = service_node[schema_.name].getString().value(); @@ -639,7 +638,7 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser utils::Identifier uuid; uuid = id; - std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node = createControllerService(type, fullType, name, uuid); + std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node = createControllerService(type, name, uuid); if (nullptr != controller_service_node) { logger_->log_debug("Created Controller Service with UUID {} and name {}", id, name); controller_service_node->initialize(); diff --git a/libminifi/test/integration/C2ControllerEnableFailureTest.cpp b/libminifi/test/integration/C2ControllerEnableFailureTest.cpp new file mode 100644 index 000000000..79267b20b --- /dev/null +++ b/libminifi/test/integration/C2ControllerEnableFailureTest.cpp @@ -0,0 +1,197 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <string> + +#include "unit/TestBase.h" +#include "integration/HTTPIntegrationBase.h" +#include "integration/HTTPHandlers.h" +#include "unit/Catch.h" +#include "core/Processor.h" +#include "core/controller/ControllerService.h" +#include "core/Resource.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::test { + +class DummyController : public core::controller::ControllerServiceImpl { + public: + explicit DummyController(std::string_view name, const minifi::utils::Identifier &uuid = {}) : ControllerServiceImpl(name, uuid) {} + + static constexpr const char* Description = "Dummy Controller"; + + static constexpr auto DummyControllerProperty = core::PropertyDefinitionBuilder<>::createProperty("Dummy Controller Property") + .withDescription("Dummy Controller Property") + .build(); + + static constexpr auto Properties = std::to_array<core::PropertyReference>({DummyControllerProperty}); + static constexpr bool SupportsDynamicProperties = false; + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES + + void initialize() override { + setSupportedProperties(Properties); + } + + void yield() override { + } + + bool isWorkAvailable() override { + return false; + } + + bool isRunning() const override { + return getState() == core::controller::ControllerServiceState::ENABLED; + } + + void onEnable() override { + auto dummy_controller_property = getProperty(DummyControllerProperty.name); + if (!dummy_controller_property || dummy_controller_property->empty()) { + throw minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing dummy property"); + } + } + + private: + std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<DummyController>::getLogger(uuid_); +}; + +REGISTER_RESOURCE(DummyController, ControllerService); + +class DummmyControllerUserProcessor : public minifi::core::ProcessorImpl { + using minifi::core::ProcessorImpl::ProcessorImpl; + + public: + DummmyControllerUserProcessor(std::string_view name, const minifi::utils::Identifier& uuid) : ProcessorImpl(name, uuid) {} + explicit DummmyControllerUserProcessor(std::string_view name) : ProcessorImpl(name) {} + static constexpr auto DummyControllerService = core::PropertyDefinitionBuilder<>::createProperty("Dummy Controller Service") + .withDescription("Dummy Controller Service") + .withAllowedTypes<DummyController>() + .build(); + + void initialize() override { + setSupportedProperties(Properties); + } + + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& /*session_factory*/) override { + if (auto controller_service = context.getProperty(DummmyControllerUserProcessor::DummyControllerService)) { + if (!std::dynamic_pointer_cast<DummyController>(context.getControllerService(*controller_service, uuid_))) { + throw minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service"); + } + } else { + throw minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing controller service"); + } + logger_->log_debug("DummyControllerUserProcessor::onSchedule successful"); + } + + static constexpr const char* Description = "A processor that uses controller."; + static constexpr auto Properties = std::array<core::PropertyReference, 1>{DummyControllerService}; + static constexpr auto Relationships = std::array<core::RelationshipDefinition, 0>{}; + static constexpr bool SupportsDynamicProperties = false; + static constexpr bool SupportsDynamicRelationships = false; + static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED; + static constexpr bool IsSingleThreaded = false; + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS + + private: + std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<DummmyControllerUserProcessor>::getLogger(uuid_); +}; + +REGISTER_RESOURCE(DummmyControllerUserProcessor, Processor); + +class VerifyC2ControllerUpdate : public VerifyC2Base { + public: + explicit VerifyC2ControllerUpdate(const std::atomic_bool& flow_updated_successfully) : flow_updated_successfully_(flow_updated_successfully) { + } + + void testSetup() override { + LogTestController::getInstance().setTrace<minifi::c2::C2Agent>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setDebug<DummmyControllerUserProcessor>(); + LogTestController::getInstance().setDebug<DummyController>(); + LogTestController::getInstance().setDebug<core::controller::StandardControllerServiceProvider>(); + VerifyC2Base::testSetup(); + } + + void runAssertions() override { + using org::apache::nifi::minifi::test::utils::verifyEventHappenedInPollTime; + REQUIRE(verifyEventHappenedInPollTime(40s, [&] { return flow_updated_successfully_.load(); }, 1s)); + } + + private: + const std::atomic_bool& flow_updated_successfully_; +}; + +class ControllerUpdateHandler: public HeartbeatHandler { + public: + explicit ControllerUpdateHandler(std::atomic_bool& flow_updated_successfully, std::shared_ptr<minifi::Configure> configuration, const std::filesystem::path& replacement_config_path) + : HeartbeatHandler(std::move(configuration)), + flow_updated_successfully_(flow_updated_successfully), + replacement_config_(minifi::utils::file::get_content(replacement_config_path.string())) { + } + + void handleHeartbeat(const rapidjson::Document& /*root*/, struct mg_connection* conn) override { + switch (test_state_) { + case TestState::VERIFY_INITIAL_METRICS: { + sendEmptyHeartbeatResponse(conn); + REQUIRE(minifi::test::utils::verifyLogLinePresenceInPollTime(5s, "Could not enable DummyController")); + REQUIRE(minifi::test::utils::verifyLogLinePresenceInPollTime(5s, "(DummmyControllerUserProcessor): Process Schedule Operation: Invalid controller service")); + test_state_ = TestState::SEND_NEW_CONFIG; + break; + } + case TestState::SEND_NEW_CONFIG: { + sendHeartbeatResponse("UPDATE", "configuration", "889349", conn, {{"configuration_data", minifi::c2::C2Value{replacement_config_}}}); + test_state_ = TestState::VERIFY_UPDATED_METRICS; + break; + } + case TestState::VERIFY_UPDATED_METRICS: { + sendEmptyHeartbeatResponse(conn); + if (minifi::test::utils::verifyLogLinePresenceInPollTime(0s, "DummyControllerUserProcessor::onSchedule successful")) { + flow_updated_successfully_ = true; + } + break; + } + } + } + + private: + enum class TestState { + VERIFY_INITIAL_METRICS, + SEND_NEW_CONFIG, + VERIFY_UPDATED_METRICS + }; + + static void sendEmptyHeartbeatResponse(struct mg_connection* conn) { + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"); + } + + std::atomic_bool& flow_updated_successfully_; + TestState test_state_ = TestState::VERIFY_INITIAL_METRICS; + std::string replacement_config_; +}; + +TEST_CASE("C2ControllerEnableFailureTest", "[c2test]") { + std::atomic_bool flow_updated_successfully{false}; + VerifyC2ControllerUpdate harness(flow_updated_successfully); + const auto test_file_path = std::filesystem::path(TEST_RESOURCES) / "TestC2InvalidController.yml"; + auto replacement_path = test_file_path.string(); + minifi::utils::string::replaceAll(replacement_path, "TestC2InvalidController", "TestC2ValidController"); + ControllerUpdateHandler handler(flow_updated_successfully, harness.getConfiguration(), replacement_path); + harness.setUrl("https://localhost:0/api/heartbeat", &handler); + harness.run(test_file_path); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/libminifi/test/libtest/unit/ProvenanceTestHelper.h b/libminifi/test/libtest/unit/ProvenanceTestHelper.h index 36704bc71..db61c837b 100644 --- a/libminifi/test/libtest/unit/ProvenanceTestHelper.h +++ b/libminifi/test/libtest/unit/ProvenanceTestHelper.h @@ -240,57 +240,3 @@ class TestFlowRepository : public org::apache::nifi::minifi::core::ThreadedRepos std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository> content_repo_; std::thread thread_; }; - -class TestFlowController : public org::apache::nifi::minifi::FlowController { - public: - TestFlowController(std::shared_ptr<org::apache::nifi::minifi::core::Repository> repo, std::shared_ptr<org::apache::nifi::minifi::core::Repository> flow_file_repo, - const std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>& /*content_repo*/) - :org::apache::nifi::minifi::FlowController(repo, flow_file_repo, org::apache::nifi::minifi::Configure::create(), nullptr, - std::make_shared<org::apache::nifi::minifi::core::repository::VolatileContentRepository>()) { - } - - ~TestFlowController() override = default; - - void load(bool /*reload*/ = false) override { - } - - int16_t start() override { - return 0; - } - - int16_t stop() override { - return 0; - } - - void waitUnload(const std::chrono::milliseconds /*time_to_wait*/) override { - stop(); - } - - int16_t pause() override { - return -1; - } - - int16_t resume() override { - return -1; - } - - bool isRunning() const override { - return true; - } - - std::shared_ptr<org::apache::nifi::minifi::core::Processor> createProcessor(const std::string& /*name*/, const org::apache::nifi::minifi::utils::Identifier& /*uuid*/) { - return nullptr; - } - - org::apache::nifi::minifi::core::ProcessGroup *createRootProcessGroup(const std::string& /*name*/, const org::apache::nifi::minifi::utils::Identifier& /*uuid*/) { - return nullptr; - } - - org::apache::nifi::minifi::core::ProcessGroup *createRemoteProcessGroup(const std::string& /*name*/, const org::apache::nifi::minifi::utils::Identifier& /*uuid*/) { - return nullptr; - } - - std::shared_ptr<org::apache::nifi::minifi::Connection> createConnection(const std::string& /*name*/, const org::apache::nifi::minifi::utils::Identifier& /*uuid*/) { - return nullptr; - } -}; diff --git a/libminifi/test/libtest/unit/TestBase.cpp b/libminifi/test/libtest/unit/TestBase.cpp index f933465f3..1f00083f8 100644 --- a/libminifi/test/libtest/unit/TestBase.cpp +++ b/libminifi/test/libtest/unit/TestBase.cpp @@ -366,7 +366,7 @@ std::shared_ptr<minifi::core::controller::ControllerServiceNode> TestPlan::addCo minifi::utils::Identifier uuid = minifi::utils::IdGenerator::getIdGenerator()->generate(); std::shared_ptr<minifi::core::controller::ControllerServiceNode> controller_service_node = - controller_services_provider_->createControllerService(controller_name, controller_name, name, true /*firstTimeAdded*/); + controller_services_provider_->createControllerService(controller_name, name); if (controller_service_node == nullptr) { return nullptr; } diff --git a/libminifi/test/resources/TestC2InvalidController.yml b/libminifi/test/resources/TestC2InvalidController.yml new file mode 100644 index 000000000..f29f0146e --- /dev/null +++ b/libminifi/test/resources/TestC2InvalidController.yml @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +Flow Controller: + name: MiNiFi Flow + id: 2438e3c8-015a-1000-79ca-83af40ec1998 +Processors: + - name: DummmyControllerUserProcessor + id: 2438e3c8-015a-1000-79ca-83af40ec1899 + class: org.apache.nifi.processors.DummmyControllerUserProcessor + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 100 msec + penalization period: 30 sec + yield period: 10 sec + run duration nanos: 0 + auto-terminated relationships list: + Properties: + Dummy Controller Service: DummyController + +Controller Services: + - name: DummyController + id: 2438e3c8-015a-1000-79ca-83af40ec1888 + class: DummyController + Properties: + Dummy Controller Property: diff --git a/libminifi/test/resources/TestC2ValidController.yml b/libminifi/test/resources/TestC2ValidController.yml new file mode 100644 index 000000000..11eb4ec41 --- /dev/null +++ b/libminifi/test/resources/TestC2ValidController.yml @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +Flow Controller: + name: MiNiFi Flow + id: 2438e3c8-015a-1000-79ca-83af40ec1998 +Processors: + - name: DummmyControllerUserProcessor + id: 2438e3c8-015a-1000-79ca-83af40ec1899 + class: org.apache.nifi.processors.DummmyControllerUserProcessor + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 100 msec + penalization period: 30 sec + yield period: 10 sec + run duration nanos: 0 + auto-terminated relationships list: + Properties: + Dummy Controller Service: DummyController + +Controller Services: + - name: DummyController + id: 2438e3c8-015a-1000-79ca-83af40ec1888 + class: DummyController + Properties: + Dummy Controller Property: dummy diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp b/libminifi/test/unit/SchedulingAgentTests.cpp index 298d1c60e..2a6a7abc8 100644 --- a/libminifi/test/unit/SchedulingAgentTests.cpp +++ b/libminifi/test/unit/SchedulingAgentTests.cpp @@ -76,7 +76,6 @@ class SchedulingAgentTestFixture { protected: std::shared_ptr<core::Repository> test_repo_ = std::make_shared<TestThreadedRepository>(); std::shared_ptr<core::ContentRepository> content_repo_ = std::make_shared<core::repository::VolatileContentRepository>(); - std::shared_ptr<minifi::FlowController> controller_ = std::make_shared<TestFlowController>(test_repo_, test_repo_, content_repo_); TestController test_controller_; std::shared_ptr<TestPlan> test_plan = test_controller_.createPlan(); diff --git a/minifi-api/include/minifi-cpp/core/controller/ControllerServiceProvider.h b/minifi-api/include/minifi-cpp/core/controller/ControllerServiceProvider.h index 4d7a1dd35..2c122fe03 100644 --- a/minifi-api/include/minifi-cpp/core/controller/ControllerServiceProvider.h +++ b/minifi-api/include/minifi-cpp/core/controller/ControllerServiceProvider.h @@ -40,7 +40,7 @@ class ControllerServiceProvider : public virtual CoreComponent, public virtual C public: ~ControllerServiceProvider() override = default; - virtual std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &longType, const std::string &id, bool firstTimeAdded) = 0; + virtual std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &id) = 0; virtual ControllerServiceNode* getControllerServiceNode(const std::string &id) const = 0; virtual ControllerServiceNode* getControllerServiceNode(const std::string &id, const utils::Identifier &processor_or_controller_uuid) const = 0; virtual void putControllerServiceNode(const std::string& identifier, const std::shared_ptr<ControllerServiceNode>& controller_service_node, ProcessGroup* process_group) = 0; diff --git a/utils/include/core/ProcessContext.h b/utils/include/core/ProcessContext.h index c4c8c7eb8..2606058b3 100644 --- a/utils/include/core/ProcessContext.h +++ b/utils/include/core/ProcessContext.h @@ -110,7 +110,11 @@ class ProcessContextImpl : public core::VariableRegistryImpl, public virtual Pro // controller services std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier, const utils::Identifier &processor_uuid) const override { - return controller_service_provider_ == nullptr ? nullptr : controller_service_provider_->getControllerService(identifier, processor_uuid); + auto controller_service = controller_service_provider_ == nullptr ? nullptr : controller_service_provider_->getControllerService(identifier, processor_uuid); + if (!controller_service || controller_service->getState() != core::controller::ControllerServiceState::ENABLED) { + return nullptr; + } + return controller_service; } void initializeContentRepository(const std::string& home) override { @@ -149,8 +153,8 @@ class ProcessContextImpl : public core::VariableRegistryImpl, public virtual Pro const auto path = configuration->getWithFallback(Configure::nifi_state_storage_local_path, Configure::nifi_state_storage_local_path_old); /* Function to help creating a state storage */ - auto create_provider = [&](const std::string& type, const std::string& longType, const std::unordered_map<std::string, std::string>& extraProperties) -> std::shared_ptr<core::StateStorage> { - auto new_node = controller_service_provider->createControllerService(type, longType, DefaultStateStorageName, true /*firstTimeAdded*/); + auto create_provider = [&](const std::string& type, const std::unordered_map<std::string, std::string>& extraProperties) -> std::shared_ptr<core::StateStorage> { + auto new_node = controller_service_provider->createControllerService(type, DefaultStateStorageName); if (new_node == nullptr) { return nullptr; } new_node->initialize(); auto storage = new_node->getControllerServiceImplementation(); @@ -169,19 +173,19 @@ class ProcessContextImpl : public core::VariableRegistryImpl, public virtual Pro /* Try to create a RocksDB-backed provider */ if (preferredType.empty() || preferredType == "RocksDbPersistableKeyValueStoreService" || preferredType == "RocksDbStateStorage") { - auto provider = create_provider("RocksDbStateStorage", "org.apache.nifi.minifi.controllers.RocksDbStateStorage", {{"Directory", path.value_or("corecomponentstate")}}); + auto provider = create_provider("RocksDbStateStorage", {{"Directory", path.value_or("corecomponentstate")}}); if (provider != nullptr) { return provider; } } /* Fall back to a locked unordered map-backed provider */ if (preferredType.empty() || preferredType == "UnorderedMapPersistableKeyValueStoreService" || preferredType == "PersistentMapStateStorage") { - auto provider = create_provider("PersistentMapStateStorage", "org.apache.nifi.minifi.controllers.PersistentMapStateStorage", {{"File", path.value_or("corecomponentstate.txt")}}); + auto provider = create_provider("PersistentMapStateStorage", {{"File", path.value_or("corecomponentstate.txt")}}); if (provider != nullptr) { return provider; } } /* Fall back to volatile memory-backed provider */ if (preferredType.empty() || preferredType == "UnorderedMapKeyValueStoreService" || preferredType == "VolatileMapStateStorage") { - auto provider = create_provider("VolatileMapStateStorage", "org.apache.nifi.minifi.controllers.VolatileMapStateStorage", {}); + auto provider = create_provider("VolatileMapStateStorage", {}); if (provider != nullptr) { return provider; } }
