This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 7818783cb901849c97404dfcc4189930abdaee46 Author: Martin Zink <[email protected]> AuthorDate: Tue Sep 12 16:01:05 2023 +0200 MINIFICPP-1076 Yielding processor should still respect scheduling period Closes #1653 Signed-off-by: Marton Szasz <[email protected]> --- libminifi/src/TimerDrivenSchedulingAgent.cpp | 6 +- libminifi/src/utils/TestUtils.cpp | 2 + libminifi/test/unit/SchedulingAgentTests.cpp | 231 +++++++++++++++------------ 3 files changed, 135 insertions(+), 104 deletions(-) diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp index cbe5c2306..2cde8d3c8 100644 --- a/libminifi/src/TimerDrivenSchedulingAgent.cpp +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -30,10 +30,10 @@ utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* proce if (this->running_ && processor->isRunning()) { auto trigger_start_time = std::chrono::steady_clock::now(); this->onTrigger(processor, processContext, sessionFactory); - if (processor->isYield()) - return utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime()); - return utils::TaskRescheduleInfo::RetryAfter(trigger_start_time + processor->getSchedulingPeriod()); + auto next_scheduled_run = trigger_start_time + processor->getSchedulingPeriod(); + auto yield_expiration_time = processor->getYieldExpirationTime(); + return utils::TaskRescheduleInfo::RetryAfter(std::max(next_scheduled_run, yield_expiration_time)); } return utils::TaskRescheduleInfo::Done(); } diff --git a/libminifi/src/utils/TestUtils.cpp b/libminifi/src/utils/TestUtils.cpp index 8d46e8efd..6d3021be7 100644 --- a/libminifi/src/utils/TestUtils.cpp +++ b/libminifi/src/utils/TestUtils.cpp @@ -19,6 +19,8 @@ namespace org::apache::nifi::minifi::utils { #ifdef WIN32 +// If minifi is not installed through the MSI installer, then TZDATA might be missing +// date::set_install can point to the TZDATA location, but it has to be called from each library/executable that wants to use timezones void dateSetInstall(const std::string& install) { date::set_install(install); } diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp b/libminifi/test/unit/SchedulingAgentTests.cpp index b3ffed29d..4a66bfc63 100644 --- a/libminifi/test/unit/SchedulingAgentTests.cpp +++ b/libminifi/test/unit/SchedulingAgentTests.cpp @@ -26,6 +26,8 @@ using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::testing { +using minifi::core::controller::StandardControllerServiceProvider; + class CountOnTriggersProcessor : public minifi::core::Processor { public: using minifi::core::Processor::Processor; @@ -36,133 +38,160 @@ class CountOnTriggersProcessor : public minifi::core::Processor { static constexpr bool IsSingleThreaded = false; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - void onTrigger(core::ProcessContext*, core::ProcessSession*) override { + void onTrigger(core::ProcessContext* context, core::ProcessSession*) override { if (on_trigger_duration_ > 0ms) std::this_thread::sleep_for(on_trigger_duration_); ++number_of_triggers; + if (should_yield_) + context->yield(); } size_t getNumberOfTriggers() const { return number_of_triggers; } void setOnTriggerDuration(std::chrono::steady_clock::duration on_trigger_duration) { on_trigger_duration_ = on_trigger_duration; } + void setShouldYield(bool should_yield) { should_yield_ = should_yield; } private: + bool should_yield_ = false; std::chrono::steady_clock::duration on_trigger_duration_ = 0ms; std::atomic<size_t> number_of_triggers = 0; }; +class SchedulingAgentTestFixture { + public: + SchedulingAgentTestFixture() { + count_proc_->incrementActiveTasks(); + count_proc_->setScheduledState(core::RUNNING); -TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") { - std::shared_ptr<core::Repository> test_repo = std::make_shared<TestThreadedRepository>(); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - auto repo = std::static_pointer_cast<TestThreadedRepository>(test_repo); - std::shared_ptr<minifi::FlowController> controller = - std::make_shared<TestFlowController>(test_repo, test_repo, content_repo); - - TestController testController; - auto test_plan = testController.createPlan(); - auto controller_services_ = std::make_shared<minifi::core::controller::ControllerServiceMap>(); - auto configuration = std::make_shared<minifi::Configure>(); - auto controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, configuration); - utils::ThreadPool thread_pool; - auto count_proc = std::make_shared<CountOnTriggersProcessor>("count_proc"); - count_proc->incrementActiveTasks(); - count_proc->setScheduledState(core::RUNNING); - auto node = std::make_shared<core::ProcessorNode>(count_proc.get()); - auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo); - std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); - count_proc->setSchedulingPeriod(125ms); #ifdef WIN32 - utils::dateSetInstall(TZ_DATA_DIR); + utils::dateSetInstall(TZ_DATA_DIR); + date::set_install(TZ_DATA_DIR); #endif + } - SECTION("Timer Driven") { - auto timer_driven_agent = std::make_shared<TimerDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo, test_repo, content_repo, configuration, thread_pool); - timer_driven_agent->start(); - auto first_task_reschedule_info = timer_driven_agent->run(count_proc.get(), context, factory); - CHECK(!first_task_reschedule_info.isFinished()); - CHECK(first_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 125ms); - CHECK(count_proc->getNumberOfTriggers() == 1); + protected: + std::shared_ptr<core::Repository> test_repo_ = std::make_shared<TestThreadedRepository>(); + std::shared_ptr<core::ContentRepository> content_repo_ = std::make_shared<core::repository::VolatileContentRepository>(); + std::shared_ptr<minifi::FlowController> controller_ = std::make_shared<TestFlowController>(test_repo_, test_repo_, content_repo_); + + TestController test_controller_; + std::shared_ptr<TestPlan> test_plan = test_controller_.createPlan(); + std::shared_ptr<minifi::core::controller::ControllerServiceMap> controller_services_ = std::make_shared<minifi::core::controller::ControllerServiceMap>(); + std::shared_ptr<minifi::Configure> configuration_ = std::make_shared<minifi::Configure>(); + std::shared_ptr<StandardControllerServiceProvider> controller_services_provider_ = std::make_shared<StandardControllerServiceProvider>(controller_services_, configuration_); + utils::ThreadPool thread_pool_; + + std::shared_ptr<CountOnTriggersProcessor> count_proc_ = std::make_shared<CountOnTriggersProcessor>("count_proc"); + std::shared_ptr<core::ProcessorNode> node_ = std::make_shared<core::ProcessorNode>(count_proc_.get()); + std::shared_ptr<core::ProcessContext> context_ = std::make_shared<core::ProcessContext>(node_, nullptr, test_repo_, test_repo_, content_repo_); + std::shared_ptr<core::ProcessSessionFactory> factory_ = std::make_shared<core::ProcessSessionFactory>(context_); +}; - count_proc->setOnTriggerDuration(50ms); - auto second_task_reschedule_info = timer_driven_agent->run(count_proc.get(), context, factory); - CHECK(!second_task_reschedule_info.isFinished()); - CHECK(first_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 75ms); - CHECK(count_proc->getNumberOfTriggers() == 2); - - count_proc->setOnTriggerDuration(150ms); - auto third_task_reschedule_info = timer_driven_agent->run(count_proc.get(), context, factory); - CHECK(!third_task_reschedule_info.isFinished()); - CHECK(first_task_reschedule_info.getNextExecutionTime() < std::chrono::steady_clock::now()); - CHECK(count_proc->getNumberOfTriggers() == 3); - } +TEST_CASE_METHOD(SchedulingAgentTestFixture, "TimerDrivenSchedulingAgent") { + count_proc_->setSchedulingPeriod(125ms); + auto timer_driven_agent = std::make_shared<TimerDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo_, test_repo_, content_repo_, configuration_, thread_pool_); + timer_driven_agent->start(); + auto first_task_reschedule_info = timer_driven_agent->run(count_proc_.get(), context_, factory_); + CHECK(!first_task_reschedule_info.isFinished()); + CHECK(first_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 125ms); + CHECK(count_proc_->getNumberOfTriggers() == 1); - SECTION("Event Driven") { - auto event_driven_agent = std::make_shared<EventDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo, test_repo, content_repo, configuration, thread_pool); - event_driven_agent->start(); - auto first_task_reschedule_info = event_driven_agent->run(count_proc.get(), context, factory); - CHECK(!first_task_reschedule_info.isFinished()); - CHECK(first_task_reschedule_info.getNextExecutionTime() < std::chrono::steady_clock::now()); - auto count_num_after_one_schedule = count_proc->getNumberOfTriggers(); - CHECK(count_num_after_one_schedule > 100); + count_proc_->setOnTriggerDuration(50ms); + auto second_task_reschedule_info = timer_driven_agent->run(count_proc_.get(), context_, factory_); - auto second_task_reschedule_info = event_driven_agent->run(count_proc.get(), context, factory); - CHECK(!second_task_reschedule_info.isFinished()); - CHECK(second_task_reschedule_info.getNextExecutionTime() < std::chrono::steady_clock::now()); - auto count_num_after_two_schedule = count_proc->getNumberOfTriggers(); - CHECK(count_num_after_two_schedule > count_num_after_one_schedule+100); - } + CHECK(!second_task_reschedule_info.isFinished()); + CHECK(first_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 75ms); + CHECK(count_proc_->getNumberOfTriggers() == 2); - SECTION("Cron Driven every year") { -#ifdef WIN32 - date::set_install(TZ_DATA_DIR); -#endif - count_proc->setCronPeriod("0 0 0 1 1 ?"); - auto cron_driven_agent = std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo, test_repo, content_repo, configuration, thread_pool); - cron_driven_agent->start(); - auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(), context, factory); - CHECK(!first_task_reschedule_info.isFinished()); - if (first_task_reschedule_info.getNextExecutionTime() > std::chrono::steady_clock::now() + 1min) { // To avoid possibly failing around dec 31 23:59:59 - auto wait_time_till_next_execution_time = std::chrono::round<std::chrono::seconds>(first_task_reschedule_info.getNextExecutionTime() - std::chrono::steady_clock::now()); - - auto current_time = date::make_zoned<std::chrono::seconds>(date::current_zone(), std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now())); - auto current_year_month_day = date::year_month_day(date::floor<date::days>(current_time.get_local_time())); - auto new_years_day = date::make_zoned<std::chrono::seconds>(date::current_zone(), date::local_days{date::year{current_year_month_day.year()+date::years(1)}/date::January/1}); - - auto time_until_new_years_day = new_years_day.get_local_time() - current_time.get_local_time(); - - CHECK(std::chrono::round<std::chrono::minutes>(time_until_new_years_day - wait_time_till_next_execution_time) == 0min); - CHECK(count_proc->getNumberOfTriggers() == 0); - - auto second_task_reschedule_info = cron_driven_agent->run(count_proc.get(), context, factory); - CHECK(!second_task_reschedule_info.isFinished()); - CHECK(std::chrono::round<std::chrono::minutes>(first_task_reschedule_info.getNextExecutionTime() - second_task_reschedule_info.getNextExecutionTime()) == 0min); - CHECK(count_proc->getNumberOfTriggers() == 0); - } - } + count_proc_->setOnTriggerDuration(150ms); + auto third_task_reschedule_info = timer_driven_agent->run(count_proc_.get(), context_, factory_); + CHECK(!third_task_reschedule_info.isFinished()); + CHECK(first_task_reschedule_info.getNextExecutionTime() < std::chrono::steady_clock::now()); + CHECK(count_proc_->getNumberOfTriggers() == 3); +} - SECTION("Cron Driven every sec") { - count_proc->setCronPeriod("* * * * * *"); - auto cron_driven_agent = std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo, test_repo, content_repo, configuration, thread_pool); - cron_driven_agent->start(); - auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(), context, factory); - CHECK(!first_task_reschedule_info.isFinished()); - CHECK(first_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 1s); - CHECK(count_proc->getNumberOfTriggers() == 0); - - std::this_thread::sleep_until(first_task_reschedule_info.getNextExecutionTime()); - auto second_task_reschedule_info = cron_driven_agent->run(count_proc.get(), context, factory); +TEST_CASE_METHOD(SchedulingAgentTestFixture, "EventDrivenSchedulingAgent") { + auto event_driven_agent = std::make_shared<EventDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo_, test_repo_, content_repo_, configuration_, thread_pool_); + event_driven_agent->start(); + auto first_task_reschedule_info = event_driven_agent->run(count_proc_.get(), context_, factory_); + CHECK(!first_task_reschedule_info.isFinished()); + CHECK(first_task_reschedule_info.getNextExecutionTime() < std::chrono::steady_clock::now()); + auto count_num_after_one_schedule = count_proc_->getNumberOfTriggers(); + CHECK(count_num_after_one_schedule > 100); + + auto second_task_reschedule_info = event_driven_agent->run(count_proc_.get(), context_, factory_); + CHECK(!second_task_reschedule_info.isFinished()); + CHECK(second_task_reschedule_info.getNextExecutionTime() < std::chrono::steady_clock::now()); + auto count_num_after_two_schedule = count_proc_->getNumberOfTriggers(); + CHECK(count_num_after_two_schedule > count_num_after_one_schedule+100); +} + +TEST_CASE_METHOD(SchedulingAgentTestFixture, "Cron Driven every year") { + count_proc_->setCronPeriod("0 0 0 1 1 ?"); + auto cron_driven_agent = std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo_, test_repo_, content_repo_, configuration_, thread_pool_); + cron_driven_agent->start(); + auto first_task_reschedule_info = cron_driven_agent->run(count_proc_.get(), context_, factory_); + CHECK(!first_task_reschedule_info.isFinished()); + if (first_task_reschedule_info.getNextExecutionTime() > std::chrono::steady_clock::now() + 1min) { // To avoid possibly failing around dec 31 23:59:59 + auto wait_time_till_next_execution_time = std::chrono::round<std::chrono::seconds>(first_task_reschedule_info.getNextExecutionTime() - std::chrono::steady_clock::now()); + + auto current_time = date::make_zoned<std::chrono::seconds>(date::current_zone(), std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now())); + auto current_year_month_day = date::year_month_day(date::floor<date::days>(current_time.get_local_time())); + auto new_years_day = date::make_zoned<std::chrono::seconds>(date::current_zone(), date::local_days{date::year{current_year_month_day.year()+date::years(1)}/date::January/1}); + + auto time_until_new_years_day = new_years_day.get_local_time() - current_time.get_local_time(); + + CHECK(std::chrono::abs(time_until_new_years_day - wait_time_till_next_execution_time) < 1min); + CHECK(count_proc_->getNumberOfTriggers() == 0); + + auto second_task_reschedule_info = cron_driven_agent->run(count_proc_.get(), context_, factory_); CHECK(!second_task_reschedule_info.isFinished()); - CHECK(second_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 1s); - CHECK(count_proc->getNumberOfTriggers() == 1); + CHECK(std::chrono::abs(first_task_reschedule_info.getNextExecutionTime() - second_task_reschedule_info.getNextExecutionTime()) < 1min); + + CHECK(count_proc_->getNumberOfTriggers() == 0); } +} + +TEST_CASE_METHOD(SchedulingAgentTestFixture, "Cron Driven every sec") { + count_proc_->setCronPeriod("* * * * * *"); + auto cron_driven_agent = std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo_, test_repo_, content_repo_, configuration_, thread_pool_); + cron_driven_agent->start(); + auto first_task_reschedule_info = cron_driven_agent->run(count_proc_.get(), context_, factory_); + CHECK(!first_task_reschedule_info.isFinished()); + CHECK(first_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 1s); + CHECK(count_proc_->getNumberOfTriggers() == 0); + + std::this_thread::sleep_until(first_task_reschedule_info.getNextExecutionTime()); + auto second_task_reschedule_info = cron_driven_agent->run(count_proc_.get(), context_, factory_); + CHECK(!second_task_reschedule_info.isFinished()); + CHECK(second_task_reschedule_info.getNextExecutionTime() <= std::chrono::steady_clock::now() + 1s); + CHECK(count_proc_->getNumberOfTriggers() == 1); +} - SECTION("Cron Driven no future triggers") { - count_proc->setCronPeriod("* * * * * * 2012"); - auto cron_driven_agent = std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo, test_repo, content_repo, configuration, thread_pool); - cron_driven_agent->start(); - auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(), context, factory); - CHECK(first_task_reschedule_info.isFinished()); +TEST_CASE_METHOD(SchedulingAgentTestFixture, "Cron Driven no future triggers") { + count_proc_->setCronPeriod("* * * * * * 2012"); + auto cron_driven_agent = std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo_, test_repo_, content_repo_, configuration_, thread_pool_); + cron_driven_agent->start(); + auto first_task_reschedule_info = cron_driven_agent->run(count_proc_.get(), context_, factory_); + CHECK(first_task_reschedule_info.isFinished()); +} + +TEST_CASE_METHOD(SchedulingAgentTestFixture, "Timer driven should respect both yield and run schedule") { + SECTION("Fast yield slow schedule") { + count_proc_->setSchedulingPeriod(1min); + count_proc_->setYieldPeriodMsec(10ms); + } + SECTION("Slow yield fast schedule") { + count_proc_->setSchedulingPeriod(10ms); + count_proc_->setYieldPeriodMsec(1min); } + count_proc_->setShouldYield(true); + auto timer_driven_agent = std::make_shared<TimerDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()), test_repo_, test_repo_, content_repo_, configuration_, thread_pool_); + timer_driven_agent->start(); + auto first_task_reschedule_info = timer_driven_agent->run(count_proc_.get(), context_, factory_); + CHECK(!first_task_reschedule_info.isFinished()); + CHECK(first_task_reschedule_info.getNextExecutionTime() > std::chrono::steady_clock::now() + 100ms); + CHECK(count_proc_->getNumberOfTriggers() == 1); } + } // namespace org::apache::nifi::minifi::testing
