This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit fe14bed424697d3cbfba184e4d18743ea67413f8 Author: Martin Zink <[email protected]> AuthorDate: Tue Jul 16 16:01:15 2024 +0000 MINIFICPP-2398 Only commit once per time_slice in EventDrivenSchedulingAgent Closes #1815 Signed-off-by: Marton Szasz <[email protected]> --- docker/test/integration/features/kafka.feature | 6 +++ extensions/rocksdb-repos/tests/SwapTests.cpp | 3 +- libminifi/include/EventDrivenSchedulingAgent.h | 6 +-- libminifi/include/SchedulingAgent.h | 7 ++- libminifi/include/ThreadedSchedulingAgent.h | 3 +- libminifi/include/core/ProcessSession.h | 2 + libminifi/include/core/Processor.h | 11 +++-- libminifi/include/core/ProcessorMetrics.h | 2 +- libminifi/src/CronDrivenSchedulingAgent.cpp | 15 +++--- libminifi/src/EventDrivenSchedulingAgent.cpp | 66 +++++++++++++++++++++----- libminifi/src/SchedulingAgent.cpp | 61 +++++++++++++++++++++--- libminifi/src/TimerDrivenSchedulingAgent.cpp | 8 ++-- libminifi/src/core/ProcessSession.cpp | 15 +++++- libminifi/src/core/Processor.cpp | 47 +++++++++--------- libminifi/test/libtest/unit/TestBase.cpp | 2 +- 15 files changed, 186 insertions(+), 68 deletions(-) diff --git a/docker/test/integration/features/kafka.feature b/docker/test/integration/features/kafka.feature index 7fcae658f..02aca2cc0 100644 --- a/docker/test/integration/features/kafka.feature +++ b/docker/test/integration/features/kafka.feature @@ -46,6 +46,7 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka And the "success" relationship of the GetFile processor is connected to the UpdateAttribute And the "success" relationship of the UpdateAttribute processor is connected to the PublishKafka And the "success" relationship of the PublishKafka processor is connected to the PutFile + And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka And a kafka broker is set up in correspondence with the PublishKafka @@ -93,6 +94,7 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka And a PutFile processor with the "Directory" property set to "/tmp/output" And the "success" relationship of the GetFile processor is connected to the PublishKafka And the "success" relationship of the PublishKafka processor is connected to the PutFile + And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka And a kafka broker is set up in correspondence with the PublishKafka @@ -120,6 +122,7 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka And a PutFile processor with the "Directory" property set to "/tmp/output" And the "success" relationship of the GetFile processor is connected to the PublishKafka And the "success" relationship of the PublishKafka processor is connected to the PutFile + And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka And a kafka broker is set up in correspondence with the PublishKafka @@ -150,6 +153,7 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka And a PutFile processor with the "Directory" property set to "/tmp/output" And the "success" relationship of the GetFile processor is connected to the PublishKafka And the "success" relationship of the PublishKafka processor is connected to the PutFile + And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka And a kafka broker is set up in correspondence with the PublishKafka @@ -178,6 +182,7 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka And an ssl context service is set up for PublishKafka And the "success" relationship of the GetFile processor is connected to the PublishKafka And the "success" relationship of the PublishKafka processor is connected to the PutFile + And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka And a kafka broker is set up in correspondence with the PublishKafka @@ -203,6 +208,7 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka And an ssl context service is set up for PublishKafka And the "success" relationship of the GetFile processor is connected to the PublishKafka And the "success" relationship of the PublishKafka processor is connected to the PutFile + And the "failure" relationship of the PublishKafka processor is connected to the PublishKafka And a kafka broker is set up in correspondence with the PublishKafka diff --git a/extensions/rocksdb-repos/tests/SwapTests.cpp b/extensions/rocksdb-repos/tests/SwapTests.cpp index 62822e8d7..5e04bf5a6 100644 --- a/extensions/rocksdb-repos/tests/SwapTests.cpp +++ b/extensions/rocksdb-repos/tests/SwapTests.cpp @@ -33,7 +33,6 @@ namespace org::apache::nifi::minifi::test { class OutputProcessor : public core::Processor { public: using core::Processor::Processor; - using core::Processor::onTrigger; static constexpr const char* Description = "Processor used for testing cycles"; static constexpr auto Properties = std::array<core::PropertyReference, 0>{}; @@ -112,7 +111,7 @@ TEST_CASE("Connection will on-demand swap flow files") { auto session_factory = std::make_shared<core::ProcessSessionFactory>(context); for (size_t i = 0; i < 200; ++i) { - processor->onTrigger(context, session_factory); + processor->triggerAndCommit(context, session_factory); } REQUIRE(connection->getQueueSize() == processor->flow_files_.size()); diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h index 352212179..2a42a6933 100644 --- a/libminifi/include/EventDrivenSchedulingAgent.h +++ b/libminifi/include/EventDrivenSchedulingAgent.h @@ -52,11 +52,11 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { void schedule(core::Processor* processor) override; - utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, - const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override; + utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &process_context, + const std::shared_ptr<core::ProcessSessionFactory> &session_factory) override; private: - std::chrono::milliseconds time_slice_; + std::chrono::milliseconds time_slice_{}; }; } // namespace org::apache::nifi::minifi diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 398cd52d2..99ebd62c8 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -86,9 +86,14 @@ class SchedulingAgent { logger_->log_trace("Destroying scheduling agent"); } - nonstd::expected<void, std::exception_ptr> onTrigger(core::Processor* processor, + bool processorYields(core::Processor* processor) const; + + nonstd::expected<void, std::exception_ptr> triggerAndCommit(core::Processor* processor, const std::shared_ptr<core::ProcessContext>& process_context, const std::shared_ptr<core::ProcessSessionFactory>& session_factory); + nonstd::expected<bool, std::exception_ptr> trigger(core::Processor* processor, + const std::shared_ptr<core::ProcessContext>& process_context, + const std::shared_ptr<core::ProcessSession>& process_session); void start() { running_ = true; diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h index 14a703948..a831a3377 100644 --- a/libminifi/include/ThreadedSchedulingAgent.h +++ b/libminifi/include/ThreadedSchedulingAgent.h @@ -54,9 +54,10 @@ class ThreadedSchedulingAgent : public SchedulingAgent { void stop() override; - private: + protected: std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger(); + private: std::set<utils::Identifier> processors_running_; // Set just for easy usage }; diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index 87da7d974..912f0bef2 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -67,6 +67,8 @@ class ProcessSession : public ReferenceContainer { void commit(); // Roll Back the session void rollback(); + + nonstd::expected<void, std::exception_ptr> rollbackNoThrow() noexcept; // Get Provenance Report std::shared_ptr<provenance::ProvenanceReporter> getProvenanceReporter() { return provenance_report_; diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index b18eb5f25..299b183ff 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -153,12 +153,12 @@ class Processor : public Connectable, public ConfigurableComponent, public state } void incrementActiveTasks() { - active_tasks_++; + ++active_tasks_; } void decrementActiveTask() { if (active_tasks_ > 0) - active_tasks_--; + --active_tasks_; } void clearActiveTask() { @@ -187,7 +187,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state void initialize() override { } - virtual void onTrigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory); + virtual void triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory); + void trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session); virtual void onTriggerSharedPtr(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& session) { onTrigger(*context, *session); @@ -219,6 +220,10 @@ class Processor : public Connectable, public ConfigurableComponent, public state return metrics_; } + auto getMetrics() const { + return metrics_; + } + static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{}; static constexpr auto OutputAttributes = std::array<OutputAttributeReference, 0>{}; diff --git a/libminifi/include/core/ProcessorMetrics.h b/libminifi/include/core/ProcessorMetrics.h index 63e1f7879..de5c3d157 100644 --- a/libminifi/include/core/ProcessorMetrics.h +++ b/libminifi/include/core/ProcessorMetrics.h @@ -78,7 +78,7 @@ class ProcessorMetrics : public state::response::ResponseNode { }; [[nodiscard]] std::unordered_map<std::string, std::string> getCommonLabels() const; - static const uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10; + static constexpr uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10; std::mutex transferred_relationships_mutex_; std::unordered_map<std::string, size_t> transferred_relationships_; diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp index 905c309ed..2cc5a37d8 100644 --- a/libminifi/src/CronDrivenSchedulingAgent.cpp +++ b/libminifi/src/CronDrivenSchedulingAgent.cpp @@ -19,7 +19,6 @@ */ #include "CronDrivenSchedulingAgent.h" #include <chrono> -#include <memory> #include "core/Processor.h" #include "core/ProcessContext.h" #include "core/ProcessSessionFactory.h" @@ -36,30 +35,28 @@ utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(core::Processor* proces using std::chrono::system_clock; if (this->running_ && processor->isRunning()) { - auto uuid = processor->getUUID(); - auto current_time = date::make_zoned<seconds>(date::current_zone(), time_point_cast<seconds>(system_clock::now())); + const auto uuid = processor->getUUID(); + const auto current_time = date::make_zoned<seconds>(date::current_zone(), time_point_cast<seconds>(system_clock::now())); std::lock_guard<std::mutex> lock(mutex_); schedules_.emplace(uuid, utils::Cron(processor->getCronPeriod())); last_exec_.emplace(uuid, current_time.get_local_time()); - auto last_trigger = last_exec_[uuid]; - auto next_to_last_trigger = schedules_.at(uuid).calculateNextTrigger(last_trigger); + const auto last_trigger = last_exec_[uuid]; + const auto next_to_last_trigger = schedules_.at(uuid).calculateNextTrigger(last_trigger); if (!next_to_last_trigger) return utils::TaskRescheduleInfo::Done(); if (*next_to_last_trigger > current_time.get_local_time()) return utils::TaskRescheduleInfo::RetryIn(*next_to_last_trigger-current_time.get_local_time()); - auto on_trigger_result = this->onTrigger(processor, processContext, sessionFactory); - - if (on_trigger_result) + if (this->triggerAndCommit(processor, processContext, sessionFactory)) last_exec_[uuid] = current_time.get_local_time(); if (processor->isYield()) return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime()); - if (auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time())) + if (const auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time())) return utils::TaskRescheduleInfo::RetryIn(*next_trigger-current_time.get_local_time()); } return utils::TaskRescheduleInfo::Done(); diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp index 38073fa12..7b4aa509b 100644 --- a/libminifi/src/EventDrivenSchedulingAgent.cpp +++ b/libminifi/src/EventDrivenSchedulingAgent.cpp @@ -35,20 +35,64 @@ void EventDrivenSchedulingAgent::schedule(core::Processor* processor) { ThreadedSchedulingAgent::schedule(processor); } -utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, - const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { - if (this->running_) { - auto start_time = std::chrono::steady_clock::now(); - // trigger processor until it has work to do, but no more than the configured nifi.flow.engine.event.driven.time.slice - while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) { - this->onTrigger(processor, processContext, sessionFactory); - if (processor->isYield()) { - return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime()); +utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* processor, + const std::shared_ptr<core::ProcessContext>& process_context, + const std::shared_ptr<core::ProcessSessionFactory>& session_factory) { + if (!this->running_) { + return utils::TaskRescheduleInfo::Done(); + } + if (processorYields(processor)) { + return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime()); + } + + const auto start_time = std::chrono::steady_clock::now(); + // trigger processor while it has work to do, but no more than the configured nifi.flow.engine.event.driven.time.slice + + const auto process_session = session_factory->createSession(); + process_session->setMetrics(processor->getMetrics()); + bool needs_commit = true; + + while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) { + const auto trigger_result = this->trigger(processor, process_context, process_session); + if (!trigger_result) { + try { + std::rethrow_exception(trigger_result.error()); + } catch (const std::exception& exception) { + logger_->log_warn("Caught \"{}\" ({}) during Processor::onTrigger of processor: {} ({})", + exception.what(), typeid(exception).name(), processor->getUUIDStr(), processor->getName()); + needs_commit = false; + break; + } catch (...) { + logger_->log_warn("Caught unknown exception during Processor::onTrigger of processor: {} ({})", processor->getUUIDStr(), processor->getName()); + needs_commit = false; + break; } } - return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available + if (!*trigger_result) { + logger_->log_trace("Processor {} ({}) yielded", processor->getUUIDStr(), processor->getName()); + break; + } } - return utils::TaskRescheduleInfo::Done(); + if (needs_commit) { + try { + process_session->commit(); + } catch (const std::exception& exception) { + logger_->log_warn("Caught \"{}\" ({}) during ProcessSession::commit after triggering processor: {} ({})", + exception.what(), typeid(exception).name(), processor->getUUIDStr(), processor->getName()); + process_session->rollbackNoThrow(); + } catch (...) { + logger_->log_warn("Caught unknown exception during ProcessSession::commit after triggering processor: {} ({})", processor->getUUIDStr(), processor->getName()); + process_session->rollbackNoThrow(); + } + } else { + process_session->rollbackNoThrow(); + } + + if (processor->isYield()) { + return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime()); + } + + return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available } } // namespace org::apache::nifi::minifi diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index 0ec7e6986..f42ca057f 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -36,13 +36,10 @@ bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) { namespace org::apache::nifi::minifi { -nonstd::expected<void, std::exception_ptr> SchedulingAgent::onTrigger(core::Processor* processor, - const std::shared_ptr<core::ProcessContext> &process_context, - const std::shared_ptr<core::ProcessSessionFactory> &session_factory) { - gsl_Expects(processor); +bool SchedulingAgent::processorYields(core::Processor* processor) const { if (processor->isYield()) { logger_->log_debug("Not running {} since it must yield", processor->getName()); - return {}; + return true; } // No need to yield, reset yield expiration to 0 @@ -52,11 +49,22 @@ nonstd::expected<void, std::exception_ptr> SchedulingAgent::onTrigger(core::Proc if (!hasWorkToDo(processor)) { processor->yield(bored_yield_duration); - return {}; + return true; } if (processor->isThrottledByBackpressure()) { logger_->log_debug("backpressure applied because too much outgoing for {} {}", processor->getUUIDStr(), processor->getName()); processor->yield(bored_yield_duration); + return true; + } + + return false; +} + +nonstd::expected<void, std::exception_ptr> SchedulingAgent::triggerAndCommit(core::Processor* processor, + const std::shared_ptr<core::ProcessContext>& process_context, + const std::shared_ptr<core::ProcessSessionFactory>& session_factory) { + gsl_Expects(processor); + if (processorYields(processor)) { return {}; } @@ -74,8 +82,9 @@ nonstd::expected<void, std::exception_ptr> SchedulingAgent::onTrigger(core::Proc processor->incrementActiveTasks(); auto decrement_task = gsl::finally([processor]() { processor->decrementActiveTask(); }); + try { - processor->onTrigger(process_context, session_factory); + processor->triggerAndCommit(process_context, session_factory); } catch (const std::exception& exception) { logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor {} (uuid: {}), type: {}, what: {}", processor->getName(), processor->getUUIDStr(), typeid(exception).name(), exception.what()); @@ -90,6 +99,44 @@ nonstd::expected<void, std::exception_ptr> SchedulingAgent::onTrigger(core::Proc return {}; } +nonstd::expected<bool, std::exception_ptr> SchedulingAgent::trigger(core::Processor* processor, + const std::shared_ptr<core::ProcessContext>& process_context, + const std::shared_ptr<core::ProcessSession>& process_session) { + gsl_Expects(processor); + if (processorYields(processor)) { + return false; + } + + auto schedule_it = scheduled_processors_.end(); + + { + std::lock_guard<std::mutex> lock(watchdog_mtx_); + schedule_it = scheduled_processors_.emplace(processor).first; + } + + const auto guard = gsl::finally([this, &schedule_it](){ + std::lock_guard<std::mutex> lock(watchdog_mtx_); + scheduled_processors_.erase(schedule_it); + }); + + processor->incrementActiveTasks(); + auto decrement_task = gsl::finally([processor]() { processor->decrementActiveTask(); }); + try { + processor->trigger(process_context, process_session); + } catch (const std::exception& exception) { + logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor {} (uuid: {}), type: {}, what: {}", + processor->getName(), processor->getUUIDStr(), typeid(exception).name(), exception.what()); + processor->yield(admin_yield_duration_); + return nonstd::make_unexpected(std::current_exception()); + } catch (...) { + logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor {} (uuid: {}), type: {}", + processor->getName(), processor->getUUIDStr(), getCurrentExceptionTypeName()); + processor->yield(admin_yield_duration_); + return nonstd::make_unexpected(std::current_exception()); + } + return true; +} + void SchedulingAgent::watchDogFunc() { std::lock_guard<std::mutex> lock(watchdog_mtx_); auto now = std::chrono::steady_clock::now(); diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp index 2cde8d3c8..bbc0a8324 100644 --- a/libminifi/src/TimerDrivenSchedulingAgent.cpp +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -28,11 +28,11 @@ namespace org::apache::nifi::minifi { utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { if (this->running_ && processor->isRunning()) { - auto trigger_start_time = std::chrono::steady_clock::now(); - this->onTrigger(processor, processContext, sessionFactory); + const auto trigger_start_time = std::chrono::steady_clock::now(); + this->triggerAndCommit(processor, processContext, sessionFactory); - auto next_scheduled_run = trigger_start_time + processor->getSchedulingPeriod(); - auto yield_expiration_time = processor->getYieldExpirationTime(); + const auto next_scheduled_run = trigger_start_time + processor->getSchedulingPeriod(); + const auto yield_expiration_time = processor->getYieldExpirationTime(); return utils::TaskRescheduleInfo::RetryAfter(std::max(next_scheduled_run, yield_expiration_time)); } return utils::TaskRescheduleInfo::Done(); diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 5a2460ec8..786872600 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -78,7 +78,8 @@ ProcessSession::ProcessSession(std::shared_ptr<ProcessContext> processContext) ProcessSession::~ProcessSession() { if (stateManager_ && stateManager_->isTransactionInProgress()) { - logger_->log_error("Session has ended without decision on state (commit or rollback)."); + logger_->log_critical("Session has ended without decision on state (commit or rollback)."); + std::terminate(); } removeReferences(); } @@ -803,6 +804,7 @@ ProcessSession::RouteResult ProcessSession::routeFlowFile(const std::shared_ptr< } void ProcessSession::commit() { + const auto commit_start_time = std::chrono::steady_clock::now(); try { std::unordered_map<std::string, TransferMetrics> transfers; auto increaseTransferMetrics = [&](const FlowFile& record, const Relationship& relationship) { @@ -916,6 +918,8 @@ void ProcessSession::commit() { // persistent the provenance report this->provenance_report_->commit(); logger_->log_debug("ProcessSession committed for {}", process_context_->getProcessorNode()->getName()); + if (metrics_) + metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - commit_start_time)); } catch (const std::exception& exception) { logger_->log_debug("Caught Exception during process session commit, type: {}, what: {}", typeid(exception).name(), exception.what()); throw; @@ -981,6 +985,15 @@ void ProcessSession::rollback() { } } +nonstd::expected<void, std::exception_ptr> ProcessSession::rollbackNoThrow() noexcept { + try { + rollback(); + return {}; + } catch(...) { + return nonstd::make_unexpected(std::current_exception()); + } +} + void ProcessSession::persistFlowFilesBeforeTransfer( std::map<Connectable*, std::vector<std::shared_ptr<core::FlowFile> > >& transactionMap, const std::map<utils::Identifier, FlowFileUpdate>& modifiedFlowFiles) { diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index feabc33d9..79d8e43f2 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -126,7 +126,7 @@ bool Processor::addConnection(Connectable* conn) { if (uuid_ == destUUID) { // Connection is destination to the current processor - if (incoming_connections_.find(connection) == incoming_connections_.end()) { + if (!incoming_connections_.contains(connection)) { incoming_connections_.insert(connection); connection->setDestination(this); logger_->log_debug("Add connection {} into Processor {} incoming connection", connection->getName(), name_); @@ -142,7 +142,7 @@ bool Processor::addConnection(Connectable* conn) { if (it != outgoing_connections_.end()) { // We already has connection for this relationship std::set<Connectable*> existedConnection = it->second; - if (existedConnection.find(connection) == existedConnection.end()) { + if (!existedConnection.contains(connection)) { // We do not have the same connection for this relationship yet existedConnection.insert(connection); connection->setSource(this); @@ -167,44 +167,43 @@ bool Processor::addConnection(Connectable* conn) { bool Processor::flowFilesOutGoingFull() const { std::lock_guard<std::mutex> lock(mutex_); - for (const auto& connection_pair : outgoing_connections_) { - // We already has connection for this relationship - std::set<Connectable*> existedConnection = connection_pair.second; - const bool has_full_connection = std::any_of(begin(existedConnection), end(existedConnection), [](const Connectable* conn) { - auto connection = dynamic_cast<const Connection*>(conn); + for (const auto& [_name, existed_connection] : outgoing_connections_) { + if (ranges::any_of(existed_connection, [](const Connectable* conn) { + const auto connection = dynamic_cast<const Connection*>(conn); return connection && connection->backpressureThresholdReached(); - }); - if (has_full_connection) { return true; } + })) { + return true; + } } return false; } -void Processor::onTrigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory) { - ++metrics_->iterations; - auto session = session_factory->createSession(); - session->setMetrics(metrics_); - +void Processor::triggerAndCommit(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSessionFactory>& session_factory) { + const auto process_session = session_factory->createSession(); + process_session->setMetrics(metrics_); try { - // Call the virtual trigger function - auto start = std::chrono::steady_clock::now(); - onTriggerSharedPtr(context, session); - metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start)); - start = std::chrono::steady_clock::now(); - session->commit(); - metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start)); + trigger(context, process_session); + process_session->commit(); } catch (const std::exception& exception) { logger_->log_warn("Caught \"{}\" ({}) during Processor::onTrigger of processor: {} ({})", exception.what(), typeid(exception).name(), getUUIDStr(), getName()); - session->rollback(); + process_session->rollback(); throw; } catch (...) { logger_->log_warn("Caught unknown exception during Processor::onTrigger of processor: {} ({})", getUUIDStr(), getName()); - session->rollback(); + process_session->rollback(); throw; } } +void Processor::trigger(const std::shared_ptr<ProcessContext>& context, const std::shared_ptr<ProcessSession>& process_session) { + ++metrics_->iterations; + const auto start = std::chrono::steady_clock::now(); + onTriggerSharedPtr(context, process_session); + metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start)); +} + bool Processor::isWorkAvailable() { // We have work if any incoming connection has work std::lock_guard<std::mutex> lock(mutex_); @@ -280,7 +279,7 @@ bool Processor::partOfCycle(Connection* conn) { if (it == source->reachable_processors_.end()) { return false; } - return it->second.find(source) != it->second.end(); + return it->second.contains(source); } bool Processor::isThrottledByBackpressure() const { diff --git a/libminifi/test/libtest/unit/TestBase.cpp b/libminifi/test/libtest/unit/TestBase.cpp index 2a6402d8a..0cf9efa5e 100644 --- a/libminifi/test/libtest/unit/TestBase.cpp +++ b/libminifi/test/libtest/unit/TestBase.cpp @@ -535,7 +535,7 @@ bool TestPlan::runProcessor(size_t target_location, const PreTriggerVerifier& ve process_sessions_.push_back(current_session); }); logger_->log_info("Running {}", processor->getName()); - processor->onTrigger(context, session_factory); + processor->triggerAndCommit(context, session_factory); } return gsl::narrow<size_t>(target_location + 1) < processor_queue_.size();
