szaszm commented on code in PR #1815:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1815#discussion_r1670587482
##########
libminifi/src/EventDrivenSchedulingAgent.cpp:
##########
@@ -35,20 +35,66 @@ void EventDrivenSchedulingAgent::schedule(core::Processor*
processor) {
ThreadedSchedulingAgent::schedule(processor);
}
-utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor*
processor, const std::shared_ptr<core::ProcessContext> &processContext,
- const
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
- if (this->running_) {
- auto start_time = std::chrono::steady_clock::now();
- // trigger processor until it has work to do, but no more than the
configured nifi.flow.engine.event.driven.time.slice
- while (processor->isRunning() && (std::chrono::steady_clock::now() -
start_time < time_slice_)) {
- this->onTrigger(processor, processContext, sessionFactory);
- if (processor->isYield()) {
- return
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
+utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor*
processor,
+ const std::shared_ptr<core::ProcessContext>& process_context,
+ const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+ if (!this->running_) {
+ return utils::TaskRescheduleInfo::Done();
+ }
+ if (processorYields(processor)) {
+ return
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
+ }
+
+ const auto start_time = std::chrono::steady_clock::now();
+ // trigger processor until it has work to do, but no more than the
configured nifi.flow.engine.event.driven.time.slice
Review Comment:
```suggestion
// trigger processor while it has work to do, but no more than the
configured nifi.flow.engine.event.driven.time.slice
```
##########
libminifi/src/EventDrivenSchedulingAgent.cpp:
##########
@@ -35,20 +35,64 @@ void EventDrivenSchedulingAgent::schedule(core::Processor*
processor) {
ThreadedSchedulingAgent::schedule(processor);
}
-utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor*
processor, const std::shared_ptr<core::ProcessContext> &processContext,
- const
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
- if (this->running_) {
- auto start_time = std::chrono::steady_clock::now();
- // trigger processor until it has work to do, but no more than the
configured nifi.flow.engine.event.driven.time.slice
- while (processor->isRunning() && (std::chrono::steady_clock::now() -
start_time < time_slice_)) {
- this->onTrigger(processor, processContext, sessionFactory);
- if (processor->isYield()) {
- return
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
+utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor*
processor,
+ const std::shared_ptr<core::ProcessContext>& process_context,
+ const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+ if (!this->running_) {
+ return utils::TaskRescheduleInfo::Done();
+ }
+ if (processorYields(processor)) {
+ return
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
+ }
+
+ const auto start_time = std::chrono::steady_clock::now();
+ // trigger processor until it has work to do, but no more than the
configured nifi.flow.engine.event.driven.time.slice
+
+ const auto process_session = session_factory->createSession();
+ process_session->setMetrics(processor->getMetrics());
+ bool needs_commit = true;
+
+ while (processor->isRunning() && (std::chrono::steady_clock::now() -
start_time < time_slice_)) {
+ const auto trigger_result = this->trigger(processor, process_context,
process_session);
+ if (!trigger_result) {
+ try {
+ std::rethrow_exception(trigger_result.error());
+ } catch (const std::exception& exception) {
+ logger_->log_warn("Caught \"{}\" ({}) during Processor::onTrigger of
processor: {} ({})",
+ exception.what(), typeid(exception).name(),
processor->getUUIDStr(), processor->getName());
+ needs_commit = false;
+ break;
+ } catch (...) {
+ logger_->log_warn("Caught unknown exception during
Processor::onTrigger of processor: {} ({})", processor->getUUIDStr(),
processor->getName());
+ needs_commit = false;
+ break;
}
}
- return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue
work as soon as a thread is available
+ if (!*trigger_result) {
+ logger_->log_trace("Processor {} ({}) yielded", processor->getUUIDStr(),
processor->getName());
+ break;
+ }
}
- return utils::TaskRescheduleInfo::Done();
+ if (needs_commit) {
+ try {
+ process_session->commit();
+ } catch (const std::exception& exception) {
+ logger_->log_warn("Caught \"{}\" ({}) during ProcessSession::commit
after triggering processor: {} ({})",
+ exception.what(), typeid(exception).name(), processor->getUUIDStr(),
processor->getName());
+ process_session->rollbackNoThrow();
Review Comment:
We shouldn't just throw away the exception here, at least logging something
would be nice. If a rollback fails, it's probably a critical error, so
`terminate` might be an appropriate handling.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]