This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit fe14bed424697d3cbfba184e4d18743ea67413f8
Author: Martin Zink <[email protected]>
AuthorDate: Tue Jul 16 16:01:15 2024 +0000

    MINIFICPP-2398 Only commit once per time_slice in EventDrivenSchedulingAgent
    
    Closes #1815
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 docker/test/integration/features/kafka.feature |  6 +++
 extensions/rocksdb-repos/tests/SwapTests.cpp   |  3 +-
 libminifi/include/EventDrivenSchedulingAgent.h |  6 +--
 libminifi/include/SchedulingAgent.h            |  7 ++-
 libminifi/include/ThreadedSchedulingAgent.h    |  3 +-
 libminifi/include/core/ProcessSession.h        |  2 +
 libminifi/include/core/Processor.h             | 11 +++--
 libminifi/include/core/ProcessorMetrics.h      |  2 +-
 libminifi/src/CronDrivenSchedulingAgent.cpp    | 15 +++---
 libminifi/src/EventDrivenSchedulingAgent.cpp   | 66 +++++++++++++++++++++-----
 libminifi/src/SchedulingAgent.cpp              | 61 +++++++++++++++++++++---
 libminifi/src/TimerDrivenSchedulingAgent.cpp   |  8 ++--
 libminifi/src/core/ProcessSession.cpp          | 15 +++++-
 libminifi/src/core/Processor.cpp               | 47 +++++++++---------
 libminifi/test/libtest/unit/TestBase.cpp       |  2 +-
 15 files changed, 186 insertions(+), 68 deletions(-)

diff --git a/docker/test/integration/features/kafka.feature 
b/docker/test/integration/features/kafka.feature
index 7fcae658f..02aca2cc0 100644
--- a/docker/test/integration/features/kafka.feature
+++ b/docker/test/integration/features/kafka.feature
@@ -46,6 +46,7 @@ Feature: Sending data to using Kafka streaming platform using 
PublishKafka
     And the "success" relationship of the GetFile processor is connected to 
the UpdateAttribute
     And the "success" relationship of the UpdateAttribute processor is 
connected to the PublishKafka
     And the "success" relationship of the PublishKafka processor is connected 
to the PutFile
+    And the "failure" relationship of the PublishKafka processor is connected 
to the PublishKafka
 
     And a kafka broker is set up in correspondence with the PublishKafka
 
@@ -93,6 +94,7 @@ Feature: Sending data to using Kafka streaming platform using 
PublishKafka
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And the "success" relationship of the GetFile processor is connected to 
the PublishKafka
     And the "success" relationship of the PublishKafka processor is connected 
to the PutFile
+    And the "failure" relationship of the PublishKafka processor is connected 
to the PublishKafka
 
     And a kafka broker is set up in correspondence with the PublishKafka
 
@@ -120,6 +122,7 @@ Feature: Sending data to using Kafka streaming platform 
using PublishKafka
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And the "success" relationship of the GetFile processor is connected to 
the PublishKafka
     And the "success" relationship of the PublishKafka processor is connected 
to the PutFile
+    And the "failure" relationship of the PublishKafka processor is connected 
to the PublishKafka
 
     And a kafka broker is set up in correspondence with the PublishKafka
 
@@ -150,6 +153,7 @@ Feature: Sending data to using Kafka streaming platform 
using PublishKafka
     And a PutFile processor with the "Directory" property set to "/tmp/output"
     And the "success" relationship of the GetFile processor is connected to 
the PublishKafka
     And the "success" relationship of the PublishKafka processor is connected 
to the PutFile
+    And the "failure" relationship of the PublishKafka processor is connected 
to the PublishKafka
 
     And a kafka broker is set up in correspondence with the PublishKafka
 
@@ -178,6 +182,7 @@ Feature: Sending data to using Kafka streaming platform 
using PublishKafka
     And an ssl context service is set up for PublishKafka
     And the "success" relationship of the GetFile processor is connected to 
the PublishKafka
     And the "success" relationship of the PublishKafka processor is connected 
to the PutFile
+    And the "failure" relationship of the PublishKafka processor is connected 
to the PublishKafka
 
     And a kafka broker is set up in correspondence with the PublishKafka
 
@@ -203,6 +208,7 @@ Feature: Sending data to using Kafka streaming platform 
using PublishKafka
     And an ssl context service is set up for PublishKafka
     And the "success" relationship of the GetFile processor is connected to 
the PublishKafka
     And the "success" relationship of the PublishKafka processor is connected 
to the PutFile
+    And the "failure" relationship of the PublishKafka processor is connected 
to the PublishKafka
 
     And a kafka broker is set up in correspondence with the PublishKafka
 
diff --git a/extensions/rocksdb-repos/tests/SwapTests.cpp 
b/extensions/rocksdb-repos/tests/SwapTests.cpp
index 62822e8d7..5e04bf5a6 100644
--- a/extensions/rocksdb-repos/tests/SwapTests.cpp
+++ b/extensions/rocksdb-repos/tests/SwapTests.cpp
@@ -33,7 +33,6 @@ namespace org::apache::nifi::minifi::test {
 class OutputProcessor : public core::Processor {
  public:
   using core::Processor::Processor;
-  using core::Processor::onTrigger;
 
   static constexpr const char* Description = "Processor used for testing 
cycles";
   static constexpr auto Properties = std::array<core::PropertyReference, 0>{};
@@ -112,7 +111,7 @@ TEST_CASE("Connection will on-demand swap flow files") {
   auto session_factory = 
std::make_shared<core::ProcessSessionFactory>(context);
 
   for (size_t i = 0; i < 200; ++i) {
-    processor->onTrigger(context, session_factory);
+    processor->triggerAndCommit(context, session_factory);
   }
 
   REQUIRE(connection->getQueueSize() == processor->flow_files_.size());
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h 
b/libminifi/include/EventDrivenSchedulingAgent.h
index 352212179..2a42a6933 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -52,11 +52,11 @@ class EventDrivenSchedulingAgent : public 
ThreadedSchedulingAgent {
 
   void schedule(core::Processor* processor) override;
 
-  utils::TaskRescheduleInfo run(core::Processor* processor, const 
std::shared_ptr<core::ProcessContext> &processContext,
-      const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) 
override;
+  utils::TaskRescheduleInfo run(core::Processor* processor, const 
std::shared_ptr<core::ProcessContext> &process_context,
+      const std::shared_ptr<core::ProcessSessionFactory> &session_factory) 
override;
 
  private:
-  std::chrono::milliseconds time_slice_;
+  std::chrono::milliseconds time_slice_{};
 };
 
 }  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/SchedulingAgent.h 
b/libminifi/include/SchedulingAgent.h
index 398cd52d2..99ebd62c8 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -86,9 +86,14 @@ class SchedulingAgent {
     logger_->log_trace("Destroying scheduling agent");
   }
 
-  nonstd::expected<void, std::exception_ptr> onTrigger(core::Processor* 
processor,
+  bool processorYields(core::Processor* processor) const;
+
+  nonstd::expected<void, std::exception_ptr> triggerAndCommit(core::Processor* 
processor,
       const std::shared_ptr<core::ProcessContext>& process_context,
       const std::shared_ptr<core::ProcessSessionFactory>& session_factory);
+  nonstd::expected<bool, std::exception_ptr> trigger(core::Processor* 
processor,
+      const std::shared_ptr<core::ProcessContext>& process_context,
+      const std::shared_ptr<core::ProcessSession>& process_session);
 
   void start() {
     running_ = true;
diff --git a/libminifi/include/ThreadedSchedulingAgent.h 
b/libminifi/include/ThreadedSchedulingAgent.h
index 14a703948..a831a3377 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -54,9 +54,10 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
 
   void stop() override;
 
- private:
+ protected:
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger();
 
+ private:
   std::set<utils::Identifier> processors_running_;  // Set just for easy usage
 };
 
diff --git a/libminifi/include/core/ProcessSession.h 
b/libminifi/include/core/ProcessSession.h
index 87da7d974..912f0bef2 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -67,6 +67,8 @@ class ProcessSession : public ReferenceContainer {
   void commit();
   // Roll Back the session
   void rollback();
+
+  nonstd::expected<void, std::exception_ptr> rollbackNoThrow() noexcept;
   // Get Provenance Report
   std::shared_ptr<provenance::ProvenanceReporter> getProvenanceReporter() {
     return provenance_report_;
diff --git a/libminifi/include/core/Processor.h 
b/libminifi/include/core/Processor.h
index b18eb5f25..299b183ff 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -153,12 +153,12 @@ class Processor : public Connectable, public 
ConfigurableComponent, public state
   }
 
   void incrementActiveTasks() {
-    active_tasks_++;
+    ++active_tasks_;
   }
 
   void decrementActiveTask() {
     if (active_tasks_ > 0)
-      active_tasks_--;
+      --active_tasks_;
   }
 
   void clearActiveTask() {
@@ -187,7 +187,8 @@ class Processor : public Connectable, public 
ConfigurableComponent, public state
   void initialize() override {
   }
 
-  virtual void onTrigger(const std::shared_ptr<ProcessContext>& context, const 
std::shared_ptr<ProcessSessionFactory>& session_factory);
+  virtual void triggerAndCommit(const std::shared_ptr<ProcessContext>& 
context, const std::shared_ptr<ProcessSessionFactory>& session_factory);
+  void trigger(const std::shared_ptr<ProcessContext>& context, const 
std::shared_ptr<ProcessSession>& process_session);
 
   virtual void onTriggerSharedPtr(const std::shared_ptr<ProcessContext>& 
context, const std::shared_ptr<ProcessSession>& session) {
     onTrigger(*context, *session);
@@ -219,6 +220,10 @@ class Processor : public Connectable, public 
ConfigurableComponent, public state
     return metrics_;
   }
 
+  auto getMetrics() const {
+    return metrics_;
+  }
+
   static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{};
 
   static constexpr auto OutputAttributes = 
std::array<OutputAttributeReference, 0>{};
diff --git a/libminifi/include/core/ProcessorMetrics.h 
b/libminifi/include/core/ProcessorMetrics.h
index 63e1f7879..de5c3d157 100644
--- a/libminifi/include/core/ProcessorMetrics.h
+++ b/libminifi/include/core/ProcessorMetrics.h
@@ -78,7 +78,7 @@ class ProcessorMetrics : public state::response::ResponseNode 
{
   };
 
   [[nodiscard]] std::unordered_map<std::string, std::string> getCommonLabels() 
const;
-  static const uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10;
+  static constexpr uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10;
 
   std::mutex transferred_relationships_mutex_;
   std::unordered_map<std::string, size_t> transferred_relationships_;
diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp 
b/libminifi/src/CronDrivenSchedulingAgent.cpp
index 905c309ed..2cc5a37d8 100644
--- a/libminifi/src/CronDrivenSchedulingAgent.cpp
+++ b/libminifi/src/CronDrivenSchedulingAgent.cpp
@@ -19,7 +19,6 @@
  */
 #include "CronDrivenSchedulingAgent.h"
 #include <chrono>
-#include <memory>
 #include "core/Processor.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSessionFactory.h"
@@ -36,30 +35,28 @@ utils::TaskRescheduleInfo 
CronDrivenSchedulingAgent::run(core::Processor* proces
   using std::chrono::system_clock;
 
   if (this->running_ && processor->isRunning()) {
-    auto uuid = processor->getUUID();
-    auto current_time = date::make_zoned<seconds>(date::current_zone(), 
time_point_cast<seconds>(system_clock::now()));
+    const auto uuid = processor->getUUID();
+    const auto current_time = date::make_zoned<seconds>(date::current_zone(), 
time_point_cast<seconds>(system_clock::now()));
     std::lock_guard<std::mutex> lock(mutex_);
 
     schedules_.emplace(uuid, utils::Cron(processor->getCronPeriod()));
     last_exec_.emplace(uuid, current_time.get_local_time());
 
-    auto last_trigger = last_exec_[uuid];
-    auto next_to_last_trigger = 
schedules_.at(uuid).calculateNextTrigger(last_trigger);
+    const auto last_trigger = last_exec_[uuid];
+    const auto next_to_last_trigger = 
schedules_.at(uuid).calculateNextTrigger(last_trigger);
     if (!next_to_last_trigger)
       return utils::TaskRescheduleInfo::Done();
 
     if (*next_to_last_trigger > current_time.get_local_time())
       return 
utils::TaskRescheduleInfo::RetryIn(*next_to_last_trigger-current_time.get_local_time());
 
-    auto on_trigger_result = this->onTrigger(processor, processContext, 
sessionFactory);
-
-    if (on_trigger_result)
+    if (this->triggerAndCommit(processor, processContext, sessionFactory))
       last_exec_[uuid] = current_time.get_local_time();
 
     if (processor->isYield())
       return 
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
 
-    if (auto next_trigger = 
schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time()))
+    if (const auto next_trigger = 
schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time()))
       return 
utils::TaskRescheduleInfo::RetryIn(*next_trigger-current_time.get_local_time());
   }
   return utils::TaskRescheduleInfo::Done();
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp 
b/libminifi/src/EventDrivenSchedulingAgent.cpp
index 38073fa12..7b4aa509b 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -35,20 +35,64 @@ void EventDrivenSchedulingAgent::schedule(core::Processor* 
processor) {
   ThreadedSchedulingAgent::schedule(processor);
 }
 
-utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* 
processor, const std::shared_ptr<core::ProcessContext> &processContext,
-                                         const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  if (this->running_) {
-    auto start_time = std::chrono::steady_clock::now();
-    // trigger processor until it has work to do, but no more than the 
configured nifi.flow.engine.event.driven.time.slice
-    while (processor->isRunning() && (std::chrono::steady_clock::now() - 
start_time < time_slice_)) {
-      this->onTrigger(processor, processContext, sessionFactory);
-      if (processor->isYield()) {
-        return 
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
+utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* 
processor,
+    const std::shared_ptr<core::ProcessContext>& process_context,
+    const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+  if (!this->running_) {
+    return utils::TaskRescheduleInfo::Done();
+  }
+  if (processorYields(processor)) {
+    return 
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
+  }
+
+  const auto start_time = std::chrono::steady_clock::now();
+  // trigger processor while it has work to do, but no more than the 
configured nifi.flow.engine.event.driven.time.slice
+
+  const auto process_session = session_factory->createSession();
+  process_session->setMetrics(processor->getMetrics());
+  bool needs_commit = true;
+
+  while (processor->isRunning() && (std::chrono::steady_clock::now() - 
start_time < time_slice_)) {
+    const auto trigger_result = this->trigger(processor, process_context, 
process_session);
+    if (!trigger_result) {
+      try {
+        std::rethrow_exception(trigger_result.error());
+      } catch (const std::exception& exception) {
+        logger_->log_warn("Caught \"{}\" ({}) during Processor::onTrigger of 
processor: {} ({})",
+            exception.what(), typeid(exception).name(), 
processor->getUUIDStr(), processor->getName());
+        needs_commit = false;
+        break;
+      } catch (...) {
+        logger_->log_warn("Caught unknown exception during 
Processor::onTrigger of processor: {} ({})", processor->getUUIDStr(), 
processor->getName());
+        needs_commit = false;
+        break;
       }
     }
-    return utils::TaskRescheduleInfo::RetryImmediately();  // Let's continue 
work as soon as a thread is available
+    if (!*trigger_result) {
+      logger_->log_trace("Processor {} ({}) yielded", processor->getUUIDStr(), 
processor->getName());
+      break;
+    }
   }
-  return utils::TaskRescheduleInfo::Done();
+  if (needs_commit) {
+    try {
+      process_session->commit();
+    } catch (const std::exception& exception) {
+      logger_->log_warn("Caught \"{}\" ({}) during ProcessSession::commit 
after triggering processor: {} ({})",
+      exception.what(), typeid(exception).name(), processor->getUUIDStr(), 
processor->getName());
+      process_session->rollbackNoThrow();
+    } catch (...) {
+      logger_->log_warn("Caught unknown exception during 
ProcessSession::commit after triggering processor: {} ({})", 
processor->getUUIDStr(), processor->getName());
+      process_session->rollbackNoThrow();
+    }
+  } else {
+    process_session->rollbackNoThrow();
+  }
+
+  if (processor->isYield()) {
+    return 
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
+  }
+
+  return utils::TaskRescheduleInfo::RetryImmediately();  // Let's continue 
work as soon as a thread is available
 }
 
 }  // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/SchedulingAgent.cpp 
b/libminifi/src/SchedulingAgent.cpp
index 0ec7e6986..f42ca057f 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -36,13 +36,10 @@ bool 
hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) {
 
 namespace org::apache::nifi::minifi {
 
-nonstd::expected<void, std::exception_ptr> 
SchedulingAgent::onTrigger(core::Processor* processor,
-    const std::shared_ptr<core::ProcessContext> &process_context,
-    const std::shared_ptr<core::ProcessSessionFactory> &session_factory) {
-  gsl_Expects(processor);
+bool SchedulingAgent::processorYields(core::Processor* processor) const {
   if (processor->isYield()) {
     logger_->log_debug("Not running {} since it must yield", 
processor->getName());
-    return {};
+    return true;
   }
 
   // No need to yield, reset yield expiration to 0
@@ -52,11 +49,22 @@ nonstd::expected<void, std::exception_ptr> 
SchedulingAgent::onTrigger(core::Proc
 
   if (!hasWorkToDo(processor)) {
     processor->yield(bored_yield_duration);
-    return {};
+    return true;
   }
   if (processor->isThrottledByBackpressure()) {
     logger_->log_debug("backpressure applied because too much outgoing for {} 
{}", processor->getUUIDStr(), processor->getName());
     processor->yield(bored_yield_duration);
+    return true;
+  }
+
+  return false;
+}
+
+nonstd::expected<void, std::exception_ptr> 
SchedulingAgent::triggerAndCommit(core::Processor* processor,
+    const std::shared_ptr<core::ProcessContext>& process_context,
+    const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+  gsl_Expects(processor);
+  if (processorYields(processor)) {
     return {};
   }
 
@@ -74,8 +82,9 @@ nonstd::expected<void, std::exception_ptr> 
SchedulingAgent::onTrigger(core::Proc
 
   processor->incrementActiveTasks();
   auto decrement_task = gsl::finally([processor]() { 
processor->decrementActiveTask(); });
+
   try {
-    processor->onTrigger(process_context, session_factory);
+    processor->triggerAndCommit(process_context, session_factory);
   } catch (const std::exception& exception) {
     logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of 
processor {} (uuid: {}), type: {}, what: {}",
         processor->getName(), processor->getUUIDStr(), 
typeid(exception).name(), exception.what());
@@ -90,6 +99,44 @@ nonstd::expected<void, std::exception_ptr> 
SchedulingAgent::onTrigger(core::Proc
   return {};
 }
 
+nonstd::expected<bool, std::exception_ptr> 
SchedulingAgent::trigger(core::Processor* processor,
+    const std::shared_ptr<core::ProcessContext>& process_context,
+    const std::shared_ptr<core::ProcessSession>& process_session) {
+  gsl_Expects(processor);
+  if (processorYields(processor)) {
+    return false;
+  }
+
+  auto schedule_it = scheduled_processors_.end();
+
+  {
+    std::lock_guard<std::mutex> lock(watchdog_mtx_);
+    schedule_it = scheduled_processors_.emplace(processor).first;
+  }
+
+  const auto guard = gsl::finally([this, &schedule_it](){
+    std::lock_guard<std::mutex> lock(watchdog_mtx_);
+    scheduled_processors_.erase(schedule_it);
+  });
+
+  processor->incrementActiveTasks();
+  auto decrement_task = gsl::finally([processor]() { 
processor->decrementActiveTask(); });
+  try {
+    processor->trigger(process_context, process_session);
+  } catch (const std::exception& exception) {
+    logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of 
processor {} (uuid: {}), type: {}, what: {}",
+        processor->getName(), processor->getUUIDStr(), 
typeid(exception).name(), exception.what());
+    processor->yield(admin_yield_duration_);
+    return nonstd::make_unexpected(std::current_exception());
+  } catch (...) {
+    logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of 
processor {} (uuid: {}), type: {}",
+        processor->getName(), processor->getUUIDStr(), 
getCurrentExceptionTypeName());
+    processor->yield(admin_yield_duration_);
+    return nonstd::make_unexpected(std::current_exception());
+  }
+  return true;
+}
+
 void SchedulingAgent::watchDogFunc() {
   std::lock_guard<std::mutex> lock(watchdog_mtx_);
   auto now = std::chrono::steady_clock::now();
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp 
b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 2cde8d3c8..bbc0a8324 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -28,11 +28,11 @@ namespace org::apache::nifi::minifi {
 utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* 
processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                          const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (this->running_ && processor->isRunning()) {
-    auto trigger_start_time = std::chrono::steady_clock::now();
-    this->onTrigger(processor, processContext, sessionFactory);
+    const auto trigger_start_time = std::chrono::steady_clock::now();
+    this->triggerAndCommit(processor, processContext, sessionFactory);
 
-    auto next_scheduled_run = trigger_start_time + 
processor->getSchedulingPeriod();
-    auto yield_expiration_time = processor->getYieldExpirationTime();
+    const auto next_scheduled_run = trigger_start_time + 
processor->getSchedulingPeriod();
+    const auto yield_expiration_time = processor->getYieldExpirationTime();
     return utils::TaskRescheduleInfo::RetryAfter(std::max(next_scheduled_run, 
yield_expiration_time));
   }
   return utils::TaskRescheduleInfo::Done();
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 5a2460ec8..786872600 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -78,7 +78,8 @@ 
ProcessSession::ProcessSession(std::shared_ptr<ProcessContext> processContext)
 
 ProcessSession::~ProcessSession() {
   if (stateManager_ && stateManager_->isTransactionInProgress()) {
-    logger_->log_error("Session has ended without decision on state (commit or 
rollback).");
+    logger_->log_critical("Session has ended without decision on state (commit 
or rollback).");
+    std::terminate();
   }
   removeReferences();
 }
@@ -803,6 +804,7 @@ ProcessSession::RouteResult 
ProcessSession::routeFlowFile(const std::shared_ptr<
 }
 
 void ProcessSession::commit() {
+  const auto commit_start_time = std::chrono::steady_clock::now();
   try {
     std::unordered_map<std::string, TransferMetrics> transfers;
       auto increaseTransferMetrics = [&](const FlowFile& record, const 
Relationship& relationship) {
@@ -916,6 +918,8 @@ void ProcessSession::commit() {
     // persistent the provenance report
     this->provenance_report_->commit();
     logger_->log_debug("ProcessSession committed for {}", 
process_context_->getProcessorNode()->getName());
+    if (metrics_)
+      
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - commit_start_time));
   } catch (const std::exception& exception) {
     logger_->log_debug("Caught Exception during process session commit, type: 
{}, what: {}", typeid(exception).name(), exception.what());
     throw;
@@ -981,6 +985,15 @@ void ProcessSession::rollback() {
   }
 }
 
+nonstd::expected<void, std::exception_ptr> ProcessSession::rollbackNoThrow() 
noexcept {
+  try {
+    rollback();
+    return {};
+  } catch(...) {
+    return nonstd::make_unexpected(std::current_exception());
+  }
+}
+
 void ProcessSession::persistFlowFilesBeforeTransfer(
     std::map<Connectable*, std::vector<std::shared_ptr<core::FlowFile> > >& 
transactionMap,
     const std::map<utils::Identifier, FlowFileUpdate>& modifiedFlowFiles) {
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index feabc33d9..79d8e43f2 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -126,7 +126,7 @@ bool Processor::addConnection(Connectable* conn) {
 
   if (uuid_ == destUUID) {
     // Connection is destination to the current processor
-    if (incoming_connections_.find(connection) == incoming_connections_.end()) 
{
+    if (!incoming_connections_.contains(connection)) {
       incoming_connections_.insert(connection);
       connection->setDestination(this);
       logger_->log_debug("Add connection {} into Processor {} incoming 
connection", connection->getName(), name_);
@@ -142,7 +142,7 @@ bool Processor::addConnection(Connectable* conn) {
       if (it != outgoing_connections_.end()) {
         // We already has connection for this relationship
         std::set<Connectable*> existedConnection = it->second;
-        if (existedConnection.find(connection) == existedConnection.end()) {
+        if (!existedConnection.contains(connection)) {
           // We do not have the same connection for this relationship yet
           existedConnection.insert(connection);
           connection->setSource(this);
@@ -167,44 +167,43 @@ bool Processor::addConnection(Connectable* conn) {
 bool Processor::flowFilesOutGoingFull() const {
   std::lock_guard<std::mutex> lock(mutex_);
 
-  for (const auto& connection_pair : outgoing_connections_) {
-    // We already has connection for this relationship
-    std::set<Connectable*> existedConnection = connection_pair.second;
-    const bool has_full_connection = std::any_of(begin(existedConnection), 
end(existedConnection), [](const Connectable* conn) {
-      auto connection = dynamic_cast<const Connection*>(conn);
+  for (const auto& [_name, existed_connection] : outgoing_connections_) {
+    if (ranges::any_of(existed_connection, [](const Connectable* conn) {
+      const auto connection = dynamic_cast<const Connection*>(conn);
       return connection && connection->backpressureThresholdReached();
-    });
-    if (has_full_connection) { return true; }
+    })) {
+      return true;
+    }
   }
 
   return false;
 }
 
-void Processor::onTrigger(const std::shared_ptr<ProcessContext>& context, 
const std::shared_ptr<ProcessSessionFactory>& session_factory) {
-  ++metrics_->iterations;
-  auto session = session_factory->createSession();
-  session->setMetrics(metrics_);
-
+void Processor::triggerAndCommit(const std::shared_ptr<ProcessContext>& 
context, const std::shared_ptr<ProcessSessionFactory>& session_factory) {
+  const auto process_session = session_factory->createSession();
+  process_session->setMetrics(metrics_);
   try {
-    // Call the virtual trigger function
-    auto start = std::chrono::steady_clock::now();
-    onTriggerSharedPtr(context, session);
-    
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start));
-    start = std::chrono::steady_clock::now();
-    session->commit();
-    
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start));
+    trigger(context, process_session);
+    process_session->commit();
   } catch (const std::exception& exception) {
     logger_->log_warn("Caught \"{}\" ({}) during Processor::onTrigger of 
processor: {} ({})",
         exception.what(), typeid(exception).name(), getUUIDStr(), getName());
-    session->rollback();
+    process_session->rollback();
     throw;
   } catch (...) {
     logger_->log_warn("Caught unknown exception during Processor::onTrigger of 
processor: {} ({})", getUUIDStr(), getName());
-    session->rollback();
+    process_session->rollback();
     throw;
   }
 }
 
+void Processor::trigger(const std::shared_ptr<ProcessContext>& context, const 
std::shared_ptr<ProcessSession>& process_session) {
+  ++metrics_->iterations;
+  const auto start = std::chrono::steady_clock::now();
+  onTriggerSharedPtr(context, process_session);
+  
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start));
+}
+
 bool Processor::isWorkAvailable() {
   // We have work if any incoming connection has work
   std::lock_guard<std::mutex> lock(mutex_);
@@ -280,7 +279,7 @@ bool Processor::partOfCycle(Connection* conn) {
   if (it == source->reachable_processors_.end()) {
     return false;
   }
-  return it->second.find(source) != it->second.end();
+  return it->second.contains(source);
 }
 
 bool Processor::isThrottledByBackpressure() const {
diff --git a/libminifi/test/libtest/unit/TestBase.cpp 
b/libminifi/test/libtest/unit/TestBase.cpp
index 2a6402d8a..0cf9efa5e 100644
--- a/libminifi/test/libtest/unit/TestBase.cpp
+++ b/libminifi/test/libtest/unit/TestBase.cpp
@@ -535,7 +535,7 @@ bool TestPlan::runProcessor(size_t target_location, const 
PreTriggerVerifier& ve
       process_sessions_.push_back(current_session);
     });
     logger_->log_info("Running {}", processor->getName());
-    processor->onTrigger(context, session_factory);
+    processor->triggerAndCommit(context, session_factory);
   }
 
   return gsl::narrow<size_t>(target_location + 1) < processor_queue_.size();

Reply via email to