This is an automated email from the ASF dual-hosted git repository. adebreceni pushed a commit to branch minifi-api-reduced in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit b43f26bc2902ef3d47a114d604a5fdbb169c4960 Author: Adam Debreceni <[email protected]> AuthorDate: Thu Jul 17 14:12:33 2025 +0200 Review changes --- .../{ProcessorFactory.h => ProcessorFactoryImpl.h} | 4 ++-- core-framework/include/core/Resource.h | 2 +- libminifi/include/core/FlowConfiguration.h | 2 +- .../reporting/SiteToSiteProvenanceReportingTask.h | 2 +- libminifi/src/core/FlowConfiguration.cpp | 6 ++++-- .../src/core/flow/StructuredConfiguration.cpp | 22 +++++++++------------- 6 files changed, 18 insertions(+), 20 deletions(-) diff --git a/core-framework/include/core/ProcessorFactory.h b/core-framework/include/core/ProcessorFactoryImpl.h similarity index 95% rename from core-framework/include/core/ProcessorFactory.h rename to core-framework/include/core/ProcessorFactoryImpl.h index bbba22664..d4ea016c8 100644 --- a/core-framework/include/core/ProcessorFactory.h +++ b/core-framework/include/core/ProcessorFactoryImpl.h @@ -46,12 +46,12 @@ class ProcessorFactoryImpl : public ProcessorFactory { } std::string getClassName() const override { - return class_name_; + return std::string{class_name_}; } protected: std::string group_name_; - std::string class_name_; + std::string_view class_name_; }; } // namespace org::apache::nifi::minifi::core diff --git a/core-framework/include/core/Resource.h b/core-framework/include/core/Resource.h index c4994666a..999f1ddb5 100644 --- a/core-framework/include/core/Resource.h +++ b/core-framework/include/core/Resource.h @@ -29,7 +29,7 @@ #include "agent/agent_docs.h" #include "utils/OptionalUtils.h" #include "utils/Macro.h" -#include "core/ProcessorFactory.h" +#include "core/ProcessorFactoryImpl.h" namespace org::apache::nifi::minifi::core { diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index 8d83efcaf..0c9cd327c 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -96,7 +96,7 @@ class FlowConfiguration : public CoreComponentImpl { // Create Connection [[nodiscard]] std::unique_ptr<minifi::Connection> createConnection(const std::string &name, const utils::Identifier &uuid) const; // Create Provenance Report Task - std::unique_ptr<core::reporting::SiteToSiteProvenanceReportingTask> createProvenanceReportTask(); + std::unique_ptr<core::Processor> createProvenanceReportTask(); static std::unique_ptr<core::ParameterProvider> createParameterProvider(const std::string &class_name, const std::string &full_class_name, const utils::Identifier& uuid); diff --git a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h index 6998e922d..dea93e616 100644 --- a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h +++ b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h @@ -36,7 +36,7 @@ class SiteToSiteProvenanceReportingTask : public minifi::RemoteProcessorGroupPor public: explicit SiteToSiteProvenanceReportingTask(std::shared_ptr<Configure> configure) : minifi::RemoteProcessorGroupPort(ReportTaskName, "", std::move(configure), - utils::IdGenerator::getIdGenerator()->generate(), logging::LoggerFactory<SiteToSiteProvenanceReportingTask>::getLogger()) { + utils::IdGenerator::getIdGenerator()->generate(), logging::LoggerFactory<SiteToSiteProvenanceReportingTask>::getLogger()) { this->setTriggerWhenEmpty(true); batch_size_ = 100; } diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index d2c21ee9c..379c18fe3 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -80,8 +80,10 @@ std::unique_ptr<core::Processor> FlowConfiguration::createProcessor(const std::s return processor; } -std::unique_ptr<core::reporting::SiteToSiteProvenanceReportingTask> FlowConfiguration::createProvenanceReportTask() { - return std::make_unique<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(this->configuration_); +std::unique_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask() { + auto processor = std::make_unique<core::Processor>("", std::make_unique<core::reporting::SiteToSiteProvenanceReportingTask>(this->configuration_)); + processor->initialize(); + return processor; } std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const std::string& url, const std::string& yamlConfigPayload, const std::optional<std::string>& flow_id) { diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp index 55411616d..cb5a696bb 100644 --- a/libminifi/src/core/flow/StructuredConfiguration.cpp +++ b/libminifi/src/core/flow/StructuredConfiguration.cpp @@ -582,9 +582,7 @@ void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P return; } - auto report_task_impl = createProvenanceReportTask(); - auto* reportTask = report_task_impl.get(); - auto report_task_wrapper = std::make_unique<core::Processor>("", std::move(report_task_impl)); + auto report_task = createProvenanceReportTask(); checkRequiredField(node, schema_.scheduling_strategy); auto schedulingStrategyStr = node[schema_.scheduling_strategy].getString().value(); @@ -593,11 +591,11 @@ void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(schedulingPeriodStr)) { logger_->log_debug("ProvenanceReportingTask schedulingPeriod {}", scheduling_period); - report_task_wrapper->setSchedulingPeriod(*scheduling_period); + report_task->setSchedulingPeriod(*scheduling_period); } if (schedulingStrategyStr == "TIMER_DRIVEN") { - report_task_wrapper->setSchedulingStrategy(core::TIMER_DRIVEN); + report_task->setSchedulingStrategy(core::TIMER_DRIVEN); logger_->log_debug("ProvenanceReportingTask scheduling strategy {}", schedulingStrategyStr); } else { throw std::invalid_argument("Invalid scheduling strategy " + schedulingStrategyStr); @@ -610,14 +608,14 @@ void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P if (auto port = parsing::parseIntegral<int64_t>(portStr); port && !hostStr.empty()) { logger_->log_debug("ProvenanceReportingTask port {}", *port); std::string url = hostStr + ":" + portStr; - reportTask->setURL(url); + report_task->getImpl<core::reporting::SiteToSiteProvenanceReportingTask>().setURL(url); } } if (node["url"]) { auto urlStr = node["url"].getString().value(); if (!urlStr.empty()) { - reportTask->setURL(urlStr); + report_task->getImpl<core::reporting::SiteToSiteProvenanceReportingTask>().setURL(urlStr); logger_->log_debug("ProvenanceReportingTask URL {}", urlStr); } } @@ -628,17 +626,15 @@ void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P logger_->log_debug("ProvenanceReportingTask port uuid {}", portUUIDStr); port_uuid = portUUIDStr; - reportTask->setPortUUID(port_uuid); + report_task->getImpl<core::reporting::SiteToSiteProvenanceReportingTask>().setPortUUID(port_uuid); if (auto batch_size = parsing::parseIntegral<int>(batchSizeStr)) { - reportTask->setBatchSize(*batch_size); + report_task->getImpl<core::reporting::SiteToSiteProvenanceReportingTask>().setBatchSize(*batch_size); } - report_task_wrapper->initialize(); - // add processor to parent - report_task_wrapper->setScheduledState(core::RUNNING); - parent_group->addProcessor(std::move(report_task_wrapper)); + report_task->setScheduledState(core::RUNNING); + parent_group->addProcessor(std::move(report_task)); } void StructuredConfiguration::parseControllerServices(const Node& controller_services_node, core::ProcessGroup* parent_group) {
