szaszm commented on code in PR #1815:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1815#discussion_r1670587482


##########
libminifi/src/EventDrivenSchedulingAgent.cpp:
##########
@@ -35,20 +35,66 @@ void EventDrivenSchedulingAgent::schedule(core::Processor* 
processor) {
   ThreadedSchedulingAgent::schedule(processor);
 }
 
-utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* 
processor, const std::shared_ptr<core::ProcessContext> &processContext,
-                                         const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  if (this->running_) {
-    auto start_time = std::chrono::steady_clock::now();
-    // trigger processor until it has work to do, but no more than the 
configured nifi.flow.engine.event.driven.time.slice
-    while (processor->isRunning() && (std::chrono::steady_clock::now() - 
start_time < time_slice_)) {
-      this->onTrigger(processor, processContext, sessionFactory);
-      if (processor->isYield()) {
-        return 
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
+utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* 
processor,
+    const std::shared_ptr<core::ProcessContext>& process_context,
+    const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+  if (!this->running_) {
+    return utils::TaskRescheduleInfo::Done();
+  }
+  if (processorYields(processor)) {
+    return 
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
+  }
+
+  const auto start_time = std::chrono::steady_clock::now();
+  // trigger processor until it has work to do, but no more than the 
configured nifi.flow.engine.event.driven.time.slice

Review Comment:
   ```suggestion
     // trigger processor while it has work to do, but no more than the 
configured nifi.flow.engine.event.driven.time.slice
   ```



##########
libminifi/src/EventDrivenSchedulingAgent.cpp:
##########
@@ -35,20 +35,64 @@ void EventDrivenSchedulingAgent::schedule(core::Processor* 
processor) {
   ThreadedSchedulingAgent::schedule(processor);
 }
 
-utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* 
processor, const std::shared_ptr<core::ProcessContext> &processContext,
-                                         const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
-  if (this->running_) {
-    auto start_time = std::chrono::steady_clock::now();
-    // trigger processor until it has work to do, but no more than the 
configured nifi.flow.engine.event.driven.time.slice
-    while (processor->isRunning() && (std::chrono::steady_clock::now() - 
start_time < time_slice_)) {
-      this->onTrigger(processor, processContext, sessionFactory);
-      if (processor->isYield()) {
-        return 
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
+utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* 
processor,
+    const std::shared_ptr<core::ProcessContext>& process_context,
+    const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+  if (!this->running_) {
+    return utils::TaskRescheduleInfo::Done();
+  }
+  if (processorYields(processor)) {
+    return 
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
+  }
+
+  const auto start_time = std::chrono::steady_clock::now();
+  // trigger processor until it has work to do, but no more than the 
configured nifi.flow.engine.event.driven.time.slice
+
+  const auto process_session = session_factory->createSession();
+  process_session->setMetrics(processor->getMetrics());
+  bool needs_commit = true;
+
+  while (processor->isRunning() && (std::chrono::steady_clock::now() - 
start_time < time_slice_)) {
+    const auto trigger_result = this->trigger(processor, process_context, 
process_session);
+    if (!trigger_result) {
+      try {
+        std::rethrow_exception(trigger_result.error());
+      } catch (const std::exception& exception) {
+        logger_->log_warn("Caught \"{}\" ({}) during Processor::onTrigger of 
processor: {} ({})",
+            exception.what(), typeid(exception).name(), 
processor->getUUIDStr(), processor->getName());
+        needs_commit = false;
+        break;
+      } catch (...) {
+        logger_->log_warn("Caught unknown exception during 
Processor::onTrigger of processor: {} ({})", processor->getUUIDStr(), 
processor->getName());
+        needs_commit = false;
+        break;
       }
     }
-    return utils::TaskRescheduleInfo::RetryImmediately();  // Let's continue 
work as soon as a thread is available
+    if (!*trigger_result) {
+      logger_->log_trace("Processor {} ({}) yielded", processor->getUUIDStr(), 
processor->getName());
+      break;
+    }
   }
-  return utils::TaskRescheduleInfo::Done();
+  if (needs_commit) {
+    try {
+      process_session->commit();
+    } catch (const std::exception& exception) {
+      logger_->log_warn("Caught \"{}\" ({}) during ProcessSession::commit 
after triggering processor: {} ({})",
+      exception.what(), typeid(exception).name(), processor->getUUIDStr(), 
processor->getName());
+      process_session->rollbackNoThrow();

Review Comment:
   We shouldn't just throw away the exception here, at least logging something 
would be nice. If a rollback fails, it's probably a critical error, so 
`terminate` might be an appropriate handling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to