This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit f1758fab52cce83bec70f38f6188536783064cce
Author: Ferenc Gerlits <[email protected]>
AuthorDate: Thu Aug 7 09:47:16 2025 +0200

    MINIFICPP-2599 parseOptionalControllerService should throw
    
    ... when the value of the property is not a controller service,
    or its type does not match.
    
    Also removed some helper functions which basically did the same thing,
    and replaced them with uses of parse[Optional]ControllerService.
    
    Signed-off-by: Gabor Gyimesi <[email protected]>
    
    This closes #2002
---
 .../include/utils/ProcessorConfigUtils.h           | 22 ++---
 extensions/aws/processors/AwsProcessor.cpp         | 10 ++-
 .../controllerservices/CouchbaseClusterService.cpp | 15 +---
 .../controllerservices/CouchbaseClusterService.h   |  2 -
 .../couchbase/processors/GetCouchbaseKey.cpp       |  3 +-
 .../couchbase/processors/PutCouchbaseKey.cpp       |  3 +-
 extensions/elasticsearch/PostElasticsearch.cpp     | 17 +---
 extensions/elasticsearch/PostElasticsearch.h       |  2 -
 extensions/gcp/processors/GCSProcessor.cpp         |  7 +-
 extensions/grafana-loki/PushGrafanaLoki.cpp        | 15 +---
 extensions/grafana-loki/PushGrafanaLoki.h          |  1 -
 extensions/grafana-loki/PushGrafanaLokiGrpc.cpp    | 22 ++---
 extensions/grafana-loki/PushGrafanaLokiREST.cpp    |  3 +-
 .../processors/CollectKubernetesPodMetrics.cpp     | 16 +---
 extensions/smb/FetchSmb.cpp                        |  3 +-
 extensions/smb/ListSmb.cpp                         |  2 +-
 extensions/smb/PutSmb.cpp                          |  2 +-
 extensions/smb/SmbConnectionControllerService.cpp  | 11 ---
 extensions/smb/SmbConnectionControllerService.h    |  2 -
 extensions/splunk/PutSplunkHTTP.cpp                |  2 +-
 extensions/splunk/QuerySplunkIndexingStatus.cpp    |  5 +-
 extensions/splunk/SplunkHECProcessor.cpp           |  6 --
 extensions/splunk/SplunkHECProcessor.h             |  1 -
 extensions/sql/processors/SQLProcessor.cpp         | 13 +--
 .../standard-processors/modbus/FetchModbusTcp.cpp  | 18 ++--
 .../standard-processors/processors/GetTCP.cpp      | 23 ++---
 extensions/standard-processors/processors/GetTCP.h |  1 -
 .../standard-processors/processors/InvokeHTTP.cpp  | 11 +--
 .../standard-processors/processors/PutTCP.cpp      | 13 +--
 .../standard-processors/processors/SplitRecord.cpp | 25 +-----
 .../standard-processors/processors/TailFile.cpp    | 37 +++-----
 .../standard-processors/processors/TailFile.h      |  3 -
 .../integration/C2ControllerEnableFailureTest.cpp  | 13 ++-
 libminifi/test/unit/ProcessorConfigUtilsTests.cpp  | 99 ++++++++++++++++++++++
 34 files changed, 192 insertions(+), 236 deletions(-)

diff --git a/extension-framework/include/utils/ProcessorConfigUtils.h 
b/extension-framework/include/utils/ProcessorConfigUtils.h
index f18febf78..0d91513f9 100644
--- a/extension-framework/include/utils/ProcessorConfigUtils.h
+++ b/extension-framework/include/utils/ProcessorConfigUtils.h
@@ -22,6 +22,7 @@
 #include <string>
 #include <vector>
 
+#include "core/ClassName.h"
 #include "core/ProcessContext.h"
 #include "minifi-cpp/core/PropertyValidator.h"
 #include "utils/Enum.h"
@@ -166,30 +167,31 @@ std::optional<T> parseOptionalEnumProperty(const 
core::ProcessContext& context,
 }
 
 template<typename ControllerServiceType>
-std::optional<std::shared_ptr<ControllerServiceType>> 
parseOptionalControllerService(const core::ProcessContext& context,
-    const core::PropertyReference& prop,
-    const utils::Identifier& processor_uuid) {
+std::shared_ptr<ControllerServiceType> parseOptionalControllerService(const 
core::ProcessContext& context, const core::PropertyReference& prop, const 
utils::Identifier& processor_uuid) {
   const auto controller_service_name = context.getProperty(prop.name);
-  if (!controller_service_name) {
-    return std::nullopt;
+  if (!controller_service_name || controller_service_name->empty()) {
+    return nullptr;
   }
 
   const std::shared_ptr<core::controller::ControllerService> service = 
context.getControllerService(*controller_service_name, processor_uuid);
   if (!service) {
-    return std::nullopt;
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Controller 
service '{}' = '{}' not found", prop.name, *controller_service_name));
   }
 
   auto typed_controller_service = 
std::dynamic_pointer_cast<ControllerServiceType>(service);
   if (!typed_controller_service) {
-    return std::nullopt;
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Controller 
service '{}' = '{}' is not of type {}", prop.name, *controller_service_name, 
core::className<ControllerServiceType>()));
   }
 
   return typed_controller_service;
 }
 
 template<typename ControllerServiceType>
-std::shared_ptr<ControllerServiceType> parseControllerService(const 
core::ProcessContext& context, const core::PropertyReference& prop, const 
utils::Identifier& processor_uuid) {
-  return parseOptionalControllerService<ControllerServiceType>(context, prop, 
processor_uuid)
-      | utils::orThrow("Required Controller Service");
+gsl::not_null<std::shared_ptr<ControllerServiceType>> 
parseControllerService(const core::ProcessContext& context, const 
core::PropertyReference& prop, const utils::Identifier& processor_uuid) {
+  auto controller_service = 
parseOptionalControllerService<ControllerServiceType>(context, prop, 
processor_uuid);
+  if (!controller_service) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Required 
controller service property '{}' is missing", prop.name));
+  }
+  return gsl::make_not_null(controller_service);
 }
 }  // namespace org::apache::nifi::minifi::utils
diff --git a/extensions/aws/processors/AwsProcessor.cpp 
b/extensions/aws/processors/AwsProcessor.cpp
index cd442abb7..5e7ef1e14 100644
--- a/extensions/aws/processors/AwsProcessor.cpp
+++ b/extensions/aws/processors/AwsProcessor.cpp
@@ -33,11 +33,10 @@
 namespace org::apache::nifi::minifi::aws::processors {
 
 std::optional<Aws::Auth::AWSCredentials> 
AwsProcessor::getAWSCredentialsFromControllerService(core::ProcessContext& 
context) const {
-  if (const auto aws_credentials_service = 
minifi::utils::parseOptionalControllerService<controllers::AWSCredentialsService>(context,
 AWSCredentialsProviderService, getUUID())) {
-    return (*aws_credentials_service)->getAWSCredentials();
+  if (auto service = 
minifi::utils::parseOptionalControllerService<controllers::AWSCredentialsService>(context,
 AWSCredentialsProviderService, getUUID())) {
+    return service->getAWSCredentials();
   }
   logger_->log_error("AWS credentials service could not be found");
-
   return std::nullopt;
 }
 
@@ -47,7 +46,7 @@ std::optional<Aws::Auth::AWSCredentials> 
AwsProcessor::getAWSCredentials(
   auto service_cred = getAWSCredentialsFromControllerService(context);
   if (service_cred) {
     logger_->log_info("AWS Credentials successfully set from controller 
service");
-    return service_cred.value();
+    return service_cred;
   }
 
   aws::AWSCredentialsProvider aws_credentials_provider;
@@ -98,6 +97,9 @@ void AwsProcessor::onSchedule(core::ProcessContext& context, 
core::ProcessSessio
   if (default_ca_file) {
     client_config_->caFile = *default_ca_file;
   }
+
+  // throw here if the credentials provider service is set to an invalid value
+  std::ignore = 
minifi::utils::parseOptionalControllerService<controllers::AWSCredentialsService>(context,
 AWSCredentialsProviderService, getUUID());
 }
 
 std::optional<CommonProperties> AwsProcessor::getCommonELSupportedProperties(
diff --git 
a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp 
b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp
index f85203e6d..9a04a7688 100644
--- a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp
+++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp
@@ -157,9 +157,11 @@ nonstd::expected<CouchbaseGetResult, CouchbaseErrorType> 
CouchbaseClient::get(co
       logger_->log_error("Failed to get document '{}' from collection 
'{}.{}.{}' due to timeout", document_id, collection.bucket_name, 
collection.scope_name, collection.collection_name);
       return nonstd::make_unexpected(CouchbaseErrorType::TEMPORARY);
     }
-    std::string cause = get_err.cause() ? get_err.cause()->message() : "";
     logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}' 
with error code: '{}', message: '{}'", document_id, collection.bucket_name, 
collection.scope_name,
         collection.collection_name, get_err.ec(), get_err.message());
+    if (get_err.cause()) {
+      logger_->log_error("... root cause error code: '{}', message: '{}'", 
get_err.cause()->ec(), get_err.cause()->message());
+    }
     return nonstd::make_unexpected(CouchbaseErrorType::FATAL);
   } else {
     try {
@@ -247,17 +249,6 @@ void CouchbaseClusterService::onEnable() {
   }
 }
 
-gsl::not_null<std::shared_ptr<CouchbaseClusterService>> 
CouchbaseClusterService::getFromProperty(const core::ProcessContext& context, 
const core::PropertyReference& property) {
-  std::shared_ptr<CouchbaseClusterService> couchbase_cluster_service;
-  if (auto connection_controller_name = context.getProperty(property)) {
-    couchbase_cluster_service = 
std::dynamic_pointer_cast<CouchbaseClusterService>(context.getControllerService(*connection_controller_name,
 context.getProcessorInfo().getUUID()));
-  }
-  if (!couchbase_cluster_service) {
-    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
"Missing Couchbase Cluster Service");
-  }
-  return gsl::make_not_null(couchbase_cluster_service);
-}
-
 REGISTER_RESOURCE(CouchbaseClusterService, ControllerService);
 
 }  // namespace controllers
diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.h 
b/extensions/couchbase/controllerservices/CouchbaseClusterService.h
index 040b6773f..8e9040411 100644
--- a/extensions/couchbase/controllerservices/CouchbaseClusterService.h
+++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.h
@@ -166,8 +166,6 @@ class CouchbaseClusterService : public 
core::controller::ControllerServiceImpl {
     return client_->get(collection, document_id, return_type);
   }
 
-  static gsl::not_null<std::shared_ptr<CouchbaseClusterService>> 
getFromProperty(const core::ProcessContext& context, const 
core::PropertyReference& property);
-
  private:
   std::unique_ptr<CouchbaseClient> client_;
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<CouchbaseClusterService>::getLogger(uuid_);
diff --git a/extensions/couchbase/processors/GetCouchbaseKey.cpp 
b/extensions/couchbase/processors/GetCouchbaseKey.cpp
index 02fd95e97..fd983c36d 100644
--- a/extensions/couchbase/processors/GetCouchbaseKey.cpp
+++ b/extensions/couchbase/processors/GetCouchbaseKey.cpp
@@ -17,6 +17,7 @@
  */
 
 #include "GetCouchbaseKey.h"
+#include "CouchbaseClusterService.h"
 #include "utils/gsl.h"
 #include "core/Resource.h"
 #include "utils/ProcessorConfigUtils.h"
@@ -24,7 +25,7 @@
 namespace org::apache::nifi::minifi::couchbase::processors {
 
 void GetCouchbaseKey::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
-  couchbase_cluster_service_ = 
controllers::CouchbaseClusterService::getFromProperty(context, 
GetCouchbaseKey::CouchbaseClusterControllerService);
+  couchbase_cluster_service_ = 
utils::parseControllerService<controllers::CouchbaseClusterService>(context, 
GetCouchbaseKey::CouchbaseClusterControllerService, 
context.getProcessorInfo().getUUID());
   document_type_ = utils::parseEnumProperty<CouchbaseValueType>(context, 
GetCouchbaseKey::DocumentType);
 }
 
diff --git a/extensions/couchbase/processors/PutCouchbaseKey.cpp 
b/extensions/couchbase/processors/PutCouchbaseKey.cpp
index 7c7edd4a0..04acd4033 100644
--- a/extensions/couchbase/processors/PutCouchbaseKey.cpp
+++ b/extensions/couchbase/processors/PutCouchbaseKey.cpp
@@ -17,6 +17,7 @@
  */
 
 #include "PutCouchbaseKey.h"
+#include "CouchbaseClusterService.h"
 #include "utils/gsl.h"
 #include "core/Resource.h"
 #include "utils/ProcessorConfigUtils.h"
@@ -24,7 +25,7 @@
 namespace org::apache::nifi::minifi::couchbase::processors {
 
 void PutCouchbaseKey::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
-  couchbase_cluster_service_ = 
controllers::CouchbaseClusterService::getFromProperty(context, 
PutCouchbaseKey::CouchbaseClusterControllerService);
+  couchbase_cluster_service_ = 
utils::parseControllerService<controllers::CouchbaseClusterService>(context, 
PutCouchbaseKey::CouchbaseClusterControllerService, 
context.getProcessorInfo().getUUID());
   document_type_ = utils::parseEnumProperty<CouchbaseValueType>(context, 
PutCouchbaseKey::DocumentType);
   persist_to_ = utils::parseEnumProperty<::couchbase::persist_to>(context, 
PutCouchbaseKey::PersistTo);
   replicate_to_ = utils::parseEnumProperty<::couchbase::replicate_to>(context, 
PutCouchbaseKey::ReplicateTo);
diff --git a/extensions/elasticsearch/PostElasticsearch.cpp 
b/extensions/elasticsearch/PostElasticsearch.cpp
index 05e0835e6..ca6e1c747 100644
--- a/extensions/elasticsearch/PostElasticsearch.cpp
+++ b/extensions/elasticsearch/PostElasticsearch.cpp
@@ -37,18 +37,6 @@ void PostElasticsearch::initialize() {
   setSupportedRelationships(Relationships);
 }
 
-auto PostElasticsearch::getSSLContextService(core::ProcessContext& context) 
const {
-  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
-    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(context.getControllerService(*ssl_context,
 getUUID()));
-  return std::shared_ptr<minifi::controllers::SSLContextServiceInterface>{};
-}
-
-auto PostElasticsearch::getCredentialsService(core::ProcessContext& context) 
const {
-  if (auto credentials = 
context.getProperty(PostElasticsearch::ElasticCredentials))
-    return 
std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials,
 getUUID()));
-  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
-}
-
 void PostElasticsearch::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
   max_batch_size_ = utils::parseU64Property(context, MaxBatchSize);
   if (max_batch_size_ < 1)
@@ -63,11 +51,12 @@ void PostElasticsearch::onSchedule(core::ProcessContext& 
context, core::ProcessS
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
   }
 
-  credentials_service_ = getCredentialsService(context);
+  credentials_service_ = 
utils::parseOptionalControllerService<ElasticsearchCredentialsControllerService>(context,
 PostElasticsearch::ElasticCredentials, getUUID());
   if (!credentials_service_)
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch 
credentials service");
 
-  client_.initialize(http::HttpRequestMethod::POST, host_url_ + "/_bulk", 
getSSLContextService(context));
+  auto ssl_context_service = 
utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context,
 PostElasticsearch::SSLContext, getUUID());
+  client_.initialize(http::HttpRequestMethod::POST, host_url_ + "/_bulk", 
ssl_context_service);
   client_.setContentType("application/json");
   credentials_service_->authenticateClient(client_);
 }
diff --git a/extensions/elasticsearch/PostElasticsearch.h 
b/extensions/elasticsearch/PostElasticsearch.h
index 9d3cb4dc8..9397d7747 100644
--- a/extensions/elasticsearch/PostElasticsearch.h
+++ b/extensions/elasticsearch/PostElasticsearch.h
@@ -109,8 +109,6 @@ class PostElasticsearch : public core::ProcessorImpl {
 
  private:
   std::string collectPayload(core::ProcessContext&, core::ProcessSession&, 
std::vector<std::shared_ptr<core::FlowFile>>&) const;
-  auto getSSLContextService(core::ProcessContext& context) const;
-  auto getCredentialsService(core::ProcessContext& context) const;
 
   uint64_t max_batch_size_ = 100;
   std::string host_url_;
diff --git a/extensions/gcp/processors/GCSProcessor.cpp 
b/extensions/gcp/processors/GCSProcessor.cpp
index 2ae8e16d1..4c833a3ce 100644
--- a/extensions/gcp/processors/GCSProcessor.cpp
+++ b/extensions/gcp/processors/GCSProcessor.cpp
@@ -28,11 +28,8 @@ namespace gcs = ::google::cloud::storage;
 namespace org::apache::nifi::minifi::extensions::gcp {
 
 std::shared_ptr<google::cloud::storage::oauth2::Credentials> 
GCSProcessor::getCredentials(core::ProcessContext& context) const {
-  const std::string service_name = utils::parseProperty(context, 
GCSProcessor::GCPCredentials);
-  if (!IsNullOrEmpty(service_name)) {
-    auto gcp_credentials_controller_service = std::dynamic_pointer_cast<const 
GCPCredentialsControllerService>(context.getControllerService(service_name, 
getUUID()));
-    if (!gcp_credentials_controller_service)
-      return nullptr;
+  auto gcp_credentials_controller_service = 
utils::parseOptionalControllerService<GCPCredentialsControllerService>(context, 
GCSProcessor::GCPCredentials, getUUID());
+  if (gcp_credentials_controller_service) {
     return gcp_credentials_controller_service->getCredentials();
   }
   return nullptr;
diff --git a/extensions/grafana-loki/PushGrafanaLoki.cpp 
b/extensions/grafana-loki/PushGrafanaLoki.cpp
index f328078ac..a70013237 100644
--- a/extensions/grafana-loki/PushGrafanaLoki.cpp
+++ b/extensions/grafana-loki/PushGrafanaLoki.cpp
@@ -77,13 +77,6 @@ void 
PushGrafanaLoki::LogBatch::setStartPushTime(std::chrono::system_clock::time
 
 const core::Relationship PushGrafanaLoki::Self("__self__", "Marks the FlowFile 
to be owned by this processor");
 
-std::shared_ptr<minifi::controllers::SSLContextServiceInterface> 
PushGrafanaLoki::getSSLContextService(core::ProcessContext& context) const {
-  if (auto ssl_context = 
context.getProperty(PushGrafanaLoki::SSLContextService)) {
-    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(context.getControllerService(*ssl_context,
 getUUID()));
-  }
-  return std::shared_ptr<minifi::controllers::SSLContextServiceInterface>{};
-}
-
 void PushGrafanaLoki::setUpStateManager(core::ProcessContext& context) {
   auto state_manager = context.getStateManager();
   if (state_manager == nullptr) {
@@ -110,11 +103,11 @@ std::map<std::string, std::string> 
PushGrafanaLoki::buildStreamLabelMap(core::Pr
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream 
Labels property");
     }
     for (const auto& label : stream_labels) {
-      auto stream_labels = utils::string::splitAndTrimRemovingEmpty(label, 
"=");
-      if (stream_labels.size() != 2) {
+      auto key_value = utils::string::splitAndTrimRemovingEmpty(label, "=");
+      if (key_value.size() != 2) {
         throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream 
Labels property");
       }
-      stream_label_map[stream_labels[0]] = stream_labels[1];
+      stream_label_map[key_value[0]] = key_value[1];
     }
   } else {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream 
Labels property");
@@ -149,7 +142,7 @@ void PushGrafanaLoki::onSchedule(core::ProcessContext& 
context, core::ProcessSes
   }
 
   if (log_line_batch_wait) {
-    log_batch_.setLogLineBatchWait(*log_line_batch_wait);
+    log_batch_.setLogLineBatchWait(log_line_batch_wait);
     logger_->log_debug("PushGrafanaLoki Log Line Batch Wait is set to {} 
milliseconds", *log_line_batch_wait);
   }
 }
diff --git a/extensions/grafana-loki/PushGrafanaLoki.h 
b/extensions/grafana-loki/PushGrafanaLoki.h
index bd64aa021..06676194c 100644
--- a/extensions/grafana-loki/PushGrafanaLoki.h
+++ b/extensions/grafana-loki/PushGrafanaLoki.h
@@ -133,7 +133,6 @@ class PushGrafanaLoki : public core::ProcessorImpl {
 
   static std::map<std::string, std::string> 
buildStreamLabelMap(core::ProcessContext& context);
 
-  std::shared_ptr<minifi::controllers::SSLContextServiceInterface> 
getSSLContextService(core::ProcessContext& context) const;
   void processBatch(const std::vector<std::shared_ptr<core::FlowFile>>& 
batched_flow_files, core::ProcessSession& session);
   virtual nonstd::expected<void, std::string> submitRequest(const 
std::vector<std::shared_ptr<core::FlowFile>>& batched_flow_files, 
core::ProcessSession& session) = 0;
   void initializeHttpClient(core::ProcessContext& context);
diff --git a/extensions/grafana-loki/PushGrafanaLokiGrpc.cpp 
b/extensions/grafana-loki/PushGrafanaLokiGrpc.cpp
index d13634ce8..932c218d2 100644
--- a/extensions/grafana-loki/PushGrafanaLokiGrpc.cpp
+++ b/extensions/grafana-loki/PushGrafanaLokiGrpc.cpp
@@ -73,16 +73,18 @@ void PushGrafanaLokiGrpc::setUpGrpcChannel(const 
std::string& url, core::Process
 
   args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
 
-  std::shared_ptr<::grpc::ChannelCredentials> creds;
-  if (auto ssl_context_service = getSSLContextService(context)) {
-    ::grpc::SslCredentialsOptions ssl_credentials_options;
-    ssl_credentials_options.pem_cert_chain = 
utils::file::FileUtils::get_content(ssl_context_service->getCertificateFile());
-    ssl_credentials_options.pem_private_key = 
utils::file::FileUtils::get_content(ssl_context_service->getPrivateKeyFile());
-    ssl_credentials_options.pem_root_certs = 
utils::file::FileUtils::get_content(ssl_context_service->getCACertificate());
-    creds = ::grpc::SslCredentials(ssl_credentials_options);
-  } else {
-    creds = ::grpc::InsecureChannelCredentials();
-  }
+  std::shared_ptr<::grpc::ChannelCredentials> creds = [&]() {
+    auto ssl_context_service = 
utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context,
 PushGrafanaLoki::SSLContextService, getUUID());
+    if (ssl_context_service) {
+      ::grpc::SslCredentialsOptions ssl_credentials_options;
+      ssl_credentials_options.pem_cert_chain = 
utils::file::FileUtils::get_content(ssl_context_service->getCertificateFile());
+      ssl_credentials_options.pem_private_key = 
utils::file::FileUtils::get_content(ssl_context_service->getPrivateKeyFile());
+      ssl_credentials_options.pem_root_certs = 
utils::file::FileUtils::get_content(ssl_context_service->getCACertificate());
+      return ::grpc::SslCredentials(ssl_credentials_options);
+    } else {
+      return ::grpc::InsecureChannelCredentials();
+    }
+  }();
 
   push_channel_ = ::grpc::CreateCustomChannel(url, creds, args);
   if (!push_channel_) {
diff --git a/extensions/grafana-loki/PushGrafanaLokiREST.cpp 
b/extensions/grafana-loki/PushGrafanaLokiREST.cpp
index 74560af5e..1feb11d9c 100644
--- a/extensions/grafana-loki/PushGrafanaLokiREST.cpp
+++ b/extensions/grafana-loki/PushGrafanaLokiREST.cpp
@@ -82,7 +82,8 @@ void 
PushGrafanaLokiREST::initializeHttpClient(core::ProcessContext& context) {
     url += "/loki/api/v1/push";
   }
   logger_->log_debug("PushGrafanaLokiREST push url is set to: {}", url);
-  client_.initialize(http::HttpRequestMethod::POST, url, 
getSSLContextService(context));
+  auto ssl_context_service = 
utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context,
 PushGrafanaLoki::SSLContextService, getUUID());
+  client_.initialize(http::HttpRequestMethod::POST, url, ssl_context_service);
 }
 
 void PushGrafanaLokiREST::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory& session_factory) {
diff --git a/extensions/kubernetes/processors/CollectKubernetesPodMetrics.cpp 
b/extensions/kubernetes/processors/CollectKubernetesPodMetrics.cpp
index bae100bda..5693a06ae 100644
--- a/extensions/kubernetes/processors/CollectKubernetesPodMetrics.cpp
+++ b/extensions/kubernetes/processors/CollectKubernetesPodMetrics.cpp
@@ -22,6 +22,7 @@
 #include "../ContainerInfo.h"
 #include "../MetricsApi.h"
 #include "../MetricsFilter.h"
+#include "utils/ProcessorConfigUtils.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -31,20 +32,7 @@ void CollectKubernetesPodMetrics::initialize() {
 }
 
 void CollectKubernetesPodMetrics::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
-  const auto controller_service_name = 
context.getProperty(KubernetesControllerService);
-  if (!controller_service_name || controller_service_name->empty()) {
-    throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
utils::string::join_pack("Missing '", KubernetesControllerService.name, "' 
property")};
-  }
-
-  std::shared_ptr<core::controller::ControllerService> controller_service = 
context.getControllerService(*controller_service_name, getUUID());
-  if (!controller_service) {
-    throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
utils::string::join_pack("Controller service '", *controller_service_name, "' 
not found")};
-  }
-
-  kubernetes_controller_service_ = 
std::dynamic_pointer_cast<minifi::controllers::KubernetesControllerService>(controller_service);
-  if (!kubernetes_controller_service_) {
-    throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
utils::string::join_pack("Controller service '", *controller_service_name, "' 
is not a KubernetesControllerService")};
-  }
+  kubernetes_controller_service_ = 
utils::parseControllerService<controllers::KubernetesControllerService>(context,
 KubernetesControllerService, getUUID());
 }
 
 void CollectKubernetesPodMetrics::onTrigger(core::ProcessContext&, 
core::ProcessSession& session) {
diff --git a/extensions/smb/FetchSmb.cpp b/extensions/smb/FetchSmb.cpp
index a6c24c59e..9b17ae86c 100644
--- a/extensions/smb/FetchSmb.cpp
+++ b/extensions/smb/FetchSmb.cpp
@@ -19,6 +19,7 @@
 #include "core/Resource.h"
 #include "utils/ConfigurationUtils.h"
 #include "utils/file/FileReaderCallback.h"
+#include "utils/ProcessorConfigUtils.h"
 
 namespace org::apache::nifi::minifi::extensions::smb {
 
@@ -28,7 +29,7 @@ void FetchSmb::initialize() {
 }
 
 void FetchSmb::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
-  smb_connection_controller_service_ = 
SmbConnectionControllerService::getFromProperty(context, 
FetchSmb::ConnectionControllerService);
+  smb_connection_controller_service_ = 
utils::parseControllerService<SmbConnectionControllerService>(context, 
FetchSmb::ConnectionControllerService, getUUID());
   buffer_size_ = 
utils::configuration::getBufferSize(*context.getConfiguration());
 }
 
diff --git a/extensions/smb/ListSmb.cpp b/extensions/smb/ListSmb.cpp
index c82c053a3..ecfebe343 100644
--- a/extensions/smb/ListSmb.cpp
+++ b/extensions/smb/ListSmb.cpp
@@ -33,7 +33,7 @@ void ListSmb::initialize() {
 }
 
 void ListSmb::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
-  smb_connection_controller_service_ = 
SmbConnectionControllerService::getFromProperty(context, 
ListSmb::ConnectionControllerService);
+  smb_connection_controller_service_ = 
utils::parseControllerService<SmbConnectionControllerService>(context, 
ListSmb::ConnectionControllerService, getUUID());
 
   auto state_manager = context.getStateManager();
   if (state_manager == nullptr) {
diff --git a/extensions/smb/PutSmb.cpp b/extensions/smb/PutSmb.cpp
index 40b9db243..fadd10c9e 100644
--- a/extensions/smb/PutSmb.cpp
+++ b/extensions/smb/PutSmb.cpp
@@ -32,7 +32,7 @@ void PutSmb::initialize() {
 }
 
 void PutSmb::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
-  smb_connection_controller_service_ = 
SmbConnectionControllerService::getFromProperty(context, 
PutSmb::ConnectionControllerService);
+  smb_connection_controller_service_ = 
utils::parseControllerService<SmbConnectionControllerService>(context, 
PutSmb::ConnectionControllerService, getUUID());
   create_missing_dirs_ = utils::parseBoolProperty(context, 
PutSmb::CreateMissingDirectories);
   conflict_resolution_strategy_ = 
utils::parseEnumProperty<FileExistsResolutionStrategy>(context, 
ConflictResolution);
 }
diff --git a/extensions/smb/SmbConnectionControllerService.cpp 
b/extensions/smb/SmbConnectionControllerService.cpp
index d1302b32c..8a61a5a79 100644
--- a/extensions/smb/SmbConnectionControllerService.cpp
+++ b/extensions/smb/SmbConnectionControllerService.cpp
@@ -58,17 +58,6 @@ void SmbConnectionControllerService::notifyStop() {
     logger_->log_error("Error while disconnecting from SMB: {}", 
disconnection_result.error().message());
 }
 
-gsl::not_null<std::shared_ptr<SmbConnectionControllerService>> 
SmbConnectionControllerService::getFromProperty(const core::ProcessContext& 
context, const core::PropertyReference& property) {
-  std::shared_ptr<SmbConnectionControllerService> 
smb_connection_controller_service;
-  if (auto connection_controller_name = context.getProperty(property)) {
-    smb_connection_controller_service = 
std::dynamic_pointer_cast<SmbConnectionControllerService>(context.getControllerService(*connection_controller_name,
 context.getProcessorInfo().getUUID()));
-  }
-  if (!smb_connection_controller_service) {
-    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
"Missing SMB Connection Controller Service");
-  }
-  return gsl::make_not_null(smb_connection_controller_service);
-}
-
 nonstd::expected<void, std::error_code> 
SmbConnectionControllerService::connect() {
   auto connection_result = WNetAddConnection2A(&net_resource_,
       credentials_ ? credentials_->password.c_str() : nullptr,
diff --git a/extensions/smb/SmbConnectionControllerService.h 
b/extensions/smb/SmbConnectionControllerService.h
index 8522e027d..274376d48 100644
--- a/extensions/smb/SmbConnectionControllerService.h
+++ b/extensions/smb/SmbConnectionControllerService.h
@@ -87,8 +87,6 @@ class SmbConnectionControllerService : public 
core::controller::ControllerServic
   virtual std::error_code validateConnection();
   virtual std::filesystem::path getPath() const { return server_path_; }
 
-  static gsl::not_null<std::shared_ptr<SmbConnectionControllerService>> 
getFromProperty(const core::ProcessContext& context, const 
core::PropertyReference& property);
-
  private:
   nonstd::expected<void, std::error_code> connect();
   nonstd::expected<void, std::error_code> disconnect();
diff --git a/extensions/splunk/PutSplunkHTTP.cpp 
b/extensions/splunk/PutSplunkHTTP.cpp
index 30280335f..4abe8ee7a 100644
--- a/extensions/splunk/PutSplunkHTTP.cpp
+++ b/extensions/splunk/PutSplunkHTTP.cpp
@@ -113,7 +113,7 @@ void setFlowFileAsPayload(core::ProcessSession& session,
 
 void PutSplunkHTTP::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory& session_factory) {
   SplunkHECProcessor::onSchedule(context, session_factory);
-  ssl_context_service_ = getSSLContextService(context);
+  ssl_context_service_ = 
utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context,
 SSLContext, getUUID());
   auto create_client = [this]() -> std::unique_ptr<minifi::http::HTTPClient> {
     auto client = std::make_unique<http::HTTPClient>();
     initializeClient(*client, 
getNetworkLocation().append(getEndpoint(*client)), ssl_context_service_);
diff --git a/extensions/splunk/QuerySplunkIndexingStatus.cpp 
b/extensions/splunk/QuerySplunkIndexingStatus.cpp
index 1f416486a..819a44422 100644
--- a/extensions/splunk/QuerySplunkIndexingStatus.cpp
+++ b/extensions/splunk/QuerySplunkIndexingStatus.cpp
@@ -141,12 +141,11 @@ void 
QuerySplunkIndexingStatus::onSchedule(core::ProcessContext& context, core::
   SplunkHECProcessor::onSchedule(context, session_factory);
   max_age_ = utils::parseDurationProperty(context, MaximumWaitingTime);
   batch_size_ = utils::parseU64Property(context, MaxQuerySize);
-  initializeClient(client_, getNetworkLocation().append(getEndpoint()), 
getSSLContextService(context));
+  auto ssl_context_service = 
utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context,
 SSLContext, getUUID());
+  initializeClient(client_, getNetworkLocation().append(getEndpoint()), 
ssl_context_service);
 }
 
 void QuerySplunkIndexingStatus::onTrigger(core::ProcessContext&, 
core::ProcessSession& session) {
-  std::string ack_request;
-
   auto undetermined_flow_files = getUndeterminedFlowFiles(session, 
batch_size_);
   if (undetermined_flow_files.empty())
     return;
diff --git a/extensions/splunk/SplunkHECProcessor.cpp 
b/extensions/splunk/SplunkHECProcessor.cpp
index 58ba0827a..eae24f31b 100644
--- a/extensions/splunk/SplunkHECProcessor.cpp
+++ b/extensions/splunk/SplunkHECProcessor.cpp
@@ -40,12 +40,6 @@ std::string SplunkHECProcessor::getNetworkLocation() const {
   return hostname_ + ":" + port_;
 }
 
-std::shared_ptr<minifi::controllers::SSLContextServiceInterface> 
SplunkHECProcessor::getSSLContextService(core::ProcessContext& context) const {
-  if (const auto context_name = context.getProperty(SSLContext); context_name 
&& !IsNullOrEmpty(*context_name))
-    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(context.getControllerService(*context_name,
 getUUID()));
-  return nullptr;
-}
-
 void SplunkHECProcessor::initializeClient(http::HTTPClient& client, const 
std::string &url, 
std::shared_ptr<minifi::controllers::SSLContextServiceInterface> 
ssl_context_service) const {
   client.initialize(http::HttpRequestMethod::POST, url, 
std::move(ssl_context_service));
   client.setRequestHeader("Authorization", token_);
diff --git a/extensions/splunk/SplunkHECProcessor.h 
b/extensions/splunk/SplunkHECProcessor.h
index 766f2ee3b..dbc58ebf5 100644
--- a/extensions/splunk/SplunkHECProcessor.h
+++ b/extensions/splunk/SplunkHECProcessor.h
@@ -77,7 +77,6 @@ class SplunkHECProcessor : public core::ProcessorImpl {
 
  protected:
   std::string getNetworkLocation() const;
-  std::shared_ptr<minifi::controllers::SSLContextServiceInterface> 
getSSLContextService(core::ProcessContext& context) const;
   void initializeClient(http::HTTPClient& client, const std::string &url, 
std::shared_ptr<minifi::controllers::SSLContextServiceInterface> 
ssl_context_service) const;
 
   std::string token_;
diff --git a/extensions/sql/processors/SQLProcessor.cpp 
b/extensions/sql/processors/SQLProcessor.cpp
index 9bde15f7f..8c88c535d 100644
--- a/extensions/sql/processors/SQLProcessor.cpp
+++ b/extensions/sql/processors/SQLProcessor.cpp
@@ -24,21 +24,12 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "Exception.h"
+#include "utils/ProcessorConfigUtils.h"
 
 namespace org::apache::nifi::minifi::processors {
 
 void SQLProcessor::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
-  std::string controllerService = 
context.getProperty(DBControllerService).value_or("");
-
-  if (auto service = context.getControllerService(controllerService, 
getUUID())) {
-    db_service_ = 
std::dynamic_pointer_cast<sql::controllers::DatabaseService>(service);
-    if (!db_service_) {
-      throw minifi::Exception(PROCESSOR_EXCEPTION, "'" + controllerService + 
"' is not a DatabaseService");
-    }
-  } else {
-    throw minifi::Exception(PROCESSOR_EXCEPTION, "Could not find controller 
service '" + controllerService + "'");
-  }
-
+  db_service_ = 
utils::parseControllerService<sql::controllers::DatabaseService>(context, 
DBControllerService, getUUID());
   processOnSchedule(context);
 }
 
diff --git a/extensions/standard-processors/modbus/FetchModbusTcp.cpp 
b/extensions/standard-processors/modbus/FetchModbusTcp.cpp
index eacea365b..e6667ab49 100644
--- a/extensions/standard-processors/modbus/FetchModbusTcp.cpp
+++ b/extensions/standard-processors/modbus/FetchModbusTcp.cpp
@@ -58,18 +58,13 @@ void FetchModbusTcp::onSchedule(core::ProcessContext& 
context, core::ProcessSess
     connections_.emplace();
   }
 
-  ssl_context_.reset();
-  if (const auto controller_service_name = 
context.getProperty(SSLContextService); controller_service_name && 
!IsNullOrEmpty(*controller_service_name)) {
-    if (auto controller_service = 
context.getControllerService(*controller_service_name, getUUID())) {
-      if (const auto ssl_context_service = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(controller_service))
 {
-        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
-      } else {
-        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *controller_service_name + 
" is not an SSL Context Service");
-      }
-    } else {
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: 
" + *controller_service_name);
+  ssl_context_ = [&]() -> std::optional<asio::ssl::context> {
+    auto service = 
utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context,
 SSLContextService, getUUID());
+    if (service) {
+      return {utils::net::getSslContext(*service)};
     }
-  }
+    return std::nullopt;
+  }();
 }
 
 void FetchModbusTcp::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
@@ -208,7 +203,6 @@ auto 
FetchModbusTcp::sendRequestsAndReadResponses(utils::net::ConnectionHandlerB
 
 auto 
FetchModbusTcp::sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& 
connection_handler,
     const ReadModbusFunction& read_modbus_function) -> 
asio::awaitable<nonstd::expected<core::RecordField, std::error_code>> {
-  std::string result;
   if (auto connection_error = co_await 
connection_handler.setupUsableSocket(io_context_)) {  // NOLINT (clang tidy 
doesnt like coroutines)
     co_return nonstd::make_unexpected(connection_error);
   }
diff --git a/extensions/standard-processors/processors/GetTCP.cpp 
b/extensions/standard-processors/processors/GetTCP.cpp
index 2e51b3ace..221573572 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -71,22 +71,6 @@ char GetTCP::parseDelimiter(core::ProcessContext& context) {
   return delimiter;
 }
 
-std::optional<asio::ssl::context> 
GetTCP::parseSSLContext(core::ProcessContext& context) const {
-  std::optional<asio::ssl::context> ssl_context;
-  if (auto context_name = context.getProperty(SSLContextService)) {
-    if (auto controller_service = context.getControllerService(*context_name, 
getUUID())) {
-      if (auto ssl_context_service = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(context.getControllerService(*context_name,
 getUUID()))) {
-        ssl_context = utils::net::getSslContext(*ssl_context_service);
-      } else {
-        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not 
an SSL Context Service");
-      }
-    } else {
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: 
" + *context_name);
-    }
-  }
-  return ssl_context;
-}
-
 uint64_t GetTCP::parseMaxBatchSize(core::ProcessContext& context) {
   const auto max_batch_size = utils::parseU64Property(context, MaxBatchSize);
   if (max_batch_size == 0) {
@@ -98,7 +82,12 @@ uint64_t GetTCP::parseMaxBatchSize(core::ProcessContext& 
context) {
 void GetTCP::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
   auto connections_to_make = parseEndpointList(context);
   auto delimiter = parseDelimiter(context);
-  auto ssl_context = parseSSLContext(context);
+  auto ssl_context = [&]() -> std::optional<asio::ssl::context> {
+    if (auto ssl_context_service = 
utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context,
 SSLContextService, getUUID())) {
+      return {utils::net::getSslContext(*ssl_context_service)};
+    }
+    return std::nullopt;
+  }();
 
   std::optional<size_t> max_queue_size = 
utils::parseOptionalU64Property(context, MaxQueueSize);
   std::optional<size_t> max_message_size = 
utils::parseOptionalU64Property(context, MaxMessageSize);
diff --git a/extensions/standard-processors/processors/GetTCP.h 
b/extensions/standard-processors/processors/GetTCP.h
index 9c8b16363..dc4e9827f 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -137,7 +137,6 @@ class GetTCP : public core::ProcessorImpl {
 
   std::vector<utils::net::ConnectionId> 
parseEndpointList(core::ProcessContext& context);
   static char parseDelimiter(core::ProcessContext& context);
-  std::optional<asio::ssl::context> parseSSLContext(core::ProcessContext& 
context) const;
   static uint64_t parseMaxBatchSize(core::ProcessContext& context);
 
   class TcpClient {
diff --git a/extensions/standard-processors/processors/InvokeHTTP.cpp 
b/extensions/standard-processors/processors/InvokeHTTP.cpp
index 084065cd4..269479ede 100644
--- a/extensions/standard-processors/processors/InvokeHTTP.cpp
+++ b/extensions/standard-processors/processors/InvokeHTTP.cpp
@@ -159,22 +159,13 @@ void InvokeHTTP::setupMembersFromProperties(const 
core::ProcessContext& context)
 
   proxy_.host = context.getProperty(InvokeHTTP::ProxyHost).value_or("");
   proxy_.port = (context.getProperty(InvokeHTTP::ProxyPort) | 
utils::andThen(parsing::parseIntegral<int>)).value_or(0);
-  std::string port_str;
   proxy_.username = 
context.getProperty(InvokeHTTP::ProxyUsername).value_or("");
   proxy_.password = 
context.getProperty(InvokeHTTP::ProxyPassword).value_or("");
 
   follow_redirects_ = utils::parseBoolProperty(context, FollowRedirects);  // 
Shouldn't fail due to default value;
   content_type_ = utils::parseProperty(context, InvokeHTTP::ContentType);  // 
Shouldn't fail due to default value;
 
-  if (auto ssl_context_name = context.getProperty(SSLContext)) {
-    if (auto service = context.getControllerService(*ssl_context_name, 
getUUID())) {
-      ssl_context_service_ = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(service);
-      if (!ssl_context_service_)
-        logger_->log_error("Controller service '{}' is not an 
SSLContextService", *ssl_context_name);
-    } else {
-      logger_->log_error("Couldn't find controller service with name '{}'", 
*ssl_context_name);
-    }
-  }
+  ssl_context_service_ = 
utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context,
 SSLContext, getUUID());
 }
 
 gsl::not_null<std::unique_ptr<http::HTTPClient>> 
InvokeHTTP::createHTTPClientFromMembers(const std::string& url) const {
diff --git a/extensions/standard-processors/processors/PutTCP.cpp 
b/extensions/standard-processors/processors/PutTCP.cpp
index f5a113238..a6c55b0d7 100644
--- a/extensions/standard-processors/processors/PutTCP.cpp
+++ b/extensions/standard-processors/processors/PutTCP.cpp
@@ -70,17 +70,8 @@ void PutTCP::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFacto
   else
     connections_.emplace();
 
-  ssl_context_.reset();
-  if (const auto context_name = context.getProperty(SSLContextService); 
context_name && !IsNullOrEmpty(*context_name)) {
-    if (auto controller_service = context.getControllerService(*context_name, 
getUUID())) {
-      if (const auto ssl_context_service = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(context.getControllerService(*context_name,
 getUUID()))) {
-        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
-      } else {
-        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not 
an SSL Context Service");
-      }
-    } else {
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: 
" + *context_name);
-    }
+  if (auto ssl_context_service = 
utils::parseOptionalControllerService<minifi::controllers::SSLContextServiceInterface>(context,
 SSLContextService, getUUID())) {
+    ssl_context_ = {utils::net::getSslContext(*ssl_context_service)};
   }
 
   const auto delimiter_str = 
context.getProperty(OutgoingMessageDelimiter).value_or(std::string{});
diff --git a/extensions/standard-processors/processors/SplitRecord.cpp 
b/extensions/standard-processors/processors/SplitRecord.cpp
index 56099608c..52016653b 100644
--- a/extensions/standard-processors/processors/SplitRecord.cpp
+++ b/extensions/standard-processors/processors/SplitRecord.cpp
@@ -21,34 +21,15 @@
 #include "utils/GeneralUtils.h"
 
 namespace org::apache::nifi::minifi::processors {
-namespace {
-template<typename RecordSetIO>
-std::shared_ptr<RecordSetIO> getRecordSetIO(core::ProcessContext& context, 
const core::PropertyReference& property, const utils::Identifier& 
processor_uuid) {
-  std::string service_name = context.getProperty(property).value_or("");
-  if (!IsNullOrEmpty(service_name)) {
-    auto record_set_io = 
std::dynamic_pointer_cast<RecordSetIO>(context.getControllerService(service_name,
 processor_uuid));
-    if (!record_set_io)
-      return nullptr;
-    return record_set_io;
-  }
-  return nullptr;
-}
-}  // namespace
 
 void SplitRecord::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
-  record_set_reader_ = getRecordSetIO<core::RecordSetReader>(context, 
RecordReader, getUUID());
-  if (!record_set_reader_) {
-    throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Record Reader 
property is missing or invalid");
-  }
-  record_set_writer_ = getRecordSetIO<core::RecordSetWriter>(context, 
RecordWriter, getUUID());
-  if (!record_set_writer_) {
-    throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Record Writer 
property is missing or invalid");
-  }
+  record_set_reader_ = 
utils::parseControllerService<core::RecordSetReader>(context, RecordReader, 
getUUID());
+  record_set_writer_ = 
utils::parseControllerService<core::RecordSetWriter>(context, RecordWriter, 
getUUID());
 }
 
 nonstd::expected<std::size_t, std::string> 
SplitRecord::readRecordsPerSplit(core::ProcessContext& context, const 
core::FlowFile& original_flow_file) {
   return context.getProperty(RecordsPerSplit, &original_flow_file)
-      | utils::andThen([](const auto records_per_split_str) {
+      | utils::andThen([](const auto& records_per_split_str) {
             return 
parsing::parseIntegralMinMax<std::size_t>(records_per_split_str, 1, 
std::numeric_limits<std::size_t>::max());
           })
       | utils::transformError([](std::error_code) -> std::string { return 
std::string{"Records Per Split should be set to a number larger than 0"}; });
diff --git a/extensions/standard-processors/processors/TailFile.cpp 
b/extensions/standard-processors/processors/TailFile.cpp
index 2e61155c1..f5ae3c837 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -44,10 +44,10 @@
 
 namespace org::apache::nifi::minifi::processors {
 
-const char *TailFile::CURRENT_STR = "CURRENT.";
-const char *TailFile::POSITION_STR = "POSITION.";
-
 namespace {
+inline constexpr std::string_view CURRENT_STR = "CURRENT.";
+inline constexpr std::string_view POSITION_STR = "POSITION.";
+
 template<typename Container, typename Key>
 bool containsKey(const Container &container, const Key &key) {
   return container.find(key) != container.end();
@@ -269,7 +269,10 @@ void TailFile::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFac
     tail_mode_ = Mode::MULTIPLE;
     pattern_regex_ = utils::Regex(file_name_str);
 
-    parseAttributeProviderServiceProperty(context);
+    if (auto service = 
utils::parseOptionalControllerService<minifi::controllers::AttributeProviderService>(context,
 AttributeProviderService, getUUID())) {
+      // we drop ownership of the service here -- in the long term, 
getControllerService/parseControllerService should return a non-owning pointer 
or optional reference
+      attribute_provider_service_ = service.get();
+    }
 
     if (auto base_dir = context.getProperty(BaseDirectory); !base_dir) {
       throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base 
directory is required for multiple tail mode.");
@@ -309,24 +312,6 @@ void TailFile::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFac
   if (batch_size_ == 0) { batch_size_.reset(); }
 }
 
-void TailFile::parseAttributeProviderServiceProperty(const 
core::ProcessContext& context) {
-  const auto attribute_provider_service_name = 
context.getProperty(AttributeProviderService);
-  if (!attribute_provider_service_name || 
attribute_provider_service_name->empty()) {
-    return;
-  }
-
-  std::shared_ptr<core::controller::ControllerService> controller_service = 
context.getControllerService(*attribute_provider_service_name, getUUID());
-  if (!controller_service) {
-    throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
utils::string::join_pack("Controller service '", 
*attribute_provider_service_name, "' not found")};
-  }
-
-  // we drop ownership of the service here -- in the long term, 
getControllerService() should return a non-owning pointer or optional reference
-  attribute_provider_service_ = 
dynamic_cast<minifi::controllers::AttributeProviderService*>(controller_service.get());
-  if (!attribute_provider_service_) {
-    throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
utils::string::join_pack("Controller service '", 
*attribute_provider_service_name, "' is not an AttributeProviderService")};
-  }
-}
-
 void TailFile::parseStateFileLine(char *buf, std::map<std::filesystem::path, 
TailState> &state) const {
   char *line = buf;
 
@@ -379,8 +364,8 @@ void TailFile::parseStateFileLine(char *buf, 
std::map<std::filesystem::path, Tai
     logger_->log_debug("Received position {}", position);
     state.begin()->second.position_ = gsl::narrow<uint64_t>(position);
   }
-  if (key.find(CURRENT_STR) == 0) {
-    const auto file = key.substr(strlen(CURRENT_STR));
+  if (key.starts_with(CURRENT_STR)) {
+    const auto file = key.substr(CURRENT_STR.size());
     std::filesystem::path file_path = value;
     if (file_path.has_filename() && file_path.has_parent_path()) {
       state[file].path_ = file_path.parent_path();
@@ -390,8 +375,8 @@ void TailFile::parseStateFileLine(char *buf, 
std::map<std::filesystem::path, Tai
     }
   }
 
-  if (key.find(POSITION_STR) == 0) {
-    const auto file = key.substr(strlen(POSITION_STR));
+  if (key.starts_with(POSITION_STR)) {
+    const auto file = key.substr(POSITION_STR.size());
     state[file].position_ = std::stoull(value);
   }
 }
diff --git a/extensions/standard-processors/processors/TailFile.h 
b/extensions/standard-processors/processors/TailFile.h
index 0095770da..006404428 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -256,7 +256,6 @@ class TailFile : public core::ProcessorImpl {
     TimePoint mtime_;
   };
 
-  void parseAttributeProviderServiceProperty(const core::ProcessContext& 
context);
   void parseStateFileLine(char *buf, std::map<std::filesystem::path, 
TailState> &state) const;
   void processAllRotatedFiles(core::ProcessSession& session, TailState &state);
   void processRotatedFiles(core::ProcessSession& session, TailState &state, 
std::vector<TailState> &rotated_file_states);
@@ -284,8 +283,6 @@ class TailFile : public core::ProcessorImpl {
   static void updateStateAttributes(TailState &state, uint64_t size, uint64_t 
checksum);
   bool isOldFileInitiallyRead(const TailState &state) const;
 
-  static const char *CURRENT_STR;
-  static const char *POSITION_STR;
   static constexpr int BUFFER_SIZE = 512;
 
   std::optional<char> delimiter_;  // Delimiter for the data incoming from the 
tailed file.
diff --git a/libminifi/test/integration/C2ControllerEnableFailureTest.cpp 
b/libminifi/test/integration/C2ControllerEnableFailureTest.cpp
index 3cc01e1ff..0fc6ab3e8 100644
--- a/libminifi/test/integration/C2ControllerEnableFailureTest.cpp
+++ b/libminifi/test/integration/C2ControllerEnableFailureTest.cpp
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 #include <string>
+#include <utility>
 
 #include "unit/TestBase.h"
 #include "integration/HTTPIntegrationBase.h"
@@ -24,6 +25,7 @@
 #include "core/Processor.h"
 #include "core/controller/ControllerService.h"
 #include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -85,13 +87,7 @@ class DummmyControllerUserProcessor : public 
minifi::core::ProcessorImpl {
   }
 
   void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& 
/*session_factory*/) override {
-    if (auto controller_service = 
context.getProperty(DummmyControllerUserProcessor::DummyControllerService)) {
-      if 
(!std::dynamic_pointer_cast<DummyController>(context.getControllerService(*controller_service,
 getUUID()))) {
-        throw 
minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid 
controller service");
-      }
-    } else {
-      throw 
minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing 
controller service");
-    }
+    std::ignore = 
minifi::utils::parseControllerService<DummyController>(context, 
DummyControllerService, getUUID());
     logger_->log_debug("DummyControllerUserProcessor::onSchedule successful");
   }
 
@@ -143,7 +139,8 @@ class ControllerUpdateHandler: public HeartbeatHandler {
       case TestState::VERIFY_INITIAL_METRICS: {
         sendEmptyHeartbeatResponse(conn);
         REQUIRE(minifi::test::utils::verifyLogLinePresenceInPollTime(5s, 
"Could not enable DummyController"));
-        REQUIRE(minifi::test::utils::verifyLogLinePresenceInPollTime(5s, 
"(DummmyControllerUserProcessor): Process Schedule Operation: Invalid 
controller service"));
+        REQUIRE(minifi::test::utils::verifyLogLinePresenceInPollTime(5s,
+            "(DummmyControllerUserProcessor): Process Schedule Operation: 
Controller service 'Dummy Controller Service' = 'DummyController' not found"));
         test_state_ = TestState::SEND_NEW_CONFIG;
         break;
       }
diff --git a/libminifi/test/unit/ProcessorConfigUtilsTests.cpp 
b/libminifi/test/unit/ProcessorConfigUtilsTests.cpp
index 9e03584f0..24f775a63 100644
--- a/libminifi/test/unit/ProcessorConfigUtilsTests.cpp
+++ b/libminifi/test/unit/ProcessorConfigUtilsTests.cpp
@@ -17,11 +17,15 @@
 
 #include "unit/TestBase.h"
 #include "unit/Catch.h"
+#include "core/controller/ControllerService.h"
+#include "core/controller/ControllerServiceNode.h"
+#include "core/controller/ControllerServiceProvider.h"
 #include "core/PropertyDefinition.h"
 #include "core/ProcessorImpl.h"
 #include "core/PropertyDefinitionBuilder.h"
 #include "utils/ProcessorConfigUtils.h"
 #include "utils/Enum.h"
+#include "utils/Id.h"
 #include "unit/TestUtils.h"
 
 namespace org::apache::nifi::minifi::core {
@@ -48,6 +52,7 @@ enum class TestEnum {
   A,
   B
 };
+}  // namespace
 
 TEST_CASE("Parse enum property") {
   static constexpr auto prop = 
PropertyDefinitionBuilder<magic_enum::enum_count<TestEnum>()>::createProperty("prop")
@@ -84,5 +89,99 @@ TEST_CASE("Parse enum property") {
   }
 }
 
+namespace {
+class TestControllerService : public controller::ControllerServiceImpl {
+ public:
+  using ControllerServiceImpl::ControllerServiceImpl;
+  bool supportsDynamicProperties() const override { return false;  }
+  void yield() override {}
+  bool isRunning() const override { return false; }
+  bool isWorkAvailable() override { return false; }
+};
+
+const auto test_controller_service = []() {
+  auto service = 
std::make_shared<TestControllerService>("test-controller-service", 
utils::IdGenerator::getIdGenerator()->generate());
+  service->initialize();
+  return service;
+}();
+
+class WrongTestControllerService : public TestControllerService {};
+
+class TestControllerServiceProvider : public 
controller::ControllerServiceProviderImpl {
+ public:
+  using ControllerServiceProviderImpl::ControllerServiceProviderImpl;
+  std::shared_ptr<controller::ControllerServiceNode> 
createControllerService(const std::string&, const std::string&) override { 
return nullptr; }
+  void clearControllerServices() override {}
+  void enableAllControllerServices() override {}
+  void disableAllControllerServices() override {}
+
+  std::shared_ptr<controller::ControllerService> getControllerService(const 
std::string& name, const utils::Identifier&) const override {
+    if (name == "TestControllerService") { return test_controller_service; }
+    return nullptr;
+  }
+};
+
+TestControllerServiceProvider 
test_controller_service_provider("test-provider");
 }  // namespace
+
+TEST_CASE("Parse controller service property") {
+  static constexpr auto property = 
PropertyDefinitionBuilder<>::createProperty("Controller Service")
+      .withAllowedTypes<TestControllerService>()
+      .build();
+  auto processor = 
minifi::test::utils::make_processor<TestProcessor>("test-processor");
+  processor->getImpl<TestProcessor>().on_initialize_ = [&] (auto& self) {
+    
self.setSupportedProperties(std::to_array<core::PropertyReference>({property}));
+  };
+  processor->initialize();
+  auto configuration = minifi::Configure::create();
+  ProcessContextImpl context(*processor, &test_controller_service_provider, 
nullptr, nullptr, configuration, nullptr);
+
+  SECTION("Required controller service property") {
+    SECTION("... is valid") {
+      REQUIRE(processor->setProperty(property.name, "TestControllerService"));
+      const auto value = 
utils::parseControllerService<TestControllerService>(context, property, 
processor->getUUID());
+      CHECK(value == test_controller_service);
+    }
+    SECTION("... is missing") {
+      
CHECK_THROWS(utils::parseControllerService<TestControllerService>(context, 
property, processor->getUUID()));
+    }
+    SECTION("... is blank") {
+      REQUIRE(processor->setProperty(property.name, ""));
+      
CHECK_THROWS(utils::parseControllerService<TestControllerService>(context, 
property, processor->getUUID()));
+    }
+    SECTION("... is invalid") {
+      REQUIRE(processor->setProperty(property.name, 
"NonExistentControllerService"));
+      
CHECK_THROWS(utils::parseControllerService<TestControllerService>(context, 
property, processor->getUUID()));
+    }
+    SECTION("... is not the correct class") {
+      REQUIRE(processor->setProperty(property.name, "TestControllerService"));
+      
CHECK_THROWS(utils::parseControllerService<WrongTestControllerService>(context, 
property, processor->getUUID()));
+    }
+  }
+
+  SECTION("Optional controller service property") {
+    SECTION("... is valid") {
+      REQUIRE(processor->setProperty(property.name, "TestControllerService"));
+      const auto value = 
utils::parseOptionalControllerService<TestControllerService>(context, property, 
processor->getUUID());;
+      CHECK(value == test_controller_service);
+    }
+    SECTION("... is missing") {
+      const auto value = 
utils::parseOptionalControllerService<TestControllerService>(context, property, 
processor->getUUID());;
+      CHECK(value == nullptr);
+    }
+    SECTION("... is blank") {
+      REQUIRE(processor->setProperty(property.name, ""));
+      const auto value = 
utils::parseOptionalControllerService<TestControllerService>(context, property, 
processor->getUUID());;
+      CHECK(value == nullptr);
+    }
+    SECTION("... is invalid") {
+      REQUIRE(processor->setProperty(property.name, 
"NonExistentControllerService"));
+      
CHECK_THROWS(utils::parseOptionalControllerService<TestControllerService>(context,
 property, processor->getUUID()));
+    }
+    SECTION("... is not the correct class") {
+      REQUIRE(processor->setProperty(property.name, "TestControllerService"));
+      
CHECK_THROWS(utils::parseOptionalControllerService<WrongTestControllerService>(context,
 property, processor->getUUID()));
+    }
+  }
+}
 }  // namespace org::apache::nifi::minifi::core

Reply via email to