This is an automated email from the ASF dual-hosted git repository. adebreceni pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 9f2040abc6d66602e82a2c1c0ac708ddfe9f6570 Author: Martin Zink <[email protected]> AuthorDate: Mon Jan 30 11:13:51 2023 +0100 MINIFICPP-2008 Differentiate successful onTriggers from throwing onTriggers in ScheduleAgents Signed-off-by: Adam Debreceni <[email protected]> This closes #1474 --- libminifi/include/CronDrivenSchedulingAgent.h | 2 -- libminifi/include/EventDrivenSchedulingAgent.h | 25 +++-------------------- libminifi/include/SchedulingAgent.h | 18 ++++++----------- libminifi/include/ThreadedSchedulingAgent.h | 26 +++--------------------- libminifi/include/TimerDrivenSchedulingAgent.h | 27 +++---------------------- libminifi/src/CronDrivenSchedulingAgent.cpp | 17 +++++++--------- libminifi/src/EventDrivenSchedulingAgent.cpp | 16 +++------------ libminifi/src/SchedulingAgent.cpp | 28 ++++++++++++++++---------- libminifi/src/TimerDrivenSchedulingAgent.cpp | 23 +++++---------------- 9 files changed, 47 insertions(+), 135 deletions(-) diff --git a/libminifi/include/CronDrivenSchedulingAgent.h b/libminifi/include/CronDrivenSchedulingAgent.h index 3595f302f..88ee42e6b 100644 --- a/libminifi/include/CronDrivenSchedulingAgent.h +++ b/libminifi/include/CronDrivenSchedulingAgent.h @@ -45,8 +45,6 @@ class CronDrivenSchedulingAgent : public ThreadedSchedulingAgent { : ThreadedSchedulingAgent(controller_service_provider, std::move(repo), std::move(flow_repo), std::move(content_repo), std::move(configuration), thread_pool) { } - CronDrivenSchedulingAgent(const CronDrivenSchedulingAgent& parent) = delete; - CronDrivenSchedulingAgent& operator=(const CronDrivenSchedulingAgent& parent) = delete; ~CronDrivenSchedulingAgent() override = default; utils::TaskRescheduleInfo run(core::Processor *processor, diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h index e9439a925..d017da38c 100644 --- a/libminifi/include/EventDrivenSchedulingAgent.h +++ b/libminifi/include/EventDrivenSchedulingAgent.h @@ -17,8 +17,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_ -#define LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_ +#pragma once #include <memory> #include <string> @@ -31,18 +30,10 @@ #include "core/ProcessSessionFactory.h" #include "ThreadedSchedulingAgent.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { +namespace org::apache::nifi::minifi { -// EventDrivenSchedulingAgent Class class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { public: - // Constructor - /*! - * Create a new event driven scheduling agent. - */ EventDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool) @@ -56,21 +47,11 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { void schedule(core::Processor* processor) override; - // Run function for the thread utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override; private: - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - EventDrivenSchedulingAgent(const EventDrivenSchedulingAgent &parent); - EventDrivenSchedulingAgent &operator=(const EventDrivenSchedulingAgent &parent); - std::chrono::milliseconds time_slice_; }; -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org -#endif // LIBMINIFI_INCLUDE_EVENTDRIVENSCHEDULINGAGENT_H_ +} // namespace org::apache::nifi::minifi diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 24992c928..968d03860 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -30,6 +30,7 @@ #include <algorithm> #include <thread> #include "utils/CallBackTimer.h" +#include "utils/expected.h" #include "utils/Monitors.h" #include "utils/TimeUtil.h" #include "utils/ThreadPool.h" @@ -49,7 +50,6 @@ namespace org::apache::nifi::minifi { -// SchedulingAgent Class class SchedulingAgent { public: // Constructor @@ -87,14 +87,15 @@ class SchedulingAgent { logger_->log_trace("Destroying scheduling agent"); } - // onTrigger, return whether the yield is need - bool onTrigger(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory); - // start + nonstd::expected<void, std::exception_ptr> onTrigger(core::Processor* processor, + const std::shared_ptr<core::ProcessContext>& process_context, + const std::shared_ptr<core::ProcessSessionFactory>& session_factory); + void start() { running_ = true; thread_pool_.start(); } - // stop + virtual void stop() { running_ = false; } @@ -110,13 +111,9 @@ class SchedulingAgent { SchedulingAgent &operator=(const SchedulingAgent &parent) = delete; protected: - // Mutex for protection std::mutex mutex_; - // Whether it is running std::atomic<bool> running_; - // AdministrativeYieldDuration std::chrono::milliseconds admin_yield_duration_; - // BoredYieldDuration std::chrono::milliseconds bored_yield_duration_; std::shared_ptr<Configure> configure_; @@ -126,9 +123,7 @@ class SchedulingAgent { std::shared_ptr<core::Repository> flow_repo_; std::shared_ptr<core::ContentRepository> content_repo_; - // thread pool for components. utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool_; - // controller service provider reference gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider_; private: @@ -148,7 +143,6 @@ class SchedulingAgent { } }; - // Logger std::shared_ptr<core::logging::Logger> logger_; mutable std::mutex watchdog_mtx_; // used to protect the set below std::set<SchedulingInfo> scheduled_processors_; // set was chosen to avoid iterator invalidation diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h index c53249096..e51279154 100644 --- a/libminifi/include/ThreadedSchedulingAgent.h +++ b/libminifi/include/ThreadedSchedulingAgent.h @@ -17,8 +17,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_ -#define LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_ +#pragma once #include <memory> #include <set> @@ -31,10 +30,7 @@ #include "core/ProcessContext.h" #include "SchedulingAgent.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { +namespace org::apache::nifi::minifi { /** * An abstract scheduling agent which creates and manages a pool of threads for @@ -42,42 +38,26 @@ namespace minifi { */ class ThreadedSchedulingAgent : public SchedulingAgent { public: - // Constructor - /*! - * Create a new threaded scheduling agent. - */ ThreadedSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool) : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) { } - // Destructor ~ThreadedSchedulingAgent() override = default; - // Run function for the thread virtual utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) = 0; public: - // schedule, overwritten by different DrivenTimerDrivenSchedulingAgent void schedule(core::Processor* processor) override; - // unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent void unschedule(core::Processor* processor) override; void stop() override; private: - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ThreadedSchedulingAgent(const ThreadedSchedulingAgent &parent); - ThreadedSchedulingAgent &operator=(const ThreadedSchedulingAgent &parent); std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger(); std::set<utils::Identifier> processors_running_; // Set just for easy usage }; -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org -#endif // LIBMINIFI_INCLUDE_THREADEDSCHEDULINGAGENT_H_ +} // namespace org::apache::nifi::minifi diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h index b4322a1f3..9e74fec2d 100644 --- a/libminifi/include/TimerDrivenSchedulingAgent.h +++ b/libminifi/include/TimerDrivenSchedulingAgent.h @@ -17,8 +17,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_ -#define LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_ +#pragma once #include <memory> @@ -28,40 +27,20 @@ #include "core/Repository.h" #include "ThreadedSchedulingAgent.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -// TimerDrivenSchedulingAgent Class +namespace org::apache::nifi::minifi { class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent { public: - // Constructor - /*! - * Create a new processor - */ TimerDrivenSchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configure, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool) : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure, thread_pool) { } - /** - * Run function that accepts the processor, context and session factory. - */ utils::TaskRescheduleInfo run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override; private: - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent); - TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent); - std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<TimerDrivenSchedulingAgent>::getLogger(); }; -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org -#endif // LIBMINIFI_INCLUDE_TIMERDRIVENSCHEDULINGAGENT_H_ +} // namespace org::apache::nifi::minifi diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp index 3ad94f1ea..0e12bd3ae 100644 --- a/libminifi/src/CronDrivenSchedulingAgent.cpp +++ b/libminifi/src/CronDrivenSchedulingAgent.cpp @@ -52,19 +52,16 @@ utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(core::Processor* proces if (*next_to_last_trigger > current_time.get_local_time()) return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_to_last_trigger-current_time.get_local_time())); - last_exec_[uuid] = current_time.get_local_time(); - bool shouldYield = this->onTrigger(processor, processContext, sessionFactory); + auto on_trigger_result = this->onTrigger(processor, processContext, sessionFactory); - if (processor->isYield()) { + if (on_trigger_result) + last_exec_[uuid] = current_time.get_local_time(); + + if (processor->isYield()) return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime()); - } else if (shouldYield && this->bored_yield_duration_ > 0ms) { - return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_); - } - auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time()); - if (!next_trigger) - return utils::TaskRescheduleInfo::Done(); - return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_trigger-current_time.get_local_time())); + if (auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time())) + return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_trigger-current_time.get_local_time())); } return utils::TaskRescheduleInfo::Done(); } diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp index 10edd6405..3a23ba839 100644 --- a/libminifi/src/EventDrivenSchedulingAgent.cpp +++ b/libminifi/src/EventDrivenSchedulingAgent.cpp @@ -26,10 +26,7 @@ using namespace std::literals::chrono_literals; -namespace org { -namespace apache { -namespace nifi { -namespace minifi { +namespace org::apache::nifi::minifi { void EventDrivenSchedulingAgent::schedule(core::Processor* processor) { if (!processor->hasIncomingConnections()) { @@ -44,13 +41,9 @@ utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* proce auto start_time = std::chrono::steady_clock::now(); // trigger processor until it has work to do, but no more than half a sec while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) { - bool shouldYield = this->onTrigger(processor, processContext, sessionFactory); + this->onTrigger(processor, processContext, sessionFactory); if (processor->isYield()) { - // Honor the yield return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime())); - } else if (shouldYield) { - // No work to do or need to apply back pressure - return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_ > 0ms ? this->bored_yield_duration_ : 10ms); // No work left to do, stand by } } return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available @@ -58,7 +51,4 @@ utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* proce return utils::TaskRescheduleInfo::Done(); } -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index 9dfca5c58..2f8ccdb6b 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -25,6 +25,8 @@ #include "core/Processor.h" #include "utils/gsl.h" +using namespace std::literals::chrono_literals; + namespace { bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) { // Whether it has work to do @@ -34,25 +36,28 @@ bool hasWorkToDo(org::apache::nifi::minifi::core::Processor* processor) { namespace org::apache::nifi::minifi { -bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, - const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { +nonstd::expected<void, std::exception_ptr> SchedulingAgent::onTrigger(core::Processor* processor, + const std::shared_ptr<core::ProcessContext> &process_context, + const std::shared_ptr<core::ProcessSessionFactory> &session_factory) { gsl_Expects(processor); if (processor->isYield()) { logger_->log_debug("Not running %s since it must yield", processor->getName()); - return false; + return {}; } // No need to yield, reset yield expiration to 0 processor->clearYield(); + auto bored_yield_duration = bored_yield_duration_ > 0ms ? bored_yield_duration_ : 10ms; + if (!hasWorkToDo(processor)) { - // No work to do, yield - return true; + processor->yield(bored_yield_duration); + return {}; } if (processor->isThrottledByBackpressure()) { logger_->log_debug("backpressure applied because too much outgoing for %s %s", processor->getUUIDStr(), processor->getName()); - // need to apply backpressure - return true; + processor->yield(bored_yield_duration); + return {}; } auto schedule_it = scheduled_processors_.end(); @@ -68,20 +73,21 @@ bool SchedulingAgent::onTrigger(core::Processor* processor, const std::shared_pt }); processor->incrementActiveTasks(); + auto decrement_task = gsl::finally([processor]() { processor->decrementActiveTask(); }); try { - processor->onTrigger(processContext, sessionFactory); + processor->onTrigger(process_context, session_factory); } catch (const std::exception& exception) { logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor %s (uuid: %s), type: %s, what: %s", processor->getName(), processor->getUUIDStr(), typeid(exception).name(), exception.what()); processor->yield(admin_yield_duration_); + return nonstd::make_unexpected(std::current_exception()); } catch (...) { logger_->log_warn("Caught Exception during SchedulingAgent::onTrigger of processor %s (uuid: %s), type: %s", processor->getName(), processor->getUUIDStr(), getCurrentExceptionTypeName()); processor->yield(admin_yield_duration_); + return nonstd::make_unexpected(std::current_exception()); } - processor->decrementActiveTask(); - - return false; + return {}; } void SchedulingAgent::watchDogFunc() { diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp index 7864a7945..2b4072170 100644 --- a/libminifi/src/TimerDrivenSchedulingAgent.cpp +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -19,35 +19,22 @@ */ #include "TimerDrivenSchedulingAgent.h" #include <chrono> -#include <thread> #include <memory> -#include <iostream> -#include "core/Property.h" using namespace std::literals::chrono_literals; -namespace org { -namespace apache { -namespace nifi { -namespace minifi { +namespace org::apache::nifi::minifi { utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { if (this->running_ && processor->isRunning()) { - bool shouldYield = this->onTrigger(processor, processContext, sessionFactory); - if (processor->isYield()) { - // Honor the yield + this->onTrigger(processor, processContext, sessionFactory); + if (processor->isYield()) return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime()); - } else if (shouldYield && this->bored_yield_duration_ > 0ms) { - // No work to do or need to apply back pressure - return utils::TaskRescheduleInfo::RetryIn(this->bored_yield_duration_); - } + return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(processor->getSchedulingPeriodNano())); } return utils::TaskRescheduleInfo::Done(); } -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ +} // namespace org::apache::nifi::minifi
