This is an automated email from the ASF dual-hosted git repository. adebreceni pushed a commit to branch minifi-api-reduced in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit bd5c020c0783f03ab561f6a7443dd2e82d082dbb Author: Adam Debreceni <[email protected]> AuthorDate: Tue Feb 18 12:37:56 2025 +0100 Working on api reduction --- .../include/utils/ProcessorConfigUtils.h | 1 + extension-utils/src/utils/ProcessorConfigUtils.cpp | 108 +++++++ extension-utils/src/utils/net/Ssl.cpp | 2 +- extensions/civetweb/processors/ListenHTTP.cpp | 10 +- extensions/civetweb/processors/ListenHTTP.h | 2 - .../controllerservices/CouchbaseClusterService.cpp | 2 +- .../expression-language/ProcessContextExpr.cpp | 32 +- .../expression-language/ProcessContextExpr.h | 28 +- extensions/grafana-loki/PushGrafanaLoki.cpp | 8 - extensions/grafana-loki/PushGrafanaLoki.h | 1 - extensions/libarchive/BinFiles.cpp | 8 - extensions/libarchive/BinFiles.h | 2 - extensions/python/ExecutePythonProcessor.cpp | 76 +---- extensions/python/ExecutePythonProcessor.h | 2 +- extensions/python/types/PyProcessContext.cpp | 6 +- extensions/smb/SmbConnectionControllerService.cpp | 2 +- .../processors/AttributeRollingWindow.cpp | 2 +- .../processors/DefragmentText.cpp | 8 - .../processors/DefragmentText.h | 1 - .../standard-processors/processors/ExtractText.h | 5 +- .../processors/GenerateFlowFile.cpp | 5 +- .../processors/GenerateFlowFile.h | 5 +- .../standard-processors/processors/GetFile.h | 2 +- .../processors/RouteOnAttribute.cpp | 6 +- .../processors/RouteOnAttribute.h | 5 +- .../processors/SegmentContent.h | 4 +- .../standard-processors/processors/SplitText.h | 5 +- .../processors/UpdateAttribute.h | 5 +- libminifi/include/ForwardingNode.h | 7 +- libminifi/include/RemoteProcessorGroupPort.h | 4 +- .../include}/core/Processor.h | 26 +- .../include/core/ProcessorProxy.h | 110 ++----- .../include/core/state/nodes/FlowInformation.h | 2 +- libminifi/include/core/state/nodes/MetricsBase.h | 2 +- libminifi/src/RemoteProcessorGroupPort.cpp | 14 +- libminifi/src/core/ClassLoader.cpp | 44 +++ libminifi/src/core/FlowConfiguration.cpp | 1 + libminifi/src/core/ProcessContext.cpp | 110 +++++++ libminifi/src/core/ProcessSession.cpp | 1 + {utils => libminifi}/src/core/Processor.cpp | 95 +++--- libminifi/test/libtest/unit/DummyProcessor.h | 2 +- .../libtest/unit/ReadFromFlowFileTestProcessor.h | 2 +- libminifi/test/libtest/unit/StatefulProcessor.h | 2 +- libminifi/test/libtest/unit/TestBase.cpp | 2 +- minifi-api/include/minifi-cpp/core/ClassLoader.h | 3 + .../include/minifi-cpp/core/ProcessContext.h | 19 ++ .../minifi-cpp/core/ProcessContextBuilder.h | 2 + minifi-api/include/minifi-cpp/core/Processor.h | 57 +--- .../include/minifi-cpp/core/ProcessorDescriptor.h | 33 ++ .../include/minifi-cpp/core/ProcessorFactory.h | 37 +++ .../include/minifi-cpp/core/ProcessorMetadata.h | 32 ++ .../minifi-cpp/core/state/nodes/MetricsBase.h | 4 +- utils/include/core/ProcessContext.h | 65 +--- utils/include/core/Processor.h | 288 +----------------- utils/include/core/ProcessorFactory.h | 57 ++++ utils/include/core/ProcessorImpl.h | 159 ++++++++++ utils/include/core/ProcessorMetrics.h | 6 +- utils/include/core/Resource.h | 14 +- utils/include/core/state/nodes/ResponseNode.h | 20 +- utils/src/core/Processor.cpp | 337 +++------------------ utils/src/core/ProcessorMetrics.cpp | 2 +- 61 files changed, 855 insertions(+), 1047 deletions(-) diff --git a/extension-utils/include/utils/ProcessorConfigUtils.h b/extension-utils/include/utils/ProcessorConfigUtils.h index dbb99f748..29de57eec 100644 --- a/extension-utils/include/utils/ProcessorConfigUtils.h +++ b/extension-utils/include/utils/ProcessorConfigUtils.h @@ -27,6 +27,7 @@ #include "utils/Enum.h" #include "utils/expected.h" #include "utils/OptionalUtils.h" +#include "minifi-cpp/Exception.h" namespace org::apache::nifi::minifi::utils { diff --git a/extension-utils/src/utils/ProcessorConfigUtils.cpp b/extension-utils/src/utils/ProcessorConfigUtils.cpp new file mode 100644 index 000000000..2ba8c73cb --- /dev/null +++ b/extension-utils/src/utils/ProcessorConfigUtils.cpp @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "utils/ProcessorConfigUtils.h" + +#include <string> + +#include "utils/expected.h" + +namespace org::apache::nifi::minifi::utils { + +std::string parseProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { + return ctx.getProperty(property, flow_file) | utils::expect(fmt::format("Expected valid value from {}::{}", ctx.getProcessorInfo().getName(), property.name)); +} + +std::optional<std::string> parseOptionalProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { + if (const auto property_str = ctx.getProperty(property, flow_file)) { + return *property_str; + } + return std::nullopt; +} + +std::optional<bool> parseOptionalBoolProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { + if (const auto property_str = ctx.getProperty(property, flow_file)) { + return parsing::parseBool(*property_str) | utils::expect(fmt::format("Expected parsable bool from {}::{}", ctx.getProcessorInfo().getName(), property.name)); + } + return std::nullopt; +} + +bool parseBoolProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { + return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseBool) | utils::expect(fmt::format("Expected parsable bool from {}::{}", ctx.getProcessorInfo().getName(), property.name)); +} + +std::optional<uint64_t> parseOptionalU64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { + if (const auto property_str = ctx.getProperty(property, flow_file)) { + if (property_str->empty()) { + return std::nullopt; + } + return parsing::parseIntegral<uint64_t>(*property_str) | utils::expect(fmt::format("Expected parsable uint64_t from {}::{}", ctx.getProcessorInfo().getName(), property.name)); + } + + return std::nullopt; +} + +uint64_t parseU64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { + return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseIntegral<uint64_t>) | utils::expect(fmt::format("Expected parsable uint64_t from {}::{}", ctx.getProcessorInfo().getName(), property.name)); +} + +std::optional<int64_t> parseOptionalI64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { + if (const auto property_str = ctx.getProperty(property, flow_file)) { + if (property_str->empty()) { + return std::nullopt; + } + return parsing::parseIntegral<int64_t>(*property_str) | utils::expect(fmt::format("Expected parsable int64_t from {}::{}", ctx.getProcessorInfo().getName(), property.name)); + } + + return std::nullopt; +} + +int64_t parseI64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { + return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseIntegral<int64_t>) | utils::expect(fmt::format("Expected parsable int64_t from {}::{}", ctx.getProcessorInfo().getName(), property.name)); +} + +std::optional<std::chrono::milliseconds> parseOptionalMsProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { + if (const auto property_str = ctx.getProperty(property, flow_file)) { + if (property_str->empty()) { + return std::nullopt; + } + return parsing::parseDuration(*property_str) | utils::expect(fmt::format("Expected parsable duration from {}::{}", ctx.getProcessorInfo().getName(), property.name)); + } + + return std::nullopt; +} + +std::chrono::milliseconds parseMsProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { + return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | utils::expect(fmt::format("Expected parsable duration from {}::{}", ctx.getProcessorInfo().getName(), property.name)); +} + +std::optional<uint64_t> parseOptionalDataSizeProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { + if (const auto property_str = ctx.getProperty(property, flow_file)) { + if (property_str->empty()) { + return std::nullopt; + } + return parsing::parseDataSize(*property_str) | utils::expect(fmt::format("Expected parsable data size from {}::{}", ctx.getProcessorInfo().getName(), property.name)); + } + + return std::nullopt; +} + +uint64_t parseDataSizeProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file) { + return ctx.getProperty(property, flow_file) | utils::andThen(parsing::parseDataSize) | utils::expect(fmt::format("Expected parsable data size from {}::{}", ctx.getProcessorInfo().getName(), property.name)); +} + + +} // namespace org::apache::nifi::minifi::utils diff --git a/extension-utils/src/utils/net/Ssl.cpp b/extension-utils/src/utils/net/Ssl.cpp index 4534c4c5f..3b78429f9 100644 --- a/extension-utils/src/utils/net/Ssl.cpp +++ b/extension-utils/src/utils/net/Ssl.cpp @@ -22,7 +22,7 @@ namespace org::apache::nifi::minifi::utils::net { std::optional<utils::net::SslData> getSslData(const core::ProcessContext& context, const core::PropertyReference& ssl_prop, const std::shared_ptr<core::logging::Logger>& logger) { auto getSslContextService = [&]() -> std::shared_ptr<minifi::controllers::SSLContextService> { if (auto ssl_service_name = context.getProperty(ssl_prop); ssl_service_name && !ssl_service_name->empty()) { - if (auto service = context.getControllerService(*ssl_service_name, context.getProcessor().getUUID())) { + if (auto service = context.getControllerService(*ssl_service_name, context.getProcessorInfo().getUUID())) { if (auto ssl_service = std::dynamic_pointer_cast<org::apache::nifi::minifi::controllers::SSLContextService>(service)) { return ssl_service; } else { diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp index d2096643d..46f627f34 100644 --- a/extensions/civetweb/processors/ListenHTTP.cpp +++ b/extensions/civetweb/processors/ListenHTTP.cpp @@ -108,7 +108,7 @@ void ListenHTTP::onSchedule(core::ProcessContext& context, core::ProcessSessionF logger_->log_debug("ListenHTTP using {}: {}", BatchSize.name, batch_size_); std::optional<std::string> flow_id; - if (auto flow_version = context.getProcessor().getFlowIdentifier()) { + if (auto flow_version = context.getProcessorInfo().getFlowIdentifier()) { flow_id = flow_version->getFlowId(); } @@ -504,14 +504,6 @@ void ListenHTTP::notifyStop() { handler_.reset(); } -std::set<core::Connectable*> ListenHTTP::getOutGoingConnections(const std::string &relationship) { - auto result = core::ProcessorImpl::getOutGoingConnections(relationship); - if (relationship == Self.name) { - result.insert(this); - } - return result; -} - REGISTER_RESOURCE(ListenHTTP, Processor); } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h index ce47ab97c..4a5181988 100644 --- a/extensions/civetweb/processors/ListenHTTP.h +++ b/extensions/civetweb/processors/ListenHTTP.h @@ -155,8 +155,6 @@ class ListenHTTP : public core::ProcessorImpl { return handler_ ? !handler_->empty() : false; } - std::set<core::Connectable*> getOutGoingConnections(const std::string &relationship) override; - struct ResponseBody { std::string uri; std::string mime_type; diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp index a86702b98..7b672e0b4 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp @@ -250,7 +250,7 @@ void CouchbaseClusterService::onEnable() { gsl::not_null<std::shared_ptr<CouchbaseClusterService>> CouchbaseClusterService::getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property) { std::shared_ptr<CouchbaseClusterService> couchbase_cluster_service; if (auto connection_controller_name = context.getProperty(property)) { - couchbase_cluster_service = std::dynamic_pointer_cast<CouchbaseClusterService>(context.getControllerService(*connection_controller_name, context.getProcessor().getUUID())); + couchbase_cluster_service = std::dynamic_pointer_cast<CouchbaseClusterService>(context.getControllerService(*connection_controller_name, context.getProcessorInfo().getUUID())); } if (!couchbase_cluster_service) { throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing Couchbase Cluster Service"); diff --git a/extensions/expression-language/ProcessContextExpr.cpp b/extensions/expression-language/ProcessContextExpr.cpp index d22447c5d..bff17c6b6 100644 --- a/extensions/expression-language/ProcessContextExpr.cpp +++ b/extensions/expression-language/ProcessContextExpr.cpp @@ -25,21 +25,21 @@ namespace org::apache::nifi::minifi::core { -nonstd::expected<std::string, std::error_code> ProcessContextExpr::getProperty(const std::string_view name, const FlowFile* flow_file) const { - const auto property = getProcessor().getSupportedProperty(name); +nonstd::expected<std::string, std::error_code> ProcessContextExpr::getProperty(ProcessContext& context, const std::string_view name, const FlowFile* flow_file) const { + const auto property = context.getProcessorInfo().getPropertyReference(name); if (!property) { return nonstd::make_unexpected(PropertyErrorCode::NotSupportedProperty); } - if (!property->supportsExpressionLanguage()) { - return ProcessContextImpl::getProperty(name, flow_file); + if (!property->supports_expression_language) { + return context.getProperty(name, flow_file); } if (!cached_expressions_.contains(name)) { - auto expression_str = ProcessContextImpl::getProperty(name, flow_file); + auto expression_str = context.getProperty(name, flow_file); if (!expression_str) { return expression_str; } cached_expressions_.emplace(std::string{name}, expression::compile(*expression_str)); } - expression::Parameters p(this, flow_file); + expression::Parameters p(&context, flow_file); auto result = cached_expressions_[std::string{name}](p).asString(); if (!property->getValidator().validate(result)) { return nonstd::make_unexpected(PropertyErrorCode::ValidationFailed); @@ -47,21 +47,25 @@ nonstd::expected<std::string, std::error_code> ProcessContextExpr::getProperty(c return result; } -nonstd::expected<std::string, std::error_code> ProcessContextExpr::getDynamicProperty(const std::string_view name, const FlowFile* flow_file) const { - // all dynamic properties support EL +nonstd::expected<std::string, std::error_code> ProcessContextExpr::getDynamicProperty(ProcessContext& context, const std::string_view name, const FlowFile* flow_file) const { if (!cached_dynamic_expressions_.contains(name)) { - auto expression_str = ProcessContextImpl::getDynamicProperty(name, flow_file); + auto expression_str = context.getDynamicProperty(name, flow_file); if (!expression_str) { return expression_str; } cached_dynamic_expressions_.emplace(std::string{name}, expression::compile(*expression_str)); } - const expression::Parameters p(this, flow_file); + const expression::Parameters p(&context, flow_file); return cached_dynamic_expressions_[std::string{name}](p).asString(); } -nonstd::expected<void, std::error_code> ProcessContextExpr::setProperty(const std::string_view name, std::string value) { - cached_expressions_.erase(std::string{name}); - return ProcessContextImpl::setProperty(name, std::move(value)); -} +//nonstd::expected<void, std::error_code> ProcessContextExpr::setProperty(const std::string_view name, std::string value) { +// cached_expressions_.erase(std::string{name}); +// return ProcessContextImpl::setProperty(name, std::move(value)); +//} +// +//nonstd::expected<void, std::error_code> ProcessContextExpr::setDynamicProperty(std::string name, std::string value) { +// cached_dynamic_expressions_.erase(name); +// return ProcessContextImpl::setDynamicProperty(std::move(name), std::move(value)); +//} nonstd::expected<void, std::error_code> ProcessContextExpr::setDynamicProperty(std::string name, std::string value) { cached_dynamic_expressions_.erase(name); diff --git a/extensions/expression-language/ProcessContextExpr.h b/extensions/expression-language/ProcessContextExpr.h index fdd00caef..cb33c973c 100644 --- a/extensions/expression-language/ProcessContextExpr.h +++ b/extensions/expression-language/ProcessContextExpr.h @@ -34,35 +34,19 @@ namespace org::apache::nifi::minifi::core { */ class ProcessContextExpr final : public core::ProcessContextImpl { public: - ProcessContextExpr(Processor& processor, controller::ControllerServiceProvider* controller_service_provider, - const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_repo, - const std::shared_ptr<core::ContentRepository> &content_repo = core::repository::createFileSystemRepository()) - : core::ProcessContextImpl(processor, controller_service_provider, repo, flow_repo, content_repo), - logger_(logging::LoggerFactory<ProcessContextExpr>::getLogger()) { - } - - ProcessContextExpr(Processor& processor, controller::ControllerServiceProvider* controller_service_provider, - const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_repo, const std::shared_ptr<minifi::Configure> &configuration, - const std::shared_ptr<core::ContentRepository> &content_repo = core::repository::createFileSystemRepository()) - : core::ProcessContextImpl(processor, controller_service_provider, repo, flow_repo, configuration, content_repo), - logger_(logging::LoggerFactory<ProcessContextExpr>::getLogger()) { - } - ~ProcessContextExpr() override = default; - nonstd::expected<std::string, std::error_code> getProperty(std::string_view name, const FlowFile*) const override; - nonstd::expected<std::string, std::error_code> getDynamicProperty(std::string_view name, const FlowFile*) const override; + nonstd::expected<std::string, std::error_code> getProperty(ProcessContext& context, std::string_view name, const FlowFile*) const override; + nonstd::expected<std::string, std::error_code> getDynamicProperty(ProcessContext& context, std::string_view name, const FlowFile*) const override; +// +// nonstd::expected<void, std::error_code> setProperty(std::string_view name, std::string value); +// nonstd::expected<void, std::error_code> setDynamicProperty(std::string name, std::string value); - nonstd::expected<void, std::error_code> setProperty(std::string_view name, std::string value) override; - nonstd::expected<void, std::error_code> setDynamicProperty(std::string name, std::string value) override; - - std::map<std::string, std::string> getDynamicProperties(const FlowFile*) const override; + std::map<std::string, std::string> getDynamicProperties(ProcessContext& context, const FlowFile*) const override; private: mutable std::unordered_map<std::string, expression::Expression, utils::string::transparent_string_hash, std::equal_to<>> cached_expressions_; mutable std::unordered_map<std::string, expression::Expression, utils::string::transparent_string_hash, std::equal_to<>> cached_dynamic_expressions_; - - std::shared_ptr<logging::Logger> logger_; }; } // namespace org::apache::nifi::minifi::core diff --git a/extensions/grafana-loki/PushGrafanaLoki.cpp b/extensions/grafana-loki/PushGrafanaLoki.cpp index d37bff95c..6df796dde 100644 --- a/extensions/grafana-loki/PushGrafanaLoki.cpp +++ b/extensions/grafana-loki/PushGrafanaLoki.cpp @@ -232,12 +232,4 @@ void PushGrafanaLoki::restore(const std::shared_ptr<core::FlowFile>& flow_file) log_batch_.restore(flow_file); } -std::set<core::Connectable*> PushGrafanaLoki::getOutGoingConnections(const std::string &relationship) { - auto result = core::ConnectableImpl::getOutGoingConnections(relationship); - if (relationship == Self.getName()) { - result.insert(this); - } - return result; -} - } // namespace org::apache::nifi::minifi::extensions::grafana::loki diff --git a/extensions/grafana-loki/PushGrafanaLoki.h b/extensions/grafana-loki/PushGrafanaLoki.h index b8307d80b..e1c167e6d 100644 --- a/extensions/grafana-loki/PushGrafanaLoki.h +++ b/extensions/grafana-loki/PushGrafanaLoki.h @@ -107,7 +107,6 @@ class PushGrafanaLoki : public core::ProcessorImpl { void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& sessionFactory) override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void restore(const std::shared_ptr<core::FlowFile>& flow_file) override; - std::set<core::Connectable*> getOutGoingConnections(const std::string &relationship) override; protected: static const core::Relationship Self; diff --git a/extensions/libarchive/BinFiles.cpp b/extensions/libarchive/BinFiles.cpp index be12f3591..b9366d6d6 100644 --- a/extensions/libarchive/BinFiles.cpp +++ b/extensions/libarchive/BinFiles.cpp @@ -317,14 +317,6 @@ void BinFiles::restore(const std::shared_ptr<core::FlowFile>& flowFile) { file_store_.put(flowFile); } -std::set<core::Connectable*> BinFiles::getOutGoingConnections(const std::string &relationship) { - auto result = core::ConnectableImpl::getOutGoingConnections(relationship); - if (relationship == Self.getName()) { - result.insert(this); - } - return result; -} - REGISTER_RESOURCE(BinFiles, Processor); } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h index f4b5407bb..5d89c1790 100644 --- a/extensions/libarchive/BinFiles.h +++ b/extensions/libarchive/BinFiles.h @@ -262,8 +262,6 @@ class BinFiles : public core::ProcessorImpl { void restore(const std::shared_ptr<core::FlowFile>& flowFile) override; - std::set<core::Connectable*> getOutGoingConnections(const std::string &relationship) override; - protected: // Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId(). virtual void preprocessFlowFile(const std::shared_ptr<core::FlowFile>& flow); diff --git a/extensions/python/ExecutePythonProcessor.cpp b/extensions/python/ExecutePythonProcessor.cpp index 836c6583b..786ea25c8 100644 --- a/extensions/python/ExecutePythonProcessor.cpp +++ b/extensions/python/ExecutePythonProcessor.cpp @@ -31,16 +31,12 @@ #include "utils/StringUtils.h" #include "utils/file/FileUtils.h" #include "utils/ProcessorConfigUtils.h" -#include "utils/PropertyErrors.h" +#include "minifi-cpp/utils/PropertyErrors.h" +#include "minifi-cpp/core/ProcessorDescriptor.h" namespace org::apache::nifi::minifi::extensions::python::processors { -void ExecutePythonProcessor::initialize() { - if (getSupportedProperties().empty()) { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); - } - +void ExecutePythonProcessor::initialize(core::ProcessorDescriptor& self) { if (processor_initialized_) { logger_->log_debug("Processor has already been initialized, returning..."); return; @@ -56,6 +52,13 @@ void ExecutePythonProcessor::initialize() { // so that we can provide manifest of processor identity on C2 python_script_engine_ = createScriptEngine(); initalizeThroughScriptEngine(); + + std::vector<core::PropertyReference> all_properties{Properties.begin(), Properties.end()}; + for (auto& python_prop : python_properties_) { + all_properties.push_back(python_prop.getReference()); + } + self.setSupportedProperties(all_properties); + self.setSupportedRelationships(Relationships); } void ExecutePythonProcessor::initalizeThroughScriptEngine() { @@ -80,7 +83,7 @@ void ExecutePythonProcessor::initalizeThroughScriptEngine() { } void ExecutePythonProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& /*sessionFactory*/) { - addAutoTerminatedRelationship(Original); + context.addAutoTerminatedRelationship(Original); if (!processor_initialized_) { loadScript(); python_script_engine_ = createScriptEngine(); @@ -217,63 +220,8 @@ void ExecutePythonProcessor::addProperty(const std::string &name, const std::str python_properties_.emplace_back(property); } -nonstd::expected<std::string, std::error_code> ExecutePythonProcessor::getProperty(std::string_view name) const { - if (auto non_python_property = ConfigurableComponentImpl::getProperty(name)) { - return *non_python_property; - } - std::lock_guard<std::mutex> lock(python_properties_mutex_); - auto it = ranges::find_if(python_properties_, [&name](const auto& item){ - return item.getName() == name; - }); - if (it != python_properties_.end()) { - return it->getValue() | utils::transform([](const std::string_view value_view) -> std::string { return std::string{value_view}; }); - } - return nonstd::make_unexpected(core::PropertyErrorCode::NotSupportedProperty); -} - -nonstd::expected<void, std::error_code> ExecutePythonProcessor::setProperty(std::string_view name, std::string value) { - auto set_non_python_property = ConfigurableComponentImpl::setProperty(name, value); - if (set_non_python_property || set_non_python_property.error() != core::PropertyErrorCode::NotSupportedProperty) { - return set_non_python_property; - } - std::lock_guard<std::mutex> lock(python_properties_mutex_); - auto it = ranges::find_if(python_properties_, [&name](const auto& item){ - return item.getName() == name; - }); - if (it != python_properties_.end()) { - return it->setValue(std::move(value)); - } - return nonstd::make_unexpected(core::PropertyErrorCode::NotSupportedProperty); -} - -nonstd::expected<core::Property, std::error_code> ExecutePythonProcessor::getSupportedProperty(std::string_view name) const { - if (const auto non_python_property = ConfigurableComponentImpl::getSupportedProperty(name)) { - return *non_python_property; - } - std::lock_guard<std::mutex> lock(python_properties_mutex_); - auto it = ranges::find_if(python_properties_, [&name](const auto& item){ - return item.getName() == name; - }); - if (it != python_properties_.end()) { - return *it; - } - return nonstd::make_unexpected(core::PropertyErrorCode::NotSupportedProperty); -} - -std::map<std::string, core::Property, std::less<>> ExecutePythonProcessor::getSupportedProperties() const { - auto result = ConfigurableComponentImpl::getSupportedProperties(); - - std::lock_guard<std::mutex> lock(python_properties_mutex_); - - for (const auto &property : python_properties_) { - result.insert({ property.getName(), property }); - } - - return result; -} - std::vector<core::Relationship> ExecutePythonProcessor::getPythonRelationships() const { - auto relationships = getSupportedRelationships(); + std::vector<core::Relationship> relationships{Relationships.begin(), Relationships.end()}; auto custom_relationships = python_script_engine_->getCustomPythonRelationships(); relationships.reserve(relationships.size() + std::distance(custom_relationships.begin(), custom_relationships.end())); relationships.insert(relationships.end(), custom_relationships.begin(), custom_relationships.end()); diff --git a/extensions/python/ExecutePythonProcessor.h b/extensions/python/ExecutePythonProcessor.h index b9e12da07..caefb7b6a 100644 --- a/extensions/python/ExecutePythonProcessor.h +++ b/extensions/python/ExecutePythonProcessor.h @@ -90,7 +90,7 @@ class ExecutePythonProcessor : public core::ProcessorImpl { EXTENSIONAPI static constexpr bool IsSingleThreaded = true; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - void initialize() override; + void initialize(core::ProcessorDescriptor& self) override; void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; diff --git a/extensions/python/types/PyProcessContext.cpp b/extensions/python/types/PyProcessContext.cpp index 053b1523f..5390e4fa8 100644 --- a/extensions/python/types/PyProcessContext.cpp +++ b/extensions/python/types/PyProcessContext.cpp @@ -123,7 +123,7 @@ PyObject* PyProcessContext::getControllerService(PyProcessContext* self, PyObjec return nullptr; } - if (auto controller_service = context->getControllerService(controller_service_name, context->getProcessor().getUUID())) { + if (auto controller_service = context->getControllerService(controller_service_name, context->getProcessorInfo().getUUID())) { std::string controller_service_type_str = controller_service_type; if (controller_service_type_str == "SSLContextService") { auto ssl_ctx_service = std::dynamic_pointer_cast<controllers::SSLContextService>(controller_service); @@ -147,7 +147,7 @@ PyObject* PyProcessContext::getName(PyProcessContext* self, PyObject*) { return nullptr; } - return object::returnReference(context->getProcessor().getName()); + return object::returnReference(context->getProcessorInfo().getName()); } PyObject* PyProcessContext::getProperties(PyProcessContext* self, PyObject*) { @@ -157,7 +157,7 @@ PyObject* PyProcessContext::getProperties(PyProcessContext* self, PyObject*) { return nullptr; } - auto properties = context->getProcessor().getSupportedProperties(); + auto properties = context->getProcessorInfo().getSupportedProperties(); auto py_properties = OwnedDict::create(); for (const auto& [property_name, property] : properties) { if (const auto value = context->getProperty(property_name)) { diff --git a/extensions/smb/SmbConnectionControllerService.cpp b/extensions/smb/SmbConnectionControllerService.cpp index 87876dd95..d1302b32c 100644 --- a/extensions/smb/SmbConnectionControllerService.cpp +++ b/extensions/smb/SmbConnectionControllerService.cpp @@ -61,7 +61,7 @@ void SmbConnectionControllerService::notifyStop() { gsl::not_null<std::shared_ptr<SmbConnectionControllerService>> SmbConnectionControllerService::getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property) { std::shared_ptr<SmbConnectionControllerService> smb_connection_controller_service; if (auto connection_controller_name = context.getProperty(property)) { - smb_connection_controller_service = std::dynamic_pointer_cast<SmbConnectionControllerService>(context.getControllerService(*connection_controller_name, context.getProcessor().getUUID())); + smb_connection_controller_service = std::dynamic_pointer_cast<SmbConnectionControllerService>(context.getControllerService(*connection_controller_name, context.getProcessorInfo().getUUID())); } if (!smb_connection_controller_service) { throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing SMB Connection Controller Service"); diff --git a/extensions/standard-processors/processors/AttributeRollingWindow.cpp b/extensions/standard-processors/processors/AttributeRollingWindow.cpp index 922967577..e3108d32c 100644 --- a/extensions/standard-processors/processors/AttributeRollingWindow.cpp +++ b/extensions/standard-processors/processors/AttributeRollingWindow.cpp @@ -46,7 +46,7 @@ void AttributeRollingWindow::onSchedule(core::ProcessContext& context, core::Pro void AttributeRollingWindow::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { gsl_Expects(runningInvariant()); const auto flow_file = session.get(); - if (!flow_file) { yield(); return; } + if (!flow_file) { context.yield(); return; } gsl_Assert(flow_file); const auto current_value_opt_str = context.getProperty(ValueToTrack, flow_file.get()); if (!current_value_opt_str) { diff --git a/extensions/standard-processors/processors/DefragmentText.cpp b/extensions/standard-processors/processors/DefragmentText.cpp index a85a47391..e5c85dc77 100644 --- a/extensions/standard-processors/processors/DefragmentText.cpp +++ b/extensions/standard-processors/processors/DefragmentText.cpp @@ -188,14 +188,6 @@ void DefragmentText::restore(const std::shared_ptr<core::FlowFile>& flowFile) { flow_file_store_.put(flowFile); } -std::set<core::Connectable*> DefragmentText::getOutGoingConnections(const std::string &relationship) { - auto result = core::ConnectableImpl::getOutGoingConnections(relationship); - if (relationship == Self.getName()) { - result.insert(this); - } - return result; -} - void DefragmentText::Buffer::append(core::ProcessSession& session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append) { if (empty()) { store(session, flow_file_to_append); diff --git a/extensions/standard-processors/processors/DefragmentText.h b/extensions/standard-processors/processors/DefragmentText.h index 0b49962d1..518134afd 100644 --- a/extensions/standard-processors/processors/DefragmentText.h +++ b/extensions/standard-processors/processors/DefragmentText.h @@ -116,7 +116,6 @@ class DefragmentText : public core::ProcessorImpl { void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void restore(const std::shared_ptr<core::FlowFile>& flowFile) override; - std::set<core::Connectable*> getOutGoingConnections(const std::string &relationship) override; protected: class Buffer { diff --git a/extensions/standard-processors/processors/ExtractText.h b/extensions/standard-processors/processors/ExtractText.h index 71ee5fbfb..dd54485a6 100644 --- a/extensions/standard-processors/processors/ExtractText.h +++ b/extensions/standard-processors/processors/ExtractText.h @@ -38,10 +38,7 @@ namespace org::apache::nifi::minifi::processors { class ExtractText : public core::ProcessorImpl { public: - explicit ExtractText(const std::string_view name, const utils::Identifier& uuid = {}) - : ProcessorImpl(name, uuid) { - logger_ = core::logging::LoggerFactory<ExtractText>::getLogger(uuid_); - } + using ProcessorImpl::ProcessorImpl; // Default maximum bytes to read into an attribute static constexpr std::string_view DEFAULT_SIZE_LIMIT_STR = "2097152"; // 2 * 1024 * 1024 diff --git a/extensions/standard-processors/processors/GenerateFlowFile.cpp b/extensions/standard-processors/processors/GenerateFlowFile.cpp index ccc7b8636..24e27170e 100644 --- a/extensions/standard-processors/processors/GenerateFlowFile.cpp +++ b/extensions/standard-processors/processors/GenerateFlowFile.cpp @@ -26,12 +26,13 @@ #include <string> #include <vector> -#include "core/ProcessContext.h" +#include "minifi-cpp/core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/Resource.h" #include "utils/OptionalUtils.h" #include "utils/gsl.h" #include "utils/ProcessorConfigUtils.h" +#include "minifi-cpp/core/ProcessorDescriptor.h" namespace org::apache::nifi::minifi::processors { const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text"; @@ -83,7 +84,7 @@ void GenerateFlowFile::onSchedule(core::ProcessContext& context, core::ProcessSe | utils::valueOrElse([]() {return false;}); bool is_unique = utils::parseOptionalBoolProperty(context, UniqueFlowFiles).value_or(true); - const auto custom_text_without_evaluation = getProperty(CustomText.name); + const auto custom_text_without_evaluation = context.getProperty(CustomText.name); const bool has_custom_text = custom_text_without_evaluation.has_value() && !custom_text_without_evaluation->empty(); file_size_ = utils::parseDataSizeProperty(context, FileSize); diff --git a/extensions/standard-processors/processors/GenerateFlowFile.h b/extensions/standard-processors/processors/GenerateFlowFile.h index 2e173750c..a3fb0a698 100644 --- a/extensions/standard-processors/processors/GenerateFlowFile.h +++ b/extensions/standard-processors/processors/GenerateFlowFile.h @@ -39,10 +39,7 @@ namespace org::apache::nifi::minifi::processors { class GenerateFlowFile : public core::ProcessorImpl { public: - explicit GenerateFlowFile(const std::string_view name, const utils::Identifier& uuid = {}) // NOLINT - : ProcessorImpl(name, uuid) { - logger_ = core::logging::LoggerFactory<GenerateFlowFile>::getLogger(uuid_); - } + using ProcessorImpl::ProcessorImpl; ~GenerateFlowFile() override = default; EXTENSIONAPI static constexpr const char* Description = "This processor creates FlowFiles with random data or custom content. " diff --git a/extensions/standard-processors/processors/GetFile.h b/extensions/standard-processors/processors/GetFile.h index 5aa32033a..1e127b8e1 100644 --- a/extensions/standard-processors/processors/GetFile.h +++ b/extensions/standard-processors/processors/GetFile.h @@ -54,7 +54,7 @@ struct GetFileRequest { class GetFileMetrics : public core::ProcessorMetricsImpl { public: - explicit GetFileMetrics(const core::Processor& source_processor) + explicit GetFileMetrics(const core::ProcessorApi& source_processor) : core::ProcessorMetricsImpl(source_processor) { } diff --git a/extensions/standard-processors/processors/RouteOnAttribute.cpp b/extensions/standard-processors/processors/RouteOnAttribute.cpp index 2a3d095e2..573634be9 100644 --- a/extensions/standard-processors/processors/RouteOnAttribute.cpp +++ b/extensions/standard-processors/processors/RouteOnAttribute.cpp @@ -33,8 +33,8 @@ void RouteOnAttribute::initialize() { setSupportedRelationships(Relationships); } -void RouteOnAttribute::onSchedule(core::ProcessContext &, core::ProcessSessionFactory&) { - route_properties_ = getDynamicProperties(); +void RouteOnAttribute::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + route_properties_ = context.getDynamicProperties(); const auto static_relationships = RouteOnAttribute::Relationships; std::vector<core::RelationshipDefinition> relationships(static_relationships.begin(), static_relationships.end()); @@ -78,7 +78,7 @@ void RouteOnAttribute::onTrigger(core::ProcessContext& context, core::ProcessSes } catch (const std::exception &e) { logger_->log_error("Caught exception while updating attributes: type: {}, what: {}", typeid(e).name(), e.what()); session.transfer(flow_file, Failure); - yield(); + context.yield(); } } diff --git a/extensions/standard-processors/processors/RouteOnAttribute.h b/extensions/standard-processors/processors/RouteOnAttribute.h index 30fe8ee6c..f1775bab7 100644 --- a/extensions/standard-processors/processors/RouteOnAttribute.h +++ b/extensions/standard-processors/processors/RouteOnAttribute.h @@ -35,10 +35,7 @@ namespace org::apache::nifi::minifi::processors { class RouteOnAttribute : public core::ProcessorImpl { public: - explicit RouteOnAttribute(const std::string_view name, const utils::Identifier& uuid = {}) - : core::ProcessorImpl(name, uuid) { - logger_ = core::logging::LoggerFactory<RouteOnAttribute>::getLogger(uuid_); - } + using ProcessorImpl::ProcessorImpl; EXTENSIONAPI static constexpr const char* Description = "Routes FlowFiles based on their Attributes using the Attribute Expression Language.\n\n" "Any number of user-defined dynamic properties can be added, which all support the Attribute Expression Language. Relationships matching the name of the properties will be added.\n" diff --git a/extensions/standard-processors/processors/SegmentContent.h b/extensions/standard-processors/processors/SegmentContent.h index 6e4962ad6..364bfbbd8 100644 --- a/extensions/standard-processors/processors/SegmentContent.h +++ b/extensions/standard-processors/processors/SegmentContent.h @@ -34,9 +34,7 @@ namespace org::apache::nifi::minifi::processors { class SegmentContent final : public core::ProcessorImpl { public: - explicit SegmentContent(const std::string_view name, const utils::Identifier& uuid = {}) : ProcessorImpl(name, uuid) { - logger_ = core::logging::LoggerFactory<SegmentContent>::getLogger(uuid_); - } + using ProcessorImpl::ProcessorImpl; EXTENSIONAPI static constexpr auto Description = "Segments a FlowFile into multiple smaller segments on byte boundaries."; diff --git a/extensions/standard-processors/processors/SplitText.h b/extensions/standard-processors/processors/SplitText.h index b461aad9b..93f9f82a2 100644 --- a/extensions/standard-processors/processors/SplitText.h +++ b/extensions/standard-processors/processors/SplitText.h @@ -87,10 +87,7 @@ class LineReader { class SplitText : public core::ProcessorImpl { public: - explicit SplitText(const std::string_view name, const utils::Identifier& uuid = {}) - : ProcessorImpl(name, uuid) { - logger_ = core::logging::LoggerFactory<SplitText>::getLogger(uuid_); - } + using ProcessorImpl::ProcessorImpl; EXTENSIONAPI static constexpr const char* Description = "Splits a text file into multiple smaller text files on line boundaries limited by maximum number of lines or total size of fragment. " "Each output split file will contain no more than the configured number of lines or bytes. If both Line Split Count and Maximum Fragment Size are specified, the split occurs at whichever " diff --git a/extensions/standard-processors/processors/UpdateAttribute.h b/extensions/standard-processors/processors/UpdateAttribute.h index 2cd1ba2d0..e877b0669 100644 --- a/extensions/standard-processors/processors/UpdateAttribute.h +++ b/extensions/standard-processors/processors/UpdateAttribute.h @@ -35,10 +35,7 @@ namespace org::apache::nifi::minifi::processors { class UpdateAttribute : public core::ProcessorImpl { public: - explicit UpdateAttribute(const std::string_view name, const utils::Identifier& uuid = {}) - : core::ProcessorImpl(name, uuid) { - logger_ = core::logging::LoggerFactory<UpdateAttribute>::getLogger(uuid_); - } + using ProcessorImpl::ProcessorImpl; EXTENSIONAPI static constexpr const char* Description = "This processor updates the attributes of a FlowFile using properties that are added by the user. " "This allows you to set default attribute changes that affect every FlowFile going through the processor, equivalent to the \"basic\" usage in Apache NiFi."; diff --git a/libminifi/include/ForwardingNode.h b/libminifi/include/ForwardingNode.h index d20468b27..ffa8e7713 100644 --- a/libminifi/include/ForwardingNode.h +++ b/libminifi/include/ForwardingNode.h @@ -22,15 +22,14 @@ #include <utility> #include "core/logging/LoggerFactory.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" +#include "minifi-cpp/core/ProcessorDescriptor.h" namespace org::apache::nifi::minifi { class ForwardingNode : public core::ProcessorImpl { public: - ForwardingNode(std::string_view name, const utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger) : ProcessorImpl(name, uuid), logger_(std::move(logger)) { - strategy_ = core::SchedulingStrategy::EVENT_DRIVEN; - } + ForwardingNode(std::string_view name, const utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger) : ProcessorImpl(name, uuid), logger_(std::move(logger)) {} ForwardingNode(std::string_view name, std::shared_ptr<core::logging::Logger> logger) : ProcessorImpl(name), logger_(std::move(logger)) {} MINIFIAPI static constexpr auto Properties = std::array<core::PropertyReference, 0>{}; diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h index 2f1e5bbb2..c3608fec4 100644 --- a/libminifi/include/RemoteProcessorGroupPort.h +++ b/libminifi/include/RemoteProcessorGroupPort.h @@ -29,7 +29,7 @@ #include "http/BaseHTTPClient.h" #include "concurrentqueue.h" #include "FlowFileRecord.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" @@ -212,7 +212,7 @@ class RemoteProcessorGroupPort : public core::ProcessorImpl { } std::unique_ptr<sitetosite::SiteToSiteClient> getNextProtocol(bool create); - void returnProtocol(std::unique_ptr<sitetosite::SiteToSiteClient> protocol); + void returnProtocol(core::ProcessContext& context, std::unique_ptr<sitetosite::SiteToSiteClient> protocol); moodycamel::ConcurrentQueue<std::unique_ptr<sitetosite::SiteToSiteClient>> available_protocols_; diff --git a/minifi-api/include/minifi-cpp/core/Processor.h b/libminifi/include/core/Processor.h similarity index 80% copy from minifi-api/include/minifi-cpp/core/Processor.h copy to libminifi/include/core/Processor.h index ded0f7570..8d4904827 100644 --- a/minifi-api/include/minifi-cpp/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -23,17 +23,16 @@ #include <unordered_set> #include <unordered_map> -#include "ConfigurableComponent.h" -#include "Connectable.h" -#include "Property.h" -#include "DynamicProperty.h" -#include "Core.h" +#include "minifi-cpp/core/ConfigurableComponent.h" +#include "minifi-cpp/core/Connectable.h" +#include "minifi-cpp/core/Property.h" +#include "minifi-cpp/core/DynamicProperty.h" +#include "minifi-cpp/core/Core.h" #include "minifi-cpp/core/Annotation.h" -#include "Scheduling.h" +#include "minifi-cpp/core/Scheduling.h" #include "minifi-cpp/core/state/nodes/MetricsBase.h" -#include "ProcessorMetrics.h" +#include "minifi-cpp/core/ProcessorMetrics.h" #include "utils/gsl.h" -#include "core/logging/Logger.h" namespace org::apache::nifi::minifi { @@ -64,7 +63,6 @@ class Processor : public virtual Connectable, public virtual ConfigurableCompone virtual void setPenalizationPeriod(std::chrono::milliseconds period) = 0; virtual bool isSingleThreaded() const = 0; virtual std::string getProcessorType() const = 0; - virtual void setTriggerWhenEmpty(bool value) = 0; virtual bool getTriggerWhenEmpty() const = 0; virtual uint8_t getActiveTasks() const = 0; virtual void incrementActiveTasks() = 0; @@ -76,7 +74,6 @@ class Processor : public virtual Connectable, public virtual ConfigurableCompone virtual void clearYield() = 0; virtual std::chrono::steady_clock::time_point getYieldExpirationTime() const = 0; virtual std::chrono::steady_clock::duration getYieldTime() const = 0; - virtual bool flowFilesOutGoingFull() const = 0; virtual bool addConnection(Connectable* connection) = 0; virtual void triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory) = 0; virtual void trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session) = 0; @@ -87,15 +84,6 @@ class Processor : public virtual Connectable, public virtual ConfigurableCompone virtual void validateAnnotations() const = 0; virtual annotation::Input getInputRequirement() const = 0; virtual gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const = 0; - virtual std::string getProcessGroupUUIDStr() const = 0; - virtual void setProcessGroupUUIDStr(const std::string &uuid) = 0; - virtual std::string getProcessGroupName() const = 0; - virtual void setProcessGroupName(const std::string &name) = 0; - virtual std::string getProcessGroupPath() const = 0; - virtual void setProcessGroupPath(const std::string &path) = 0; - virtual logging::LOG_LEVEL getLogBulletinLevel() const = 0; - virtual void setLogBulletinLevel(logging::LOG_LEVEL level) = 0; - virtual void setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, const std::string& message)>& callback) = 0; virtual void updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force = false) = 0; virtual const std::unordered_map<Connection*, std::unordered_set<Processor*>>& reachable_processors() const = 0; diff --git a/utils/include/core/Processor.h b/libminifi/include/core/ProcessorProxy.h similarity index 67% copy from utils/include/core/Processor.h copy to libminifi/include/core/ProcessorProxy.h index 11c779b00..5ea8931ce 100644 --- a/utils/include/core/Processor.h +++ b/libminifi/include/core/ProcessorProxy.h @@ -42,22 +42,9 @@ #include "utils/gsl.h" #include "utils/Id.h" #include "minifi-cpp/core/OutputAttributeDefinition.h" +#include "Processor.h" #include "minifi-cpp/core/Processor.h" -#define ADD_GET_PROCESSOR_NAME \ - std::string getProcessorType() const override { \ - auto class_name = org::apache::nifi::minifi::core::className<decltype(*this)>(); \ - auto splitted = org::apache::nifi::minifi::utils::string::split(class_name, "::"); \ - return splitted[splitted.size() - 1]; \ - } - -#define ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS \ - bool supportsDynamicProperties() const override { return SupportsDynamicProperties; } \ - bool supportsDynamicRelationships() const override { return SupportsDynamicRelationships; } \ - minifi::core::annotation::Input getInputRequirement() const override { return InputRequirement; } \ - bool isSingleThreaded() const override { return IsSingleThreaded; } \ - ADD_GET_PROCESSOR_NAME - namespace org::apache::nifi::minifi { class Connection; @@ -72,17 +59,17 @@ constexpr std::chrono::microseconds MINIMUM_SCHEDULING_PERIOD{30}; #define BUILDING_DLL 1 -class ProcessorImpl : public virtual Processor, public ConnectableImpl, public ConfigurableComponentImpl { +class ProcessorProxy : public virtual Processor, public ConnectableImpl, public ConfigurableComponentImpl { public: - ProcessorImpl(std::string_view name, const utils::Identifier& uuid, std::shared_ptr<ProcessorMetrics> metrics = nullptr); - explicit ProcessorImpl(std::string_view name, std::shared_ptr<ProcessorMetrics> metrics = nullptr); + ProcessorProxy(std::string_view name, const utils::Identifier& uuid, std::unique_ptr<ProcessorApi> impl); + explicit ProcessorProxy(std::string_view name, std::unique_ptr<ProcessorApi> impl); - ProcessorImpl(const ProcessorImpl& parent) = delete; - ProcessorImpl& operator=(const ProcessorImpl& parent) = delete; + ProcessorProxy(const ProcessorProxy& parent) = delete; + ProcessorProxy& operator=(const ProcessorProxy& parent) = delete; bool isRunning() const override; - ~ProcessorImpl() override; + ~ProcessorProxy() override; void setScheduledState(ScheduledState state) override; @@ -136,16 +123,16 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C void setMaxConcurrentTasks(uint8_t tasks) override; - bool isSingleThreaded() const override = 0; - - std::string getProcessorType() const override = 0; + bool isSingleThreaded() const override { + return impl_->isSingleThreaded(); + } - void setTriggerWhenEmpty(bool value) override { - _triggerWhenEmpty = value; + std::string getProcessorType() const override { + return impl_->getProcessorType(); } bool getTriggerWhenEmpty() const override { - return (_triggerWhenEmpty); + return impl_->getTriggerWhenEmpty(); } uint8_t getActiveTasks() const override { @@ -165,30 +152,6 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C active_tasks_ = 0; } - std::string getProcessGroupUUIDStr() const override { - return process_group_uuid_; - } - - void setProcessGroupUUIDStr(const std::string &uuid) override { - process_group_uuid_ = uuid; - } - - std::string getProcessGroupName() const override { - return process_group_name_; - } - - void setProcessGroupName(const std::string &name) override { - process_group_name_ = name; - } - - std::string getProcessGroupPath() const override { - return process_group_path_; - } - - void setProcessGroupPath(const std::string &path) override { - process_group_path_ = path; - } - void yield() override; void yield(std::chrono::steady_clock::duration delta_time) override; @@ -199,8 +162,6 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C std::chrono::steady_clock::time_point getYieldExpirationTime() const override { return yield_expiration_; } std::chrono::steady_clock::duration getYieldTime() const override; - // Whether flow file queue full in any of the outgoing connection - bool flowFilesOutGoingFull() const override; bool addConnection(Connectable* connection) override; @@ -208,19 +169,22 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C return !isRunning(); } - void initialize() override { - } + void initialize() override; void triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory) override; void trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session) override; - void onTrigger(ProcessContext&, ProcessSession&) override {} + void onTrigger(ProcessContext& context, ProcessSession& session) override { + impl_->onTrigger(context, session); + } - void onSchedule(ProcessContext&, ProcessSessionFactory&) override {} + void onSchedule(ProcessContext& context, ProcessSessionFactory& session_factory) override { + impl_->onSchedule(context, session_factory); + } // Hook executed when onSchedule fails (throws). Configuration should be reset in this void onUnSchedule() override { - notifyStop(); + impl_->onUnSchedule(); } // Check all incoming connections for work @@ -232,34 +196,31 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C void validateAnnotations() const override; - annotation::Input getInputRequirement() const override = 0; - - state::response::SharedResponseNode getResponseNode() override { - return metrics_; + annotation::Input getInputRequirement() const override { + return impl_->getInputRequirement(); } - gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const override { - return metrics_; + [[nodiscard]] bool supportsDynamicProperties() const override { + return impl_->supportsDynamicProperties(); } - logging::LOG_LEVEL getLogBulletinLevel() const override { - return log_bulletin_level_; + [[nodiscard]] bool supportsDynamicRelationships() const override { + return impl_->supportsDynamicRelationships(); } - void setLogBulletinLevel(logging::LOG_LEVEL level) override { - log_bulletin_level_ = level; + state::response::SharedResponseNode getResponseNode() override { + return getMetrics(); } - void setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, const std::string& message)>& callback) override; + gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const override { + return impl_->getMetrics(); + } static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{}; static constexpr auto OutputAttributes = std::array<OutputAttributeReference, 0>{}; protected: - virtual void notifyStop() { - } - std::atomic<ScheduledState> state_; std::atomic<std::chrono::steady_clock::duration> scheduling_period_; @@ -267,13 +228,10 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C std::atomic<std::chrono::steady_clock::duration> yield_period_; std::atomic<uint8_t> active_tasks_; - std::atomic<bool> _triggerWhenEmpty; std::string cron_period_; - gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_; std::shared_ptr<logging::Logger> logger_; - logging::LOG_LEVEL log_bulletin_level_ = logging::LOG_LEVEL::warn; private: mutable std::mutex mutex_; @@ -296,9 +254,7 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C // an outgoing connection allows us to reach these nodes std::unordered_map<Connection*, std::unordered_set<Processor*>> reachable_processors_; - std::string process_group_uuid_; - std::string process_group_name_; - std::string process_group_path_; + std::unique_ptr<ProcessorApi> impl_; }; } // namespace core diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h index e0f69b2ca..cacd4dfcb 100644 --- a/libminifi/include/core/state/nodes/FlowInformation.h +++ b/libminifi/include/core/state/nodes/FlowInformation.h @@ -52,7 +52,7 @@ class FlowVersion : public DeviceInformation { return "FlowVersion"; } - std::shared_ptr<state::FlowIdentifier> getFlowIdentifier() const override { + std::shared_ptr<state::FlowIdentifier> getFlowIdentifier() const { std::lock_guard<std::mutex> lock(guard); return identifier; } diff --git a/libminifi/include/core/state/nodes/MetricsBase.h b/libminifi/include/core/state/nodes/MetricsBase.h index 555b06d70..0cc9ebd97 100644 --- a/libminifi/include/core/state/nodes/MetricsBase.h +++ b/libminifi/include/core/state/nodes/MetricsBase.h @@ -54,7 +54,7 @@ class ObjectNode : public ResponseNodeImpl { } std::string getName() const override { - return ConnectableImpl::getName(); + return CoreComponentImpl::getName(); } std::vector<SerializedResponseNode> serialize() override; diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 96f77efdc..f8ee1b46c 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -90,10 +90,10 @@ std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessorGroupPort::getNextP return nextProtocol; } -void RemoteProcessorGroupPort::returnProtocol(std::unique_ptr<sitetosite::SiteToSiteClient> return_protocol) { +void RemoteProcessorGroupPort::returnProtocol(core::ProcessContext& context, std::unique_ptr<sitetosite::SiteToSiteClient> return_protocol) { auto count = peers_.size(); - if (max_concurrent_tasks_ > count) - count = max_concurrent_tasks_; + if (context.getProcessor().getMaxConcurrentTasks() > count) + count = context.getProcessor().getMaxConcurrentTasks(); if (available_protocols_.size_approx() >= count) { logger_->log_debug("not enqueueing protocol {}", getUUIDStr()); // let the memory be freed @@ -142,8 +142,8 @@ void RemoteProcessorGroupPort::onSchedule(core::ProcessContext& context, core::P // populate the site2site protocol for load balancing between them if (!peers_.empty()) { auto count = peers_.size(); - if (max_concurrent_tasks_ > count) - count = max_concurrent_tasks_; + if (context.getProcessor().getMaxConcurrentTasks() > count) + count = context.getProcessor().getMaxConcurrentTasks(); for (uint32_t i = 0; i < count; i++) { std::unique_ptr<sitetosite::SiteToSiteClient> nextProtocol = nullptr; sitetosite::SiteToSiteClientConfiguration config(peers_[this->peer_index_].getPeer(), this->getInterface(), client_type_); @@ -157,7 +157,7 @@ void RemoteProcessorGroupPort::onSchedule(core::ProcessContext& context, core::P config.setIdleTimeout(idle_timeout_); nextProtocol = sitetosite::createClient(config); logger_->log_trace("Created client, moving into available protocols"); - returnProtocol(std::move(nextProtocol)); + returnProtocol(context, std::move(nextProtocol)); } } else { // we don't have any peers @@ -205,7 +205,7 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext& context, core::Pr context.yield(); } - returnProtocol(std::move(protocol_)); + returnProtocol(context, std::move(protocol_)); return; } catch (const minifi::Exception &) { context.yield(); diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp index 985d4cb72..9d56c36e5 100644 --- a/libminifi/src/core/ClassLoader.cpp +++ b/libminifi/src/core/ClassLoader.cpp @@ -23,6 +23,7 @@ #include "core/logging/LoggerFactory.h" #include "range/v3/action/sort.hpp" #include "range/v3/action/unique.hpp" +#include "core/ProcessorProxy.h" namespace org { namespace apache { @@ -38,6 +39,8 @@ class ClassLoaderImpl : public ClassLoader { void registerClass(const std::string &clazz, std::unique_ptr<ObjectFactory> factory) override; + void registerClass(const std::string &clazz, std::unique_ptr<ProcessorFactory> factory) override; + void unregisterClass(const std::string& clazz) override; std::optional<std::string> getGroupForClass(const std::string &class_name) const override; @@ -96,6 +99,47 @@ void ClassLoaderImpl::registerClass(const std::string &clazz, std::unique_ptr<Ob loaded_factories_.insert(std::make_pair(clazz, std::move(factory))); } +namespace { +class ProcessorFactoryWrapper : public ObjectFactoryImpl { + public: + explicit ProcessorFactoryWrapper(std::unique_ptr<ProcessorFactory> factory) + : ObjectFactoryImpl(factory->getGroupName()), + factory_(std::move(factory)) {} + + std::unique_ptr<CoreComponent> create(const std::string &name) override { + return std::unique_ptr<CoreComponent>{createRaw(name)}; + } + + std::unique_ptr<CoreComponent> create(const std::string &name, const utils::Identifier &uuid) override { + return std::unique_ptr<CoreComponent>{createRaw(name, uuid)}; + } + + CoreComponent* createRaw(const std::string &name) override { + return createRaw(name, utils::IdGenerator::getIdGenerator()->generate()); + } + + CoreComponent* createRaw(const std::string &name, const utils::Identifier &uuid) override { + auto logger = logging::LoggerFactoryBase::getAliasedLogger(getClassName(), uuid); + return new ProcessorProxy(name, uuid, factory_->create({.uuid = uuid, .name = name, .logger = logger})); + } + + std::string getGroupName() const override { + return factory_->getGroupName(); + } + + std::string getClassName() override { + return factory_->getClassName(); + } + + private: + std::unique_ptr<ProcessorFactory> factory_; +}; +} + +void ClassLoaderImpl::registerClass(const std::string &clazz, std::unique_ptr<ProcessorFactory> factory) { + registerClass(clazz, std::make_unique<ProcessorFactoryWrapper>(std::move(factory))); +} + void ClassLoaderImpl::unregisterClass(const std::string& clazz) { std::lock_guard<std::mutex> lock(internal_mutex_); if (loaded_factories_.erase(clazz) == 0) { diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 761aafdd8..17285480c 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -26,6 +26,7 @@ #include "processors/ProcessorUtils.h" #include "utils/StringUtils.h" #include "utils/file/FileUtils.h" +#include "core/ProcessorProxy.h" namespace org::apache::nifi::minifi::core { diff --git a/libminifi/src/core/ProcessContext.cpp b/libminifi/src/core/ProcessContext.cpp new file mode 100644 index 000000000..63bb8bc8b --- /dev/null +++ b/libminifi/src/core/ProcessContext.cpp @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/ProcessContext.h" +#include "core/Processor.h" + +namespace org::apache::nifi::minifi::core { + +namespace { + +class StandardProcessorInfo : public ProcessorInfo { + public: + explicit StandardProcessorInfo(Processor& proc): proc_(proc) {} + + std::string getName() const override {return proc_.getName();} + utils::Identifier getUUID() const override {return proc_.getUUID();} + std::shared_ptr<state::FlowIdentifier> getFlowIdentifier() const override {return proc_.getFlowIdentifier();} + std::map<std::string, core::Property> getSupportedProperties() const override {return proc_.getSupportedProperties();} + nonstd::expected<PropertyReference, std::error_code> getPropertyReference(std::string_view name) const override {return proc_.getPropertyReference(name);} + + private: + Processor& proc_; +}; + +} // namespace + +ProcessContextImpl::ProcessContextImpl( + Processor& processor, controller::ControllerServiceProvider* controller_service_provider, const std::shared_ptr<core::Repository>& repo, + const std::shared_ptr<core::Repository>& flow_repo, const std::shared_ptr<core::ContentRepository>& content_repo) + : VariableRegistryImpl(Configure::create()), + controller_service_provider_(controller_service_provider), + flow_repo_(flow_repo), + content_repo_(content_repo), + processor_(processor), + logger_(logging::LoggerFactory<ProcessContext>::getLogger()), + configure_(minifi::Configure::create()), + initialized_(false) { + repo_ = repo; + state_storage_ = getStateStorage(logger_, controller_service_provider_, nullptr); +} + +ProcessContextImpl::ProcessContextImpl( + Processor& processor, controller::ControllerServiceProvider* controller_service_provider, const std::shared_ptr<core::Repository>& repo, + const std::shared_ptr<core::Repository>& flow_repo, const std::shared_ptr<minifi::Configure>& configuration, + const std::shared_ptr<core::ContentRepository>& content_repo) + : VariableRegistryImpl(configuration), + controller_service_provider_(controller_service_provider), + flow_repo_(flow_repo), + content_repo_(content_repo), + processor_(processor), + logger_(logging::LoggerFactory<ProcessContext>::getLogger()), + configure_(configuration), + initialized_(false) { + repo_ = repo; + state_storage_ = getStateStorage(logger_, controller_service_provider_, configuration); + if (!configure_) { configure_ = minifi::Configure::create(); } + info_ = std::make_unique<StandardProcessorInfo>(processor); +} + +std::vector<std::string> ProcessContextImpl::getDynamicPropertyKeys() const { return processor_.getDynamicPropertyKeys(); } + +std::map<std::string, std::string> ProcessContextImpl::getDynamicProperties(const FlowFile*) const { return processor_.getDynamicProperties(); } + +bool ProcessContextImpl::isAutoTerminated(Relationship relationship) const { return processor_.isAutoTerminated(relationship); } + +uint8_t ProcessContextImpl::getMaxConcurrentTasks() const { return processor_.getMaxConcurrentTasks(); } + +void ProcessContextImpl::yield() { processor_.yield(); } + +nonstd::expected<std::string, std::error_code> ProcessContextImpl::getProperty(const std::string_view name, const FlowFile* const) const { + return getProcessor().getProperty(name); +} + +nonstd::expected<void, std::error_code> ProcessContextImpl::setProperty(const std::string_view name, std::string value) { + return getProcessor().setProperty(name, std::move(value)); +} + +nonstd::expected<void, std::error_code> ProcessContextImpl::clearProperty(const std::string_view name) { + return getProcessor().clearProperty(name); +} + +nonstd::expected<std::string, std::error_code> ProcessContextImpl::getDynamicProperty(const std::string_view name, const FlowFile* const) const { + return getProcessor().getDynamicProperty(name); +} + +nonstd::expected<void, std::error_code> ProcessContextImpl::setDynamicProperty(std::string name, std::string value) { + return getProcessor().setDynamicProperty(std::move(name), std::move(value)); +} + +StateManager* ProcessContextImpl::getStateManager() { + if (state_storage_ == nullptr) { return nullptr; } + if (!state_manager_) { state_manager_ = state_storage_->getStateManager(processor_); } + return state_manager_.get(); +} + +} // namespace org::apache::nifi::minifi::core diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index e4b8838bf..37706642c 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -34,6 +34,7 @@ #include "io/StreamSlice.h" #include "io/StreamPipe.h" #include "utils/gsl.h" +#include "core/Processor.h" /* This implementation is only for native Windows systems. */ #if (defined _WIN32 || defined __WIN32__) && !defined __CYGWIN__ diff --git a/utils/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp similarity index 81% copy from utils/src/core/Processor.cpp copy to libminifi/src/core/Processor.cpp index 95feac9f8..538c265fb 100644 --- a/utils/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -32,26 +32,26 @@ #include "core/logging/LoggerFactory.h" #include "minifi-cpp/core/ProcessorConfig.h" #include "minifi-cpp/core/ProcessContext.h" +#include "minifi-cpp/core/ProcessorDescriptor.h" #include "minifi-cpp/core/ProcessSessionFactory.h" #include "utils/gsl.h" #include "range/v3/algorithm/any_of.hpp" #include "fmt/format.h" #include "Exception.h" +#include "core/ProcessorProxy.h" #include "core/ProcessorMetrics.h" using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core { -ProcessorImpl::ProcessorImpl(std::string_view name, std::shared_ptr<ProcessorMetrics> metrics) +ProcessorProxy::ProcessorProxy(std::string_view name, std::unique_ptr<ProcessorApi> impl) : ConnectableImpl(name), state_(DISABLED), scheduling_period_(MINIMUM_SCHEDULING_PERIOD), run_duration_(DEFAULT_RUN_DURATION), yield_period_(DEFAULT_YIELD_PERIOD_SECONDS), active_tasks_(0), - _triggerWhenEmpty(false), - metrics_(metrics ? std::move(metrics) : std::make_shared<ProcessorMetricsImpl>(*this)), logger_(logging::LoggerFactory<Processor>::getLogger(uuid_)) { has_work_.store(false); // Setup the default values @@ -62,15 +62,13 @@ ProcessorImpl::ProcessorImpl(std::string_view name, std::shared_ptr<ProcessorMet logger_->log_debug("Processor {} created UUID {}", name_, getUUIDStr()); } -ProcessorImpl::ProcessorImpl(std::string_view name, const utils::Identifier& uuid, std::shared_ptr<ProcessorMetrics> metrics) +ProcessorProxy::ProcessorProxy(std::string_view name, const utils::Identifier& uuid, std::unique_ptr<ProcessorApi> impl) : ConnectableImpl(name, uuid), state_(DISABLED), scheduling_period_(MINIMUM_SCHEDULING_PERIOD), run_duration_(DEFAULT_RUN_DURATION), yield_period_(DEFAULT_YIELD_PERIOD_SECONDS), active_tasks_(0), - _triggerWhenEmpty(false), - metrics_(metrics ? std::move(metrics) : std::make_shared<ProcessorMetricsImpl>(*this)), logger_(logging::LoggerFactory<Processor>::getLogger(uuid_)) { has_work_.store(false); // Setup the default values @@ -81,22 +79,22 @@ ProcessorImpl::ProcessorImpl(std::string_view name, const utils::Identifier& uui logger_->log_debug("Processor {} created with uuid {}", name_, getUUIDStr()); } -ProcessorImpl::~ProcessorImpl() { +ProcessorProxy::~ProcessorProxy() { logger_->log_debug("Destroying processor {} with uuid {}", name_, getUUIDStr()); } -bool ProcessorImpl::isRunning() const { +bool ProcessorProxy::isRunning() const { return (state_ == RUNNING && active_tasks_ > 0); } -void ProcessorImpl::setScheduledState(ScheduledState state) { +void ProcessorProxy::setScheduledState(ScheduledState state) { state_ = state; if (state == STOPPED) { - notifyStop(); + impl_->notifyStop(); } } -bool ProcessorImpl::addConnection(Connectable* conn) { +bool ProcessorProxy::addConnection(Connectable* conn) { enum class SetAs{ NONE, OUTPUT, @@ -166,24 +164,9 @@ bool ProcessorImpl::addConnection(Connectable* conn) { return result != SetAs::NONE; } -bool ProcessorImpl::flowFilesOutGoingFull() const { - std::lock_guard<std::mutex> lock(mutex_); - - for (const auto& [_name, existed_connection] : outgoing_connections_) { - if (ranges::any_of(existed_connection, [](const Connectable* conn) { - const auto connection = dynamic_cast<const Connection*>(conn); - return connection && connection->backpressureThresholdReached(); - })) { - return true; - } - } - - return false; -} - -void ProcessorImpl::triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory) { +void ProcessorProxy::triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory) { const auto process_session = session_factory->createSession(); - process_session->setMetrics(metrics_); + process_session->setMetrics(getMetrics()); try { trigger(context, process_session); process_session->commit(); @@ -199,14 +182,14 @@ void ProcessorImpl::triggerAndCommit(const std::shared_ptr<ProcessContext>& cont } } -void ProcessorImpl::trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session) { - ++metrics_->invocations(); +void ProcessorProxy::trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session) { + ++impl_->getMetrics()->iterations(); const auto start = std::chrono::steady_clock::now(); onTrigger(*context, *process_session); - metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start)); + impl_->getMetrics()->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start)); } -bool ProcessorImpl::isWorkAvailable() { +bool ProcessorProxy::isWorkAvailable() { // We have work if any incoming connection has work std::lock_guard<std::mutex> lock(mutex_); bool hasWork = false; @@ -228,11 +211,11 @@ bool ProcessorImpl::isWorkAvailable() { getCurrentExceptionTypeName()); } - return hasWork; + return hasWork || impl_->isWorkAvailable(); } // must hold the graphMutex -void ProcessorImpl::updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force) { +void ProcessorProxy::updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force) { bool didChange = force; for (auto& outIt : outgoing_connections_) { for (auto& outConn : outIt.second) { @@ -272,7 +255,7 @@ void ProcessorImpl::updateReachability(const std::lock_guard<std::mutex>& graph_ } } -bool ProcessorImpl::partOfCycle(Connection* conn) { +bool ProcessorProxy::partOfCycle(Connection* conn) { auto source = dynamic_cast<Processor*>(conn->getSource()); if (!source) { return false; @@ -284,7 +267,7 @@ bool ProcessorImpl::partOfCycle(Connection* conn) { return it->second.contains(source); } -bool ProcessorImpl::isThrottledByBackpressure() const { +bool ProcessorProxy::isThrottledByBackpressure() const { bool isThrottledByOutgoing = ranges::any_of(outgoing_connections_, [](auto& name_connection_set_pair) { return ranges::any_of(name_connection_set_pair.second, [](auto& connectable) { auto connection = dynamic_cast<Connection*>(connectable); @@ -298,7 +281,7 @@ bool ProcessorImpl::isThrottledByBackpressure() const { return isThrottledByOutgoing && !isForcedByIncomingCycle; } -Connectable* ProcessorImpl::pickIncomingConnection() { +Connectable* ProcessorProxy::pickIncomingConnection() { std::lock_guard<std::mutex> rel_guard(relationship_mutex_); auto beginIt = incoming_connections_Iter; @@ -318,7 +301,7 @@ Connectable* ProcessorImpl::pickIncomingConnection() { return getNextIncomingConnectionImpl(rel_guard); } -void ProcessorImpl::validateAnnotations() const { +void ProcessorProxy::validateAnnotations() const { switch (getInputRequirement()) { case annotation::Input::INPUT_REQUIRED: { if (!hasIncomingConnections()) { @@ -338,7 +321,7 @@ void ProcessorImpl::validateAnnotations() const { } } -void ProcessorImpl::setMaxConcurrentTasks(const uint8_t tasks) { +void ProcessorProxy::setMaxConcurrentTasks(const uint8_t tasks) { if (isSingleThreaded() && tasks > 1) { logger_->log_warn("Processor {} can not be run in parallel, its \"max concurrent tasks\" value is too high. " "It was set to 1 from {}.", name_, tasks); @@ -349,28 +332,48 @@ void ProcessorImpl::setMaxConcurrentTasks(const uint8_t tasks) { max_concurrent_tasks_ = tasks; } -void ProcessorImpl::yield() { +void ProcessorProxy::yield() { yield_expiration_ = std::chrono::steady_clock::now() + yield_period_.load(); } -void ProcessorImpl::yield(std::chrono::steady_clock::duration delta_time) { +void ProcessorProxy::yield(std::chrono::steady_clock::duration delta_time) { yield_expiration_ = std::chrono::steady_clock::now() + delta_time; } -bool ProcessorImpl::isYield() { +bool ProcessorProxy::isYield() { return getYieldTime() > 0ms; } -void ProcessorImpl::clearYield() { +void ProcessorProxy::clearYield() { yield_expiration_ = std::chrono::steady_clock::time_point(); } -std::chrono::steady_clock::duration ProcessorImpl::getYieldTime() const { +std::chrono::steady_clock::duration ProcessorProxy::getYieldTime() const { return std::max(yield_expiration_.load()-std::chrono::steady_clock::now(), std::chrono::steady_clock::duration{0}); } -void ProcessorImpl::setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, const std::string& message)>& callback) { - logger_->setLogCallback(callback); +namespace { + +class ProcessorDescriptorImpl : public ProcessorDescriptor { + public: + explicit ProcessorDescriptorImpl(ProcessorProxy* impl): impl_(impl) {} + void setSupportedRelationships(std::span<const RelationshipDefinition> relationships) override { + impl_->setSupportedRelationships(relationships); + } + + void setSupportedProperties(std::span<const PropertyReference> properties) { + impl_->setSupportedProperties(properties); + } + + private: + ProcessorProxy* impl_; +}; + +} // namespace + +void ProcessorProxy::initialize() { + ProcessorDescriptorImpl self{this}; + impl_->initialize(self); } } // namespace org::apache::nifi::minifi::core diff --git a/libminifi/test/libtest/unit/DummyProcessor.h b/libminifi/test/libtest/unit/DummyProcessor.h index 162d69a55..336c9d2e0 100644 --- a/libminifi/test/libtest/unit/DummyProcessor.h +++ b/libminifi/test/libtest/unit/DummyProcessor.h @@ -20,7 +20,7 @@ #include <string_view> #include <utility> -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "agent/agent_docs.h" #include "core/PropertyDefinitionBuilder.h" diff --git a/libminifi/test/libtest/unit/ReadFromFlowFileTestProcessor.h b/libminifi/test/libtest/unit/ReadFromFlowFileTestProcessor.h index a1fdd7f28..ac055b5ef 100644 --- a/libminifi/test/libtest/unit/ReadFromFlowFileTestProcessor.h +++ b/libminifi/test/libtest/unit/ReadFromFlowFileTestProcessor.h @@ -22,7 +22,7 @@ #include <utility> #include "core/Core.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/ProcessSession.h" #include "core/PropertyDefinition.h" #include "core/RelationshipDefinition.h" diff --git a/libminifi/test/libtest/unit/StatefulProcessor.h b/libminifi/test/libtest/unit/StatefulProcessor.h index 5659b1692..16aa18c12 100644 --- a/libminifi/test/libtest/unit/StatefulProcessor.h +++ b/libminifi/test/libtest/unit/StatefulProcessor.h @@ -20,7 +20,7 @@ #include <memory> #include <utility> #include <vector> -#include "core/Processor.h" +#include "core/ProcessorImpl.h" #include "core/StateManager.h" namespace org::apache::nifi::minifi::processors { diff --git a/libminifi/test/libtest/unit/TestBase.cpp b/libminifi/test/libtest/unit/TestBase.cpp index bec3fc9c0..115fea1af 100644 --- a/libminifi/test/libtest/unit/TestBase.cpp +++ b/libminifi/test/libtest/unit/TestBase.cpp @@ -476,7 +476,7 @@ std::vector<minifi::core::Processor*>::iterator TestPlan::getProcessorItByUuid(c std::shared_ptr<minifi::core::ProcessContext> TestPlan::getProcessContextForProcessor(minifi::core::Processor* processor) { const auto contextMatchesProcessor = [&processor] (const std::shared_ptr<minifi::core::ProcessContext>& context) { - return context->getProcessor().getUUIDStr() == processor->getUUIDStr(); + return context->getProcessorInfo().getUUID().to_string() == processor->getUUIDStr(); }; const auto context_found_at = std::find_if(processor_contexts_.begin(), processor_contexts_.end(), contextMatchesProcessor); if (context_found_at == processor_contexts_.end()) { diff --git a/minifi-api/include/minifi-cpp/core/ClassLoader.h b/minifi-api/include/minifi-cpp/core/ClassLoader.h index d1fcac4a0..cac9410d6 100644 --- a/minifi-api/include/minifi-cpp/core/ClassLoader.h +++ b/minifi-api/include/minifi-cpp/core/ClassLoader.h @@ -26,6 +26,7 @@ #include "Core.h" #include "ObjectFactory.h" +#include "ProcessorFactory.h" namespace org::apache::nifi::minifi::core { @@ -62,6 +63,8 @@ class ClassLoader { */ virtual void registerClass(const std::string &clazz, std::unique_ptr<ObjectFactory> factory) = 0; + virtual void registerClass(const std::string &clazz, std::unique_ptr<ProcessorFactory> factory) = 0; + virtual void unregisterClass(const std::string& clazz) = 0; virtual std::optional<std::string> getGroupForClass(const std::string &class_name) const = 0; diff --git a/minifi-api/include/minifi-cpp/core/ProcessContext.h b/minifi-api/include/minifi-cpp/core/ProcessContext.h index 50f1518f1..163ae53fc 100644 --- a/minifi-api/include/minifi-cpp/core/ProcessContext.h +++ b/minifi-api/include/minifi-cpp/core/ProcessContext.h @@ -33,8 +33,27 @@ namespace org::apache::nifi::minifi::core { +namespace detail { +template<typename T> +concept NotAFlowFile = !std::convertible_to<T &, const FlowFile &> && !std::convertible_to<T &, const std::shared_ptr<FlowFile> &>; +} // namespace detail + +class ProcessorInfo { + public: + virtual std::string getName() const = 0; + virtual utils::Identifier getUUID() const = 0; + virtual std::shared_ptr<state::FlowIdentifier> getFlowIdentifier() const = 0; + virtual std::map<std::string, core::Property> getSupportedProperties() const = 0; + virtual nonstd::expected<PropertyReference, std::error_code> getPropertyReference(std::string_view name) const = 0; + + virtual ~ProcessorInfo() = default; +}; + +class Processor; + class ProcessContext : public virtual core::VariableRegistry, public virtual utils::EnableSharedFromThis { public: + virtual const ProcessorInfo& getProcessorInfo() const = 0; virtual Processor& getProcessor() const = 0; virtual nonstd::expected<std::string, std::error_code> getProperty(std::string_view name, const FlowFile* flow_file = nullptr) const = 0; diff --git a/minifi-api/include/minifi-cpp/core/ProcessContextBuilder.h b/minifi-api/include/minifi-cpp/core/ProcessContextBuilder.h index da20c1303..18d8ec27f 100644 --- a/minifi-api/include/minifi-cpp/core/ProcessContextBuilder.h +++ b/minifi-api/include/minifi-cpp/core/ProcessContextBuilder.h @@ -34,6 +34,8 @@ namespace org::apache::nifi::minifi::core { * While this incurs a tiny cost to look up, it allows us to have a replaceable builder that erases the type we are * constructing. */ +class Processor; + class ProcessContextBuilder : public virtual core::CoreComponent, public virtual utils::EnableSharedFromThis { public: virtual std::shared_ptr<ProcessContextBuilder> withProvider(core::controller::ControllerServiceProvider* controller_service_provider) = 0; diff --git a/minifi-api/include/minifi-cpp/core/Processor.h b/minifi-api/include/minifi-cpp/core/Processor.h index ded0f7570..e9cfda446 100644 --- a/minifi-api/include/minifi-cpp/core/Processor.h +++ b/minifi-api/include/minifi-cpp/core/Processor.h @@ -44,61 +44,30 @@ namespace core { class ProcessContext; class ProcessSession; class ProcessSessionFactory; +class ProcessorDescriptor; -class Processor : public virtual Connectable, public virtual ConfigurableComponent, public virtual state::response::ResponseNodeSource { +class ProcessorApi { public: - ~Processor() override = default; + virtual ~ProcessorApi() = default; - virtual void setScheduledState(ScheduledState state) = 0; - virtual ScheduledState getScheduledState() const = 0; - virtual void setSchedulingStrategy(SchedulingStrategy strategy) = 0; - virtual SchedulingStrategy getSchedulingStrategy() const = 0; - virtual void setSchedulingPeriod(std::chrono::steady_clock::duration period) = 0; - virtual std::chrono::steady_clock::duration getSchedulingPeriod() const = 0; - virtual void setCronPeriod(const std::string &period) = 0; - virtual std::string getCronPeriod() const = 0; - virtual void setRunDurationNano(std::chrono::steady_clock::duration period) = 0; - virtual std::chrono::steady_clock::duration getRunDurationNano() const = 0; - virtual void setYieldPeriodMsec(std::chrono::milliseconds period) = 0; - virtual std::chrono::steady_clock::duration getYieldPeriod() const = 0; - virtual void setPenalizationPeriod(std::chrono::milliseconds period) = 0; + virtual bool isWorkAvailable() = 0; + + virtual void restore(const std::shared_ptr<FlowFile>& file) = 0; + + + [[nodiscard]] virtual bool supportsDynamicProperties() const = 0; + [[nodiscard]] virtual bool supportsDynamicRelationships() const = 0; + + virtual void initialize(ProcessorDescriptor& descriptor) = 0; virtual bool isSingleThreaded() const = 0; virtual std::string getProcessorType() const = 0; - virtual void setTriggerWhenEmpty(bool value) = 0; virtual bool getTriggerWhenEmpty() const = 0; - virtual uint8_t getActiveTasks() const = 0; - virtual void incrementActiveTasks() = 0; - virtual void decrementActiveTask() = 0; - virtual void clearActiveTask() = 0; - using Connectable::yield; - virtual void yield(std::chrono::steady_clock::duration delta_time) = 0; - virtual bool isYield() = 0; - virtual void clearYield() = 0; - virtual std::chrono::steady_clock::time_point getYieldExpirationTime() const = 0; - virtual std::chrono::steady_clock::duration getYieldTime() const = 0; - virtual bool flowFilesOutGoingFull() const = 0; - virtual bool addConnection(Connectable* connection) = 0; - virtual void triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory) = 0; - virtual void trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session) = 0; virtual void onTrigger(ProcessContext&, ProcessSession&) = 0; virtual void onSchedule(ProcessContext&, ProcessSessionFactory&) = 0; virtual void onUnSchedule() = 0; - virtual bool isThrottledByBackpressure() const = 0; - virtual void validateAnnotations() const = 0; + virtual void notifyStop() = 0; virtual annotation::Input getInputRequirement() const = 0; virtual gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const = 0; - virtual std::string getProcessGroupUUIDStr() const = 0; - virtual void setProcessGroupUUIDStr(const std::string &uuid) = 0; - virtual std::string getProcessGroupName() const = 0; - virtual void setProcessGroupName(const std::string &name) = 0; - virtual std::string getProcessGroupPath() const = 0; - virtual void setProcessGroupPath(const std::string &path) = 0; - virtual logging::LOG_LEVEL getLogBulletinLevel() const = 0; - virtual void setLogBulletinLevel(logging::LOG_LEVEL level) = 0; - virtual void setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, const std::string& message)>& callback) = 0; - - virtual void updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force = false) = 0; - virtual const std::unordered_map<Connection*, std::unordered_set<Processor*>>& reachable_processors() const = 0; }; } // namespace core diff --git a/minifi-api/include/minifi-cpp/core/ProcessorDescriptor.h b/minifi-api/include/minifi-cpp/core/ProcessorDescriptor.h new file mode 100644 index 000000000..fd942605c --- /dev/null +++ b/minifi-api/include/minifi-cpp/core/ProcessorDescriptor.h @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "minifi-cpp/core/RelationshipDefinition.h" +#include "minifi-cpp/core/PropertyDefinition.h" +#include <span> + +namespace org::apache::nifi::minifi::core { + +class ProcessorDescriptor { + public: + virtual ~ProcessorDescriptor() = default; + + virtual void setSupportedRelationships(std::span<const RelationshipDefinition> relationships) = 0; + virtual void setSupportedProperties(std::span<const PropertyReference> properties) = 0; +}; + +} // namespace org::apache::nifi::minifi::core diff --git a/minifi-api/include/minifi-cpp/core/ProcessorFactory.h b/minifi-api/include/minifi-cpp/core/ProcessorFactory.h new file mode 100644 index 000000000..7b453170f --- /dev/null +++ b/minifi-api/include/minifi-cpp/core/ProcessorFactory.h @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <memory> +#include <utility> +#include "minifi-cpp/core/Processor.h" +#include "minifi-cpp/core/ProcessorMetadata.h" + +namespace org::apache::nifi::minifi::core { + +class ProcessorFactory { + public: + virtual std::unique_ptr<ProcessorApi> create(ProcessorMetadata info) = 0; + virtual std::string getGroupName() const = 0; + virtual std::string getClassName() const = 0; + + virtual ~ProcessorFactory() = default; +}; + +} // namespace org::apache::nifi::minifi::core diff --git a/minifi-api/include/minifi-cpp/core/ProcessorMetadata.h b/minifi-api/include/minifi-cpp/core/ProcessorMetadata.h new file mode 100644 index 000000000..b40e51faf --- /dev/null +++ b/minifi-api/include/minifi-cpp/core/ProcessorMetadata.h @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "minifi-cpp/utils/Id.h" +#include "minifi-cpp/core/logging/Logger.h" +#include <memory> +#include <string> + +namespace org::apache::nifi::minifi::core { + +struct ProcessorMetadata { + utils::Identifier uuid; + std::string name; + std::shared_ptr<logging::Logger> logger; +}; + +} // namespace org::apache::nifi::minifi::core diff --git a/minifi-api/include/minifi-cpp/core/state/nodes/MetricsBase.h b/minifi-api/include/minifi-cpp/core/state/nodes/MetricsBase.h index 08fccb249..d8d390855 100644 --- a/minifi-api/include/minifi-cpp/core/state/nodes/MetricsBase.h +++ b/minifi-api/include/minifi-cpp/core/state/nodes/MetricsBase.h @@ -26,14 +26,14 @@ #include "../Value.h" #include "../PublishedMetricProvider.h" #include "core/Core.h" -#include "core/Connectable.h" +#include "minifi-cpp/core/Connectable.h" namespace org::apache::nifi::minifi::state::response { class ResponseNode; using SharedResponseNode = gsl::not_null<std::shared_ptr<ResponseNode>>; -class ResponseNode : public virtual core::Connectable, public virtual PublishedMetricProvider { +class ResponseNode : public virtual core::CoreComponent, public virtual PublishedMetricProvider { public: ~ResponseNode() override = default; diff --git a/utils/include/core/ProcessContext.h b/utils/include/core/ProcessContext.h index 2606058b3..4728236d6 100644 --- a/utils/include/core/ProcessContext.h +++ b/utils/include/core/ProcessContext.h @@ -46,51 +46,35 @@ namespace org::apache::nifi::minifi::core { +class Processor; + class ProcessContextImpl : public core::VariableRegistryImpl, public virtual ProcessContext { public: ProcessContextImpl(Processor& processor, controller::ControllerServiceProvider* controller_service_provider, const std::shared_ptr<core::Repository>& repo, - const std::shared_ptr<core::Repository>& flow_repo, const std::shared_ptr<core::ContentRepository>& content_repo = repository::createFileSystemRepository()) - : VariableRegistryImpl(static_cast<std::shared_ptr<Configure>>(minifi::Configure::create())), - logger_(logging::LoggerFactory<ProcessContext>::getLogger()), - controller_service_provider_(controller_service_provider), - state_storage_(getStateStorage(logger_, controller_service_provider_, nullptr)), - repo_(repo), - flow_repo_(flow_repo), - content_repo_(content_repo), - processor_(processor), - configure_(minifi::Configure::create()), - initialized_(false) {} + const std::shared_ptr<core::Repository>& flow_repo, const std::shared_ptr<core::ContentRepository>& content_repo = repository::createFileSystemRepository()); ProcessContextImpl(Processor& processor, controller::ControllerServiceProvider* controller_service_provider, const std::shared_ptr<core::Repository>& repo, const std::shared_ptr<core::Repository>& flow_repo, const std::shared_ptr<minifi::Configure>& configuration, - const std::shared_ptr<core::ContentRepository>& content_repo = repository::createFileSystemRepository()) - : VariableRegistryImpl(configuration), - logger_(logging::LoggerFactory<ProcessContext>::getLogger()), - controller_service_provider_(controller_service_provider), - state_storage_(getStateStorage(logger_, controller_service_provider_, configuration)), - repo_(repo), - flow_repo_(flow_repo), - content_repo_(content_repo), - processor_(processor), - configure_(configuration ? gsl::make_not_null(configuration) : minifi::Configure::create()), - initialized_(false) {} + const std::shared_ptr<core::ContentRepository>& content_repo = repository::createFileSystemRepository()); // Get Processor associated with the Process Context Processor& getProcessor() const override { return processor_; } + const ProcessorInfo& getProcessorInfo() const override { return *info_; } + nonstd::expected<std::string, std::error_code> getProperty(std::string_view name, const FlowFile*) const override; nonstd::expected<void, std::error_code> setProperty(std::string_view name, std::string value) override; nonstd::expected<void, std::error_code> clearProperty(std::string_view name) override; nonstd::expected<std::string, std::error_code> getDynamicProperty(std::string_view name, const FlowFile*) const override; nonstd::expected<void, std::error_code> setDynamicProperty(std::string name, std::string value) override; - std::vector<std::string> getDynamicPropertyKeys() const override { return processor_.getDynamicPropertyKeys(); } - std::map<std::string, std::string> getDynamicProperties(const FlowFile*) const override { return processor_.getDynamicProperties(); } + std::vector<std::string> getDynamicPropertyKeys() const override; + std::map<std::string, std::string> getDynamicProperties(const FlowFile*) const override; - bool isAutoTerminated(Relationship relationship) const override { return processor_.isAutoTerminated(relationship); } - uint8_t getMaxConcurrentTasks() const override { return processor_.getMaxConcurrentTasks(); } + bool isAutoTerminated(Relationship relationship) const override; + uint8_t getMaxConcurrentTasks() const override; - void yield() override { processor_.yield(); } + void yield() override; std::shared_ptr<core::Repository> getProvenanceRepository() override { return repo_; } @@ -127,11 +111,7 @@ class ProcessContextImpl : public core::VariableRegistryImpl, public virtual Pro static constexpr char const* DefaultStateStorageName = "defaultstatestorage"; - StateManager* getStateManager() override { - if (state_storage_ == nullptr) { return nullptr; } - if (!state_manager_) { state_manager_ = state_storage_->getStateManager(processor_); } - return state_manager_.get(); - } + StateManager* getStateManager() override; bool hasStateManager() const override { return state_manager_ != nullptr; } @@ -223,27 +203,8 @@ class ProcessContextImpl : public core::VariableRegistryImpl, public virtual Pro std::shared_ptr<core::ContentRepository> content_repo_; Processor& processor_; gsl::not_null<std::shared_ptr<Configure>> configure_; + std::unique_ptr<ProcessorInfo> info_; bool initialized_; }; -inline nonstd::expected<std::string, std::error_code> ProcessContextImpl::getProperty(const std::string_view name, const FlowFile* const) const { - return getProcessor().getProperty(name); -} - -inline nonstd::expected<void, std::error_code> ProcessContextImpl::setProperty(const std::string_view name, std::string value) { - return getProcessor().setProperty(name, std::move(value)); -} - -inline nonstd::expected<void, std::error_code> ProcessContextImpl::clearProperty(const std::string_view name) { - return getProcessor().clearProperty(name); -} - -inline nonstd::expected<std::string, std::error_code> ProcessContextImpl::getDynamicProperty(const std::string_view name, const FlowFile* const) const { - return getProcessor().getDynamicProperty(name); -} - -inline nonstd::expected<void, std::error_code> ProcessContextImpl::setDynamicProperty(std::string name, std::string value) { - return getProcessor().setDynamicProperty(std::move(name), std::move(value)); -} - } // namespace org::apache::nifi::minifi::core diff --git a/utils/include/core/Processor.h b/utils/include/core/Processor.h index 11c779b00..3792098b7 100644 --- a/utils/include/core/Processor.h +++ b/utils/include/core/Processor.h @@ -16,290 +16,4 @@ */ #pragma once -#include <algorithm> -#include <atomic> -#include <chrono> -#include <condition_variable> -#include <functional> -#include <memory> -#include <mutex> -#include <string> -#include <string_view> -#include <unordered_set> -#include <unordered_map> -#include <utility> -#include <vector> - -#include "core/ConfigurableComponentImpl.h" -#include "core/Connectable.h" -#include "core/Property.h" -#include "core/Core.h" -#include "minifi-cpp/core/Annotation.h" -#include "minifi-cpp/core/DynamicProperty.h" -#include "minifi-cpp/core/Scheduling.h" -#include "minifi-cpp/core/state/nodes/MetricsBase.h" -#include "minifi-cpp/core/ProcessorMetrics.h" -#include "utils/gsl.h" -#include "utils/Id.h" -#include "minifi-cpp/core/OutputAttributeDefinition.h" -#include "minifi-cpp/core/Processor.h" - -#define ADD_GET_PROCESSOR_NAME \ - std::string getProcessorType() const override { \ - auto class_name = org::apache::nifi::minifi::core::className<decltype(*this)>(); \ - auto splitted = org::apache::nifi::minifi::utils::string::split(class_name, "::"); \ - return splitted[splitted.size() - 1]; \ - } - -#define ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS \ - bool supportsDynamicProperties() const override { return SupportsDynamicProperties; } \ - bool supportsDynamicRelationships() const override { return SupportsDynamicRelationships; } \ - minifi::core::annotation::Input getInputRequirement() const override { return InputRequirement; } \ - bool isSingleThreaded() const override { return IsSingleThreaded; } \ - ADD_GET_PROCESSOR_NAME - -namespace org::apache::nifi::minifi { - -class Connection; - -namespace core { - -class ProcessContext; -class ProcessSession; -class ProcessSessionFactory; - -constexpr std::chrono::microseconds MINIMUM_SCHEDULING_PERIOD{30}; - -#define BUILDING_DLL 1 - -class ProcessorImpl : public virtual Processor, public ConnectableImpl, public ConfigurableComponentImpl { - public: - ProcessorImpl(std::string_view name, const utils::Identifier& uuid, std::shared_ptr<ProcessorMetrics> metrics = nullptr); - explicit ProcessorImpl(std::string_view name, std::shared_ptr<ProcessorMetrics> metrics = nullptr); - - ProcessorImpl(const ProcessorImpl& parent) = delete; - ProcessorImpl& operator=(const ProcessorImpl& parent) = delete; - - bool isRunning() const override; - - ~ProcessorImpl() override; - - void setScheduledState(ScheduledState state) override; - - ScheduledState getScheduledState() const override { - return state_; - } - - void setSchedulingStrategy(SchedulingStrategy strategy) override { - strategy_ = strategy; - } - - SchedulingStrategy getSchedulingStrategy() const override { - return strategy_; - } - - void setSchedulingPeriod(std::chrono::steady_clock::duration period) override { - scheduling_period_ = std::max(std::chrono::steady_clock::duration(MINIMUM_SCHEDULING_PERIOD), period); - } - - std::chrono::steady_clock::duration getSchedulingPeriod() const override { - return scheduling_period_; - } - - void setCronPeriod(const std::string &period) override { - cron_period_ = period; - } - - std::string getCronPeriod() const override { - return cron_period_; - } - - void setRunDurationNano(std::chrono::steady_clock::duration period) override { - run_duration_ = period; - } - - std::chrono::steady_clock::duration getRunDurationNano() const override { - return (run_duration_); - } - - void setYieldPeriodMsec(std::chrono::milliseconds period) override { - yield_period_ = period; - } - - std::chrono::steady_clock::duration getYieldPeriod() const override { - return yield_period_; - } - - void setPenalizationPeriod(std::chrono::milliseconds period) override { - penalization_period_ = period; - } - - void setMaxConcurrentTasks(uint8_t tasks) override; - - bool isSingleThreaded() const override = 0; - - std::string getProcessorType() const override = 0; - - void setTriggerWhenEmpty(bool value) override { - _triggerWhenEmpty = value; - } - - bool getTriggerWhenEmpty() const override { - return (_triggerWhenEmpty); - } - - uint8_t getActiveTasks() const override { - return (active_tasks_); - } - - void incrementActiveTasks() override { - ++active_tasks_; - } - - void decrementActiveTask() override { - if (active_tasks_ > 0) - --active_tasks_; - } - - void clearActiveTask() override { - active_tasks_ = 0; - } - - std::string getProcessGroupUUIDStr() const override { - return process_group_uuid_; - } - - void setProcessGroupUUIDStr(const std::string &uuid) override { - process_group_uuid_ = uuid; - } - - std::string getProcessGroupName() const override { - return process_group_name_; - } - - void setProcessGroupName(const std::string &name) override { - process_group_name_ = name; - } - - std::string getProcessGroupPath() const override { - return process_group_path_; - } - - void setProcessGroupPath(const std::string &path) override { - process_group_path_ = path; - } - - void yield() override; - - void yield(std::chrono::steady_clock::duration delta_time) override; - - bool isYield() override; - - void clearYield() override; - - std::chrono::steady_clock::time_point getYieldExpirationTime() const override { return yield_expiration_; } - std::chrono::steady_clock::duration getYieldTime() const override; - // Whether flow file queue full in any of the outgoing connection - bool flowFilesOutGoingFull() const override; - - bool addConnection(Connectable* connection) override; - - bool canEdit() override { - return !isRunning(); - } - - void initialize() override { - } - - void triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory) override; - void trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session) override; - - void onTrigger(ProcessContext&, ProcessSession&) override {} - - void onSchedule(ProcessContext&, ProcessSessionFactory&) override {} - - // Hook executed when onSchedule fails (throws). Configuration should be reset in this - void onUnSchedule() override { - notifyStop(); - } - - // Check all incoming connections for work - bool isWorkAvailable() override; - - bool isThrottledByBackpressure() const override; - - Connectable* pickIncomingConnection() override; - - void validateAnnotations() const override; - - annotation::Input getInputRequirement() const override = 0; - - state::response::SharedResponseNode getResponseNode() override { - return metrics_; - } - - gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const override { - return metrics_; - } - - logging::LOG_LEVEL getLogBulletinLevel() const override { - return log_bulletin_level_; - } - - void setLogBulletinLevel(logging::LOG_LEVEL level) override { - log_bulletin_level_ = level; - } - - void setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, const std::string& message)>& callback) override; - - static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{}; - - static constexpr auto OutputAttributes = std::array<OutputAttributeReference, 0>{}; - - protected: - virtual void notifyStop() { - } - - std::atomic<ScheduledState> state_; - - std::atomic<std::chrono::steady_clock::duration> scheduling_period_; - std::atomic<std::chrono::steady_clock::duration> run_duration_; - std::atomic<std::chrono::steady_clock::duration> yield_period_; - - std::atomic<uint8_t> active_tasks_; - std::atomic<bool> _triggerWhenEmpty; - - std::string cron_period_; - gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_; - - std::shared_ptr<logging::Logger> logger_; - logging::LOG_LEVEL log_bulletin_level_ = logging::LOG_LEVEL::warn; - - private: - mutable std::mutex mutex_; - std::atomic<std::chrono::steady_clock::time_point> yield_expiration_{}; - - static std::mutex& getGraphMutex() { - static std::mutex mutex{}; - return mutex; - } - - // must hold the graphMutex - void updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force = false) override; - - const std::unordered_map<Connection*, std::unordered_set<Processor*>>& reachable_processors() const override { - return reachable_processors_; - } - - static bool partOfCycle(Connection* conn); - - // an outgoing connection allows us to reach these nodes - std::unordered_map<Connection*, std::unordered_set<Processor*>> reachable_processors_; - - std::string process_group_uuid_; - std::string process_group_name_; - std::string process_group_path_; -}; - -} // namespace core -} // namespace org::apache::nifi::minifi +#include "core/ProcessorImpl.h" diff --git a/utils/include/core/ProcessorFactory.h b/utils/include/core/ProcessorFactory.h new file mode 100644 index 000000000..17a42b78d --- /dev/null +++ b/utils/include/core/ProcessorFactory.h @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <memory> +#include <utility> +#include "ClassName.h" +#include "minifi-cpp/core/ProcessorFactory.h" + +namespace org::apache::nifi::minifi::core { + +template<class T> +class ProcessorFactoryImpl : public ProcessorFactory { + public: + ProcessorFactoryImpl() + : class_name_(core::className<T>()) { + } + + explicit ProcessorFactoryImpl(std::string group_name) + : group_name_(std::move(group_name)), + class_name_(core::className<T>()) { + } + + std::string getGroupName() const override { + return group_name_; + } + + std::unique_ptr<ProcessorApi> create(ProcessorMetadata info) override { + return std::make_unique<T>(info); + } + + std::string getClassName() const override { + return class_name_; + } + + protected: + std::string group_name_; + std::string class_name_; +}; + +} // namespace org::apache::nifi::minifi::core diff --git a/utils/include/core/ProcessorImpl.h b/utils/include/core/ProcessorImpl.h new file mode 100644 index 000000000..633a80c70 --- /dev/null +++ b/utils/include/core/ProcessorImpl.h @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <algorithm> +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <functional> +#include <memory> +#include <mutex> +#include <string> +#include <string_view> +#include <unordered_set> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "core/ConfigurableComponentImpl.h" +#include "core/Connectable.h" +#include "core/Property.h" +#include "core/Core.h" +#include "minifi-cpp/core/Annotation.h" +#include "minifi-cpp/core/DynamicProperty.h" +#include "minifi-cpp/core/Scheduling.h" +#include "minifi-cpp/core/state/nodes/MetricsBase.h" +#include "minifi-cpp/core/ProcessorMetrics.h" +#include "utils/gsl.h" +#include "utils/Id.h" +#include "minifi-cpp/core/OutputAttributeDefinition.h" +#include "minifi-cpp/core/Processor.h" +#include "minifi-cpp/utils/PropertyErrors.h" +#include "minifi-cpp/core/ProcessorMetadata.h" +#include "Exception.h" + +#define ADD_GET_PROCESSOR_NAME \ + std::string getProcessorType() const override { \ + auto class_name = org::apache::nifi::minifi::core::className<decltype(*this)>(); \ + auto splitted = org::apache::nifi::minifi::utils::string::split(class_name, "::"); \ + return splitted[splitted.size() - 1]; \ + } + +#define ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS \ + bool supportsDynamicProperties() const override { return SupportsDynamicProperties; } \ + bool supportsDynamicRelationships() const override { return SupportsDynamicRelationships; } \ + minifi::core::annotation::Input getInputRequirement() const override { return InputRequirement; } \ + bool isSingleThreaded() const override { return IsSingleThreaded; } \ + ADD_GET_PROCESSOR_NAME + +namespace org::apache::nifi::minifi { + +class Connection; + +namespace core { + +class ProcessContext; +class ProcessSession; +class ProcessSessionFactory; + +constexpr std::chrono::microseconds MINIMUM_SCHEDULING_PERIOD{30}; + +#define BUILDING_DLL 1 + +class ProcessorImpl : public virtual ProcessorApi { + public: + explicit ProcessorImpl(ProcessorMetadata info, std::shared_ptr<ProcessorMetrics> metrics = nullptr); + explicit ProcessorImpl(std::string_view name, const utils::Identifier& uuid = utils::IdGenerator::getIdGenerator()->generate(), std::shared_ptr<core::logging::Logger> logger = nullptr, std::shared_ptr<ProcessorMetrics> metrics = nullptr); + + ProcessorImpl(const ProcessorImpl& parent) = delete; + ProcessorImpl& operator=(const ProcessorImpl& parent) = delete; + + ~ProcessorImpl() override; + + bool isSingleThreaded() const override = 0; + + [[nodiscard]] bool supportsDynamicProperties() const override = 0; + + [[nodiscard]] bool supportsDynamicRelationships() const override = 0; + + std::string getProcessorType() const override = 0; + + void setTriggerWhenEmpty(bool trigger_when_empty) { + trigger_when_empty_ = trigger_when_empty; + } + + bool getTriggerWhenEmpty() const override { + return trigger_when_empty_; + } + + void initialize(ProcessorDescriptor& self) final; + + void setSupportedRelationships(std::span<const RelationshipDefinition> relationships); + + void setSupportedProperties(std::span<const PropertyReference> properties); + + virtual void initialize() {} + + void onTrigger(ProcessContext&, ProcessSession&) override {} + + void onSchedule(ProcessContext&, ProcessSessionFactory&) override {} + + // Hook executed when onSchedule fails (throws). Configuration should be reset in this + void onUnSchedule() override { + notifyStop(); + } + + // Check all incoming connections for work + bool isWorkAvailable() override; + + annotation::Input getInputRequirement() const override = 0; + + gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const override { + return metrics_; + } + + static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{}; + + static constexpr auto OutputAttributes = std::array<OutputAttributeReference, 0>{}; + + void restore(const std::shared_ptr<FlowFile>& file) override; + + std::string getName() const; + utils::Identifier getUUID() const; + utils::SmallString<36> getUUIDStr() const; + + protected: + void notifyStop() override { + } + + ProcessorMetadata info_; + + std::atomic<bool> trigger_when_empty_; + + gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_; + + std::shared_ptr<logging::Logger> logger_; + + private: + mutable std::mutex mutex_; + + ProcessorDescriptor* descriptor_{nullptr}; +}; + +} // namespace core +} // namespace org::apache::nifi::minifi diff --git a/utils/include/core/ProcessorMetrics.h b/utils/include/core/ProcessorMetrics.h index 5de882d8c..40e0e6e0a 100644 --- a/utils/include/core/ProcessorMetrics.h +++ b/utils/include/core/ProcessorMetrics.h @@ -35,11 +35,11 @@ concept Summable = requires(T x) { x + x; }; // NOLINT(readability/braces) template<typename T> concept DividableByInteger = requires(T x, uint32_t divisor) { x / divisor; }; // NOLINT(readability/braces) -class Processor; +class ProcessorImpl; class ProcessorMetricsImpl : public state::response::ResponseNodeImpl, public virtual ProcessorMetrics { public: - explicit ProcessorMetricsImpl(const Processor& source_processor); + explicit ProcessorMetricsImpl(const ProcessorImpl& source_processor); [[nodiscard]] std::string getName() const override; @@ -97,7 +97,7 @@ class ProcessorMetricsImpl : public state::response::ResponseNodeImpl, public vi mutable std::mutex transferred_relationships_mutex_; std::unordered_map<std::string, size_t> transferred_relationships_; - const Processor& source_processor_; + const ProcessorImpl& source_processor_; Averager<std::chrono::milliseconds> on_trigger_runtime_averager_; Averager<std::chrono::milliseconds> session_commit_runtime_averager_; diff --git a/utils/include/core/Resource.h b/utils/include/core/Resource.h index ab4527b25..c4994666a 100644 --- a/utils/include/core/Resource.h +++ b/utils/include/core/Resource.h @@ -29,6 +29,7 @@ #include "agent/agent_docs.h" #include "utils/OptionalUtils.h" #include "utils/Macro.h" +#include "core/ProcessorFactory.h" namespace org::apache::nifi::minifi::core { @@ -54,9 +55,16 @@ class StaticClassType { auto module_name = "minifi-system"; #endif - for (const auto& construction_name : construction_names_) { - auto factory = std::unique_ptr<ObjectFactory>(new DefaultObjectFactory<Class>(module_name)); - getClassLoader().registerClass(construction_name, std::move(factory)); + if constexpr (Type == ResourceType::Processor) { + for (const auto& construction_name : construction_names_) { + auto factory = std::unique_ptr<ProcessorFactory>(new ProcessorFactoryImpl<Class>(module_name)); + getClassLoader().registerClass(construction_name, std::move(factory)); + } + } else { + for (const auto& construction_name : construction_names_) { + auto factory = std::unique_ptr<ObjectFactory>(new DefaultObjectFactory<Class>(module_name)); + getClassLoader().registerClass(construction_name, std::move(factory)); + } } minifi::AgentDocs::createClassDescription<Class, Type>(module_name, class_name); diff --git a/utils/include/core/state/nodes/ResponseNode.h b/utils/include/core/state/nodes/ResponseNode.h index afd2f7087..bba642279 100644 --- a/utils/include/core/state/nodes/ResponseNode.h +++ b/utils/include/core/state/nodes/ResponseNode.h @@ -25,7 +25,6 @@ #include <unordered_map> #include "core/Core.h" -#include "core/Connectable.h" #include "minifi-cpp/core/state/nodes/MetricsBase.h" #include "core/state/PublishedMetricProvider.h" @@ -37,20 +36,20 @@ using SharedResponseNode = gsl::not_null<std::shared_ptr<ResponseNode>>; /** * Purpose: Defines a metric. Serialization is intended to be thread safe. */ -class ResponseNodeImpl : public core::ConnectableImpl, public PublishedMetricProviderImpl, public virtual ResponseNode { +class ResponseNodeImpl : public core::CoreComponentImpl, public PublishedMetricProviderImpl, public virtual ResponseNode { public: ResponseNodeImpl() - : core::ConnectableImpl("metric"), + : core::CoreComponentImpl("metric"), is_array_(false) { } explicit ResponseNodeImpl(const std::string_view name) - : core::ConnectableImpl(name), + : core::CoreComponentImpl(name), is_array_(false) { } ResponseNodeImpl(const std::string_view name, const utils::Identifier& uuid) - : core::ConnectableImpl(name, uuid), + : core::CoreComponentImpl(name, uuid), is_array_(false) { } @@ -58,17 +57,6 @@ class ResponseNodeImpl : public core::ConnectableImpl, public PublishedMetricPro static std::vector<SerializedResponseNode> serializeAndMergeResponseNodes(const std::vector<SharedResponseNode>& nodes); - void yield() override { - } - - bool isRunning() const override { - return true; - } - - bool isWorkAvailable() override { - return true; - } - bool isArray() const override { return is_array_; } diff --git a/utils/src/core/Processor.cpp b/utils/src/core/Processor.cpp index 95feac9f8..e70957e21 100644 --- a/utils/src/core/Processor.cpp +++ b/utils/src/core/Processor.cpp @@ -38,335 +38,68 @@ #include "fmt/format.h" #include "Exception.h" #include "core/ProcessorMetrics.h" +#include "minifi-cpp/utils/PropertyErrors.h" +#include "minifi-cpp/core/ProcessorDescriptor.h" using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core { -ProcessorImpl::ProcessorImpl(std::string_view name, std::shared_ptr<ProcessorMetrics> metrics) - : ConnectableImpl(name), - state_(DISABLED), - scheduling_period_(MINIMUM_SCHEDULING_PERIOD), - run_duration_(DEFAULT_RUN_DURATION), - yield_period_(DEFAULT_YIELD_PERIOD_SECONDS), - active_tasks_(0), - _triggerWhenEmpty(false), +ProcessorImpl::ProcessorImpl(ProcessorMetadata info, std::shared_ptr<ProcessorMetrics> metrics) + : info_(info), + trigger_when_empty_(false), metrics_(metrics ? std::move(metrics) : std::make_shared<ProcessorMetricsImpl>(*this)), - logger_(logging::LoggerFactory<Processor>::getLogger(uuid_)) { - has_work_.store(false); - // Setup the default values - strategy_ = TIMER_DRIVEN; - penalization_period_ = DEFAULT_PENALIZATION_PERIOD; - max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS; - incoming_connections_Iter = this->incoming_connections_.begin(); - logger_->log_debug("Processor {} created UUID {}", name_, getUUIDStr()); + logger_(info.logger) { + logger_->log_debug("Processor {} created with uuid {}", getName(), getUUIDStr()); } -ProcessorImpl::ProcessorImpl(std::string_view name, const utils::Identifier& uuid, std::shared_ptr<ProcessorMetrics> metrics) - : ConnectableImpl(name, uuid), - state_(DISABLED), - scheduling_period_(MINIMUM_SCHEDULING_PERIOD), - run_duration_(DEFAULT_RUN_DURATION), - yield_period_(DEFAULT_YIELD_PERIOD_SECONDS), - active_tasks_(0), - _triggerWhenEmpty(false), - metrics_(metrics ? std::move(metrics) : std::make_shared<ProcessorMetricsImpl>(*this)), - logger_(logging::LoggerFactory<Processor>::getLogger(uuid_)) { - has_work_.store(false); - // Setup the default values - strategy_ = TIMER_DRIVEN; - penalization_period_ = DEFAULT_PENALIZATION_PERIOD; - max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS; - incoming_connections_Iter = this->incoming_connections_.begin(); - logger_->log_debug("Processor {} created with uuid {}", name_, getUUIDStr()); -} +ProcessorImpl::ProcessorImpl(std::string_view name, const utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger, std::shared_ptr<ProcessorMetrics> metrics) + : ProcessorImpl(ProcessorMetadata{ + .uuid = uuid, + .name = std::string{name}, + .logger = logger ? logger : logging::LoggerFactory<ProcessorImpl>::getLogger(uuid) + }, metrics) +{} ProcessorImpl::~ProcessorImpl() { - logger_->log_debug("Destroying processor {} with uuid {}", name_, getUUIDStr()); -} - -bool ProcessorImpl::isRunning() const { - return (state_ == RUNNING && active_tasks_ > 0); -} - -void ProcessorImpl::setScheduledState(ScheduledState state) { - state_ = state; - if (state == STOPPED) { - notifyStop(); - } -} - -bool ProcessorImpl::addConnection(Connectable* conn) { - enum class SetAs{ - NONE, - OUTPUT, - INPUT, - }; - SetAs result = SetAs::NONE; - - if (isRunning()) { - logger_->log_warn("Can not add connection while the process {} is running", name_); - return false; - } - const auto connection = dynamic_cast<Connection*>(conn); - if (!connection) { - return false; - } - - std::lock_guard<std::mutex> lock(getGraphMutex()); - - auto updateGraph = gsl::finally([&] { - if (result == SetAs::INPUT) { - updateReachability(lock); - } else if (result == SetAs::OUTPUT) { - updateReachability(lock, true); - } - }); - - utils::Identifier srcUUID = connection->getSourceUUID(); - utils::Identifier destUUID = connection->getDestinationUUID(); - - if (uuid_ == destUUID) { - // Connection is destination to the current processor - if (!incoming_connections_.contains(connection)) { - incoming_connections_.insert(connection); - connection->setDestination(this); - logger_->log_debug("Add connection {} into Processor {} incoming connection", connection->getName(), name_); - incoming_connections_Iter = this->incoming_connections_.begin(); - result = SetAs::OUTPUT; - } - } - if (uuid_ == srcUUID) { - for (const auto& rel : connection->getRelationships()) { - const auto relationship = rel.getName(); - // Connection is source from the current processor - auto &&it = outgoing_connections_.find(relationship); - if (it != outgoing_connections_.end()) { - // We already has connection for this relationship - std::set<Connectable*> existedConnection = it->second; - if (!existedConnection.contains(connection)) { - // We do not have the same connection for this relationship yet - existedConnection.insert(connection); - connection->setSource(this); - outgoing_connections_[relationship] = existedConnection; - logger_->log_debug("Add connection {} into Processor {} outgoing connection for relationship {}", connection->getName(), name_, relationship); - result = SetAs::INPUT; - } - } else { - // We do not have any outgoing connection for this relationship yet - std::set<Connectable*> newConnection; - newConnection.insert(connection); - connection->setSource(this); - outgoing_connections_[relationship] = newConnection; - logger_->log_debug("Add connection {} into Processor {} outgoing connection for relationship {}", connection->getName(), name_, relationship); - result = SetAs::INPUT; - } - } - } - return result != SetAs::NONE; -} - -bool ProcessorImpl::flowFilesOutGoingFull() const { - std::lock_guard<std::mutex> lock(mutex_); - - for (const auto& [_name, existed_connection] : outgoing_connections_) { - if (ranges::any_of(existed_connection, [](const Connectable* conn) { - const auto connection = dynamic_cast<const Connection*>(conn); - return connection && connection->backpressureThresholdReached(); - })) { - return true; - } - } - - return false; -} - -void ProcessorImpl::triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory) { - const auto process_session = session_factory->createSession(); - process_session->setMetrics(metrics_); - try { - trigger(context, process_session); - process_session->commit(); - } catch (const std::exception& exception) { - logger_->log_warn("Caught \"{}\" ({}) during Processor::onTrigger of processor: {} ({})", - exception.what(), typeid(exception).name(), getUUIDStr(), getName()); - process_session->rollback(); - throw; - } catch (...) { - logger_->log_warn("Caught unknown exception during Processor::onTrigger of processor: {} ({})", getUUIDStr(), getName()); - process_session->rollback(); - throw; - } -} - -void ProcessorImpl::trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session) { - ++metrics_->invocations(); - const auto start = std::chrono::steady_clock::now(); - onTrigger(*context, *process_session); - metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start)); + logger_->log_debug("Destroying processor {} with uuid {}", getName(), getUUIDStr()); } bool ProcessorImpl::isWorkAvailable() { - // We have work if any incoming connection has work - std::lock_guard<std::mutex> lock(mutex_); - bool hasWork = false; - - try { - for (const auto &conn : incoming_connections_) { - auto connection = dynamic_cast<Connection*>(conn); - if (!connection) { - continue; - } - if (connection->isWorkAvailable()) { - hasWork = true; - break; - } - } - } catch (...) { - logger_->log_error("Caught an exception (type: {}) while checking if work is available;" - " unless it was positively determined that work is available, assuming NO work is available!", - getCurrentExceptionTypeName()); - } - - return hasWork; -} - -// must hold the graphMutex -void ProcessorImpl::updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force) { - bool didChange = force; - for (auto& outIt : outgoing_connections_) { - for (auto& outConn : outIt.second) { - auto connection = dynamic_cast<Connection*>(outConn); - if (!connection) { - continue; - } - auto dest = dynamic_cast<Processor*>(connection->getDestination()); - if (!dest) { - continue; - } - if (reachable_processors_[connection].insert(dest).second) { - didChange = true; - } - for (auto& reachedIt : dest->reachable_processors()) { - for (auto &reached_proc : reachedIt.second) { - if (reachable_processors_[connection].insert(reached_proc).second) { - didChange = true; - } - } - } - } - } - if (didChange) { - // propagate the change to sources - for (auto& inConn : incoming_connections_) { - auto connection = dynamic_cast<Connection*>(inConn); - if (!connection) { - continue; - } - auto source = dynamic_cast<Processor*>(connection->getSource()); - if (!source) { - continue; - } - source->updateReachability(graph_lock); - } - } -} - -bool ProcessorImpl::partOfCycle(Connection* conn) { - auto source = dynamic_cast<Processor*>(conn->getSource()); - if (!source) { - return false; - } - auto it = source->reachable_processors().find(conn); - if (it == source->reachable_processors().end()) { - return false; - } - return it->second.contains(source); -} - -bool ProcessorImpl::isThrottledByBackpressure() const { - bool isThrottledByOutgoing = ranges::any_of(outgoing_connections_, [](auto& name_connection_set_pair) { - return ranges::any_of(name_connection_set_pair.second, [](auto& connectable) { - auto connection = dynamic_cast<Connection*>(connectable); - return connection && connection->backpressureThresholdReached(); - }); - }); - bool isForcedByIncomingCycle = ranges::any_of(incoming_connections_, [](auto& connectable) { - auto connection = dynamic_cast<Connection*>(connectable); - return connection && partOfCycle(connection) && connection->backpressureThresholdReached(); - }); - return isThrottledByOutgoing && !isForcedByIncomingCycle; -} - -Connectable* ProcessorImpl::pickIncomingConnection() { - std::lock_guard<std::mutex> rel_guard(relationship_mutex_); - - auto beginIt = incoming_connections_Iter; - Connectable* inConn = nullptr; - do { - inConn = getNextIncomingConnectionImpl(rel_guard); - auto connection = dynamic_cast<Connection*>(inConn); - if (!connection) { - continue; - } - if (partOfCycle(connection) && connection->backpressureThresholdReached()) { - return inConn; - } - } while (incoming_connections_Iter != beginIt); - - // we did not find a preferred connection - return getNextIncomingConnectionImpl(rel_guard); + return false; } -void ProcessorImpl::validateAnnotations() const { - switch (getInputRequirement()) { - case annotation::Input::INPUT_REQUIRED: { - if (!hasIncomingConnections()) { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("INPUT_REQUIRED was specified for the processor '{}' (uuid: '{}'), but no incoming connections were found", - getName(), std::string(getUUIDStr()))); - } - break; - } - case annotation::Input::INPUT_ALLOWED: - break; - case annotation::Input::INPUT_FORBIDDEN: { - if (hasIncomingConnections()) { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("INPUT_FORBIDDEN was specified for the processor '{}' (uuid: '{}'), but there are incoming connections", - getName(), std::string(getUUIDStr()))); - } - } - } +void ProcessorImpl::restore(const std::shared_ptr<FlowFile>& file) { + gsl_Assert("Not implemented"); } -void ProcessorImpl::setMaxConcurrentTasks(const uint8_t tasks) { - if (isSingleThreaded() && tasks > 1) { - logger_->log_warn("Processor {} can not be run in parallel, its \"max concurrent tasks\" value is too high. " - "It was set to 1 from {}.", name_, tasks); - max_concurrent_tasks_ = 1; - return; - } - - max_concurrent_tasks_ = tasks; +std::string ProcessorImpl::getName() const { + return info_.name; } -void ProcessorImpl::yield() { - yield_expiration_ = std::chrono::steady_clock::now() + yield_period_.load(); +utils::Identifier ProcessorImpl::getUUID() const { + return info_.uuid; } -void ProcessorImpl::yield(std::chrono::steady_clock::duration delta_time) { - yield_expiration_ = std::chrono::steady_clock::now() + delta_time; +utils::SmallString<36> ProcessorImpl::getUUIDStr() const { + return getUUID().to_string(); } -bool ProcessorImpl::isYield() { - return getYieldTime() > 0ms; +void ProcessorImpl::initialize(ProcessorDescriptor& self) { + gsl_Expects(!descriptor_); + descriptor_ = &self; + auto guard = gsl::finally([&] {descriptor_ = nullptr;}); + initialize(); } -void ProcessorImpl::clearYield() { - yield_expiration_ = std::chrono::steady_clock::time_point(); +void ProcessorImpl::setSupportedRelationships(std::span<const RelationshipDefinition> relationships) { + gsl_Expects(descriptor_); + descriptor_->setSupportedRelationships(relationships); } -std::chrono::steady_clock::duration ProcessorImpl::getYieldTime() const { - return std::max(yield_expiration_.load()-std::chrono::steady_clock::now(), std::chrono::steady_clock::duration{0}); +void ProcessorImpl::setSupportedProperties(std::span<const PropertyReference> properties) { + gsl_Expects(descriptor_); + descriptor_->setSupportedProperties(properties); } void ProcessorImpl::setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, const std::string& message)>& callback) { diff --git a/utils/src/core/ProcessorMetrics.cpp b/utils/src/core/ProcessorMetrics.cpp index 713822540..5d3e7be33 100644 --- a/utils/src/core/ProcessorMetrics.cpp +++ b/utils/src/core/ProcessorMetrics.cpp @@ -25,7 +25,7 @@ using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core { -ProcessorMetricsImpl::ProcessorMetricsImpl(const Processor& source_processor) +ProcessorMetricsImpl::ProcessorMetricsImpl(const ProcessorImpl& source_processor) : source_processor_(source_processor), on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT), session_commit_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) {
