This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 6c57decc7ce380af863dd621eb457f958b6698b0 Author: Adam Debreceni <[email protected]> AuthorDate: Tue Nov 22 14:04:02 2022 +0100 MINIFICPP-1991 - Remove unused ControllerServiceProvider methods Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1458 --- .../tests/ControllerServiceIntegrationTests.cpp | 43 +++---- libminifi/include/FlowController.h | 37 ++---- libminifi/include/SchedulingAgent.h | 17 +-- libminifi/include/core/ProcessContext.h | 2 +- libminifi/include/core/ProcessGroup.h | 14 +-- .../core/controller/ControllerServiceProvider.h | 88 +------------- .../ForwardingControllerServiceProvider.h | 62 +--------- .../controller/StandardControllerServiceProvider.h | 131 +++------------------ libminifi/include/core/state/ProcessorController.h | 4 +- libminifi/include/utils/ThreadPool.h | 23 ++-- libminifi/src/FlowController.cpp | 29 ++--- libminifi/src/SchedulingAgent.cpp | 41 ------- libminifi/src/core/FlowConfiguration.cpp | 4 +- libminifi/src/core/ProcessGroup.cpp | 28 ++--- libminifi/src/core/state/ProcessorController.cpp | 4 +- libminifi/src/utils/ThreadPool.cpp | 41 +++++-- libminifi/test/TestBase.cpp | 2 +- libminifi/test/unit/SchedulingAgentTests.cpp | 2 +- 18 files changed, 142 insertions(+), 430 deletions(-) diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp index ccd910b1b..9e92c8270 100644 --- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp +++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp @@ -83,7 +83,7 @@ int main(int argc, char **argv) { auto pg = yaml_config.getRoot(); - auto provider = std::make_shared<core::controller::StandardControllerServiceProvider>(map, pg.get(), std::make_shared<minifi::Configure>()); + auto provider = std::make_shared<core::controller::StandardControllerServiceProvider>(map, std::make_shared<minifi::Configure>()); std::shared_ptr<core::controller::ControllerServiceNode> mockNode = pg->findControllerService("MockItLikeIts1995"); assert(mockNode != nullptr); mockNode->enable(); @@ -108,27 +108,28 @@ int main(int argc, char **argv) { assert(!ssl_client->getCACertificate().empty()); // now let's disable one of the controller services. std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller->getControllerServiceNode("ID"); - const auto checkCsIdEnabledMatchesDisabledFlag = [&cs_id] { return !disabled == cs_id->enabled(); }; assert(cs_id != nullptr); - { - std::lock_guard<std::mutex> lock(control_mutex); - controller->enableControllerService(cs_id); - disabled = false; - } - std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995"); - assert(verifyEventHappenedInPollTime(std::chrono::seconds(4), checkCsIdEnabledMatchesDisabledFlag)); - { - std::lock_guard<std::mutex> lock(control_mutex); - controller->disableReferencingServices(mock_cont); - disabled = true; - } - assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag)); - { - std::lock_guard<std::mutex> lock(control_mutex); - controller->enableReferencingServices(mock_cont); - disabled = false; - } - assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag)); + // TODO(adebreceni): MINIFICPP-1992 +// const auto checkCsIdEnabledMatchesDisabledFlag = [&cs_id] { return !disabled == cs_id->enabled(); }; +// { +// std::lock_guard<std::mutex> lock(control_mutex); +// controller->enableControllerService(cs_id); +// disabled = false; +// } +// std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995"); +// assert(verifyEventHappenedInPollTime(std::chrono::seconds(4), checkCsIdEnabledMatchesDisabledFlag)); +// { +// std::lock_guard<std::mutex> lock(control_mutex); +// controller->disableReferencingServices(mock_cont); +// disabled = true; +// } +// assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag)); +// { +// std::lock_guard<std::mutex> lock(control_mutex); +// controller->enableReferencingServices(mock_cont); +// disabled = false; +// } +// assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag)); controller->waitUnload(60000); return 0; diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 97710c0d0..b398191d1 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -65,7 +65,6 @@ namespace state { class ProcessorController; } // namespace state -// Default NiFi Root Group Name #define DEFAULT_ROOT_GROUP_NAME "" /** @@ -87,28 +86,24 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi ~FlowController() override; - // Get the provenance repository virtual std::shared_ptr<core::Repository> getProvenanceRepository() { return this->provenance_repo_; } - // Load flow xml from disk, after that, create the root process group and its children, initialize the flows virtual void load(std::unique_ptr<core::ProcessGroup> root = nullptr, bool reload = false); - // Whether the Flow Controller is start running bool isRunning() override { return running_.load() || updating_.load(); } - // Whether the Flow Controller has already been initialized (loaded flow XML) virtual bool isInitialized() { return initialized_.load(); } - // Start to run the Flow Controller which internally start the root process group and all its children + // Start the Flow Controller which internally starts the root process group and all its children int16_t start() override; int16_t pause() override; int16_t resume() override; - // Unload the current flow YAML, clean the root process group and all its children + // Unload the current flow, clean the root process group and all its children int16_t stop() override; int16_t applyUpdate(const std::string &source, const std::string &configuration, bool persist, const std::optional<std::string>& flow_id) override; int16_t drainRepositories() override { @@ -123,31 +118,27 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi int16_t applyUpdate(const std::string& /*source*/, const std::shared_ptr<state::Update>&) override { return -1; } // Asynchronous function trigger unloading and wait for a period of time virtual void waitUnload(uint64_t timeToWaitMs); - // Unload the current flow xml, clean the root process group and all its children + // Unload the current flow, clean the root process group and all its children virtual void unload(); - // update property value void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) { if (root_ != nullptr) root_->updatePropertyValue(std::move(processorName), std::move(propertyName), std::move(propertyValue)); } - // set SerialNumber void setSerialNumber(std::string number) { serial_number_ = std::move(number); } - // get serial number as string std::string getSerialNumber() { return serial_number_; } - // validate and apply passing yaml configuration payload + // validate and apply passing configuration payload // first it will validate the payload with the current root node config for flowController // like FlowController id/name is the same and new version is greater than the current version // after that, it will apply the configuration bool applyConfiguration(const std::string &source, const std::string &configurePayload, const std::optional<std::string>& flow_id = std::nullopt); - // get name std::string getName() const override { if (root_ != nullptr) return root_->getName(); @@ -166,7 +157,6 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi return root_->getUUID(); } - // get version virtual std::string getVersion() { if (root_ != nullptr) return std::to_string(root_->getVersion()); @@ -201,38 +191,29 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi void loadMetricsPublisher(); protected: - // function to load the flow file repo. void loadFlowRepo(); std::optional<std::chrono::milliseconds> loadShutdownTimeoutFromConfiguration(); private: template <typename T, typename = typename std::enable_if<std::is_base_of<SchedulingAgent, T>::value>::type> - void conditionalReloadScheduler(std::shared_ptr<T>& scheduler, const bool condition) { + void conditionalReloadScheduler(std::unique_ptr<T>& scheduler, const bool condition) { if (condition) { - scheduler = std::make_shared<T>(gsl::not_null<core::controller::ControllerServiceProvider*>(this), provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_); + scheduler = std::make_unique<T>(gsl::not_null<core::controller::ControllerServiceProvider*>(this), provenance_repo_, flow_file_repo_, content_repo_, configuration_, thread_pool_); } } protected: - // flow controller mutex std::recursive_mutex mutex_; - // Whether it is running std::atomic<bool> running_; std::atomic<bool> updating_; - // Whether it has already been initialized (load the flow XML already) std::atomic<bool> initialized_; - // Flow Timer Scheduler - std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_; - // Flow Event Scheduler - std::shared_ptr<EventDrivenSchedulingAgent> event_scheduler_; - // Cron Schedule - std::shared_ptr<CronDrivenSchedulingAgent> cron_scheduler_; - // FlowControl Protocol + std::unique_ptr<TimerDrivenSchedulingAgent> timer_scheduler_; + std::unique_ptr<EventDrivenSchedulingAgent> event_scheduler_; + std::unique_ptr<CronDrivenSchedulingAgent> cron_scheduler_; std::unique_ptr<FlowControlProtocol> protocol_; - // metrics information std::chrono::steady_clock::time_point start_time_; private: diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 3ec3ad892..24992c928 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -17,8 +17,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_SCHEDULINGAGENT_H_ -#define LIBMINIFI_INCLUDE_SCHEDULINGAGENT_H_ + +#pragma once #include <memory> #include <string> @@ -47,10 +47,7 @@ #define SCHEDULING_WATCHDOG_CHECK_PERIOD_MS 1000 // msec #define SCHEDULING_WATCHDOG_DEFAULT_ALERT_PERIOD_MS 5000 // msec -namespace org { -namespace apache { -namespace nifi { -namespace minifi { +namespace org::apache::nifi::minifi { // SchedulingAgent Class class SchedulingAgent { @@ -104,8 +101,6 @@ class SchedulingAgent { void watchDogFunc(); - virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); - virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode); // schedule, overwritten by different DrivenSchedulingAgent virtual void schedule(core::Processor* processor) = 0; // unschedule, overwritten by different DrivenSchedulingAgent @@ -161,8 +156,4 @@ class SchedulingAgent { std::chrono::milliseconds alert_time_; }; -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org -#endif // LIBMINIFI_INCLUDE_SCHEDULINGAGENT_H_ +} // namespace org::apache::nifi::minifi diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h index f008e6669..0e0a9b5a1 100644 --- a/libminifi/include/core/ProcessContext.h +++ b/libminifi/include/core/ProcessContext.h @@ -194,7 +194,7 @@ class ProcessContext : public controller::ControllerServiceLookup, public core:: * identifier */ std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier) const override { - return controller_service_provider_ == nullptr ? nullptr : controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUID()); + return controller_service_provider_ == nullptr ? nullptr : controller_service_provider_->getControllerService(identifier); } /** diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index 96dd68438..af43f8e02 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -146,13 +146,13 @@ class ProcessGroup : public CoreComponent { return config_version_; } - void startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, - const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, - const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); + void startProcessing(TimerDrivenSchedulingAgent& timeScheduler, + EventDrivenSchedulingAgent& eventScheduler, + CronDrivenSchedulingAgent& cronScheduler); - void stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, - const std::shared_ptr<EventDrivenSchedulingAgent>& eventScheduler, - const std::shared_ptr<CronDrivenSchedulingAgent>& cronScheduler, + void stopProcessing(TimerDrivenSchedulingAgent& timeScheduler, + EventDrivenSchedulingAgent& eventScheduler, + CronDrivenSchedulingAgent& cronScheduler, const std::function<bool(const Processor*)>& filter = nullptr); bool isRemoteProcessGroup(); @@ -231,7 +231,7 @@ class ProcessGroup : public CoreComponent { void verify() const; protected: - void startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); // NOLINT + void startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler); // version int config_version_; diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h index bb1f826a8..55f4d0ca2 100644 --- a/libminifi/include/core/controller/ControllerServiceProvider.h +++ b/libminifi/include/core/controller/ControllerServiceProvider.h @@ -15,8 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_ -#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_ + +#pragma once #include <memory> #include <string> @@ -78,32 +78,6 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo return controller_map_->getControllerServiceNode(id); } - /** - * Removes a controller service. - * @param serviceNode controller service node. - */ - virtual void removeControllerService(const std::shared_ptr<ControllerServiceNode> &serviceNode) { - controller_map_->removeControllerService(serviceNode); - } - - /** - * Enables the provided controller service - * @param serviceNode controller service node. - */ - virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) = 0; - - /** - * Enables the provided controller service nodes - * @param serviceNode controller service node. - */ - virtual void enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) = 0; - - /** - * Disables the provided controller service node - * @param serviceNode controller service node. - */ - virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0; - /** * Removes all controller services. */ @@ -116,62 +90,6 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo return controller_map_->getAllControllerServices(); } - /** - * Verifies that referencing components can be stopped for the controller service - */ - virtual void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0; - - /** - * Unschedules referencing components. - */ - virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0; - - /** - * Verifies referencing components for <code>serviceNode</code> can be disabled. - * @param serviceNode shared pointer to a controller service node. - */ - virtual void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0; - - /** - * Disables referencing components for <code>serviceNode</code> can be disabled. - * @param serviceNode shared pointer to a controller service node. - */ - virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode>& /*serviceNode*/) { - return std::vector<std::shared_ptr<core::controller::ControllerServiceNode>>(); - } - - /** - * Verifies referencing components for <code>serviceNode</code> can be enabled. - * @param serviceNode shared pointer to a controller service node. - */ - virtual void verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode); - for (auto ref : references) { - ref->canEnable(); - } - } - - /** - * Enables referencing components for <code>serviceNode</code> can be Enabled. - * @param serviceNode shared pointer to a controller service node. - */ - virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0; - - /** - * Schedules the service node and referencing components. - * @param serviceNode shared pointer to a controller service node. - */ - virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0; - - /** - * Returns a controller service for the service identifier and componentID - * @param service Identifier service identifier. - */ - virtual std::shared_ptr<ControllerService> getControllerServiceForComponent(const std::string& serviceIdentifier, const utils::Identifier& /*componentId*/) const { - std::shared_ptr<ControllerService> node = getControllerService(serviceIdentifier); - return node; - } - /** * Gets the controller service for the provided identifier * @param identifier service identifier. @@ -272,5 +190,3 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo }; } // namespace org::apache::nifi::minifi::core::controller - -#endif // LIBMINIFI_INCLUDE_CORE_CONTROLLER_CONTROLLERSERVICEPROVIDER_H_ diff --git a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h index 960307be5..d70d6d324 100644 --- a/libminifi/include/core/controller/ForwardingControllerServiceProvider.h +++ b/libminifi/include/core/controller/ForwardingControllerServiceProvider.h @@ -25,12 +25,7 @@ #include "ControllerServiceProvider.h" #include "ControllerServiceNode.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace core { -namespace controller { +namespace org::apache::nifi::minifi::core::controller { class ForwardingControllerServiceProvider : public ControllerServiceProvider { public: @@ -44,22 +39,6 @@ class ForwardingControllerServiceProvider : public ControllerServiceProvider { return controller_service_provider_impl_->getControllerServiceNode(id); } - void removeControllerService(const std::shared_ptr<ControllerServiceNode> &serviceNode) override { - return controller_service_provider_impl_->removeControllerService(serviceNode); - } - - std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) override { - return controller_service_provider_impl_->enableControllerService(serviceNode); - } - - void enableControllerServices(std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> serviceNodes) override { - return controller_service_provider_impl_->enableControllerServices(serviceNodes); - } - - std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { - return controller_service_provider_impl_->disableControllerService(serviceNode); - } - void clearControllerServices() override { return controller_service_provider_impl_->clearControllerServices(); } @@ -72,38 +51,6 @@ class ForwardingControllerServiceProvider : public ControllerServiceProvider { return controller_service_provider_impl_->getAllControllerServices(); } - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { - return controller_service_provider_impl_->unscheduleReferencingComponents(serviceNode); - } - - void verifyCanEnableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { - return controller_service_provider_impl_->verifyCanEnableReferencingServices(serviceNode); - } - - void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { - return controller_service_provider_impl_->verifyCanDisableReferencingServices(serviceNode); - } - - void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { - return controller_service_provider_impl_->verifyCanStopReferencingComponents(serviceNode); - } - - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { - return controller_service_provider_impl_->disableReferencingServices(serviceNode); - } - - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { - return controller_service_provider_impl_->enableReferencingServices(serviceNode); - } - - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) override { - return controller_service_provider_impl_->scheduleReferencingComponents(serviceNode); - } - - std::shared_ptr<ControllerService> getControllerServiceForComponent(const std::string &serviceIdentifier, const utils::Identifier &componentId) const override { - return controller_service_provider_impl_->getControllerServiceForComponent(serviceIdentifier, componentId); - } - bool isControllerServiceEnabled(const std::string &identifier) override { return controller_service_provider_impl_->isControllerServiceEnabled(identifier); } @@ -128,9 +75,4 @@ class ForwardingControllerServiceProvider : public ControllerServiceProvider { std::shared_ptr<ControllerServiceProvider> controller_service_provider_impl_; }; -} // namespace controller -} // namespace core -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi::core::controller diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h index 92b12890c..8730133fd 100644 --- a/libminifi/include/core/controller/StandardControllerServiceProvider.h +++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h @@ -15,8 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICEPROVIDER_H_ -#define LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICEPROVIDER_H_ + +#pragma once #include <string> #include <utility> @@ -32,32 +32,16 @@ #include "StandardControllerServiceNode.h" #include "ControllerServiceProvider.h" #include "core/logging/LoggerFactory.h" +#include "SchedulingAgent.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace core { -namespace controller { +namespace org::apache::nifi::minifi::core::controller { class StandardControllerServiceProvider : public ControllerServiceProvider, public std::enable_shared_from_this<StandardControllerServiceProvider> { public: - explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, ProcessGroup* root_group, std::shared_ptr<Configure> configuration, - std::shared_ptr<minifi::SchedulingAgent> agent, ClassLoader &loader = ClassLoader::getDefaultClassLoader()) - : ControllerServiceProvider(services), - agent_(agent), - extension_loader_(loader), - root_group_(root_group), - configuration_(configuration), - logger_(logging::LoggerFactory<StandardControllerServiceProvider>::getLogger()) { - } - - explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, ProcessGroup* root_group, std::shared_ptr<Configure> configuration, ClassLoader &loader = + explicit StandardControllerServiceProvider(std::shared_ptr<ControllerServiceMap> services, std::shared_ptr<Configure> configuration, ClassLoader &loader = ClassLoader::getDefaultClassLoader()) : ControllerServiceProvider(services), - agent_(nullptr), extension_loader_(loader), - root_group_(root_group), configuration_(configuration), logger_(logging::LoggerFactory<StandardControllerServiceProvider>::getLogger()) { } @@ -68,14 +52,6 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ StandardControllerServiceProvider& operator=(const StandardControllerServiceProvider &other) = delete; StandardControllerServiceProvider& operator=(StandardControllerServiceProvider &&other) = delete; - void setRootGroup(ProcessGroup* rg) { - root_group_ = rg; - } - - void setSchedulingAgent(std::shared_ptr<minifi::SchedulingAgent> agent) { - agent_ = agent; - } - std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &fullType, const std::string &id, bool /*firstTimeAdded*/) { std::shared_ptr<ControllerService> new_controller_service = extension_loader_.instantiate<ControllerService>(type, id); @@ -97,22 +73,15 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ return new_service_node; } - std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) { - if (serviceNode->canEnable()) { - return agent_->enableControllerService(serviceNode); - } else { - std::future<utils::TaskRescheduleInfo> no_run = std::async(std::launch::deferred, utils::TaskRescheduleInfo::Done); - return no_run; - } - } - virtual void enableAllControllerServices() { logger_->log_info("Enabling %u controller services", controller_map_->getAllControllerServices().size()); for (auto service : controller_map_->getAllControllerServices()) { - if (service->canEnable()) { - logger_->log_info("Enabling %s", service->getName()); - agent_->enableControllerService(service); - } else { + logger_->log_info("Enabling %s", service->getName()); + if (!service->canEnable()) { + logger_->log_warn("Service %s cannot be enabled", service->getName()); + continue; + } + if (!service->enable()) { logger_->log_warn("Could not enable %s", service->getName()); } } @@ -121,98 +90,32 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ virtual void disableAllControllerServices() { logger_->log_info("Disabling %u controller services", controller_map_->getAllControllerServices().size()); for (auto service : controller_map_->getAllControllerServices()) { + logger_->log_info("Disabling %s", service->getName()); + if (!service->enabled()) { + logger_->log_warn("Service %s is not enabled", service->getName()); + continue; + } if (!service->disable()) { logger_->log_warn("Could not disable %s", service->getName()); } } } - void enableControllerServices(std::vector<std::shared_ptr<ControllerServiceNode>> serviceNodes) { - for (auto node : serviceNodes) { - enableControllerService(node); - } - } - - std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<ControllerServiceNode> &serviceNode) { - if (!IsNullOrEmpty(serviceNode.get()) && serviceNode->enabled()) { - return agent_->disableControllerService(serviceNode); - } else { - std::future<utils::TaskRescheduleInfo> no_run = std::async(std::launch::deferred, utils::TaskRescheduleInfo::Done); - return no_run; - } - } - void clearControllerServices() { controller_map_->clear(); } - void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode>& /*serviceNode*/) { - } - - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> unscheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode); - for (auto ref : references) { - agent_->disableControllerService(ref); - } - return references; - } - - void verifyCanDisableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode); - for (auto ref : references) { - if (!ref->canEnable()) { - logger_->log_info("Cannot disable %s", ref->getName()); - } - } - } - - virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> disableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode); - for (auto ref : references) { - agent_->disableControllerService(ref); - } - - return references; - } - - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> enableReferencingServices(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode); - for (auto ref : references) { - agent_->enableControllerService(ref); - } - return references; - } - - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> scheduleReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> references = findLinkedComponents(serviceNode); - for (auto ref : references) { - agent_->enableControllerService(ref); - } - return references; - } - protected: bool canEdit() { return false; } - std::shared_ptr<minifi::SchedulingAgent> agent_; - ClassLoader &extension_loader_; - ProcessGroup* root_group_ = nullptr; - std::shared_ptr<Configure> configuration_; private: std::shared_ptr<logging::Logger> logger_; }; -} // namespace controller -} // namespace core -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org - -#endif // LIBMINIFI_INCLUDE_CORE_CONTROLLER_STANDARDCONTROLLERSERVICEPROVIDER_H_ +} // namespace org::apache::nifi::minifi::core::controller diff --git a/libminifi/include/core/state/ProcessorController.h b/libminifi/include/core/state/ProcessorController.h index 109d5c490..e3546da2a 100644 --- a/libminifi/include/core/state/ProcessorController.h +++ b/libminifi/include/core/state/ProcessorController.h @@ -32,7 +32,7 @@ namespace org::apache::nifi::minifi::state { */ class ProcessorController : public StateController { public: - ProcessorController(core::Processor& processor, std::shared_ptr<SchedulingAgent> scheduler); + ProcessorController(core::Processor& processor, SchedulingAgent& scheduler); ~ProcessorController() override; @@ -64,7 +64,7 @@ class ProcessorController : public StateController { protected: gsl::not_null<core::Processor*> processor_; - std::shared_ptr<SchedulingAgent> scheduler_; + gsl::not_null<SchedulingAgent*> scheduler_; }; } // namespace org::apache::nifi::minifi::state diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 37aa27fdd..68c2663ec 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -40,6 +40,7 @@ #include "controllers/ThreadManagementService.h" #include "core/controller/ControllerService.h" #include "core/controller/ControllerServiceProvider.h" + namespace org::apache::nifi::minifi::utils { using TaskId = std::string; @@ -162,18 +163,8 @@ class WorkerThread { template<typename T> class ThreadPool { public: - ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, core::controller::ControllerServiceProvider* controller_service_provider = nullptr, - std::string name = "NamelessPool") - : daemon_threads_(daemon_threads), - thread_reduction_count_(0), - max_worker_threads_(max_worker_threads), - adjust_threads_(false), - running_(false), - controller_service_provider_(controller_service_provider), - name_(std::move(name)) { - current_workers_ = 0; - thread_manager_ = nullptr; - } + ThreadPool(int max_worker_threads = 2, bool daemon_threads = false, + core::controller::ControllerServiceProvider* controller_service_provider = nullptr, std::string name = "NamelessPool"); ThreadPool(const ThreadPool<T> &other) = delete; ThreadPool<T>& operator=(const ThreadPool<T> &other) = delete; @@ -277,6 +268,9 @@ class ThreadPool { start(); } + private: + std::shared_ptr<controllers::ThreadManagementService> createThreadManager() const; + protected: std::thread createThread(std::function<void()> &&functor) { return std::thread([ functor ]() mutable { @@ -296,6 +290,7 @@ class ThreadPool { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } + // determines if threads are detached bool daemon_threads_; std::atomic<int> thread_reduction_count_; @@ -337,6 +332,8 @@ class ThreadPool { // variable to signal task running completion std::condition_variable task_run_complete_; + std::shared_ptr<core::logging::Logger> logger_; + /** * Call for the manager to start worker threads */ @@ -345,7 +342,7 @@ class ThreadPool { /** * Runs worker tasks */ - void run_tasks(std::shared_ptr<WorkerThread> thread); + void run_tasks(const std::shared_ptr<WorkerThread>& thread); void manage_delayed_queue(); }; diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 87ea88853..9d0d62e15 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -182,7 +182,7 @@ int16_t FlowController::stop() { logger_->log_info("Stop Flow Controller"); if (this->root_) { // stop source processors first - this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_, [] (const core::Processor* proc) -> bool { + this->root_->stopProcessing(*timer_scheduler_, *event_scheduler_, *cron_scheduler_, [] (const core::Processor* proc) -> bool { return !proc->hasIncomingConnections(); }); // we enable C2 to progressively increase the timeout @@ -194,7 +194,7 @@ int16_t FlowController::stop() { std::this_thread::sleep_for(shutdown_check_interval_); } // shutdown all other processors as well - this->root_->stopProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_); + this->root_->stopProcessing(*timer_scheduler_, *event_scheduler_, *cron_scheduler_); } // stop after we've attempted to stop the processors. timer_scheduler_->stop(); @@ -211,7 +211,7 @@ int16_t FlowController::stop() { this->flow_file_repo_->stop(); this->provenance_repo_->stop(); // stop the ControllerServices - this->controller_service_provider_impl_->disableAllControllerServices(); + disableAllControllerServices(); running_ = false; } return 0; @@ -324,10 +324,6 @@ void FlowController::load(std::unique_ptr<core::ProcessGroup> root, bool reload) conditionalReloadScheduler<EventDrivenSchedulingAgent>(event_scheduler_, !event_scheduler_ || reload); conditionalReloadScheduler<CronDrivenSchedulingAgent>(cron_scheduler_, !cron_scheduler_ || reload); - std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_impl_)->setRootGroup(root_.get()); - std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_impl_)->setSchedulingAgent( - std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_)); - logger_->log_info("Loaded controller service provider"); /* @@ -374,16 +370,16 @@ int16_t FlowController::start() { } else { if (!running_) { logger_->log_info("Starting Flow Controller"); - controller_service_provider_impl_->enableAllControllerServices(); - this->timer_scheduler_->start(); - this->event_scheduler_->start(); - this->cron_scheduler_->start(); + enableAllControllerServices(); + timer_scheduler_->start(); + event_scheduler_->start(); + cron_scheduler_->start(); if (this->root_ != nullptr) { start_time_ = std::chrono::steady_clock::now(); // watch out, this might immediately start the processors // as the thread_pool_ is started in load() - this->root_->startProcessing(timer_scheduler_, event_scheduler_, cron_scheduler_); + this->root_->startProcessing(*timer_scheduler_, *event_scheduler_, *cron_scheduler_); } C2Client::initialize(this, this, this); core::logging::LoggerConfiguration::getConfiguration().initializeAlertSinks(this, configuration_); @@ -391,7 +387,6 @@ int16_t FlowController::start() { this->protocol_->start(); this->provenance_repo_->start(); this->flow_file_repo_->start(); - thread_pool_.start(); logger_->log_info("Started Flow Controller"); } return 0; @@ -504,11 +499,11 @@ state::StateController* FlowController::getComponent(const std::string& id_or_na } gsl::not_null<std::unique_ptr<state::ProcessorController>> FlowController::createController(core::Processor& processor) { - const auto scheduler = [this, &processor]() -> std::shared_ptr<SchedulingAgent> { + const auto scheduler = [this, &processor]() -> SchedulingAgent& { switch (processor.getSchedulingStrategy()) { - case core::SchedulingStrategy::TIMER_DRIVEN: return timer_scheduler_; - case core::SchedulingStrategy::EVENT_DRIVEN: return event_scheduler_; - case core::SchedulingStrategy::CRON_DRIVEN: return cron_scheduler_; + case core::SchedulingStrategy::TIMER_DRIVEN: return *timer_scheduler_; + case core::SchedulingStrategy::EVENT_DRIVEN: return *event_scheduler_; + case core::SchedulingStrategy::CRON_DRIVEN: return *cron_scheduler_; } gsl_Assert(false); }; diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index 5b59faa31..9dfca5c58 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -34,47 +34,6 @@ bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) { namespace org::apache::nifi::minifi { -std::future<utils::TaskRescheduleInfo> SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - logger_->log_info("Enabling CSN in SchedulingAgent %s", serviceNode->getName()); - // reference the enable function from serviceNode - std::function<utils::TaskRescheduleInfo()> f_ex = [serviceNode] { - serviceNode->enable(); - return utils::TaskRescheduleInfo::Done(); - }; - - // only need to run this once. - auto monitor = std::make_unique<utils::ComplexMonitor>(); - utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor)); - // move the functor into the thread pool. While a future is returned - // we aren't terribly concerned with the result. - std::future<utils::TaskRescheduleInfo> future; - thread_pool_.execute(std::move(functor), future); - if (future.valid()) - future.wait(); - return future; -} - -std::future<utils::TaskRescheduleInfo> SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - logger_->log_info("Disabling CSN in SchedulingAgent %s", serviceNode->getName()); - // reference the disable function from serviceNode - std::function<utils::TaskRescheduleInfo()> f_ex = [serviceNode] { - serviceNode->disable(); - return utils::TaskRescheduleInfo::Done(); - }; - - // only need to run this once. - auto monitor = std::make_unique<utils::ComplexMonitor>(); - utils::Worker<utils::TaskRescheduleInfo> functor(f_ex, serviceNode->getUUIDStr(), std::move(monitor)); - - // move the functor into the thread pool. While a future is returned - // we aren't terribly concerned with the result. - std::future<utils::TaskRescheduleInfo> future; - thread_pool_.execute(std::move(functor), future); - if (future.valid()) - future.wait(); - return future; -} - bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { gsl_Expects(processor); diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index c7981b86f..b53d3fe90 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -37,7 +37,7 @@ FlowConfiguration::FlowConfiguration(ConfigurationContext ctx) filesystem_(std::move(ctx.filesystem)), logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) { controller_services_ = std::make_shared<core::controller::ControllerServiceMap>(); - service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_); + service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, configuration_); std::string flowUrl; std::string bucket_id = "default"; std::string flowId; @@ -97,7 +97,7 @@ std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const s auto old_services = controller_services_; auto old_provider = service_provider_; controller_services_ = std::make_shared<core::controller::ControllerServiceMap>(); - service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_); + service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, configuration_); auto payload = getRootFromPayload(yamlConfigPayload); if (!url.empty() && payload != nullptr) { std::string payload_flow_id; diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 7922717fb..7300d2129 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -122,8 +122,8 @@ void ProcessGroup::addProcessGroup(std::unique_ptr<ProcessGroup> child) { } } -void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, - const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) { +void ProcessGroup::startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler, + EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler) { std::unique_lock<std::recursive_mutex> lock(mutex_); std::set<Processor*> failed_processors; @@ -133,13 +133,13 @@ void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSc logger_->log_debug("Starting %s", processor->getName()); switch (processor->getSchedulingStrategy()) { case TIMER_DRIVEN: - timeScheduler->schedule(processor); + timeScheduler.schedule(processor); break; case EVENT_DRIVEN: - eventScheduler->schedule(processor); + eventScheduler.schedule(processor); break; case CRON_DRIVEN: - cronScheduler->schedule(processor); + cronScheduler.schedule(processor); break; } } @@ -166,8 +166,8 @@ void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSc if (!onScheduleTimer_ && !failed_processors_.empty() && onschedule_retry_msec_ > 0) { logger_->log_info("Retrying failed processors in %lld msec", onschedule_retry_msec_.load()); - auto func = [this, eventScheduler, cronScheduler, timeScheduler]() { - this->startProcessingProcessors(timeScheduler, eventScheduler, cronScheduler); + auto func = [this, eventScheduler = &eventScheduler, cronScheduler = &cronScheduler, timeScheduler = &timeScheduler]() { + this->startProcessingProcessors(*timeScheduler, *eventScheduler, *cronScheduler); }; onScheduleTimer_ = std::make_unique<utils::CallBackTimer>(std::chrono::milliseconds(onschedule_retry_msec_), func); onScheduleTimer_->start(); @@ -176,8 +176,8 @@ void ProcessGroup::startProcessingProcessors(const std::shared_ptr<TimerDrivenSc } } -void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, - const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler) { +void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler, + CronDrivenSchedulingAgent& cronScheduler) { std::lock_guard<std::recursive_mutex> lock(mutex_); try { @@ -202,8 +202,8 @@ void ProcessGroup::startProcessing(const std::shared_ptr<TimerDrivenSchedulingAg } } -void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAgent>& timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, - const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler, const std::function<bool(const Processor*)>& filter) { +void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler, + CronDrivenSchedulingAgent& cronScheduler, const std::function<bool(const Processor*)>& filter) { std::lock_guard<std::recursive_mutex> lock(mutex_); if (onScheduleTimer_) { @@ -221,13 +221,13 @@ void ProcessGroup::stopProcessing(const std::shared_ptr<TimerDrivenSchedulingAge logger_->log_debug("Stopping %s", processor->getName()); switch (processor->getSchedulingStrategy()) { case TIMER_DRIVEN: - timeScheduler->unschedule(processor.get()); + timeScheduler.unschedule(processor.get()); break; case EVENT_DRIVEN: - eventScheduler->unschedule(processor.get()); + eventScheduler.unschedule(processor.get()); break; case CRON_DRIVEN: - cronScheduler->unschedule(processor.get()); + cronScheduler.unschedule(processor.get()); break; } } diff --git a/libminifi/src/core/state/ProcessorController.cpp b/libminifi/src/core/state/ProcessorController.cpp index c5686de77..b496387d9 100644 --- a/libminifi/src/core/state/ProcessorController.cpp +++ b/libminifi/src/core/state/ProcessorController.cpp @@ -22,9 +22,9 @@ namespace org::apache::nifi::minifi::state { -ProcessorController::ProcessorController(core::Processor& processor, std::shared_ptr<SchedulingAgent> scheduler) +ProcessorController::ProcessorController(core::Processor& processor, SchedulingAgent& scheduler) : processor_(&processor), - scheduler_(std::move(scheduler)) { + scheduler_(&scheduler) { } ProcessorController::~ProcessorController() = default; diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp index 8612f0dbf..baadb59ec 100644 --- a/libminifi/src/utils/ThreadPool.cpp +++ b/libminifi/src/utils/ThreadPool.cpp @@ -21,7 +21,21 @@ namespace org::apache::nifi::minifi::utils { template<typename T> -void ThreadPool<T>::run_tasks(std::shared_ptr<WorkerThread> thread) { +ThreadPool<T>::ThreadPool(int max_worker_threads, bool daemon_threads, core::controller::ControllerServiceProvider* controller_service_provider, std::string name) + : daemon_threads_(daemon_threads), + thread_reduction_count_(0), + max_worker_threads_(max_worker_threads), + adjust_threads_(false), + running_(false), + controller_service_provider_(controller_service_provider), + name_(std::move(name)), + logger_(core::logging::LoggerFactory<ThreadPool<T>>::getLogger()) { + current_workers_ = 0; + thread_manager_ = nullptr; +} + +template<typename T> +void ThreadPool<T>::run_tasks(const std::shared_ptr<WorkerThread>& thread) { thread->is_running_ = true; while (running_.load()) { if (UNLIKELY(thread_reduction_count_ > 0)) { @@ -181,16 +195,29 @@ void ThreadPool<T>::manageWorkers() { } template<typename T> -void ThreadPool<T>::start() { - if (nullptr != controller_service_provider_) { - auto thread_man = controller_service_provider_->getControllerService("ThreadPoolManager"); - thread_manager_ = thread_man != nullptr ? std::dynamic_pointer_cast<controllers::ThreadManagementService>(thread_man) : nullptr; - } else { - thread_manager_ = nullptr; +std::shared_ptr<controllers::ThreadManagementService> ThreadPool<T>::createThreadManager() const { + if (!controller_service_provider_) { + return nullptr; + } + auto service = controller_service_provider_->getControllerService("ThreadPoolManager"); + if (!service) { + logger_->log_info("Could not find a ThreadPoolManager service"); + return nullptr; } + auto thread_manager_service = std::dynamic_pointer_cast<controllers::ThreadManagementService>(service); + if (!thread_manager_service) { + logger_->log_error("Found ThreadPoolManager, but it is not a ThreadManagementService"); + return nullptr; + } + return thread_manager_service; +} +template<typename T> +void ThreadPool<T>::start() { std::lock_guard<std::recursive_mutex> lock(manager_mutex_); if (!running_) { + thread_manager_ = createThreadManager(); + running_ = true; worker_queue_.start(); manager_thread_ = std::thread(&ThreadPool::manageWorkers, this); diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp index b5c48dc1d..bc34d239e 100644 --- a/libminifi/test/TestBase.cpp +++ b/libminifi/test/TestBase.cpp @@ -197,7 +197,7 @@ TestPlan::TestPlan(std::shared_ptr<minifi::core::ContentRepository> content_repo logger_(logging::LoggerFactory<TestPlan>::getLogger()) { stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>()); controller_services_ = std::make_shared<minifi::core::controller::ControllerServiceMap>(); - controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_); + controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, configuration_); /* Inject the default state storage ahead of ProcessContext to make sure we have a unique state directory */ if (state_dir == nullptr) { state_dir_ = std::make_unique<TempDirectory>(); diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp b/libminifi/test/unit/SchedulingAgentTests.cpp index b80c7563c..36a74046e 100644 --- a/libminifi/test/unit/SchedulingAgentTests.cpp +++ b/libminifi/test/unit/SchedulingAgentTests.cpp @@ -56,7 +56,7 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") { auto test_plan = testController.createPlan(); auto controller_services_ = std::make_shared<minifi::core::controller::ControllerServiceMap>(); auto configuration = std::make_shared<minifi::Configure>(); - auto controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration); + auto controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, configuration); utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool; auto count_proc = std::make_shared<CountOnTriggersProcessor>("count_proc"); count_proc->incrementActiveTasks();
