This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch minifi-api-reduced
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 6a55322642230ae44b792d0d5b390d7a6f09ed81
Author: Adam Debreceni <[email protected]>
AuthorDate: Thu Jun 19 15:57:53 2025 +0200

    Rebase fix
---
 extensions/aws/processors/AwsProcessor.cpp         |  5 ---
 extensions/aws/processors/AwsProcessor.h           |  6 +--
 extensions/aws/processors/PutKinesisStream.h       |  8 +---
 extensions/aws/processors/S3Processor.cpp          |  2 +-
 extensions/aws/processors/S3Processor.h            |  1 -
 extensions/aws/tests/PutKinesisStreamTests.cpp     | 39 ++++++++----------
 extensions/kafka/ConsumeKafka.cpp                  |  2 +-
 extensions/python/ExecutePythonProcessor.h         |  5 ++-
 extensions/python/PythonCreator.h                  |  4 ++
 libminifi/include/core/Processor.h                 | 10 +++++
 libminifi/src/core/ProcessContext.cpp              |  4 ++
 libminifi/src/core/Processor.cpp                   | 28 +++++++++++++
 .../integration/C2ControllerEnableFailureTest.cpp  |  9 +----
 libminifi/test/integration/C2MetricsTest.cpp       | 46 ++++++++++++----------
 libminifi/test/unit/BulletinStoreTests.cpp         |  3 +-
 .../include/minifi-cpp/core/ProcessContext.h       |  2 +
 minifi-api/include/minifi-cpp/core/ProcessorApi.h  |  1 +
 utils/include/core/ProcessContext.h                |  1 +
 utils/include/core/ProcessorImpl.h                 |  2 +
 19 files changed, 106 insertions(+), 72 deletions(-)

diff --git a/extensions/aws/processors/AwsProcessor.cpp 
b/extensions/aws/processors/AwsProcessor.cpp
index 4ff850ee0..cd442abb7 100644
--- a/extensions/aws/processors/AwsProcessor.cpp
+++ b/extensions/aws/processors/AwsProcessor.cpp
@@ -32,11 +32,6 @@
 
 namespace org::apache::nifi::minifi::aws::processors {
 
-AwsProcessor::AwsProcessor(std::string_view name, const 
minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger)
-  : core::ProcessorImpl(name, uuid),
-    logger_(std::move(logger)) {
-}
-
 std::optional<Aws::Auth::AWSCredentials> 
AwsProcessor::getAWSCredentialsFromControllerService(core::ProcessContext& 
context) const {
   if (const auto aws_credentials_service = 
minifi::utils::parseOptionalControllerService<controllers::AWSCredentialsService>(context,
 AWSCredentialsProviderService, getUUID())) {
     return (*aws_credentials_service)->getAWSCredentials();
diff --git a/extensions/aws/processors/AwsProcessor.h 
b/extensions/aws/processors/AwsProcessor.h
index 9f0055e2e..542d0c75b 100644
--- a/extensions/aws/processors/AwsProcessor.h
+++ b/extensions/aws/processors/AwsProcessor.h
@@ -32,7 +32,7 @@
 #include "core/PropertyDefinition.h"
 #include "core/PropertyDefinitionBuilder.h"
 #include "minifi-cpp/core/PropertyValidator.h"
-#include "core/Processor.h"
+#include "core/ProcessorImpl.h"
 
 
 namespace org::apache::nifi::minifi::aws::processors {
@@ -169,8 +169,7 @@ class AwsProcessor : public core::ProcessorImpl {
       UseDefaultCredentials
   });
 
-
-  explicit AwsProcessor(std::string_view name, const 
minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger);
+  using ProcessorImpl::ProcessorImpl;
 
   void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& 
session_factory) override;
 
@@ -180,7 +179,6 @@ class AwsProcessor : public core::ProcessorImpl {
   aws::ProxyOptions getProxy(core::ProcessContext& context, const 
core::FlowFile* const flow_file);
   std::optional<CommonProperties> 
getCommonELSupportedProperties(core::ProcessContext& context, const 
core::FlowFile* flow_file);
 
-  std::shared_ptr<core::logging::Logger> logger_;
   std::optional<Aws::Client::ClientConfiguration> client_config_;
 };
 
diff --git a/extensions/aws/processors/PutKinesisStream.h 
b/extensions/aws/processors/PutKinesisStream.h
index c30483a20..73d308dcf 100644
--- a/extensions/aws/processors/PutKinesisStream.h
+++ b/extensions/aws/processors/PutKinesisStream.h
@@ -87,13 +87,7 @@ class PutKinesisStream : public AwsProcessor {
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
-  explicit PutKinesisStream(const std::string& name, const 
minifi::utils::Identifier& uuid = minifi::utils::Identifier())
-    : AwsProcessor(name, uuid, 
core::logging::LoggerFactory<PutKinesisStream>::getLogger(uuid)) {
-  }
-  PutKinesisStream(const PutKinesisStream&) = delete;
-  PutKinesisStream(PutKinesisStream&&) = delete;
-  PutKinesisStream& operator=(const PutKinesisStream&) = delete;
-  PutKinesisStream& operator=(PutKinesisStream&&) = delete;
+  using AwsProcessor::AwsProcessor;
   ~PutKinesisStream() override = default;
 
   void initialize() override;
diff --git a/extensions/aws/processors/S3Processor.cpp 
b/extensions/aws/processors/S3Processor.cpp
index e45c3fa1c..6bb6402f4 100644
--- a/extensions/aws/processors/S3Processor.cpp
+++ b/extensions/aws/processors/S3Processor.cpp
@@ -33,7 +33,7 @@
 namespace org::apache::nifi::minifi::aws::processors {
 
 S3Processor::S3Processor(core::ProcessorMetadata info, 
std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
-  : core::ProcessorImpl(info),
+  : AwsProcessor(info),
     s3_wrapper_(std::move(s3_request_sender)) {
 }
 
diff --git a/extensions/aws/processors/S3Processor.h 
b/extensions/aws/processors/S3Processor.h
index d3d0a485b..cdcee4c4c 100644
--- a/extensions/aws/processors/S3Processor.h
+++ b/extensions/aws/processors/S3Processor.h
@@ -30,7 +30,6 @@
 #include "AwsProcessor.h"
 #include "S3Wrapper.h"
 #include "aws/core/auth/AWSCredentialsProvider.h"
-#include "core/Processor.h"
 #include "core/Property.h"
 #include "core/PropertyDefinition.h"
 #include "core/PropertyDefinitionBuilder.h"
diff --git a/extensions/aws/tests/PutKinesisStreamTests.cpp 
b/extensions/aws/tests/PutKinesisStreamTests.cpp
index bbfda2b9c..b6532b5b3 100644
--- a/extensions/aws/tests/PutKinesisStreamTests.cpp
+++ b/extensions/aws/tests/PutKinesisStreamTests.cpp
@@ -24,6 +24,7 @@
 #include "unit/Catch.h"
 #include "unit/SingleProcessorTestController.h"
 #include "unit/TestBase.h"
+#include "unit/ProcessorUtils.h"
 
 namespace org::apache::nifi::minifi::aws::processors::test {
 
@@ -86,14 +87,7 @@ class PutKinesisStreamMocked final : public 
aws::processors::PutKinesisStream {
  public:
   static constexpr const char* Description = "PutKinesisStreamMocked";
 
-  explicit PutKinesisStreamMocked(const std::string& name, const 
minifi::utils::Identifier& uuid = minifi::utils::Identifier())
-  : PutKinesisStream(name, uuid) {
-  }
-
-  PutKinesisStreamMocked(const PutKinesisStreamMocked&) = delete;
-  PutKinesisStreamMocked(PutKinesisStreamMocked&&) = delete;
-  PutKinesisStreamMocked& operator=(const PutKinesisStreamMocked&) = delete;
-  PutKinesisStreamMocked& operator=(PutKinesisStreamMocked&&) = delete;
+  using PutKinesisStream::PutKinesisStream;
 
   ~PutKinesisStreamMocked() override = default;
 
@@ -110,10 +104,10 @@ class PutKinesisStreamMocked final : public 
aws::processors::PutKinesisStream {
 REGISTER_RESOURCE(PutKinesisStreamMocked, Processor);
 
 TEST_CASE("PutKinesisStream record size mismatch path") {
-  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
-  auto put_kinesis_stream = 
dynamic_cast<PutKinesisStreamMocked*>(controller.getProcessor());
+  minifi::test::SingleProcessorTestController 
controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor<PutKinesisStreamMocked>();
   REQUIRE(put_kinesis_stream);
-  put_kinesis_stream->behaviour_ = 
MockKinesisClient::KinesisBehaviour::RecordSizeMismatch;
+  put_kinesis_stream.get().behaviour_ = 
MockKinesisClient::KinesisBehaviour::RecordSizeMismatch;
 
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
@@ -134,10 +128,10 @@ TEST_CASE("PutKinesisStream record size mismatch path") {
 }
 
 TEST_CASE("PutKinesisStream record size failure path") {
-  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
-  auto put_kinesis_stream = 
dynamic_cast<PutKinesisStreamMocked*>(controller.getProcessor());
+  minifi::test::SingleProcessorTestController 
controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor<PutKinesisStreamMocked>();
   REQUIRE(put_kinesis_stream);
-  put_kinesis_stream->behaviour_ = 
MockKinesisClient::KinesisBehaviour::Failure;
+  put_kinesis_stream.get().behaviour_ = 
MockKinesisClient::KinesisBehaviour::Failure;
 
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
@@ -160,10 +154,9 @@ TEST_CASE("PutKinesisStream record size failure path") {
 }
 
 TEST_CASE("PutKinesisStream partial failure path") {
-  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
-  auto put_kinesis_stream = 
dynamic_cast<PutKinesisStreamMocked*>(controller.getProcessor());
-  REQUIRE(put_kinesis_stream);
-  put_kinesis_stream->behaviour_ = 
MockKinesisClient::KinesisBehaviour::OddsFail;
+  minifi::test::SingleProcessorTestController 
controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
+  auto put_kinesis_stream = controller.getProcessor<PutKinesisStreamMocked>();
+  put_kinesis_stream.get().behaviour_ = 
MockKinesisClient::KinesisBehaviour::OddsFail;
 
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
@@ -187,7 +180,7 @@ TEST_CASE("PutKinesisStream partial failure path") {
 
 
 TEST_CASE("PutKinesisStream simple happy path") {
-  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  minifi::test::SingleProcessorTestController 
controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
   auto put_kinesis_stream = controller.getProcessor();
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
@@ -210,7 +203,7 @@ TEST_CASE("PutKinesisStream simple happy path") {
 }
 
 TEST_CASE("PutKinesisStream smaller batch size than available ffs") {
-  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  minifi::test::SingleProcessorTestController 
controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
   auto put_kinesis_stream = controller.getProcessor();
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
@@ -237,7 +230,7 @@ TEST_CASE("PutKinesisStream smaller batch size than 
available ffs") {
 }
 
 TEST_CASE("PutKinesisStream max batch data size fills up") {
-  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  minifi::test::SingleProcessorTestController 
controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
   auto put_kinesis_stream = controller.getProcessor();
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
@@ -268,7 +261,7 @@ TEST_CASE("PutKinesisStream max batch data size fills up") {
 }
 
 TEST_CASE("PutKinesisStream max batch data size to different streams") {
-  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  minifi::test::SingleProcessorTestController 
controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
   auto put_kinesis_stream = controller.getProcessor();
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
@@ -296,7 +289,7 @@ TEST_CASE("PutKinesisStream max batch data size to 
different streams") {
 }
 
 TEST_CASE("PutKinesisStream with too large message") {
-  minifi::test::SingleProcessorTestController 
controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream"));
+  minifi::test::SingleProcessorTestController 
controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream"));
   auto put_kinesis_stream = controller.getProcessor();
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::AccessKey, "access_key");
   controller.plan->setProperty(put_kinesis_stream, 
PutKinesisStream::SecretKey, "secret_key");
diff --git a/extensions/kafka/ConsumeKafka.cpp 
b/extensions/kafka/ConsumeKafka.cpp
index 76f317823..c8ef2f262 100644
--- a/extensions/kafka/ConsumeKafka.cpp
+++ b/extensions/kafka/ConsumeKafka.cpp
@@ -76,7 +76,7 @@ void ConsumeKafka::onSchedule(core::ProcessContext& context, 
core::ProcessSessio
   configureNewConnection(context);
   if (commit_policy_ == 
consume_kafka::CommitPolicyEnum::CommitFromIncomingFlowFiles) {
     setTriggerWhenEmpty(true);
-  } else if (hasIncomingConnections()) {
+  } else if (context.hasIncomingConnections()) {
     logger_->log_error("Incoming connections are not allowed with {}", 
magic_enum::enum_name(commit_policy_));
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Incoming 
connections are not allowed with {}", magic_enum::enum_name(commit_policy_)));
   }
diff --git a/extensions/python/ExecutePythonProcessor.h 
b/extensions/python/ExecutePythonProcessor.h
index c54f1840f..1e44b8cc3 100644
--- a/extensions/python/ExecutePythonProcessor.h
+++ b/extensions/python/ExecutePythonProcessor.h
@@ -44,7 +44,9 @@ class ExecutePythonProcessor : public core::ProcessorImpl {
       : ProcessorImpl(info),
         processor_initialized_(false),
         python_dynamic_(false),
-        reload_on_script_change_(true) {}
+        reload_on_script_change_(true) {
+    python_logger_ = 
core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName(),
 info.uuid);
+  }
 
   EXTENSIONAPI static constexpr const char* Description = "Executes a script 
given the flow file and a process session. "
       "The script is responsible for handling the incoming flow file (transfer 
to SUCCESS or remove, e.g.) as well as "
@@ -157,6 +159,7 @@ class ExecutePythonProcessor : public core::ProcessorImpl {
   bool reload_on_script_change_;
   std::optional<std::chrono::file_clock::time_point> last_script_write_time_;
   std::string script_file_path_;
+  std::shared_ptr<core::logging::Logger> python_logger_;
   std::unique_ptr<PythonScriptEngine> python_script_engine_;
   std::optional<std::string> python_class_name_;
   std::vector<std::filesystem::path> python_paths_;
diff --git a/extensions/python/PythonCreator.h 
b/extensions/python/PythonCreator.h
index 362f737db..961928085 100644
--- a/extensions/python/PythonCreator.h
+++ b/extensions/python/PythonCreator.h
@@ -68,6 +68,10 @@ class DummyLogger : public core::logging::Logger {
     return 0;
   }
 
+  void setLogCallback(const std::function<void(core::logging::LOG_LEVEL level, 
const std::string&)>& /*callback*/) override {
+    // pass
+  }
+
   ~DummyLogger() override = default;
 };
 
diff --git a/libminifi/include/core/Processor.h 
b/libminifi/include/core/Processor.h
index c9503dc18..9daec72c1 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -113,6 +113,13 @@ class Processor : public ConnectableImpl, public 
ConfigurableComponentImpl, publ
   [[nodiscard]] bool supportsDynamicRelationships() const override;
   state::response::SharedResponseNode getResponseNode() override;
   gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const;
+  std::string getProcessGroupName() const;
+  void setProcessGroupName(const std::string &name);
+  std::string getProcessGroupPath() const;
+  void setProcessGroupPath(const std::string &path);
+  logging::LOG_LEVEL getLogBulletinLevel() const;
+  void setLogBulletinLevel(logging::LOG_LEVEL level);
+  void setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, 
const std::string& message)>& callback);
   void restore(const std::shared_ptr<FlowFile>& file) override;
 
   static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{};
@@ -143,6 +150,7 @@ class Processor : public ConnectableImpl, public 
ConfigurableComponentImpl, publ
   std::string cron_period_;
 
   std::shared_ptr<logging::Logger> logger_;
+  logging::LOG_LEVEL log_bulletin_level_ = logging::LOG_LEVEL::warn;
 
  private:
   mutable std::mutex mutex_;
@@ -159,6 +167,8 @@ class Processor : public ConnectableImpl, public 
ConfigurableComponentImpl, publ
   std::unordered_map<Connection*, std::unordered_set<Processor*>> 
reachable_processors_;
 
   std::string process_group_uuid_;
+  std::string process_group_name_;
+  std::string process_group_path_;
 
  protected:
   std::unique_ptr<ProcessorApi> impl_;
diff --git a/libminifi/src/core/ProcessContext.cpp 
b/libminifi/src/core/ProcessContext.cpp
index 3e453d58c..985a23271 100644
--- a/libminifi/src/core/ProcessContext.cpp
+++ b/libminifi/src/core/ProcessContext.cpp
@@ -104,6 +104,10 @@ nonstd::expected<std::string, std::error_code> 
ProcessContextImpl::getRawPropert
   return getProcessor().getProperty(name);
 }
 
+nonstd::expected<std::string, std::error_code> 
ProcessContextImpl::getRawDynamicProperty(const std::string_view name) const {
+  return getProcessor().getDynamicProperty(name);
+}
+
 nonstd::expected<void, std::error_code> 
ProcessContextImpl::setDynamicProperty(std::string name, std::string value) {
   return getProcessor().setDynamicProperty(std::move(name), std::move(value));
 }
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 2ff4dc802..e0df1a394 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -469,6 +469,34 @@ void Processor::setProcessGroupUUIDStr(const std::string 
&uuid) {
   process_group_uuid_ = uuid;
 }
 
+std::string Processor::getProcessGroupName() const {
+  return process_group_name_;
+}
+
+void Processor::setProcessGroupName(const std::string &name) {
+  process_group_name_ = name;
+}
+
+std::string Processor::getProcessGroupPath() const {
+  return process_group_path_;
+}
+
+void Processor::setProcessGroupPath(const std::string &path) {
+  process_group_path_ = path;
+}
+
+logging::LOG_LEVEL Processor::getLogBulletinLevel() const {
+  return log_bulletin_level_;
+}
+
+void Processor::setLogBulletinLevel(logging::LOG_LEVEL level) {
+  log_bulletin_level_ = level;
+}
+
+void Processor::setLoggerCallback(const std::function<void(logging::LOG_LEVEL 
level, const std::string& message)>& callback) {
+  impl_->setLoggerCallback(callback);
+}
+
 std::chrono::steady_clock::time_point Processor::getYieldExpirationTime() 
const {
   return yield_expiration_;
 }
diff --git a/libminifi/test/integration/C2ControllerEnableFailureTest.cpp 
b/libminifi/test/integration/C2ControllerEnableFailureTest.cpp
index 79267b20b..3cc01e1ff 100644
--- a/libminifi/test/integration/C2ControllerEnableFailureTest.cpp
+++ b/libminifi/test/integration/C2ControllerEnableFailureTest.cpp
@@ -72,11 +72,9 @@ class DummyController : public 
core::controller::ControllerServiceImpl {
 REGISTER_RESOURCE(DummyController, ControllerService);
 
 class DummmyControllerUserProcessor : public minifi::core::ProcessorImpl {
+ public:
   using minifi::core::ProcessorImpl::ProcessorImpl;
 
- public:
-  DummmyControllerUserProcessor(std::string_view name, const 
minifi::utils::Identifier& uuid) : ProcessorImpl(name, uuid) {}
-  explicit DummmyControllerUserProcessor(std::string_view name) : 
ProcessorImpl(name) {}
   static constexpr auto DummyControllerService = 
core::PropertyDefinitionBuilder<>::createProperty("Dummy Controller Service")
     .withDescription("Dummy Controller Service")
     .withAllowedTypes<DummyController>()
@@ -88,7 +86,7 @@ class DummmyControllerUserProcessor : public 
minifi::core::ProcessorImpl {
 
   void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& 
/*session_factory*/) override {
     if (auto controller_service = 
context.getProperty(DummmyControllerUserProcessor::DummyControllerService)) {
-      if 
(!std::dynamic_pointer_cast<DummyController>(context.getControllerService(*controller_service,
 uuid_))) {
+      if 
(!std::dynamic_pointer_cast<DummyController>(context.getControllerService(*controller_service,
 getUUID()))) {
         throw 
minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid 
controller service");
       }
     } else {
@@ -105,9 +103,6 @@ class DummmyControllerUserProcessor : public 
minifi::core::ProcessorImpl {
   static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_ALLOWED;
   static constexpr bool IsSingleThreaded = false;
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-
- private:
-  std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<DummmyControllerUserProcessor>::getLogger(uuid_);
 };
 
 REGISTER_RESOURCE(DummmyControllerUserProcessor, Processor);
diff --git a/libminifi/test/integration/C2MetricsTest.cpp 
b/libminifi/test/integration/C2MetricsTest.cpp
index 7451e0c3b..75519215b 100644
--- a/libminifi/test/integration/C2MetricsTest.cpp
+++ b/libminifi/test/integration/C2MetricsTest.cpp
@@ -58,6 +58,8 @@ class VerifyC2Metrics : public VerifyC2Base {
   const std::atomic_bool& metrics_updated_successfully_;
 };
 
+#define CHECK_EXPR(expr) (expr ? true : (std::cout << #expr << " failed" << 
std::endl, false))
+
 class MetricsHandler: public HeartbeatHandler {
  public:
   explicit MetricsHandler(std::atomic_bool& metrics_updated_successfully, 
std::shared_ptr<minifi::Configure> configuration, const std::filesystem::path& 
replacement_config_path)
@@ -105,13 +107,13 @@ class MetricsHandler: public HeartbeatHandler {
 
   void verifyMetrics(const rapidjson::Document& root) {
     auto initial_metrics_verified =
-      root.HasMember("metrics") &&
-      root["metrics"].HasMember("RuntimeMetrics") &&
-      root["metrics"].HasMember("LoadMetrics") &&
-      root["metrics"].HasMember("ProcessorMetrics") &&
-      verifyRuntimeMetrics(root["metrics"]["RuntimeMetrics"]) &&
-      verifyLoadMetrics(root["metrics"]["LoadMetrics"]) &&
-      verifyProcessorMetrics(root["metrics"]["ProcessorMetrics"]);
+      CHECK_EXPR(root.HasMember("metrics")) &&
+      CHECK_EXPR(root["metrics"].HasMember("RuntimeMetrics")) &&
+      CHECK_EXPR(root["metrics"].HasMember("LoadMetrics")) &&
+      CHECK_EXPR(root["metrics"].HasMember("ProcessorMetrics")) &&
+      CHECK_EXPR(verifyRuntimeMetrics(root["metrics"]["RuntimeMetrics"])) &&
+      CHECK_EXPR(verifyLoadMetrics(root["metrics"]["LoadMetrics"])) &&
+      CHECK_EXPR(verifyProcessorMetrics(root["metrics"]["ProcessorMetrics"]));
     if (initial_metrics_verified) {
       test_state_ = TestState::SEND_NEW_CONFIG;
     }
@@ -163,40 +165,42 @@ class MetricsHandler: public HeartbeatHandler {
 
   static bool verifyProcessorBulletins(const rapidjson::Value& 
runtime_metrics) {
     if (!runtime_metrics["flowInfo"].HasMember("processorBulletins")) {
+      std::cout << 
"!runtime_metrics[\"flowInfo\"].HasMember(\"processorBulletins\")" << std::endl;
       return false;
     }
     auto bulletins = 
runtime_metrics["flowInfo"]["processorBulletins"].GetArray();
     return std::any_of(bulletins.begin(), bulletins.end(), [](const auto& 
bulletin) {
       std::string message = bulletin["message"].GetString();
-      return bulletin["id"].GetInt() > 0 &&
-        bulletin["timestamp"].GetInt64() > 0 &&
-        bulletin["level"].GetString() == std::string("ERROR") &&
-        bulletin["category"].GetString() == std::string("Log Message") &&
-        message.find("Error connecting to") != std::string::npos &&
-        message.find(GETTCP_UUID) != std::string::npos &&
-        bulletin["groupId"].GetString() == std::string(PROCESS_GROUP_UUID) &&
-        bulletin["groupName"].GetString() == std::string("MiNiFi Flow") &&
-        bulletin["groupPath"].GetString() == std::string("MiNiFi Flow") &&
-        bulletin["sourceId"].GetString() == std::string(GETTCP_UUID) &&
-        bulletin["sourceName"].GetString() == std::string("GetTCP");
+      return CHECK_EXPR(bulletin["id"].GetInt() > 0) &&
+        CHECK_EXPR(bulletin["timestamp"].GetInt64() > 0) &&
+        CHECK_EXPR(bulletin["level"].GetString() == std::string("ERROR")) &&
+        CHECK_EXPR(bulletin["category"].GetString() == std::string("Log 
Message")) &&
+        CHECK_EXPR(message.find("Error connecting to") != std::string::npos) &&
+        CHECK_EXPR(message.find(GETTCP_UUID) != std::string::npos) &&
+        CHECK_EXPR(bulletin["groupId"].GetString() == 
std::string(PROCESS_GROUP_UUID)) &&
+        CHECK_EXPR(bulletin["groupName"].GetString() == std::string("MiNiFi 
Flow")) &&
+        CHECK_EXPR(bulletin["groupPath"].GetString() == std::string("MiNiFi 
Flow")) &&
+        CHECK_EXPR(bulletin["sourceId"].GetString() == 
std::string(GETTCP_UUID)) &&
+        CHECK_EXPR(bulletin["sourceName"].GetString() == 
std::string("GetTCP"));
     });
   }
 
   static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
-    return verifyCommonRuntimeMetricNodes(runtime_metrics, 
"2438e3c8-015a-1000-79ca-83af40ec1997") &&
+    return CHECK_EXPR(verifyCommonRuntimeMetricNodes(runtime_metrics, 
"2438e3c8-015a-1000-79ca-83af40ec1997")) &&
       [&]() {
         const auto processor_statuses = 
runtime_metrics["flowInfo"]["processorStatuses"].GetArray();
         if (processor_statuses.Size() != 2) {
+          std::cout << "processor_statuses.Size() != 2" << std::endl;
           return false;
         }
         return std::all_of(processor_statuses.begin(), 
processor_statuses.end(), [&](const auto& processor) {
           if (processor["id"].GetString() != std::string(GETTCP_UUID) && 
processor["id"].GetString() != std::string(LOGATTRIBUTE1_UUID)) {
             throw std::runtime_error(std::string("Unexpected processor id in 
processorStatuses: ") + processor["id"].GetString());
           }
-          return processorMetricsAreValid(processor);
+          return CHECK_EXPR(processorMetricsAreValid(processor));
         });
       }() &&
-      verifyProcessorBulletins(runtime_metrics);
+      CHECK_EXPR(verifyProcessorBulletins(runtime_metrics));
   }
 
   static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& 
runtime_metrics) {
diff --git a/libminifi/test/unit/BulletinStoreTests.cpp 
b/libminifi/test/unit/BulletinStoreTests.cpp
index deb811725..155b441bc 100644
--- a/libminifi/test/unit/BulletinStoreTests.cpp
+++ b/libminifi/test/unit/BulletinStoreTests.cpp
@@ -22,6 +22,7 @@
 #include "core/BulletinStore.h"
 #include "properties/Configure.h"
 #include "unit/DummyProcessor.h"
+#include "unit/ProcessorUtils.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -35,7 +36,7 @@ class BulletinStoreTestAccessor {
 };
 
 std::unique_ptr<core::Processor> createDummyProcessor() {
-  auto processor = std::make_unique<DummyProcessor>("DummyProcessor", 
minifi::utils::Identifier::parse("4d7fa7e6-2459-46dd-b2ba-61517239edf5").value());
+  auto processor = 
test::utils::make_processor<DummyProcessor>("DummyProcessor", 
minifi::utils::Identifier::parse("4d7fa7e6-2459-46dd-b2ba-61517239edf5").value());
   processor->setProcessGroupUUIDStr("68fa9ae4-b9fc-4873-b0d9-edab59fdb0c2");
   processor->setProcessGroupName("sub_group");
   processor->setProcessGroupPath("root / sub_group");
diff --git a/minifi-api/include/minifi-cpp/core/ProcessContext.h 
b/minifi-api/include/minifi-cpp/core/ProcessContext.h
index f4f963e1d..fdc04a874 100644
--- a/minifi-api/include/minifi-cpp/core/ProcessContext.h
+++ b/minifi-api/include/minifi-cpp/core/ProcessContext.h
@@ -74,6 +74,8 @@ class ProcessContext : public virtual core::VariableRegistry, 
public virtual uti
   virtual nonstd::expected<std::string, std::error_code> 
getDynamicProperty(std::string_view name, const FlowFile* flow_file = nullptr) 
const = 0;
   virtual nonstd::expected<void, std::error_code> 
setDynamicProperty(std::string name, std::string value) = 0;
 
+  virtual nonstd::expected<std::string, std::error_code> 
getRawDynamicProperty(std::string_view name) const = 0;
+
   virtual std::vector<std::string> getDynamicPropertyKeys() const = 0;
   virtual std::map<std::string, std::string> getDynamicProperties(const 
FlowFile* flow_file = nullptr) const = 0;
 
diff --git a/minifi-api/include/minifi-cpp/core/ProcessorApi.h 
b/minifi-api/include/minifi-cpp/core/ProcessorApi.h
index e9cfda446..0b12a4646 100644
--- a/minifi-api/include/minifi-cpp/core/ProcessorApi.h
+++ b/minifi-api/include/minifi-cpp/core/ProcessorApi.h
@@ -68,6 +68,7 @@ class ProcessorApi {
   virtual void notifyStop() = 0;
   virtual annotation::Input getInputRequirement() const = 0;
   virtual gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const 
= 0;
+  virtual void setLoggerCallback(const std::function<void(logging::LOG_LEVEL 
level, const std::string& message)>& callback) = 0;
 };
 
 }  // namespace core
diff --git a/utils/include/core/ProcessContext.h 
b/utils/include/core/ProcessContext.h
index df73d4cc5..e3b4c1d8d 100644
--- a/utils/include/core/ProcessContext.h
+++ b/utils/include/core/ProcessContext.h
@@ -71,6 +71,7 @@ class ProcessContextImpl : public core::VariableRegistryImpl, 
public virtual Pro
   nonstd::expected<std::string, std::error_code> 
getDynamicProperty(std::string_view name, const FlowFile*) const override;
   nonstd::expected<void, std::error_code> setDynamicProperty(std::string name, 
std::string value) override;
   nonstd::expected<std::string, std::error_code> 
getRawProperty(std::string_view name) const override;
+  nonstd::expected<std::string, std::error_code> 
getRawDynamicProperty(std::string_view name) const override;
   [[nodiscard]] nonstd::expected<std::vector<std::string>, std::error_code> 
getAllPropertyValues(std::string_view name) const override;
   bool hasIncomingConnections() const override;
 
diff --git a/utils/include/core/ProcessorImpl.h 
b/utils/include/core/ProcessorImpl.h
index 23ba2f516..6d51b70ff 100644
--- a/utils/include/core/ProcessorImpl.h
+++ b/utils/include/core/ProcessorImpl.h
@@ -130,6 +130,8 @@ class ProcessorImpl : public virtual ProcessorApi {
 
   void restore(const std::shared_ptr<FlowFile>& file) override;
 
+  void setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, 
const std::string& message)>& /*callback*/) override;
+
   std::string getName() const;
   utils::Identifier getUUID() const;
   utils::SmallString<36> getUUIDStr() const;

Reply via email to