This is an automated email from the ASF dual-hosted git repository. lordgamez pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit f1758fab52cce83bec70f38f6188536783064cce Author: Ferenc Gerlits <[email protected]> AuthorDate: Thu Aug 7 09:47:16 2025 +0200 MINIFICPP-2599 parseOptionalControllerService should throw ... when the value of the property is not a controller service, or its type does not match. Also removed some helper functions which basically did the same thing, and replaced them with uses of parse[Optional]ControllerService. Signed-off-by: Gabor Gyimesi <[email protected]> This closes #2002 --- .../include/utils/ProcessorConfigUtils.h | 22 ++--- extensions/aws/processors/AwsProcessor.cpp | 10 ++- .../controllerservices/CouchbaseClusterService.cpp | 15 +--- .../controllerservices/CouchbaseClusterService.h | 2 - .../couchbase/processors/GetCouchbaseKey.cpp | 3 +- .../couchbase/processors/PutCouchbaseKey.cpp | 3 +- extensions/elasticsearch/PostElasticsearch.cpp | 17 +--- extensions/elasticsearch/PostElasticsearch.h | 2 - extensions/gcp/processors/GCSProcessor.cpp | 7 +- extensions/grafana-loki/PushGrafanaLoki.cpp | 15 +--- extensions/grafana-loki/PushGrafanaLoki.h | 1 - extensions/grafana-loki/PushGrafanaLokiGrpc.cpp | 22 ++--- extensions/grafana-loki/PushGrafanaLokiREST.cpp | 3 +- .../processors/CollectKubernetesPodMetrics.cpp | 16 +--- extensions/smb/FetchSmb.cpp | 3 +- extensions/smb/ListSmb.cpp | 2 +- extensions/smb/PutSmb.cpp | 2 +- extensions/smb/SmbConnectionControllerService.cpp | 11 --- extensions/smb/SmbConnectionControllerService.h | 2 - extensions/splunk/PutSplunkHTTP.cpp | 2 +- extensions/splunk/QuerySplunkIndexingStatus.cpp | 5 +- extensions/splunk/SplunkHECProcessor.cpp | 6 -- extensions/splunk/SplunkHECProcessor.h | 1 - extensions/sql/processors/SQLProcessor.cpp | 13 +-- .../standard-processors/modbus/FetchModbusTcp.cpp | 18 ++-- .../standard-processors/processors/GetTCP.cpp | 23 ++--- extensions/standard-processors/processors/GetTCP.h | 1 - .../standard-processors/processors/InvokeHTTP.cpp | 11 +-- .../standard-processors/processors/PutTCP.cpp | 13 +-- .../standard-processors/processors/SplitRecord.cpp | 25 +----- .../standard-processors/processors/TailFile.cpp | 37 +++----- .../standard-processors/processors/TailFile.h | 3 - .../integration/C2ControllerEnableFailureTest.cpp | 13 ++- libminifi/test/unit/ProcessorConfigUtilsTests.cpp | 99 ++++++++++++++++++++++ 34 files changed, 192 insertions(+), 236 deletions(-) diff --git a/extension-framework/include/utils/ProcessorConfigUtils.h b/extension-framework/include/utils/ProcessorConfigUtils.h index f18febf78..0d91513f9 100644 --- a/extension-framework/include/utils/ProcessorConfigUtils.h +++ b/extension-framework/include/utils/ProcessorConfigUtils.h @@ -22,6 +22,7 @@ #include <string> #include <vector> +#include "core/ClassName.h" #include "core/ProcessContext.h" #include "minifi-cpp/core/PropertyValidator.h" #include "utils/Enum.h" @@ -166,30 +167,31 @@ std::optional<T> parseOptionalEnumProperty(const core::ProcessContext& context, } template<typename ControllerServiceType> -std::optional<std::shared_ptr<ControllerServiceType>> parseOptionalControllerService(const core::ProcessContext& context, - const core::PropertyReference& prop, - const utils::Identifier& processor_uuid) { +std::shared_ptr<ControllerServiceType> parseOptionalControllerService(const core::ProcessContext& context, const core::PropertyReference& prop, const utils::Identifier& processor_uuid) { const auto controller_service_name = context.getProperty(prop.name); - if (!controller_service_name) { - return std::nullopt; + if (!controller_service_name || controller_service_name->empty()) { + return nullptr; } const std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(*controller_service_name, processor_uuid); if (!service) { - return std::nullopt; + throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Controller service '{}' = '{}' not found", prop.name, *controller_service_name)); } auto typed_controller_service = std::dynamic_pointer_cast<ControllerServiceType>(service); if (!typed_controller_service) { - return std::nullopt; + throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Controller service '{}' = '{}' is not of type {}", prop.name, *controller_service_name, core::className<ControllerServiceType>())); } return typed_controller_service; } template<typename ControllerServiceType> -std::shared_ptr<ControllerServiceType> parseControllerService(const core::ProcessContext& context, const core::PropertyReference& prop, const utils::Identifier& processor_uuid) { - return parseOptionalControllerService<ControllerServiceType>(context, prop, processor_uuid) - | utils::orThrow("Required Controller Service"); +gsl::not_null<std::shared_ptr<ControllerServiceType>> parseControllerService(const core::ProcessContext& context, const core::PropertyReference& prop, const utils::Identifier& processor_uuid) { + auto controller_service = parseOptionalControllerService<ControllerServiceType>(context, prop, processor_uuid); + if (!controller_service) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Required controller service property '{}' is missing", prop.name)); + } + return gsl::make_not_null(controller_service); } } // namespace org::apache::nifi::minifi::utils diff --git a/extensions/aws/processors/AwsProcessor.cpp b/extensions/aws/processors/AwsProcessor.cpp index cd442abb7..5e7ef1e14 100644 --- a/extensions/aws/processors/AwsProcessor.cpp +++ b/extensions/aws/processors/AwsProcessor.cpp @@ -33,11 +33,10 @@ namespace org::apache::nifi::minifi::aws::processors { std::optional<Aws::Auth::AWSCredentials> AwsProcessor::getAWSCredentialsFromControllerService(core::ProcessContext& context) const { - if (const auto aws_credentials_service = minifi::utils::parseOptionalControllerService<controllers::AWSCredentialsService>(context, AWSCredentialsProviderService, getUUID())) { - return (*aws_credentials_service)->getAWSCredentials(); + if (auto service = minifi::utils::parseOptionalControllerService<controllers::AWSCredentialsService>(context, AWSCredentialsProviderService, getUUID())) { + return service->getAWSCredentials(); } logger_->log_error("AWS credentials service could not be found"); - return std::nullopt; } @@ -47,7 +46,7 @@ std::optional<Aws::Auth::AWSCredentials> AwsProcessor::getAWSCredentials( auto service_cred = getAWSCredentialsFromControllerService(context); if (service_cred) { logger_->log_info("AWS Credentials successfully set from controller service"); - return service_cred.value(); + return service_cred; } aws::AWSCredentialsProvider aws_credentials_provider; @@ -98,6 +97,9 @@ void AwsProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessio if (default_ca_file) { client_config_->caFile = *default_ca_file; } + + // throw here if the credentials provider service is set to an invalid value + std::ignore = minifi::utils::parseOptionalControllerService<controllers::AWSCredentialsService>(context, AWSCredentialsProviderService, getUUID()); } std::optional<CommonProperties> AwsProcessor::getCommonELSupportedProperties( diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp index f85203e6d..9a04a7688 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp @@ -157,9 +157,11 @@ nonstd::expected<CouchbaseGetResult, CouchbaseErrorType> CouchbaseClient::get(co logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}' due to timeout", document_id, collection.bucket_name, collection.scope_name, collection.collection_name); return nonstd::make_unexpected(CouchbaseErrorType::TEMPORARY); } - std::string cause = get_err.cause() ? get_err.cause()->message() : ""; logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}' with error code: '{}', message: '{}'", document_id, collection.bucket_name, collection.scope_name, collection.collection_name, get_err.ec(), get_err.message()); + if (get_err.cause()) { + logger_->log_error("... root cause error code: '{}', message: '{}'", get_err.cause()->ec(), get_err.cause()->message()); + } return nonstd::make_unexpected(CouchbaseErrorType::FATAL); } else { try { @@ -247,17 +249,6 @@ void CouchbaseClusterService::onEnable() { } } -gsl::not_null<std::shared_ptr<CouchbaseClusterService>> CouchbaseClusterService::getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property) { - std::shared_ptr<CouchbaseClusterService> couchbase_cluster_service; - if (auto connection_controller_name = context.getProperty(property)) { - couchbase_cluster_service = std::dynamic_pointer_cast<CouchbaseClusterService>(context.getControllerService(*connection_controller_name, context.getProcessorInfo().getUUID())); - } - if (!couchbase_cluster_service) { - throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing Couchbase Cluster Service"); - } - return gsl::make_not_null(couchbase_cluster_service); -} - REGISTER_RESOURCE(CouchbaseClusterService, ControllerService); } // namespace controllers diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.h b/extensions/couchbase/controllerservices/CouchbaseClusterService.h index 040b6773f..8e9040411 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.h +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.h @@ -166,8 +166,6 @@ class CouchbaseClusterService : public core::controller::ControllerServiceImpl { return client_->get(collection, document_id, return_type); } - static gsl::not_null<std::shared_ptr<CouchbaseClusterService>> getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property); - private: std::unique_ptr<CouchbaseClient> client_; std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<CouchbaseClusterService>::getLogger(uuid_); diff --git a/extensions/couchbase/processors/GetCouchbaseKey.cpp b/extensions/couchbase/processors/GetCouchbaseKey.cpp index 02fd95e97..fd983c36d 100644 --- a/extensions/couchbase/processors/GetCouchbaseKey.cpp +++ b/extensions/couchbase/processors/GetCouchbaseKey.cpp @@ -17,6 +17,7 @@ */ #include "GetCouchbaseKey.h" +#include "CouchbaseClusterService.h" #include "utils/gsl.h" #include "core/Resource.h" #include "utils/ProcessorConfigUtils.h" @@ -24,7 +25,7 @@ namespace org::apache::nifi::minifi::couchbase::processors { void GetCouchbaseKey::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { - couchbase_cluster_service_ = controllers::CouchbaseClusterService::getFromProperty(context, GetCouchbaseKey::CouchbaseClusterControllerService); + couchbase_cluster_service_ = utils::parseControllerService<controllers::CouchbaseClusterService>(context, GetCouchbaseKey::CouchbaseClusterControllerService, context.getProcessorInfo().getUUID()); document_type_ = utils::parseEnumProperty<CouchbaseValueType>(context, GetCouchbaseKey::DocumentType); } diff --git a/extensions/couchbase/processors/PutCouchbaseKey.cpp b/extensions/couchbase/processors/PutCouchbaseKey.cpp index 7c7edd4a0..04acd4033 100644 --- a/extensions/couchbase/processors/PutCouchbaseKey.cpp +++ b/extensions/couchbase/processors/PutCouchbaseKey.cpp @@ -17,6 +17,7 @@ */ #include "PutCouchbaseKey.h" +#include "CouchbaseClusterService.h" #include "utils/gsl.h" #include "core/Resource.h" #include "utils/ProcessorConfigUtils.h" @@ -24,7 +25,7 @@ namespace org::apache::nifi::minifi::couchbase::processors { void PutCouchbaseKey::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { - couchbase_cluster_service_ = controllers::CouchbaseClusterService::getFromProperty(context, PutCouchbaseKey::CouchbaseClusterControllerService); + couchbase_cluster_service_ = utils::parseControllerService<controllers::CouchbaseClusterService>(context, PutCouchbaseKey::CouchbaseClusterControllerService, context.getProcessorInfo().getUUID()); document_type_ = utils::parseEnumProperty<CouchbaseValueType>(context, PutCouchbaseKey::DocumentType); persist_to_ = utils::parseEnumProperty<::couchbase::persist_to>(context, PutCouchbaseKey::PersistTo); replicate_to_ = utils::parseEnumProperty<::couchbase::replicate_to>(context, PutCouchbaseKey::ReplicateTo); diff --git a/extensions/elasticsearch/PostElasticsearch.cpp b/extensions/elasticsearch/PostElasticsearch.cpp index 05e0835e6..ca6e1c747 100644 --- a/extensions/elasticsearch/PostElasticsearch.cpp +++ b/extensions/elasticsearch/PostElasticsearch.cpp @@ -37,18 +37,6 @@ void PostElasticsearch::initialize() { setSupportedRelationships(Relationships); } -auto PostElasticsearch::getSSLContextService(core::ProcessContext& context) const { - if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext)) - return std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(context.getControllerService(*ssl_context, getUUID())); - return std::shared_ptr<minifi::controllers::SSLContextServiceInterface>{}; -} - -auto PostElasticsearch::getCredentialsService(core::ProcessContext& context) const { - if (auto credentials = context.getProperty(PostElasticsearch::ElasticCredentials)) - return std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials, getUUID())); - return std::shared_ptr<ElasticsearchCredentialsControllerService>{}; -} - void PostElasticsearch::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { max_batch_size_ = utils::parseU64Property(context, MaxBatchSize); if (max_batch_size_ < 1) @@ -63,11 +51,12 @@ void PostElasticsearch::onSchedule(core::ProcessContext& context, core::ProcessS throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts"); } - credentials_service_ = getCredentialsService(context); + credentials_service_ = utils::parseOptionalControllerService<ElasticsearchCredentialsControllerService>(context, PostElasticsearch::ElasticCredentials, getUUID()); if (!credentials_service_) throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch credentials service"); - client_.initialize(http::HttpRequestMethod::POST, host_url_ + "/_bulk", getSSLContextService(context)); + auto ssl_context_service = utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context, PostElasticsearch::SSLContext, getUUID()); + client_.initialize(http::HttpRequestMethod::POST, host_url_ + "/_bulk", ssl_context_service); client_.setContentType("application/json"); credentials_service_->authenticateClient(client_); } diff --git a/extensions/elasticsearch/PostElasticsearch.h b/extensions/elasticsearch/PostElasticsearch.h index 9d3cb4dc8..9397d7747 100644 --- a/extensions/elasticsearch/PostElasticsearch.h +++ b/extensions/elasticsearch/PostElasticsearch.h @@ -109,8 +109,6 @@ class PostElasticsearch : public core::ProcessorImpl { private: std::string collectPayload(core::ProcessContext&, core::ProcessSession&, std::vector<std::shared_ptr<core::FlowFile>>&) const; - auto getSSLContextService(core::ProcessContext& context) const; - auto getCredentialsService(core::ProcessContext& context) const; uint64_t max_batch_size_ = 100; std::string host_url_; diff --git a/extensions/gcp/processors/GCSProcessor.cpp b/extensions/gcp/processors/GCSProcessor.cpp index 2ae8e16d1..4c833a3ce 100644 --- a/extensions/gcp/processors/GCSProcessor.cpp +++ b/extensions/gcp/processors/GCSProcessor.cpp @@ -28,11 +28,8 @@ namespace gcs = ::google::cloud::storage; namespace org::apache::nifi::minifi::extensions::gcp { std::shared_ptr<google::cloud::storage::oauth2::Credentials> GCSProcessor::getCredentials(core::ProcessContext& context) const { - const std::string service_name = utils::parseProperty(context, GCSProcessor::GCPCredentials); - if (!IsNullOrEmpty(service_name)) { - auto gcp_credentials_controller_service = std::dynamic_pointer_cast<const GCPCredentialsControllerService>(context.getControllerService(service_name, getUUID())); - if (!gcp_credentials_controller_service) - return nullptr; + auto gcp_credentials_controller_service = utils::parseOptionalControllerService<GCPCredentialsControllerService>(context, GCSProcessor::GCPCredentials, getUUID()); + if (gcp_credentials_controller_service) { return gcp_credentials_controller_service->getCredentials(); } return nullptr; diff --git a/extensions/grafana-loki/PushGrafanaLoki.cpp b/extensions/grafana-loki/PushGrafanaLoki.cpp index f328078ac..a70013237 100644 --- a/extensions/grafana-loki/PushGrafanaLoki.cpp +++ b/extensions/grafana-loki/PushGrafanaLoki.cpp @@ -77,13 +77,6 @@ void PushGrafanaLoki::LogBatch::setStartPushTime(std::chrono::system_clock::time const core::Relationship PushGrafanaLoki::Self("__self__", "Marks the FlowFile to be owned by this processor"); -std::shared_ptr<minifi::controllers::SSLContextServiceInterface> PushGrafanaLoki::getSSLContextService(core::ProcessContext& context) const { - if (auto ssl_context = context.getProperty(PushGrafanaLoki::SSLContextService)) { - return std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(context.getControllerService(*ssl_context, getUUID())); - } - return std::shared_ptr<minifi::controllers::SSLContextServiceInterface>{}; -} - void PushGrafanaLoki::setUpStateManager(core::ProcessContext& context) { auto state_manager = context.getStateManager(); if (state_manager == nullptr) { @@ -110,11 +103,11 @@ std::map<std::string, std::string> PushGrafanaLoki::buildStreamLabelMap(core::Pr throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream Labels property"); } for (const auto& label : stream_labels) { - auto stream_labels = utils::string::splitAndTrimRemovingEmpty(label, "="); - if (stream_labels.size() != 2) { + auto key_value = utils::string::splitAndTrimRemovingEmpty(label, "="); + if (key_value.size() != 2) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream Labels property"); } - stream_label_map[stream_labels[0]] = stream_labels[1]; + stream_label_map[key_value[0]] = key_value[1]; } } else { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream Labels property"); @@ -149,7 +142,7 @@ void PushGrafanaLoki::onSchedule(core::ProcessContext& context, core::ProcessSes } if (log_line_batch_wait) { - log_batch_.setLogLineBatchWait(*log_line_batch_wait); + log_batch_.setLogLineBatchWait(log_line_batch_wait); logger_->log_debug("PushGrafanaLoki Log Line Batch Wait is set to {} milliseconds", *log_line_batch_wait); } } diff --git a/extensions/grafana-loki/PushGrafanaLoki.h b/extensions/grafana-loki/PushGrafanaLoki.h index bd64aa021..06676194c 100644 --- a/extensions/grafana-loki/PushGrafanaLoki.h +++ b/extensions/grafana-loki/PushGrafanaLoki.h @@ -133,7 +133,6 @@ class PushGrafanaLoki : public core::ProcessorImpl { static std::map<std::string, std::string> buildStreamLabelMap(core::ProcessContext& context); - std::shared_ptr<minifi::controllers::SSLContextServiceInterface> getSSLContextService(core::ProcessContext& context) const; void processBatch(const std::vector<std::shared_ptr<core::FlowFile>>& batched_flow_files, core::ProcessSession& session); virtual nonstd::expected<void, std::string> submitRequest(const std::vector<std::shared_ptr<core::FlowFile>>& batched_flow_files, core::ProcessSession& session) = 0; void initializeHttpClient(core::ProcessContext& context); diff --git a/extensions/grafana-loki/PushGrafanaLokiGrpc.cpp b/extensions/grafana-loki/PushGrafanaLokiGrpc.cpp index d13634ce8..932c218d2 100644 --- a/extensions/grafana-loki/PushGrafanaLokiGrpc.cpp +++ b/extensions/grafana-loki/PushGrafanaLokiGrpc.cpp @@ -73,16 +73,18 @@ void PushGrafanaLokiGrpc::setUpGrpcChannel(const std::string& url, core::Process args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); - std::shared_ptr<::grpc::ChannelCredentials> creds; - if (auto ssl_context_service = getSSLContextService(context)) { - ::grpc::SslCredentialsOptions ssl_credentials_options; - ssl_credentials_options.pem_cert_chain = utils::file::FileUtils::get_content(ssl_context_service->getCertificateFile()); - ssl_credentials_options.pem_private_key = utils::file::FileUtils::get_content(ssl_context_service->getPrivateKeyFile()); - ssl_credentials_options.pem_root_certs = utils::file::FileUtils::get_content(ssl_context_service->getCACertificate()); - creds = ::grpc::SslCredentials(ssl_credentials_options); - } else { - creds = ::grpc::InsecureChannelCredentials(); - } + std::shared_ptr<::grpc::ChannelCredentials> creds = [&]() { + auto ssl_context_service = utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context, PushGrafanaLoki::SSLContextService, getUUID()); + if (ssl_context_service) { + ::grpc::SslCredentialsOptions ssl_credentials_options; + ssl_credentials_options.pem_cert_chain = utils::file::FileUtils::get_content(ssl_context_service->getCertificateFile()); + ssl_credentials_options.pem_private_key = utils::file::FileUtils::get_content(ssl_context_service->getPrivateKeyFile()); + ssl_credentials_options.pem_root_certs = utils::file::FileUtils::get_content(ssl_context_service->getCACertificate()); + return ::grpc::SslCredentials(ssl_credentials_options); + } else { + return ::grpc::InsecureChannelCredentials(); + } + }(); push_channel_ = ::grpc::CreateCustomChannel(url, creds, args); if (!push_channel_) { diff --git a/extensions/grafana-loki/PushGrafanaLokiREST.cpp b/extensions/grafana-loki/PushGrafanaLokiREST.cpp index 74560af5e..1feb11d9c 100644 --- a/extensions/grafana-loki/PushGrafanaLokiREST.cpp +++ b/extensions/grafana-loki/PushGrafanaLokiREST.cpp @@ -82,7 +82,8 @@ void PushGrafanaLokiREST::initializeHttpClient(core::ProcessContext& context) { url += "/loki/api/v1/push"; } logger_->log_debug("PushGrafanaLokiREST push url is set to: {}", url); - client_.initialize(http::HttpRequestMethod::POST, url, getSSLContextService(context)); + auto ssl_context_service = utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context, PushGrafanaLoki::SSLContextService, getUUID()); + client_.initialize(http::HttpRequestMethod::POST, url, ssl_context_service); } void PushGrafanaLokiREST::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) { diff --git a/extensions/kubernetes/processors/CollectKubernetesPodMetrics.cpp b/extensions/kubernetes/processors/CollectKubernetesPodMetrics.cpp index bae100bda..5693a06ae 100644 --- a/extensions/kubernetes/processors/CollectKubernetesPodMetrics.cpp +++ b/extensions/kubernetes/processors/CollectKubernetesPodMetrics.cpp @@ -22,6 +22,7 @@ #include "../ContainerInfo.h" #include "../MetricsApi.h" #include "../MetricsFilter.h" +#include "utils/ProcessorConfigUtils.h" namespace org::apache::nifi::minifi::processors { @@ -31,20 +32,7 @@ void CollectKubernetesPodMetrics::initialize() { } void CollectKubernetesPodMetrics::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { - const auto controller_service_name = context.getProperty(KubernetesControllerService); - if (!controller_service_name || controller_service_name->empty()) { - throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, utils::string::join_pack("Missing '", KubernetesControllerService.name, "' property")}; - } - - std::shared_ptr<core::controller::ControllerService> controller_service = context.getControllerService(*controller_service_name, getUUID()); - if (!controller_service) { - throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, utils::string::join_pack("Controller service '", *controller_service_name, "' not found")}; - } - - kubernetes_controller_service_ = std::dynamic_pointer_cast<minifi::controllers::KubernetesControllerService>(controller_service); - if (!kubernetes_controller_service_) { - throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, utils::string::join_pack("Controller service '", *controller_service_name, "' is not a KubernetesControllerService")}; - } + kubernetes_controller_service_ = utils::parseControllerService<controllers::KubernetesControllerService>(context, KubernetesControllerService, getUUID()); } void CollectKubernetesPodMetrics::onTrigger(core::ProcessContext&, core::ProcessSession& session) { diff --git a/extensions/smb/FetchSmb.cpp b/extensions/smb/FetchSmb.cpp index a6c24c59e..9b17ae86c 100644 --- a/extensions/smb/FetchSmb.cpp +++ b/extensions/smb/FetchSmb.cpp @@ -19,6 +19,7 @@ #include "core/Resource.h" #include "utils/ConfigurationUtils.h" #include "utils/file/FileReaderCallback.h" +#include "utils/ProcessorConfigUtils.h" namespace org::apache::nifi::minifi::extensions::smb { @@ -28,7 +29,7 @@ void FetchSmb::initialize() { } void FetchSmb::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { - smb_connection_controller_service_ = SmbConnectionControllerService::getFromProperty(context, FetchSmb::ConnectionControllerService); + smb_connection_controller_service_ = utils::parseControllerService<SmbConnectionControllerService>(context, FetchSmb::ConnectionControllerService, getUUID()); buffer_size_ = utils::configuration::getBufferSize(*context.getConfiguration()); } diff --git a/extensions/smb/ListSmb.cpp b/extensions/smb/ListSmb.cpp index c82c053a3..ecfebe343 100644 --- a/extensions/smb/ListSmb.cpp +++ b/extensions/smb/ListSmb.cpp @@ -33,7 +33,7 @@ void ListSmb::initialize() { } void ListSmb::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { - smb_connection_controller_service_ = SmbConnectionControllerService::getFromProperty(context, ListSmb::ConnectionControllerService); + smb_connection_controller_service_ = utils::parseControllerService<SmbConnectionControllerService>(context, ListSmb::ConnectionControllerService, getUUID()); auto state_manager = context.getStateManager(); if (state_manager == nullptr) { diff --git a/extensions/smb/PutSmb.cpp b/extensions/smb/PutSmb.cpp index 40b9db243..fadd10c9e 100644 --- a/extensions/smb/PutSmb.cpp +++ b/extensions/smb/PutSmb.cpp @@ -32,7 +32,7 @@ void PutSmb::initialize() { } void PutSmb::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { - smb_connection_controller_service_ = SmbConnectionControllerService::getFromProperty(context, PutSmb::ConnectionControllerService); + smb_connection_controller_service_ = utils::parseControllerService<SmbConnectionControllerService>(context, PutSmb::ConnectionControllerService, getUUID()); create_missing_dirs_ = utils::parseBoolProperty(context, PutSmb::CreateMissingDirectories); conflict_resolution_strategy_ = utils::parseEnumProperty<FileExistsResolutionStrategy>(context, ConflictResolution); } diff --git a/extensions/smb/SmbConnectionControllerService.cpp b/extensions/smb/SmbConnectionControllerService.cpp index d1302b32c..8a61a5a79 100644 --- a/extensions/smb/SmbConnectionControllerService.cpp +++ b/extensions/smb/SmbConnectionControllerService.cpp @@ -58,17 +58,6 @@ void SmbConnectionControllerService::notifyStop() { logger_->log_error("Error while disconnecting from SMB: {}", disconnection_result.error().message()); } -gsl::not_null<std::shared_ptr<SmbConnectionControllerService>> SmbConnectionControllerService::getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property) { - std::shared_ptr<SmbConnectionControllerService> smb_connection_controller_service; - if (auto connection_controller_name = context.getProperty(property)) { - smb_connection_controller_service = std::dynamic_pointer_cast<SmbConnectionControllerService>(context.getControllerService(*connection_controller_name, context.getProcessorInfo().getUUID())); - } - if (!smb_connection_controller_service) { - throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing SMB Connection Controller Service"); - } - return gsl::make_not_null(smb_connection_controller_service); -} - nonstd::expected<void, std::error_code> SmbConnectionControllerService::connect() { auto connection_result = WNetAddConnection2A(&net_resource_, credentials_ ? credentials_->password.c_str() : nullptr, diff --git a/extensions/smb/SmbConnectionControllerService.h b/extensions/smb/SmbConnectionControllerService.h index 8522e027d..274376d48 100644 --- a/extensions/smb/SmbConnectionControllerService.h +++ b/extensions/smb/SmbConnectionControllerService.h @@ -87,8 +87,6 @@ class SmbConnectionControllerService : public core::controller::ControllerServic virtual std::error_code validateConnection(); virtual std::filesystem::path getPath() const { return server_path_; } - static gsl::not_null<std::shared_ptr<SmbConnectionControllerService>> getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property); - private: nonstd::expected<void, std::error_code> connect(); nonstd::expected<void, std::error_code> disconnect(); diff --git a/extensions/splunk/PutSplunkHTTP.cpp b/extensions/splunk/PutSplunkHTTP.cpp index 30280335f..4abe8ee7a 100644 --- a/extensions/splunk/PutSplunkHTTP.cpp +++ b/extensions/splunk/PutSplunkHTTP.cpp @@ -113,7 +113,7 @@ void setFlowFileAsPayload(core::ProcessSession& session, void PutSplunkHTTP::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) { SplunkHECProcessor::onSchedule(context, session_factory); - ssl_context_service_ = getSSLContextService(context); + ssl_context_service_ = utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context, SSLContext, getUUID()); auto create_client = [this]() -> std::unique_ptr<minifi::http::HTTPClient> { auto client = std::make_unique<http::HTTPClient>(); initializeClient(*client, getNetworkLocation().append(getEndpoint(*client)), ssl_context_service_); diff --git a/extensions/splunk/QuerySplunkIndexingStatus.cpp b/extensions/splunk/QuerySplunkIndexingStatus.cpp index 1f416486a..819a44422 100644 --- a/extensions/splunk/QuerySplunkIndexingStatus.cpp +++ b/extensions/splunk/QuerySplunkIndexingStatus.cpp @@ -141,12 +141,11 @@ void QuerySplunkIndexingStatus::onSchedule(core::ProcessContext& context, core:: SplunkHECProcessor::onSchedule(context, session_factory); max_age_ = utils::parseDurationProperty(context, MaximumWaitingTime); batch_size_ = utils::parseU64Property(context, MaxQuerySize); - initializeClient(client_, getNetworkLocation().append(getEndpoint()), getSSLContextService(context)); + auto ssl_context_service = utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context, SSLContext, getUUID()); + initializeClient(client_, getNetworkLocation().append(getEndpoint()), ssl_context_service); } void QuerySplunkIndexingStatus::onTrigger(core::ProcessContext&, core::ProcessSession& session) { - std::string ack_request; - auto undetermined_flow_files = getUndeterminedFlowFiles(session, batch_size_); if (undetermined_flow_files.empty()) return; diff --git a/extensions/splunk/SplunkHECProcessor.cpp b/extensions/splunk/SplunkHECProcessor.cpp index 58ba0827a..eae24f31b 100644 --- a/extensions/splunk/SplunkHECProcessor.cpp +++ b/extensions/splunk/SplunkHECProcessor.cpp @@ -40,12 +40,6 @@ std::string SplunkHECProcessor::getNetworkLocation() const { return hostname_ + ":" + port_; } -std::shared_ptr<minifi::controllers::SSLContextServiceInterface> SplunkHECProcessor::getSSLContextService(core::ProcessContext& context) const { - if (const auto context_name = context.getProperty(SSLContext); context_name && !IsNullOrEmpty(*context_name)) - return std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(context.getControllerService(*context_name, getUUID())); - return nullptr; -} - void SplunkHECProcessor::initializeClient(http::HTTPClient& client, const std::string &url, std::shared_ptr<minifi::controllers::SSLContextServiceInterface> ssl_context_service) const { client.initialize(http::HttpRequestMethod::POST, url, std::move(ssl_context_service)); client.setRequestHeader("Authorization", token_); diff --git a/extensions/splunk/SplunkHECProcessor.h b/extensions/splunk/SplunkHECProcessor.h index 766f2ee3b..dbc58ebf5 100644 --- a/extensions/splunk/SplunkHECProcessor.h +++ b/extensions/splunk/SplunkHECProcessor.h @@ -77,7 +77,6 @@ class SplunkHECProcessor : public core::ProcessorImpl { protected: std::string getNetworkLocation() const; - std::shared_ptr<minifi::controllers::SSLContextServiceInterface> getSSLContextService(core::ProcessContext& context) const; void initializeClient(http::HTTPClient& client, const std::string &url, std::shared_ptr<minifi::controllers::SSLContextServiceInterface> ssl_context_service) const; std::string token_; diff --git a/extensions/sql/processors/SQLProcessor.cpp b/extensions/sql/processors/SQLProcessor.cpp index 9bde15f7f..8c88c535d 100644 --- a/extensions/sql/processors/SQLProcessor.cpp +++ b/extensions/sql/processors/SQLProcessor.cpp @@ -24,21 +24,12 @@ #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "Exception.h" +#include "utils/ProcessorConfigUtils.h" namespace org::apache::nifi::minifi::processors { void SQLProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { - std::string controllerService = context.getProperty(DBControllerService).value_or(""); - - if (auto service = context.getControllerService(controllerService, getUUID())) { - db_service_ = std::dynamic_pointer_cast<sql::controllers::DatabaseService>(service); - if (!db_service_) { - throw minifi::Exception(PROCESSOR_EXCEPTION, "'" + controllerService + "' is not a DatabaseService"); - } - } else { - throw minifi::Exception(PROCESSOR_EXCEPTION, "Could not find controller service '" + controllerService + "'"); - } - + db_service_ = utils::parseControllerService<sql::controllers::DatabaseService>(context, DBControllerService, getUUID()); processOnSchedule(context); } diff --git a/extensions/standard-processors/modbus/FetchModbusTcp.cpp b/extensions/standard-processors/modbus/FetchModbusTcp.cpp index eacea365b..e6667ab49 100644 --- a/extensions/standard-processors/modbus/FetchModbusTcp.cpp +++ b/extensions/standard-processors/modbus/FetchModbusTcp.cpp @@ -58,18 +58,13 @@ void FetchModbusTcp::onSchedule(core::ProcessContext& context, core::ProcessSess connections_.emplace(); } - ssl_context_.reset(); - if (const auto controller_service_name = context.getProperty(SSLContextService); controller_service_name && !IsNullOrEmpty(*controller_service_name)) { - if (auto controller_service = context.getControllerService(*controller_service_name, getUUID())) { - if (const auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(controller_service)) { - ssl_context_ = utils::net::getSslContext(*ssl_context_service); - } else { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, *controller_service_name + " is not an SSL Context Service"); - } - } else { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *controller_service_name); + ssl_context_ = [&]() -> std::optional<asio::ssl::context> { + auto service = utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context, SSLContextService, getUUID()); + if (service) { + return {utils::net::getSslContext(*service)}; } - } + return std::nullopt; + }(); } void FetchModbusTcp::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { @@ -208,7 +203,6 @@ auto FetchModbusTcp::sendRequestsAndReadResponses(utils::net::ConnectionHandlerB auto FetchModbusTcp::sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& connection_handler, const ReadModbusFunction& read_modbus_function) -> asio::awaitable<nonstd::expected<core::RecordField, std::error_code>> { - std::string result; if (auto connection_error = co_await connection_handler.setupUsableSocket(io_context_)) { // NOLINT (clang tidy doesnt like coroutines) co_return nonstd::make_unexpected(connection_error); } diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp index 2e51b3ace..221573572 100644 --- a/extensions/standard-processors/processors/GetTCP.cpp +++ b/extensions/standard-processors/processors/GetTCP.cpp @@ -71,22 +71,6 @@ char GetTCP::parseDelimiter(core::ProcessContext& context) { return delimiter; } -std::optional<asio::ssl::context> GetTCP::parseSSLContext(core::ProcessContext& context) const { - std::optional<asio::ssl::context> ssl_context; - if (auto context_name = context.getProperty(SSLContextService)) { - if (auto controller_service = context.getControllerService(*context_name, getUUID())) { - if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(context.getControllerService(*context_name, getUUID()))) { - ssl_context = utils::net::getSslContext(*ssl_context_service); - } else { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service"); - } - } else { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name); - } - } - return ssl_context; -} - uint64_t GetTCP::parseMaxBatchSize(core::ProcessContext& context) { const auto max_batch_size = utils::parseU64Property(context, MaxBatchSize); if (max_batch_size == 0) { @@ -98,7 +82,12 @@ uint64_t GetTCP::parseMaxBatchSize(core::ProcessContext& context) { void GetTCP::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { auto connections_to_make = parseEndpointList(context); auto delimiter = parseDelimiter(context); - auto ssl_context = parseSSLContext(context); + auto ssl_context = [&]() -> std::optional<asio::ssl::context> { + if (auto ssl_context_service = utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context, SSLContextService, getUUID())) { + return {utils::net::getSslContext(*ssl_context_service)}; + } + return std::nullopt; + }(); std::optional<size_t> max_queue_size = utils::parseOptionalU64Property(context, MaxQueueSize); std::optional<size_t> max_message_size = utils::parseOptionalU64Property(context, MaxMessageSize); diff --git a/extensions/standard-processors/processors/GetTCP.h b/extensions/standard-processors/processors/GetTCP.h index 9c8b16363..dc4e9827f 100644 --- a/extensions/standard-processors/processors/GetTCP.h +++ b/extensions/standard-processors/processors/GetTCP.h @@ -137,7 +137,6 @@ class GetTCP : public core::ProcessorImpl { std::vector<utils::net::ConnectionId> parseEndpointList(core::ProcessContext& context); static char parseDelimiter(core::ProcessContext& context); - std::optional<asio::ssl::context> parseSSLContext(core::ProcessContext& context) const; static uint64_t parseMaxBatchSize(core::ProcessContext& context); class TcpClient { diff --git a/extensions/standard-processors/processors/InvokeHTTP.cpp b/extensions/standard-processors/processors/InvokeHTTP.cpp index 084065cd4..269479ede 100644 --- a/extensions/standard-processors/processors/InvokeHTTP.cpp +++ b/extensions/standard-processors/processors/InvokeHTTP.cpp @@ -159,22 +159,13 @@ void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context) proxy_.host = context.getProperty(InvokeHTTP::ProxyHost).value_or(""); proxy_.port = (context.getProperty(InvokeHTTP::ProxyPort) | utils::andThen(parsing::parseIntegral<int>)).value_or(0); - std::string port_str; proxy_.username = context.getProperty(InvokeHTTP::ProxyUsername).value_or(""); proxy_.password = context.getProperty(InvokeHTTP::ProxyPassword).value_or(""); follow_redirects_ = utils::parseBoolProperty(context, FollowRedirects); // Shouldn't fail due to default value; content_type_ = utils::parseProperty(context, InvokeHTTP::ContentType); // Shouldn't fail due to default value; - if (auto ssl_context_name = context.getProperty(SSLContext)) { - if (auto service = context.getControllerService(*ssl_context_name, getUUID())) { - ssl_context_service_ = std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(service); - if (!ssl_context_service_) - logger_->log_error("Controller service '{}' is not an SSLContextService", *ssl_context_name); - } else { - logger_->log_error("Couldn't find controller service with name '{}'", *ssl_context_name); - } - } + ssl_context_service_ = utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context, SSLContext, getUUID()); } gsl::not_null<std::unique_ptr<http::HTTPClient>> InvokeHTTP::createHTTPClientFromMembers(const std::string& url) const { diff --git a/extensions/standard-processors/processors/PutTCP.cpp b/extensions/standard-processors/processors/PutTCP.cpp index f5a113238..a6c55b0d7 100644 --- a/extensions/standard-processors/processors/PutTCP.cpp +++ b/extensions/standard-processors/processors/PutTCP.cpp @@ -70,17 +70,8 @@ void PutTCP::onSchedule(core::ProcessContext& context, core::ProcessSessionFacto else connections_.emplace(); - ssl_context_.reset(); - if (const auto context_name = context.getProperty(SSLContextService); context_name && !IsNullOrEmpty(*context_name)) { - if (auto controller_service = context.getControllerService(*context_name, getUUID())) { - if (const auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(context.getControllerService(*context_name, getUUID()))) { - ssl_context_ = utils::net::getSslContext(*ssl_context_service); - } else { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service"); - } - } else { - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name); - } + if (auto ssl_context_service = utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context, SSLContextService, getUUID())) { + ssl_context_ = {utils::net::getSslContext(*ssl_context_service)}; } const auto delimiter_str = context.getProperty(OutgoingMessageDelimiter).value_or(std::string{}); diff --git a/extensions/standard-processors/processors/SplitRecord.cpp b/extensions/standard-processors/processors/SplitRecord.cpp index 56099608c..52016653b 100644 --- a/extensions/standard-processors/processors/SplitRecord.cpp +++ b/extensions/standard-processors/processors/SplitRecord.cpp @@ -21,34 +21,15 @@ #include "utils/GeneralUtils.h" namespace org::apache::nifi::minifi::processors { -namespace { -template<typename RecordSetIO> -std::shared_ptr<RecordSetIO> getRecordSetIO(core::ProcessContext& context, const core::PropertyReference& property, const utils::Identifier& processor_uuid) { - std::string service_name = context.getProperty(property).value_or(""); - if (!IsNullOrEmpty(service_name)) { - auto record_set_io = std::dynamic_pointer_cast<RecordSetIO>(context.getControllerService(service_name, processor_uuid)); - if (!record_set_io) - return nullptr; - return record_set_io; - } - return nullptr; -} -} // namespace void SplitRecord::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { - record_set_reader_ = getRecordSetIO<core::RecordSetReader>(context, RecordReader, getUUID()); - if (!record_set_reader_) { - throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Record Reader property is missing or invalid"); - } - record_set_writer_ = getRecordSetIO<core::RecordSetWriter>(context, RecordWriter, getUUID()); - if (!record_set_writer_) { - throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Record Writer property is missing or invalid"); - } + record_set_reader_ = utils::parseControllerService<core::RecordSetReader>(context, RecordReader, getUUID()); + record_set_writer_ = utils::parseControllerService<core::RecordSetWriter>(context, RecordWriter, getUUID()); } nonstd::expected<std::size_t, std::string> SplitRecord::readRecordsPerSplit(core::ProcessContext& context, const core::FlowFile& original_flow_file) { return context.getProperty(RecordsPerSplit, &original_flow_file) - | utils::andThen([](const auto records_per_split_str) { + | utils::andThen([](const auto& records_per_split_str) { return parsing::parseIntegralMinMax<std::size_t>(records_per_split_str, 1, std::numeric_limits<std::size_t>::max()); }) | utils::transformError([](std::error_code) -> std::string { return std::string{"Records Per Split should be set to a number larger than 0"}; }); diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp index 2e61155c1..f5ae3c837 100644 --- a/extensions/standard-processors/processors/TailFile.cpp +++ b/extensions/standard-processors/processors/TailFile.cpp @@ -44,10 +44,10 @@ namespace org::apache::nifi::minifi::processors { -const char *TailFile::CURRENT_STR = "CURRENT."; -const char *TailFile::POSITION_STR = "POSITION."; - namespace { +inline constexpr std::string_view CURRENT_STR = "CURRENT."; +inline constexpr std::string_view POSITION_STR = "POSITION."; + template<typename Container, typename Key> bool containsKey(const Container &container, const Key &key) { return container.find(key) != container.end(); @@ -269,7 +269,10 @@ void TailFile::onSchedule(core::ProcessContext& context, core::ProcessSessionFac tail_mode_ = Mode::MULTIPLE; pattern_regex_ = utils::Regex(file_name_str); - parseAttributeProviderServiceProperty(context); + if (auto service = utils::parseOptionalControllerService<minifi::controllers::AttributeProviderService>(context, AttributeProviderService, getUUID())) { + // we drop ownership of the service here -- in the long term, getControllerService/parseControllerService should return a non-owning pointer or optional reference + attribute_provider_service_ = service.get(); + } if (auto base_dir = context.getProperty(BaseDirectory); !base_dir) { throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base directory is required for multiple tail mode."); @@ -309,24 +312,6 @@ void TailFile::onSchedule(core::ProcessContext& context, core::ProcessSessionFac if (batch_size_ == 0) { batch_size_.reset(); } } -void TailFile::parseAttributeProviderServiceProperty(const core::ProcessContext& context) { - const auto attribute_provider_service_name = context.getProperty(AttributeProviderService); - if (!attribute_provider_service_name || attribute_provider_service_name->empty()) { - return; - } - - std::shared_ptr<core::controller::ControllerService> controller_service = context.getControllerService(*attribute_provider_service_name, getUUID()); - if (!controller_service) { - throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, utils::string::join_pack("Controller service '", *attribute_provider_service_name, "' not found")}; - } - - // we drop ownership of the service here -- in the long term, getControllerService() should return a non-owning pointer or optional reference - attribute_provider_service_ = dynamic_cast<minifi::controllers::AttributeProviderService*>(controller_service.get()); - if (!attribute_provider_service_) { - throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, utils::string::join_pack("Controller service '", *attribute_provider_service_name, "' is not an AttributeProviderService")}; - } -} - void TailFile::parseStateFileLine(char *buf, std::map<std::filesystem::path, TailState> &state) const { char *line = buf; @@ -379,8 +364,8 @@ void TailFile::parseStateFileLine(char *buf, std::map<std::filesystem::path, Tai logger_->log_debug("Received position {}", position); state.begin()->second.position_ = gsl::narrow<uint64_t>(position); } - if (key.find(CURRENT_STR) == 0) { - const auto file = key.substr(strlen(CURRENT_STR)); + if (key.starts_with(CURRENT_STR)) { + const auto file = key.substr(CURRENT_STR.size()); std::filesystem::path file_path = value; if (file_path.has_filename() && file_path.has_parent_path()) { state[file].path_ = file_path.parent_path(); @@ -390,8 +375,8 @@ void TailFile::parseStateFileLine(char *buf, std::map<std::filesystem::path, Tai } } - if (key.find(POSITION_STR) == 0) { - const auto file = key.substr(strlen(POSITION_STR)); + if (key.starts_with(POSITION_STR)) { + const auto file = key.substr(POSITION_STR.size()); state[file].position_ = std::stoull(value); } } diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h index 0095770da..006404428 100644 --- a/extensions/standard-processors/processors/TailFile.h +++ b/extensions/standard-processors/processors/TailFile.h @@ -256,7 +256,6 @@ class TailFile : public core::ProcessorImpl { TimePoint mtime_; }; - void parseAttributeProviderServiceProperty(const core::ProcessContext& context); void parseStateFileLine(char *buf, std::map<std::filesystem::path, TailState> &state) const; void processAllRotatedFiles(core::ProcessSession& session, TailState &state); void processRotatedFiles(core::ProcessSession& session, TailState &state, std::vector<TailState> &rotated_file_states); @@ -284,8 +283,6 @@ class TailFile : public core::ProcessorImpl { static void updateStateAttributes(TailState &state, uint64_t size, uint64_t checksum); bool isOldFileInitiallyRead(const TailState &state) const; - static const char *CURRENT_STR; - static const char *POSITION_STR; static constexpr int BUFFER_SIZE = 512; std::optional<char> delimiter_; // Delimiter for the data incoming from the tailed file. diff --git a/libminifi/test/integration/C2ControllerEnableFailureTest.cpp b/libminifi/test/integration/C2ControllerEnableFailureTest.cpp index 3cc01e1ff..0fc6ab3e8 100644 --- a/libminifi/test/integration/C2ControllerEnableFailureTest.cpp +++ b/libminifi/test/integration/C2ControllerEnableFailureTest.cpp @@ -16,6 +16,7 @@ * limitations under the License. */ #include <string> +#include <utility> #include "unit/TestBase.h" #include "integration/HTTPIntegrationBase.h" @@ -24,6 +25,7 @@ #include "core/Processor.h" #include "core/controller/ControllerService.h" #include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" using namespace std::literals::chrono_literals; @@ -85,13 +87,7 @@ class DummmyControllerUserProcessor : public minifi::core::ProcessorImpl { } void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& /*session_factory*/) override { - if (auto controller_service = context.getProperty(DummmyControllerUserProcessor::DummyControllerService)) { - if (!std::dynamic_pointer_cast<DummyController>(context.getControllerService(*controller_service, getUUID()))) { - throw minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service"); - } - } else { - throw minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing controller service"); - } + std::ignore = minifi::utils::parseControllerService<DummyController>(context, DummyControllerService, getUUID()); logger_->log_debug("DummyControllerUserProcessor::onSchedule successful"); } @@ -143,7 +139,8 @@ class ControllerUpdateHandler: public HeartbeatHandler { case TestState::VERIFY_INITIAL_METRICS: { sendEmptyHeartbeatResponse(conn); REQUIRE(minifi::test::utils::verifyLogLinePresenceInPollTime(5s, "Could not enable DummyController")); - REQUIRE(minifi::test::utils::verifyLogLinePresenceInPollTime(5s, "(DummmyControllerUserProcessor): Process Schedule Operation: Invalid controller service")); + REQUIRE(minifi::test::utils::verifyLogLinePresenceInPollTime(5s, + "(DummmyControllerUserProcessor): Process Schedule Operation: Controller service 'Dummy Controller Service' = 'DummyController' not found")); test_state_ = TestState::SEND_NEW_CONFIG; break; } diff --git a/libminifi/test/unit/ProcessorConfigUtilsTests.cpp b/libminifi/test/unit/ProcessorConfigUtilsTests.cpp index 9e03584f0..24f775a63 100644 --- a/libminifi/test/unit/ProcessorConfigUtilsTests.cpp +++ b/libminifi/test/unit/ProcessorConfigUtilsTests.cpp @@ -17,11 +17,15 @@ #include "unit/TestBase.h" #include "unit/Catch.h" +#include "core/controller/ControllerService.h" +#include "core/controller/ControllerServiceNode.h" +#include "core/controller/ControllerServiceProvider.h" #include "core/PropertyDefinition.h" #include "core/ProcessorImpl.h" #include "core/PropertyDefinitionBuilder.h" #include "utils/ProcessorConfigUtils.h" #include "utils/Enum.h" +#include "utils/Id.h" #include "unit/TestUtils.h" namespace org::apache::nifi::minifi::core { @@ -48,6 +52,7 @@ enum class TestEnum { A, B }; +} // namespace TEST_CASE("Parse enum property") { static constexpr auto prop = PropertyDefinitionBuilder<magic_enum::enum_count<TestEnum>()>::createProperty("prop") @@ -84,5 +89,99 @@ TEST_CASE("Parse enum property") { } } +namespace { +class TestControllerService : public controller::ControllerServiceImpl { + public: + using ControllerServiceImpl::ControllerServiceImpl; + bool supportsDynamicProperties() const override { return false; } + void yield() override {} + bool isRunning() const override { return false; } + bool isWorkAvailable() override { return false; } +}; + +const auto test_controller_service = []() { + auto service = std::make_shared<TestControllerService>("test-controller-service", utils::IdGenerator::getIdGenerator()->generate()); + service->initialize(); + return service; +}(); + +class WrongTestControllerService : public TestControllerService {}; + +class TestControllerServiceProvider : public controller::ControllerServiceProviderImpl { + public: + using ControllerServiceProviderImpl::ControllerServiceProviderImpl; + std::shared_ptr<controller::ControllerServiceNode> createControllerService(const std::string&, const std::string&) override { return nullptr; } + void clearControllerServices() override {} + void enableAllControllerServices() override {} + void disableAllControllerServices() override {} + + std::shared_ptr<controller::ControllerService> getControllerService(const std::string& name, const utils::Identifier&) const override { + if (name == "TestControllerService") { return test_controller_service; } + return nullptr; + } +}; + +TestControllerServiceProvider test_controller_service_provider("test-provider"); } // namespace + +TEST_CASE("Parse controller service property") { + static constexpr auto property = PropertyDefinitionBuilder<>::createProperty("Controller Service") + .withAllowedTypes<TestControllerService>() + .build(); + auto processor = minifi::test::utils::make_processor<TestProcessor>("test-processor"); + processor->getImpl<TestProcessor>().on_initialize_ = [&] (auto& self) { + self.setSupportedProperties(std::to_array<core::PropertyReference>({property})); + }; + processor->initialize(); + auto configuration = minifi::Configure::create(); + ProcessContextImpl context(*processor, &test_controller_service_provider, nullptr, nullptr, configuration, nullptr); + + SECTION("Required controller service property") { + SECTION("... is valid") { + REQUIRE(processor->setProperty(property.name, "TestControllerService")); + const auto value = utils::parseControllerService<TestControllerService>(context, property, processor->getUUID()); + CHECK(value == test_controller_service); + } + SECTION("... is missing") { + CHECK_THROWS(utils::parseControllerService<TestControllerService>(context, property, processor->getUUID())); + } + SECTION("... is blank") { + REQUIRE(processor->setProperty(property.name, "")); + CHECK_THROWS(utils::parseControllerService<TestControllerService>(context, property, processor->getUUID())); + } + SECTION("... is invalid") { + REQUIRE(processor->setProperty(property.name, "NonExistentControllerService")); + CHECK_THROWS(utils::parseControllerService<TestControllerService>(context, property, processor->getUUID())); + } + SECTION("... is not the correct class") { + REQUIRE(processor->setProperty(property.name, "TestControllerService")); + CHECK_THROWS(utils::parseControllerService<WrongTestControllerService>(context, property, processor->getUUID())); + } + } + + SECTION("Optional controller service property") { + SECTION("... is valid") { + REQUIRE(processor->setProperty(property.name, "TestControllerService")); + const auto value = utils::parseOptionalControllerService<TestControllerService>(context, property, processor->getUUID());; + CHECK(value == test_controller_service); + } + SECTION("... is missing") { + const auto value = utils::parseOptionalControllerService<TestControllerService>(context, property, processor->getUUID());; + CHECK(value == nullptr); + } + SECTION("... is blank") { + REQUIRE(processor->setProperty(property.name, "")); + const auto value = utils::parseOptionalControllerService<TestControllerService>(context, property, processor->getUUID());; + CHECK(value == nullptr); + } + SECTION("... is invalid") { + REQUIRE(processor->setProperty(property.name, "NonExistentControllerService")); + CHECK_THROWS(utils::parseOptionalControllerService<TestControllerService>(context, property, processor->getUUID())); + } + SECTION("... is not the correct class") { + REQUIRE(processor->setProperty(property.name, "TestControllerService")); + CHECK_THROWS(utils::parseOptionalControllerService<WrongTestControllerService>(context, property, processor->getUUID())); + } + } +} } // namespace org::apache::nifi::minifi::core
