This is an automated email from the ASF dual-hosted git repository. adebreceni pushed a commit to branch minifi-api-reduced in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit b208d843c13729346572d32b210dedc0e3ba5d7e Author: Adam Debreceni <[email protected]> AuthorDate: Mon Apr 28 10:28:42 2025 +0200 Rebase fix --- extensions/systemd/ConsumeJournald.cpp | 10 ++++------ extensions/systemd/ConsumeJournald.h | 3 +-- extensions/systemd/tests/ConsumeJournaldTest.cpp | 4 ++-- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/extensions/systemd/ConsumeJournald.cpp b/extensions/systemd/ConsumeJournald.cpp index fa212bad7..ca0cfa276 100644 --- a/extensions/systemd/ConsumeJournald.cpp +++ b/extensions/systemd/ConsumeJournald.cpp @@ -33,10 +33,8 @@ namespace org::apache::nifi::minifi::extensions::systemd { namespace chr = std::chrono; -ConsumeJournald::ConsumeJournald(const std::string_view name, const utils::Identifier &id, std::unique_ptr<libwrapper::LibWrapper>&& libwrapper) - : core::ProcessorImpl{name, id}, libwrapper_{std::move(libwrapper)} { - logger_ = core::logging::LoggerFactory<ConsumeJournald>::getLogger(uuid_); -} +ConsumeJournald::ConsumeJournald(core::ProcessorMetadata info, std::unique_ptr<libwrapper::LibWrapper>&& libwrapper) + : core::ProcessorImpl{info}, libwrapper_{std::move(libwrapper)} {} void ConsumeJournald::initialize() { setSupportedProperties(Properties); @@ -95,13 +93,13 @@ void ConsumeJournald::onSchedule(core::ProcessContext& context, core::ProcessSes running_ = true; } -void ConsumeJournald::onTrigger(core::ProcessContext&, core::ProcessSession& session) { +void ConsumeJournald::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { gsl_Expects(state_manager_); if (!running_.load(std::memory_order_acquire)) return; auto cursor_and_messages = getCursorAndMessageBatch().get(); auto messages = std::move(cursor_and_messages.second); if (messages.empty()) { - yield(); + context.yield(); return; } diff --git a/extensions/systemd/ConsumeJournald.h b/extensions/systemd/ConsumeJournald.h index 16a3f2e10..07cd49dc4 100644 --- a/extensions/systemd/ConsumeJournald.h +++ b/extensions/systemd/ConsumeJournald.h @@ -35,7 +35,6 @@ #include "core/PropertyDefinitionBuilder.h" #include "minifi-cpp/core/PropertyValidator.h" #include "core/RelationshipDefinition.h" -#include "core/logging/LoggerFactory.h" #include "libwrapper/LibWrapper.h" #include "utils/Deleters.h" #include "utils/gsl.h" @@ -114,7 +113,7 @@ class ConsumeJournald final : public core::ProcessorImpl { ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - explicit ConsumeJournald(std::string_view name, const utils::Identifier& id = {}, std::unique_ptr<libwrapper::LibWrapper>&& = libwrapper::createLibWrapper()); + explicit ConsumeJournald(core::ProcessorMetadata info, std::unique_ptr<libwrapper::LibWrapper>&& = libwrapper::createLibWrapper()); ConsumeJournald(const ConsumeJournald&) = delete; ConsumeJournald(ConsumeJournald&&) = delete; ConsumeJournald& operator=(const ConsumeJournald&) = delete; diff --git a/extensions/systemd/tests/ConsumeJournaldTest.cpp b/extensions/systemd/tests/ConsumeJournaldTest.cpp index e6526b5fa..24c95f16f 100644 --- a/extensions/systemd/tests/ConsumeJournaldTest.cpp +++ b/extensions/systemd/tests/ConsumeJournaldTest.cpp @@ -160,11 +160,11 @@ TEST_CASE("ConsumeJournald", "[consumejournald]") { {"systemd", "Starting Rule-based Manager for Device Events and Files...", 1}, }}); auto* const libwrapper_observer = libwrapper.get(); - const auto consume_journald = plan->addProcessor(std::make_unique<ConsumeJournald>("ConsumeJournald", utils::Identifier{}, + const TypedProcessorWrapper<ConsumeJournald> consume_journald = plan->addProcessor(minifi::test::utils::make_custom_processor<ConsumeJournald>("ConsumeJournald", utils::Identifier{}, std::move(libwrapper)), "ConsumeJournald"); REQUIRE(consume_journald->setProperty(ConsumeJournald::TimestampFormat.name, "ISO8601")); const auto get_cursor_position = [&consume_journald]() -> std::string { - return ConsumeJournaldTestAccessor::get_state_manager_(dynamic_cast<ConsumeJournald&>(*consume_journald))->get()->at("cursor"); + return ConsumeJournaldTestAccessor::get_state_manager_(consume_journald.get())->get()->at("cursor"); }; SECTION("defaults") {
