fgerlits commented on code in PR #1653:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1653#discussion_r1319996402
##########
libminifi/test/unit/SchedulingAgentTests.cpp:
##########
@@ -36,133 +38,167 @@ class CountOnTriggersProcessor : public
minifi::core::Processor {
static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
- void onTrigger(core::ProcessContext*, core::ProcessSession*) override {
+ void onTrigger(core::ProcessContext* context, core::ProcessSession*)
override {
if (on_trigger_duration_ > 0ms)
std::this_thread::sleep_for(on_trigger_duration_);
++number_of_triggers;
+ if (should_yield_)
+ context->yield();
}
size_t getNumberOfTriggers() const { return number_of_triggers; }
void setOnTriggerDuration(std::chrono::steady_clock::duration
on_trigger_duration) { on_trigger_duration_ = on_trigger_duration; }
+ void setShouldYield(bool should_yield) { should_yield_ = should_yield; }
private:
+ bool should_yield_ = false;
std::chrono::steady_clock::duration on_trigger_duration_ = 0ms;
std::atomic<size_t> number_of_triggers = 0;
};
+class SchedulingAgentTestFixture {
+ public:
+ SchedulingAgentTestFixture() {
+ count_proc_->incrementActiveTasks();
+ count_proc_->setScheduledState(core::RUNNING);
-TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
- std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestThreadedRepository>();
- std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
- auto repo = std::static_pointer_cast<TestThreadedRepository>(test_repo);
- std::shared_ptr<minifi::FlowController> controller =
- std::make_shared<TestFlowController>(test_repo, test_repo, content_repo);
-
- TestController testController;
- auto test_plan = testController.createPlan();
- auto controller_services_ =
std::make_shared<minifi::core::controller::ControllerServiceMap>();
- auto configuration = std::make_shared<minifi::Configure>();
- auto controller_services_provider_ =
std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_,
configuration);
- utils::ThreadPool thread_pool;
- auto count_proc = std::make_shared<CountOnTriggersProcessor>("count_proc");
- count_proc->incrementActiveTasks();
- count_proc->setScheduledState(core::RUNNING);
- auto node = std::make_shared<core::ProcessorNode>(count_proc.get());
- auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo,
repo, content_repo);
- std::shared_ptr<core::ProcessSessionFactory> factory =
std::make_shared<core::ProcessSessionFactory>(context);
- count_proc->setSchedulingPeriod(125ms);
#ifdef WIN32
- utils::dateSetInstall(TZ_DATA_DIR);
+ utils::dateSetInstall(TZ_DATA_DIR);
+ date::set_install(TZ_DATA_DIR);
Review Comment:
Why do we need both? `utils::dateSetInstall()` seems to just call
`date::set_install()`. In fact, why do we have `utils::dateSetInstall()` at
all?
##########
libminifi/test/unit/SchedulingAgentTests.cpp:
##########
@@ -36,133 +38,167 @@ class CountOnTriggersProcessor : public
minifi::core::Processor {
static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
- void onTrigger(core::ProcessContext*, core::ProcessSession*) override {
+ void onTrigger(core::ProcessContext* context, core::ProcessSession*)
override {
if (on_trigger_duration_ > 0ms)
std::this_thread::sleep_for(on_trigger_duration_);
++number_of_triggers;
+ if (should_yield_)
+ context->yield();
}
size_t getNumberOfTriggers() const { return number_of_triggers; }
void setOnTriggerDuration(std::chrono::steady_clock::duration
on_trigger_duration) { on_trigger_duration_ = on_trigger_duration; }
+ void setShouldYield(bool should_yield) { should_yield_ = should_yield; }
private:
+ bool should_yield_ = false;
std::chrono::steady_clock::duration on_trigger_duration_ = 0ms;
std::atomic<size_t> number_of_triggers = 0;
};
+class SchedulingAgentTestFixture {
+ public:
+ SchedulingAgentTestFixture() {
+ count_proc_->incrementActiveTasks();
+ count_proc_->setScheduledState(core::RUNNING);
-TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
- std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestThreadedRepository>();
- std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
- auto repo = std::static_pointer_cast<TestThreadedRepository>(test_repo);
- std::shared_ptr<minifi::FlowController> controller =
- std::make_shared<TestFlowController>(test_repo, test_repo, content_repo);
-
- TestController testController;
- auto test_plan = testController.createPlan();
- auto controller_services_ =
std::make_shared<minifi::core::controller::ControllerServiceMap>();
- auto configuration = std::make_shared<minifi::Configure>();
- auto controller_services_provider_ =
std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_,
configuration);
- utils::ThreadPool thread_pool;
- auto count_proc = std::make_shared<CountOnTriggersProcessor>("count_proc");
- count_proc->incrementActiveTasks();
- count_proc->setScheduledState(core::RUNNING);
- auto node = std::make_shared<core::ProcessorNode>(count_proc.get());
- auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo,
repo, content_repo);
- std::shared_ptr<core::ProcessSessionFactory> factory =
std::make_shared<core::ProcessSessionFactory>(context);
- count_proc->setSchedulingPeriod(125ms);
#ifdef WIN32
- utils::dateSetInstall(TZ_DATA_DIR);
+ utils::dateSetInstall(TZ_DATA_DIR);
+ date::set_install(TZ_DATA_DIR);
#endif
+ }
- SECTION("Timer Driven") {
- auto timer_driven_agent =
std::make_shared<TimerDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
test_repo, test_repo, content_repo, configuration, thread_pool);
- timer_driven_agent->start();
- auto first_task_reschedule_info =
timer_driven_agent->run(count_proc.get(), context, factory);
- CHECK(!first_task_reschedule_info.isFinished());
- CHECK(first_task_reschedule_info.getNextExecutionTime() <=
std::chrono::steady_clock::now() + 125ms);
- CHECK(count_proc->getNumberOfTriggers() == 1);
+ protected:
+ std::shared_ptr<core::Repository> test_repo_ =
std::make_shared<TestThreadedRepository>();
+ std::shared_ptr<core::ContentRepository> content_repo_ =
std::make_shared<core::repository::VolatileContentRepository>();
+ std::shared_ptr<minifi::FlowController> controller_ =
std::make_shared<TestFlowController>(test_repo_, test_repo_, content_repo_);
+
+ TestController test_controller_;
+ std::shared_ptr<TestPlan> test_plan = test_controller_.createPlan();
+ std::shared_ptr<minifi::core::controller::ControllerServiceMap>
controller_services_ =
std::make_shared<minifi::core::controller::ControllerServiceMap>();
+ std::shared_ptr<minifi::Configure> configuration_ =
std::make_shared<minifi::Configure>();
+ std::shared_ptr<StandardControllerServiceProvider>
controller_services_provider_ =
std::make_shared<StandardControllerServiceProvider>(controller_services_,
configuration_);
+ utils::ThreadPool thread_pool_;
+
+ std::shared_ptr<CountOnTriggersProcessor> count_proc_ =
std::make_shared<CountOnTriggersProcessor>("count_proc");
+ std::shared_ptr<core::ProcessorNode> node_ =
std::make_shared<core::ProcessorNode>(count_proc_.get());
+ std::shared_ptr<core::ProcessContext> context_ =
std::make_shared<core::ProcessContext>(node_, nullptr, test_repo_, test_repo_,
content_repo_);
+ std::shared_ptr<core::ProcessSessionFactory> factory_ =
std::make_shared<core::ProcessSessionFactory>(context_);
+};
- count_proc->setOnTriggerDuration(50ms);
- auto second_task_reschedule_info =
timer_driven_agent->run(count_proc.get(), context, factory);
- CHECK(!second_task_reschedule_info.isFinished());
- CHECK(first_task_reschedule_info.getNextExecutionTime() <=
std::chrono::steady_clock::now() + 75ms);
- CHECK(count_proc->getNumberOfTriggers() == 2);
-
- count_proc->setOnTriggerDuration(150ms);
- auto third_task_reschedule_info =
timer_driven_agent->run(count_proc.get(), context, factory);
- CHECK(!third_task_reschedule_info.isFinished());
- CHECK(first_task_reschedule_info.getNextExecutionTime() <
std::chrono::steady_clock::now());
- CHECK(count_proc->getNumberOfTriggers() == 3);
- }
+TEST_CASE_METHOD(SchedulingAgentTestFixture, "TimerDrivenSchedulingAgent") {
+ count_proc_->setSchedulingPeriod(125ms);
+ auto timer_driven_agent =
std::make_shared<TimerDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
+ timer_driven_agent->start();
+ auto first_task_reschedule_info = timer_driven_agent->run(count_proc_.get(),
context_, factory_);
+ CHECK(!first_task_reschedule_info.isFinished());
+ CHECK(first_task_reschedule_info.getNextExecutionTime() <=
std::chrono::steady_clock::now() + 125ms);
+ CHECK(count_proc_->getNumberOfTriggers() == 1);
- SECTION("Event Driven") {
- auto event_driven_agent =
std::make_shared<EventDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
test_repo, test_repo, content_repo, configuration, thread_pool);
- event_driven_agent->start();
- auto first_task_reschedule_info =
event_driven_agent->run(count_proc.get(), context, factory);
- CHECK(!first_task_reschedule_info.isFinished());
- CHECK(first_task_reschedule_info.getNextExecutionTime() <
std::chrono::steady_clock::now());
- auto count_num_after_one_schedule = count_proc->getNumberOfTriggers();
- CHECK(count_num_after_one_schedule > 100);
+ count_proc_->setOnTriggerDuration(50ms);
+ auto second_task_reschedule_info =
timer_driven_agent->run(count_proc_.get(), context_, factory_);
- auto second_task_reschedule_info =
event_driven_agent->run(count_proc.get(), context, factory);
- CHECK(!second_task_reschedule_info.isFinished());
- CHECK(second_task_reschedule_info.getNextExecutionTime() <
std::chrono::steady_clock::now());
- auto count_num_after_two_schedule = count_proc->getNumberOfTriggers();
- CHECK(count_num_after_two_schedule > count_num_after_one_schedule+100);
- }
+ CHECK(!second_task_reschedule_info.isFinished());
+ CHECK(first_task_reschedule_info.getNextExecutionTime() <=
std::chrono::steady_clock::now() + 75ms);
+ CHECK(count_proc_->getNumberOfTriggers() == 2);
- SECTION("Cron Driven every year") {
-#ifdef WIN32
- date::set_install(TZ_DATA_DIR);
-#endif
- count_proc->setCronPeriod("0 0 0 1 1 ?");
- auto cron_driven_agent =
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
test_repo, test_repo, content_repo, configuration, thread_pool);
- cron_driven_agent->start();
- auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(),
context, factory);
- CHECK(!first_task_reschedule_info.isFinished());
- if (first_task_reschedule_info.getNextExecutionTime() >
std::chrono::steady_clock::now() + 1min) { // To avoid possibly failing around
dec 31 23:59:59
- auto wait_time_till_next_execution_time =
std::chrono::round<std::chrono::seconds>(first_task_reschedule_info.getNextExecutionTime()
- std::chrono::steady_clock::now());
-
- auto current_time =
date::make_zoned<std::chrono::seconds>(date::current_zone(),
std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now()));
- auto current_year_month_day =
date::year_month_day(date::floor<date::days>(current_time.get_local_time()));
- auto new_years_day =
date::make_zoned<std::chrono::seconds>(date::current_zone(),
date::local_days{date::year{current_year_month_day.year()+date::years(1)}/date::January/1});
-
- auto time_until_new_years_day = new_years_day.get_local_time() -
current_time.get_local_time();
-
- CHECK(std::chrono::round<std::chrono::minutes>(time_until_new_years_day
- wait_time_till_next_execution_time) == 0min);
- CHECK(count_proc->getNumberOfTriggers() == 0);
-
- auto second_task_reschedule_info =
cron_driven_agent->run(count_proc.get(), context, factory);
- CHECK(!second_task_reschedule_info.isFinished());
-
CHECK(std::chrono::round<std::chrono::minutes>(first_task_reschedule_info.getNextExecutionTime()
- second_task_reschedule_info.getNextExecutionTime()) == 0min);
- CHECK(count_proc->getNumberOfTriggers() == 0);
- }
- }
+ count_proc_->setOnTriggerDuration(150ms);
+ auto third_task_reschedule_info = timer_driven_agent->run(count_proc_.get(),
context_, factory_);
+ CHECK(!third_task_reschedule_info.isFinished());
+ CHECK(first_task_reschedule_info.getNextExecutionTime() <
std::chrono::steady_clock::now());
+ CHECK(count_proc_->getNumberOfTriggers() == 3);
+}
+
+TEST_CASE_METHOD(SchedulingAgentTestFixture, "EventDrivenSchedulingAgent") {
+ auto event_driven_agent =
std::make_shared<EventDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
+ event_driven_agent->start();
+ auto first_task_reschedule_info = event_driven_agent->run(count_proc_.get(),
context_, factory_);
+ CHECK(!first_task_reschedule_info.isFinished());
+ CHECK(first_task_reschedule_info.getNextExecutionTime() <
std::chrono::steady_clock::now());
+ auto count_num_after_one_schedule = count_proc_->getNumberOfTriggers();
+ CHECK(count_num_after_one_schedule > 100);
+
+ auto second_task_reschedule_info =
event_driven_agent->run(count_proc_.get(), context_, factory_);
+ CHECK(!second_task_reschedule_info.isFinished());
+ CHECK(second_task_reschedule_info.getNextExecutionTime() <
std::chrono::steady_clock::now());
+ auto count_num_after_two_schedule = count_proc_->getNumberOfTriggers();
+ CHECK(count_num_after_two_schedule > count_num_after_one_schedule+100);
+}
- SECTION("Cron Driven every sec") {
- count_proc->setCronPeriod("* * * * * *");
- auto cron_driven_agent =
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
test_repo, test_repo, content_repo, configuration, thread_pool);
- cron_driven_agent->start();
- auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(),
context, factory);
- CHECK(!first_task_reschedule_info.isFinished());
- CHECK(first_task_reschedule_info.getNextExecutionTime() <=
std::chrono::steady_clock::now() + 1s);
- CHECK(count_proc->getNumberOfTriggers() == 0);
-
-
std::this_thread::sleep_until(first_task_reschedule_info.getNextExecutionTime());
- auto second_task_reschedule_info =
cron_driven_agent->run(count_proc.get(), context, factory);
+TEST_CASE_METHOD(SchedulingAgentTestFixture, "Cron Driven every year") {
+ count_proc_->setCronPeriod("0 0 0 1 1 ?");
+ auto cron_driven_agent =
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
+ cron_driven_agent->start();
+ auto first_task_reschedule_info = cron_driven_agent->run(count_proc_.get(),
context_, factory_);
+ CHECK(!first_task_reschedule_info.isFinished());
+ if (first_task_reschedule_info.getNextExecutionTime() >
std::chrono::steady_clock::now() + 1min) { // To avoid possibly failing around
dec 31 23:59:59
+ auto wait_time_till_next_execution_time =
std::chrono::round<std::chrono::seconds>(first_task_reschedule_info.getNextExecutionTime()
- std::chrono::steady_clock::now());
+
+ auto current_time =
date::make_zoned<std::chrono::seconds>(date::current_zone(),
std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now()));
+ auto current_year_month_day =
date::year_month_day(date::floor<date::days>(current_time.get_local_time()));
+ auto new_years_day =
date::make_zoned<std::chrono::seconds>(date::current_zone(),
date::local_days{date::year{current_year_month_day.year()+date::years(1)}/date::January/1});
+
+ auto time_until_new_years_day = new_years_day.get_local_time() -
current_time.get_local_time();
+
+ CHECK(std::chrono::abs(time_until_new_years_day -
wait_time_till_next_execution_time) < 1min);
+ CHECK(count_proc_->getNumberOfTriggers() == 0);
+
+ auto second_task_reschedule_info =
cron_driven_agent->run(count_proc_.get(), context_, factory_);
CHECK(!second_task_reschedule_info.isFinished());
- CHECK(second_task_reschedule_info.getNextExecutionTime() <=
std::chrono::steady_clock::now() + 1s);
- CHECK(count_proc->getNumberOfTriggers() == 1);
- }
+ CHECK(std::chrono::abs(first_task_reschedule_info.getNextExecutionTime() -
second_task_reschedule_info.getNextExecutionTime()) < 1min);
- SECTION("Cron Driven no future triggers") {
- count_proc->setCronPeriod("* * * * * * 2012");
- auto cron_driven_agent =
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
test_repo, test_repo, content_repo, configuration, thread_pool);
- cron_driven_agent->start();
- auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(),
context, factory);
- CHECK(first_task_reschedule_info.isFinished());
+ CHECK(count_proc_->getNumberOfTriggers() == 0);
}
}
+
+TEST_CASE_METHOD(SchedulingAgentTestFixture, "Cron Driven every sec") {
+ count_proc_->setCronPeriod("* * * * * *");
+ auto cron_driven_agent =
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
+ cron_driven_agent->start();
+ auto first_task_reschedule_info = cron_driven_agent->run(count_proc_.get(),
context_, factory_);
+ CHECK(!first_task_reschedule_info.isFinished());
+ CHECK(first_task_reschedule_info.getNextExecutionTime() <=
std::chrono::steady_clock::now() + 1s);
+ CHECK(count_proc_->getNumberOfTriggers() == 0);
+
+
std::this_thread::sleep_until(first_task_reschedule_info.getNextExecutionTime());
+ auto second_task_reschedule_info = cron_driven_agent->run(count_proc_.get(),
context_, factory_);
+ CHECK(!second_task_reschedule_info.isFinished());
+ CHECK(second_task_reschedule_info.getNextExecutionTime() <=
std::chrono::steady_clock::now() + 1s);
+ CHECK(count_proc_->getNumberOfTriggers() == 1);
+}
+
+TEST_CASE_METHOD(SchedulingAgentTestFixture, "Cron Driven no future triggers")
{
+ count_proc_->setCronPeriod("* * * * * * 2012");
+ auto cron_driven_agent =
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
+ cron_driven_agent->start();
+ auto first_task_reschedule_info = cron_driven_agent->run(count_proc_.get(),
context_, factory_);
+ CHECK(first_task_reschedule_info.isFinished());
+}
+
+TEST_CASE_METHOD(SchedulingAgentTestFixture, "Timer driven should wait for
scheduled run even if it yielded") {
+ count_proc_->setSchedulingPeriod(1min);
+ count_proc_->setYieldPeriodMsec(10ms);
+ count_proc_->setShouldYield(true);
+ auto timer_driven_agent =
std::make_shared<TimerDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
+ timer_driven_agent->start();
+ auto first_task_reschedule_info = timer_driven_agent->run(count_proc_.get(),
context_, factory_);
+ CHECK(!first_task_reschedule_info.isFinished());
+ CHECK(first_task_reschedule_info.getNextExecutionTime() >
std::chrono::steady_clock::now() + 100ms);
+ CHECK(count_proc_->getNumberOfTriggers() == 1);
+}
+
+TEST_CASE_METHOD(SchedulingAgentTestFixture, "Timer driven should wait for
yield if processor yielded") {
+ count_proc_->setSchedulingPeriod(10ms);
+ count_proc_->setYieldPeriodMsec(1min);
+ count_proc_->setShouldYield(true);
+ auto timer_driven_agent =
std::make_shared<TimerDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
+ timer_driven_agent->start();
+ auto first_task_reschedule_info = timer_driven_agent->run(count_proc_.get(),
context_, factory_);
+ CHECK(!first_task_reschedule_info.isFinished());
+ CHECK(first_task_reschedule_info.getNextExecutionTime() >
std::chrono::steady_clock::now() + 100ms);
+ CHECK(count_proc_->getNumberOfTriggers() == 1);
+}
Review Comment:
I don't insist if you think it's clearer this way, but these two tests are
identical except for the first two lines, so they could be merged into a single
test with two SECTIONs around those pairs of lines. To me, that would be more
readable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]