This is an automated email from the ASF dual-hosted git repository. adebreceni pushed a commit to branch minifi-api-reduced in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 6a55322642230ae44b792d0d5b390d7a6f09ed81 Author: Adam Debreceni <[email protected]> AuthorDate: Thu Jun 19 15:57:53 2025 +0200 Rebase fix --- extensions/aws/processors/AwsProcessor.cpp | 5 --- extensions/aws/processors/AwsProcessor.h | 6 +-- extensions/aws/processors/PutKinesisStream.h | 8 +--- extensions/aws/processors/S3Processor.cpp | 2 +- extensions/aws/processors/S3Processor.h | 1 - extensions/aws/tests/PutKinesisStreamTests.cpp | 39 ++++++++---------- extensions/kafka/ConsumeKafka.cpp | 2 +- extensions/python/ExecutePythonProcessor.h | 5 ++- extensions/python/PythonCreator.h | 4 ++ libminifi/include/core/Processor.h | 10 +++++ libminifi/src/core/ProcessContext.cpp | 4 ++ libminifi/src/core/Processor.cpp | 28 +++++++++++++ .../integration/C2ControllerEnableFailureTest.cpp | 9 +---- libminifi/test/integration/C2MetricsTest.cpp | 46 ++++++++++++---------- libminifi/test/unit/BulletinStoreTests.cpp | 3 +- .../include/minifi-cpp/core/ProcessContext.h | 2 + minifi-api/include/minifi-cpp/core/ProcessorApi.h | 1 + utils/include/core/ProcessContext.h | 1 + utils/include/core/ProcessorImpl.h | 2 + 19 files changed, 106 insertions(+), 72 deletions(-) diff --git a/extensions/aws/processors/AwsProcessor.cpp b/extensions/aws/processors/AwsProcessor.cpp index 4ff850ee0..cd442abb7 100644 --- a/extensions/aws/processors/AwsProcessor.cpp +++ b/extensions/aws/processors/AwsProcessor.cpp @@ -32,11 +32,6 @@ namespace org::apache::nifi::minifi::aws::processors { -AwsProcessor::AwsProcessor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger) - : core::ProcessorImpl(name, uuid), - logger_(std::move(logger)) { -} - std::optional<Aws::Auth::AWSCredentials> AwsProcessor::getAWSCredentialsFromControllerService(core::ProcessContext& context) const { if (const auto aws_credentials_service = minifi::utils::parseOptionalControllerService<controllers::AWSCredentialsService>(context, AWSCredentialsProviderService, getUUID())) { return (*aws_credentials_service)->getAWSCredentials(); diff --git a/extensions/aws/processors/AwsProcessor.h b/extensions/aws/processors/AwsProcessor.h index 9f0055e2e..542d0c75b 100644 --- a/extensions/aws/processors/AwsProcessor.h +++ b/extensions/aws/processors/AwsProcessor.h @@ -32,7 +32,7 @@ #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "minifi-cpp/core/PropertyValidator.h" -#include "core/Processor.h" +#include "core/ProcessorImpl.h" namespace org::apache::nifi::minifi::aws::processors { @@ -169,8 +169,7 @@ class AwsProcessor : public core::ProcessorImpl { UseDefaultCredentials }); - - explicit AwsProcessor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger); + using ProcessorImpl::ProcessorImpl; void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; @@ -180,7 +179,6 @@ class AwsProcessor : public core::ProcessorImpl { aws::ProxyOptions getProxy(core::ProcessContext& context, const core::FlowFile* const flow_file); std::optional<CommonProperties> getCommonELSupportedProperties(core::ProcessContext& context, const core::FlowFile* flow_file); - std::shared_ptr<core::logging::Logger> logger_; std::optional<Aws::Client::ClientConfiguration> client_config_; }; diff --git a/extensions/aws/processors/PutKinesisStream.h b/extensions/aws/processors/PutKinesisStream.h index c30483a20..73d308dcf 100644 --- a/extensions/aws/processors/PutKinesisStream.h +++ b/extensions/aws/processors/PutKinesisStream.h @@ -87,13 +87,7 @@ class PutKinesisStream : public AwsProcessor { ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - explicit PutKinesisStream(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier()) - : AwsProcessor(name, uuid, core::logging::LoggerFactory<PutKinesisStream>::getLogger(uuid)) { - } - PutKinesisStream(const PutKinesisStream&) = delete; - PutKinesisStream(PutKinesisStream&&) = delete; - PutKinesisStream& operator=(const PutKinesisStream&) = delete; - PutKinesisStream& operator=(PutKinesisStream&&) = delete; + using AwsProcessor::AwsProcessor; ~PutKinesisStream() override = default; void initialize() override; diff --git a/extensions/aws/processors/S3Processor.cpp b/extensions/aws/processors/S3Processor.cpp index e45c3fa1c..6bb6402f4 100644 --- a/extensions/aws/processors/S3Processor.cpp +++ b/extensions/aws/processors/S3Processor.cpp @@ -33,7 +33,7 @@ namespace org::apache::nifi::minifi::aws::processors { S3Processor::S3Processor(core::ProcessorMetadata info, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender) - : core::ProcessorImpl(info), + : AwsProcessor(info), s3_wrapper_(std::move(s3_request_sender)) { } diff --git a/extensions/aws/processors/S3Processor.h b/extensions/aws/processors/S3Processor.h index d3d0a485b..cdcee4c4c 100644 --- a/extensions/aws/processors/S3Processor.h +++ b/extensions/aws/processors/S3Processor.h @@ -30,7 +30,6 @@ #include "AwsProcessor.h" #include "S3Wrapper.h" #include "aws/core/auth/AWSCredentialsProvider.h" -#include "core/Processor.h" #include "core/Property.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" diff --git a/extensions/aws/tests/PutKinesisStreamTests.cpp b/extensions/aws/tests/PutKinesisStreamTests.cpp index bbfda2b9c..b6532b5b3 100644 --- a/extensions/aws/tests/PutKinesisStreamTests.cpp +++ b/extensions/aws/tests/PutKinesisStreamTests.cpp @@ -24,6 +24,7 @@ #include "unit/Catch.h" #include "unit/SingleProcessorTestController.h" #include "unit/TestBase.h" +#include "unit/ProcessorUtils.h" namespace org::apache::nifi::minifi::aws::processors::test { @@ -86,14 +87,7 @@ class PutKinesisStreamMocked final : public aws::processors::PutKinesisStream { public: static constexpr const char* Description = "PutKinesisStreamMocked"; - explicit PutKinesisStreamMocked(const std::string& name, const minifi::utils::Identifier& uuid = minifi::utils::Identifier()) - : PutKinesisStream(name, uuid) { - } - - PutKinesisStreamMocked(const PutKinesisStreamMocked&) = delete; - PutKinesisStreamMocked(PutKinesisStreamMocked&&) = delete; - PutKinesisStreamMocked& operator=(const PutKinesisStreamMocked&) = delete; - PutKinesisStreamMocked& operator=(PutKinesisStreamMocked&&) = delete; + using PutKinesisStream::PutKinesisStream; ~PutKinesisStreamMocked() override = default; @@ -110,10 +104,10 @@ class PutKinesisStreamMocked final : public aws::processors::PutKinesisStream { REGISTER_RESOURCE(PutKinesisStreamMocked, Processor); TEST_CASE("PutKinesisStream record size mismatch path") { - minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); - auto put_kinesis_stream = dynamic_cast<PutKinesisStreamMocked*>(controller.getProcessor()); + minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream")); + auto put_kinesis_stream = controller.getProcessor<PutKinesisStreamMocked>(); REQUIRE(put_kinesis_stream); - put_kinesis_stream->behaviour_ = MockKinesisClient::KinesisBehaviour::RecordSizeMismatch; + put_kinesis_stream.get().behaviour_ = MockKinesisClient::KinesisBehaviour::RecordSizeMismatch; controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); @@ -134,10 +128,10 @@ TEST_CASE("PutKinesisStream record size mismatch path") { } TEST_CASE("PutKinesisStream record size failure path") { - minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); - auto put_kinesis_stream = dynamic_cast<PutKinesisStreamMocked*>(controller.getProcessor()); + minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream")); + auto put_kinesis_stream = controller.getProcessor<PutKinesisStreamMocked>(); REQUIRE(put_kinesis_stream); - put_kinesis_stream->behaviour_ = MockKinesisClient::KinesisBehaviour::Failure; + put_kinesis_stream.get().behaviour_ = MockKinesisClient::KinesisBehaviour::Failure; controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); @@ -160,10 +154,9 @@ TEST_CASE("PutKinesisStream record size failure path") { } TEST_CASE("PutKinesisStream partial failure path") { - minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); - auto put_kinesis_stream = dynamic_cast<PutKinesisStreamMocked*>(controller.getProcessor()); - REQUIRE(put_kinesis_stream); - put_kinesis_stream->behaviour_ = MockKinesisClient::KinesisBehaviour::OddsFail; + minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream")); + auto put_kinesis_stream = controller.getProcessor<PutKinesisStreamMocked>(); + put_kinesis_stream.get().behaviour_ = MockKinesisClient::KinesisBehaviour::OddsFail; controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); @@ -187,7 +180,7 @@ TEST_CASE("PutKinesisStream partial failure path") { TEST_CASE("PutKinesisStream simple happy path") { - minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); + minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream")); auto put_kinesis_stream = controller.getProcessor(); controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); @@ -210,7 +203,7 @@ TEST_CASE("PutKinesisStream simple happy path") { } TEST_CASE("PutKinesisStream smaller batch size than available ffs") { - minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); + minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream")); auto put_kinesis_stream = controller.getProcessor(); controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); @@ -237,7 +230,7 @@ TEST_CASE("PutKinesisStream smaller batch size than available ffs") { } TEST_CASE("PutKinesisStream max batch data size fills up") { - minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); + minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream")); auto put_kinesis_stream = controller.getProcessor(); controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); @@ -268,7 +261,7 @@ TEST_CASE("PutKinesisStream max batch data size fills up") { } TEST_CASE("PutKinesisStream max batch data size to different streams") { - minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); + minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream")); auto put_kinesis_stream = controller.getProcessor(); controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); @@ -296,7 +289,7 @@ TEST_CASE("PutKinesisStream max batch data size to different streams") { } TEST_CASE("PutKinesisStream with too large message") { - minifi::test::SingleProcessorTestController controller(std::make_unique<PutKinesisStreamMocked>("PutKinesisStream")); + minifi::test::SingleProcessorTestController controller(minifi::test::utils::make_processor<PutKinesisStreamMocked>("PutKinesisStream")); auto put_kinesis_stream = controller.getProcessor(); controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::AccessKey, "access_key"); controller.plan->setProperty(put_kinesis_stream, PutKinesisStream::SecretKey, "secret_key"); diff --git a/extensions/kafka/ConsumeKafka.cpp b/extensions/kafka/ConsumeKafka.cpp index 76f317823..c8ef2f262 100644 --- a/extensions/kafka/ConsumeKafka.cpp +++ b/extensions/kafka/ConsumeKafka.cpp @@ -76,7 +76,7 @@ void ConsumeKafka::onSchedule(core::ProcessContext& context, core::ProcessSessio configureNewConnection(context); if (commit_policy_ == consume_kafka::CommitPolicyEnum::CommitFromIncomingFlowFiles) { setTriggerWhenEmpty(true); - } else if (hasIncomingConnections()) { + } else if (context.hasIncomingConnections()) { logger_->log_error("Incoming connections are not allowed with {}", magic_enum::enum_name(commit_policy_)); throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Incoming connections are not allowed with {}", magic_enum::enum_name(commit_policy_))); } diff --git a/extensions/python/ExecutePythonProcessor.h b/extensions/python/ExecutePythonProcessor.h index c54f1840f..1e44b8cc3 100644 --- a/extensions/python/ExecutePythonProcessor.h +++ b/extensions/python/ExecutePythonProcessor.h @@ -44,7 +44,9 @@ class ExecutePythonProcessor : public core::ProcessorImpl { : ProcessorImpl(info), processor_initialized_(false), python_dynamic_(false), - reload_on_script_change_(true) {} + reload_on_script_change_(true) { + python_logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName(), info.uuid); + } EXTENSIONAPI static constexpr const char* Description = "Executes a script given the flow file and a process session. " "The script is responsible for handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as " @@ -157,6 +159,7 @@ class ExecutePythonProcessor : public core::ProcessorImpl { bool reload_on_script_change_; std::optional<std::chrono::file_clock::time_point> last_script_write_time_; std::string script_file_path_; + std::shared_ptr<core::logging::Logger> python_logger_; std::unique_ptr<PythonScriptEngine> python_script_engine_; std::optional<std::string> python_class_name_; std::vector<std::filesystem::path> python_paths_; diff --git a/extensions/python/PythonCreator.h b/extensions/python/PythonCreator.h index 362f737db..961928085 100644 --- a/extensions/python/PythonCreator.h +++ b/extensions/python/PythonCreator.h @@ -68,6 +68,10 @@ class DummyLogger : public core::logging::Logger { return 0; } + void setLogCallback(const std::function<void(core::logging::LOG_LEVEL level, const std::string&)>& /*callback*/) override { + // pass + } + ~DummyLogger() override = default; }; diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index c9503dc18..9daec72c1 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -113,6 +113,13 @@ class Processor : public ConnectableImpl, public ConfigurableComponentImpl, publ [[nodiscard]] bool supportsDynamicRelationships() const override; state::response::SharedResponseNode getResponseNode() override; gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const; + std::string getProcessGroupName() const; + void setProcessGroupName(const std::string &name); + std::string getProcessGroupPath() const; + void setProcessGroupPath(const std::string &path); + logging::LOG_LEVEL getLogBulletinLevel() const; + void setLogBulletinLevel(logging::LOG_LEVEL level); + void setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, const std::string& message)>& callback); void restore(const std::shared_ptr<FlowFile>& file) override; static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{}; @@ -143,6 +150,7 @@ class Processor : public ConnectableImpl, public ConfigurableComponentImpl, publ std::string cron_period_; std::shared_ptr<logging::Logger> logger_; + logging::LOG_LEVEL log_bulletin_level_ = logging::LOG_LEVEL::warn; private: mutable std::mutex mutex_; @@ -159,6 +167,8 @@ class Processor : public ConnectableImpl, public ConfigurableComponentImpl, publ std::unordered_map<Connection*, std::unordered_set<Processor*>> reachable_processors_; std::string process_group_uuid_; + std::string process_group_name_; + std::string process_group_path_; protected: std::unique_ptr<ProcessorApi> impl_; diff --git a/libminifi/src/core/ProcessContext.cpp b/libminifi/src/core/ProcessContext.cpp index 3e453d58c..985a23271 100644 --- a/libminifi/src/core/ProcessContext.cpp +++ b/libminifi/src/core/ProcessContext.cpp @@ -104,6 +104,10 @@ nonstd::expected<std::string, std::error_code> ProcessContextImpl::getRawPropert return getProcessor().getProperty(name); } +nonstd::expected<std::string, std::error_code> ProcessContextImpl::getRawDynamicProperty(const std::string_view name) const { + return getProcessor().getDynamicProperty(name); +} + nonstd::expected<void, std::error_code> ProcessContextImpl::setDynamicProperty(std::string name, std::string value) { return getProcessor().setDynamicProperty(std::move(name), std::move(value)); } diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index 2ff4dc802..e0df1a394 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -469,6 +469,34 @@ void Processor::setProcessGroupUUIDStr(const std::string &uuid) { process_group_uuid_ = uuid; } +std::string Processor::getProcessGroupName() const { + return process_group_name_; +} + +void Processor::setProcessGroupName(const std::string &name) { + process_group_name_ = name; +} + +std::string Processor::getProcessGroupPath() const { + return process_group_path_; +} + +void Processor::setProcessGroupPath(const std::string &path) { + process_group_path_ = path; +} + +logging::LOG_LEVEL Processor::getLogBulletinLevel() const { + return log_bulletin_level_; +} + +void Processor::setLogBulletinLevel(logging::LOG_LEVEL level) { + log_bulletin_level_ = level; +} + +void Processor::setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, const std::string& message)>& callback) { + impl_->setLoggerCallback(callback); +} + std::chrono::steady_clock::time_point Processor::getYieldExpirationTime() const { return yield_expiration_; } diff --git a/libminifi/test/integration/C2ControllerEnableFailureTest.cpp b/libminifi/test/integration/C2ControllerEnableFailureTest.cpp index 79267b20b..3cc01e1ff 100644 --- a/libminifi/test/integration/C2ControllerEnableFailureTest.cpp +++ b/libminifi/test/integration/C2ControllerEnableFailureTest.cpp @@ -72,11 +72,9 @@ class DummyController : public core::controller::ControllerServiceImpl { REGISTER_RESOURCE(DummyController, ControllerService); class DummmyControllerUserProcessor : public minifi::core::ProcessorImpl { + public: using minifi::core::ProcessorImpl::ProcessorImpl; - public: - DummmyControllerUserProcessor(std::string_view name, const minifi::utils::Identifier& uuid) : ProcessorImpl(name, uuid) {} - explicit DummmyControllerUserProcessor(std::string_view name) : ProcessorImpl(name) {} static constexpr auto DummyControllerService = core::PropertyDefinitionBuilder<>::createProperty("Dummy Controller Service") .withDescription("Dummy Controller Service") .withAllowedTypes<DummyController>() @@ -88,7 +86,7 @@ class DummmyControllerUserProcessor : public minifi::core::ProcessorImpl { void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& /*session_factory*/) override { if (auto controller_service = context.getProperty(DummmyControllerUserProcessor::DummyControllerService)) { - if (!std::dynamic_pointer_cast<DummyController>(context.getControllerService(*controller_service, uuid_))) { + if (!std::dynamic_pointer_cast<DummyController>(context.getControllerService(*controller_service, getUUID()))) { throw minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service"); } } else { @@ -105,9 +103,6 @@ class DummmyControllerUserProcessor : public minifi::core::ProcessorImpl { static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED; static constexpr bool IsSingleThreaded = false; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - - private: - std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<DummmyControllerUserProcessor>::getLogger(uuid_); }; REGISTER_RESOURCE(DummmyControllerUserProcessor, Processor); diff --git a/libminifi/test/integration/C2MetricsTest.cpp b/libminifi/test/integration/C2MetricsTest.cpp index 7451e0c3b..75519215b 100644 --- a/libminifi/test/integration/C2MetricsTest.cpp +++ b/libminifi/test/integration/C2MetricsTest.cpp @@ -58,6 +58,8 @@ class VerifyC2Metrics : public VerifyC2Base { const std::atomic_bool& metrics_updated_successfully_; }; +#define CHECK_EXPR(expr) (expr ? true : (std::cout << #expr << " failed" << std::endl, false)) + class MetricsHandler: public HeartbeatHandler { public: explicit MetricsHandler(std::atomic_bool& metrics_updated_successfully, std::shared_ptr<minifi::Configure> configuration, const std::filesystem::path& replacement_config_path) @@ -105,13 +107,13 @@ class MetricsHandler: public HeartbeatHandler { void verifyMetrics(const rapidjson::Document& root) { auto initial_metrics_verified = - root.HasMember("metrics") && - root["metrics"].HasMember("RuntimeMetrics") && - root["metrics"].HasMember("LoadMetrics") && - root["metrics"].HasMember("ProcessorMetrics") && - verifyRuntimeMetrics(root["metrics"]["RuntimeMetrics"]) && - verifyLoadMetrics(root["metrics"]["LoadMetrics"]) && - verifyProcessorMetrics(root["metrics"]["ProcessorMetrics"]); + CHECK_EXPR(root.HasMember("metrics")) && + CHECK_EXPR(root["metrics"].HasMember("RuntimeMetrics")) && + CHECK_EXPR(root["metrics"].HasMember("LoadMetrics")) && + CHECK_EXPR(root["metrics"].HasMember("ProcessorMetrics")) && + CHECK_EXPR(verifyRuntimeMetrics(root["metrics"]["RuntimeMetrics"])) && + CHECK_EXPR(verifyLoadMetrics(root["metrics"]["LoadMetrics"])) && + CHECK_EXPR(verifyProcessorMetrics(root["metrics"]["ProcessorMetrics"])); if (initial_metrics_verified) { test_state_ = TestState::SEND_NEW_CONFIG; } @@ -163,40 +165,42 @@ class MetricsHandler: public HeartbeatHandler { static bool verifyProcessorBulletins(const rapidjson::Value& runtime_metrics) { if (!runtime_metrics["flowInfo"].HasMember("processorBulletins")) { + std::cout << "!runtime_metrics[\"flowInfo\"].HasMember(\"processorBulletins\")" << std::endl; return false; } auto bulletins = runtime_metrics["flowInfo"]["processorBulletins"].GetArray(); return std::any_of(bulletins.begin(), bulletins.end(), [](const auto& bulletin) { std::string message = bulletin["message"].GetString(); - return bulletin["id"].GetInt() > 0 && - bulletin["timestamp"].GetInt64() > 0 && - bulletin["level"].GetString() == std::string("ERROR") && - bulletin["category"].GetString() == std::string("Log Message") && - message.find("Error connecting to") != std::string::npos && - message.find(GETTCP_UUID) != std::string::npos && - bulletin["groupId"].GetString() == std::string(PROCESS_GROUP_UUID) && - bulletin["groupName"].GetString() == std::string("MiNiFi Flow") && - bulletin["groupPath"].GetString() == std::string("MiNiFi Flow") && - bulletin["sourceId"].GetString() == std::string(GETTCP_UUID) && - bulletin["sourceName"].GetString() == std::string("GetTCP"); + return CHECK_EXPR(bulletin["id"].GetInt() > 0) && + CHECK_EXPR(bulletin["timestamp"].GetInt64() > 0) && + CHECK_EXPR(bulletin["level"].GetString() == std::string("ERROR")) && + CHECK_EXPR(bulletin["category"].GetString() == std::string("Log Message")) && + CHECK_EXPR(message.find("Error connecting to") != std::string::npos) && + CHECK_EXPR(message.find(GETTCP_UUID) != std::string::npos) && + CHECK_EXPR(bulletin["groupId"].GetString() == std::string(PROCESS_GROUP_UUID)) && + CHECK_EXPR(bulletin["groupName"].GetString() == std::string("MiNiFi Flow")) && + CHECK_EXPR(bulletin["groupPath"].GetString() == std::string("MiNiFi Flow")) && + CHECK_EXPR(bulletin["sourceId"].GetString() == std::string(GETTCP_UUID)) && + CHECK_EXPR(bulletin["sourceName"].GetString() == std::string("GetTCP")); }); } static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) { - return verifyCommonRuntimeMetricNodes(runtime_metrics, "2438e3c8-015a-1000-79ca-83af40ec1997") && + return CHECK_EXPR(verifyCommonRuntimeMetricNodes(runtime_metrics, "2438e3c8-015a-1000-79ca-83af40ec1997")) && [&]() { const auto processor_statuses = runtime_metrics["flowInfo"]["processorStatuses"].GetArray(); if (processor_statuses.Size() != 2) { + std::cout << "processor_statuses.Size() != 2" << std::endl; return false; } return std::all_of(processor_statuses.begin(), processor_statuses.end(), [&](const auto& processor) { if (processor["id"].GetString() != std::string(GETTCP_UUID) && processor["id"].GetString() != std::string(LOGATTRIBUTE1_UUID)) { throw std::runtime_error(std::string("Unexpected processor id in processorStatuses: ") + processor["id"].GetString()); } - return processorMetricsAreValid(processor); + return CHECK_EXPR(processorMetricsAreValid(processor)); }); }() && - verifyProcessorBulletins(runtime_metrics); + CHECK_EXPR(verifyProcessorBulletins(runtime_metrics)); } static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& runtime_metrics) { diff --git a/libminifi/test/unit/BulletinStoreTests.cpp b/libminifi/test/unit/BulletinStoreTests.cpp index deb811725..155b441bc 100644 --- a/libminifi/test/unit/BulletinStoreTests.cpp +++ b/libminifi/test/unit/BulletinStoreTests.cpp @@ -22,6 +22,7 @@ #include "core/BulletinStore.h" #include "properties/Configure.h" #include "unit/DummyProcessor.h" +#include "unit/ProcessorUtils.h" using namespace std::literals::chrono_literals; @@ -35,7 +36,7 @@ class BulletinStoreTestAccessor { }; std::unique_ptr<core::Processor> createDummyProcessor() { - auto processor = std::make_unique<DummyProcessor>("DummyProcessor", minifi::utils::Identifier::parse("4d7fa7e6-2459-46dd-b2ba-61517239edf5").value()); + auto processor = test::utils::make_processor<DummyProcessor>("DummyProcessor", minifi::utils::Identifier::parse("4d7fa7e6-2459-46dd-b2ba-61517239edf5").value()); processor->setProcessGroupUUIDStr("68fa9ae4-b9fc-4873-b0d9-edab59fdb0c2"); processor->setProcessGroupName("sub_group"); processor->setProcessGroupPath("root / sub_group"); diff --git a/minifi-api/include/minifi-cpp/core/ProcessContext.h b/minifi-api/include/minifi-cpp/core/ProcessContext.h index f4f963e1d..fdc04a874 100644 --- a/minifi-api/include/minifi-cpp/core/ProcessContext.h +++ b/minifi-api/include/minifi-cpp/core/ProcessContext.h @@ -74,6 +74,8 @@ class ProcessContext : public virtual core::VariableRegistry, public virtual uti virtual nonstd::expected<std::string, std::error_code> getDynamicProperty(std::string_view name, const FlowFile* flow_file = nullptr) const = 0; virtual nonstd::expected<void, std::error_code> setDynamicProperty(std::string name, std::string value) = 0; + virtual nonstd::expected<std::string, std::error_code> getRawDynamicProperty(std::string_view name) const = 0; + virtual std::vector<std::string> getDynamicPropertyKeys() const = 0; virtual std::map<std::string, std::string> getDynamicProperties(const FlowFile* flow_file = nullptr) const = 0; diff --git a/minifi-api/include/minifi-cpp/core/ProcessorApi.h b/minifi-api/include/minifi-cpp/core/ProcessorApi.h index e9cfda446..0b12a4646 100644 --- a/minifi-api/include/minifi-cpp/core/ProcessorApi.h +++ b/minifi-api/include/minifi-cpp/core/ProcessorApi.h @@ -68,6 +68,7 @@ class ProcessorApi { virtual void notifyStop() = 0; virtual annotation::Input getInputRequirement() const = 0; virtual gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const = 0; + virtual void setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, const std::string& message)>& callback) = 0; }; } // namespace core diff --git a/utils/include/core/ProcessContext.h b/utils/include/core/ProcessContext.h index df73d4cc5..e3b4c1d8d 100644 --- a/utils/include/core/ProcessContext.h +++ b/utils/include/core/ProcessContext.h @@ -71,6 +71,7 @@ class ProcessContextImpl : public core::VariableRegistryImpl, public virtual Pro nonstd::expected<std::string, std::error_code> getDynamicProperty(std::string_view name, const FlowFile*) const override; nonstd::expected<void, std::error_code> setDynamicProperty(std::string name, std::string value) override; nonstd::expected<std::string, std::error_code> getRawProperty(std::string_view name) const override; + nonstd::expected<std::string, std::error_code> getRawDynamicProperty(std::string_view name) const override; [[nodiscard]] nonstd::expected<std::vector<std::string>, std::error_code> getAllPropertyValues(std::string_view name) const override; bool hasIncomingConnections() const override; diff --git a/utils/include/core/ProcessorImpl.h b/utils/include/core/ProcessorImpl.h index 23ba2f516..6d51b70ff 100644 --- a/utils/include/core/ProcessorImpl.h +++ b/utils/include/core/ProcessorImpl.h @@ -130,6 +130,8 @@ class ProcessorImpl : public virtual ProcessorApi { void restore(const std::shared_ptr<FlowFile>& file) override; + void setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, const std::string& message)>& /*callback*/) override; + std::string getName() const; utils::Identifier getUUID() const; utils::SmallString<36> getUUIDStr() const;
