This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 7818783cb901849c97404dfcc4189930abdaee46
Author: Martin Zink <[email protected]>
AuthorDate: Tue Sep 12 16:01:05 2023 +0200

    MINIFICPP-1076 Yielding processor should still respect scheduling period
    
    Closes #1653
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 libminifi/src/TimerDrivenSchedulingAgent.cpp |   6 +-
 libminifi/src/utils/TestUtils.cpp            |   2 +
 libminifi/test/unit/SchedulingAgentTests.cpp | 231 +++++++++++++++------------
 3 files changed, 135 insertions(+), 104 deletions(-)

diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp 
b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index cbe5c2306..2cde8d3c8 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -30,10 +30,10 @@ utils::TaskRescheduleInfo 
TimerDrivenSchedulingAgent::run(core::Processor* proce
   if (this->running_ && processor->isRunning()) {
     auto trigger_start_time = std::chrono::steady_clock::now();
     this->onTrigger(processor, processContext, sessionFactory);
-    if (processor->isYield())
-      return 
utils::TaskRescheduleInfo::RetryAfter(processor->getYieldExpirationTime());
 
-    return utils::TaskRescheduleInfo::RetryAfter(trigger_start_time + 
processor->getSchedulingPeriod());
+    auto next_scheduled_run = trigger_start_time + 
processor->getSchedulingPeriod();
+    auto yield_expiration_time = processor->getYieldExpirationTime();
+    return utils::TaskRescheduleInfo::RetryAfter(std::max(next_scheduled_run, 
yield_expiration_time));
   }
   return utils::TaskRescheduleInfo::Done();
 }
diff --git a/libminifi/src/utils/TestUtils.cpp 
b/libminifi/src/utils/TestUtils.cpp
index 8d46e8efd..6d3021be7 100644
--- a/libminifi/src/utils/TestUtils.cpp
+++ b/libminifi/src/utils/TestUtils.cpp
@@ -19,6 +19,8 @@
 
 namespace org::apache::nifi::minifi::utils {
 #ifdef WIN32
+// If minifi is not installed through the MSI installer, then TZDATA might be 
missing
+// date::set_install can point to the TZDATA location, but it has to be called 
from each library/executable that wants to use timezones
 void dateSetInstall(const std::string& install) {
   date::set_install(install);
 }
diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp 
b/libminifi/test/unit/SchedulingAgentTests.cpp
index b3ffed29d..4a66bfc63 100644
--- a/libminifi/test/unit/SchedulingAgentTests.cpp
+++ b/libminifi/test/unit/SchedulingAgentTests.cpp
@@ -26,6 +26,8 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::testing {
 
+using minifi::core::controller::StandardControllerServiceProvider;
+
 class CountOnTriggersProcessor : public minifi::core::Processor {
  public:
   using minifi::core::Processor::Processor;
@@ -36,133 +38,160 @@ class CountOnTriggersProcessor : public 
minifi::core::Processor {
   static constexpr bool IsSingleThreaded = false;
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
-  void onTrigger(core::ProcessContext*, core::ProcessSession*) override {
+  void onTrigger(core::ProcessContext* context, core::ProcessSession*) 
override {
     if (on_trigger_duration_ > 0ms)
       std::this_thread::sleep_for(on_trigger_duration_);
     ++number_of_triggers;
+    if (should_yield_)
+      context->yield();
   }
 
   size_t getNumberOfTriggers() const { return number_of_triggers; }
   void setOnTriggerDuration(std::chrono::steady_clock::duration 
on_trigger_duration) { on_trigger_duration_ = on_trigger_duration; }
+  void setShouldYield(bool should_yield) { should_yield_ = should_yield; }
 
  private:
+  bool should_yield_ = false;
   std::chrono::steady_clock::duration on_trigger_duration_ = 0ms;
   std::atomic<size_t> number_of_triggers = 0;
 };
 
+class SchedulingAgentTestFixture {
+ public:
+  SchedulingAgentTestFixture() {
+    count_proc_->incrementActiveTasks();
+    count_proc_->setScheduledState(core::RUNNING);
 
-TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
-  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestThreadedRepository>();
-  std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-  auto repo = std::static_pointer_cast<TestThreadedRepository>(test_repo);
-  std::shared_ptr<minifi::FlowController> controller =
-      std::make_shared<TestFlowController>(test_repo, test_repo, content_repo);
-
-  TestController testController;
-  auto test_plan = testController.createPlan();
-  auto controller_services_ = 
std::make_shared<minifi::core::controller::ControllerServiceMap>();
-  auto configuration = std::make_shared<minifi::Configure>();
-  auto controller_services_provider_ = 
std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_,
 configuration);
-  utils::ThreadPool thread_pool;
-  auto count_proc = std::make_shared<CountOnTriggersProcessor>("count_proc");
-  count_proc->incrementActiveTasks();
-  count_proc->setScheduledState(core::RUNNING);
-  auto node = std::make_shared<core::ProcessorNode>(count_proc.get());
-  auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo, 
repo, content_repo);
-  std::shared_ptr<core::ProcessSessionFactory> factory = 
std::make_shared<core::ProcessSessionFactory>(context);
-  count_proc->setSchedulingPeriod(125ms);
 #ifdef WIN32
-  utils::dateSetInstall(TZ_DATA_DIR);
+    utils::dateSetInstall(TZ_DATA_DIR);
+    date::set_install(TZ_DATA_DIR);
 #endif
+  }
 
-  SECTION("Timer Driven") {
-    auto timer_driven_agent = 
std::make_shared<TimerDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo, test_repo, content_repo, configuration, thread_pool);
-    timer_driven_agent->start();
-    auto first_task_reschedule_info = 
timer_driven_agent->run(count_proc.get(), context, factory);
-    CHECK(!first_task_reschedule_info.isFinished());
-    CHECK(first_task_reschedule_info.getNextExecutionTime() <= 
std::chrono::steady_clock::now() + 125ms);
-    CHECK(count_proc->getNumberOfTriggers() == 1);
+ protected:
+  std::shared_ptr<core::Repository> test_repo_ = 
std::make_shared<TestThreadedRepository>();
+  std::shared_ptr<core::ContentRepository> content_repo_ = 
std::make_shared<core::repository::VolatileContentRepository>();
+  std::shared_ptr<minifi::FlowController> controller_ = 
std::make_shared<TestFlowController>(test_repo_, test_repo_, content_repo_);
+
+  TestController test_controller_;
+  std::shared_ptr<TestPlan> test_plan = test_controller_.createPlan();
+  std::shared_ptr<minifi::core::controller::ControllerServiceMap> 
controller_services_ = 
std::make_shared<minifi::core::controller::ControllerServiceMap>();
+  std::shared_ptr<minifi::Configure> configuration_ = 
std::make_shared<minifi::Configure>();
+  std::shared_ptr<StandardControllerServiceProvider> 
controller_services_provider_ = 
std::make_shared<StandardControllerServiceProvider>(controller_services_, 
configuration_);
+  utils::ThreadPool thread_pool_;
+
+  std::shared_ptr<CountOnTriggersProcessor> count_proc_ = 
std::make_shared<CountOnTriggersProcessor>("count_proc");
+  std::shared_ptr<core::ProcessorNode> node_ = 
std::make_shared<core::ProcessorNode>(count_proc_.get());
+  std::shared_ptr<core::ProcessContext> context_ = 
std::make_shared<core::ProcessContext>(node_, nullptr, test_repo_, test_repo_, 
content_repo_);
+  std::shared_ptr<core::ProcessSessionFactory> factory_ = 
std::make_shared<core::ProcessSessionFactory>(context_);
+};
 
-    count_proc->setOnTriggerDuration(50ms);
-    auto second_task_reschedule_info = 
timer_driven_agent->run(count_proc.get(), context, factory);
 
-    CHECK(!second_task_reschedule_info.isFinished());
-    CHECK(first_task_reschedule_info.getNextExecutionTime() <= 
std::chrono::steady_clock::now() + 75ms);
-    CHECK(count_proc->getNumberOfTriggers() == 2);
-
-    count_proc->setOnTriggerDuration(150ms);
-    auto third_task_reschedule_info = 
timer_driven_agent->run(count_proc.get(), context, factory);
-    CHECK(!third_task_reschedule_info.isFinished());
-    CHECK(first_task_reschedule_info.getNextExecutionTime() < 
std::chrono::steady_clock::now());
-    CHECK(count_proc->getNumberOfTriggers() == 3);
-  }
+TEST_CASE_METHOD(SchedulingAgentTestFixture, "TimerDrivenSchedulingAgent") {
+  count_proc_->setSchedulingPeriod(125ms);
+  auto timer_driven_agent = 
std::make_shared<TimerDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
+  timer_driven_agent->start();
+  auto first_task_reschedule_info = timer_driven_agent->run(count_proc_.get(), 
context_, factory_);
+  CHECK(!first_task_reschedule_info.isFinished());
+  CHECK(first_task_reschedule_info.getNextExecutionTime() <= 
std::chrono::steady_clock::now() + 125ms);
+  CHECK(count_proc_->getNumberOfTriggers() == 1);
 
-  SECTION("Event Driven") {
-    auto event_driven_agent = 
std::make_shared<EventDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo, test_repo, content_repo, configuration, thread_pool);
-    event_driven_agent->start();
-    auto first_task_reschedule_info = 
event_driven_agent->run(count_proc.get(), context, factory);
-    CHECK(!first_task_reschedule_info.isFinished());
-    CHECK(first_task_reschedule_info.getNextExecutionTime() < 
std::chrono::steady_clock::now());
-    auto count_num_after_one_schedule = count_proc->getNumberOfTriggers();
-    CHECK(count_num_after_one_schedule > 100);
+  count_proc_->setOnTriggerDuration(50ms);
+  auto second_task_reschedule_info = 
timer_driven_agent->run(count_proc_.get(), context_, factory_);
 
-    auto second_task_reschedule_info = 
event_driven_agent->run(count_proc.get(), context, factory);
-    CHECK(!second_task_reschedule_info.isFinished());
-    CHECK(second_task_reschedule_info.getNextExecutionTime() < 
std::chrono::steady_clock::now());
-    auto count_num_after_two_schedule = count_proc->getNumberOfTriggers();
-    CHECK(count_num_after_two_schedule > count_num_after_one_schedule+100);
-  }
+  CHECK(!second_task_reschedule_info.isFinished());
+  CHECK(first_task_reschedule_info.getNextExecutionTime() <= 
std::chrono::steady_clock::now() + 75ms);
+  CHECK(count_proc_->getNumberOfTriggers() == 2);
 
-  SECTION("Cron Driven every year") {
-#ifdef WIN32
-    date::set_install(TZ_DATA_DIR);
-#endif
-    count_proc->setCronPeriod("0 0 0 1 1 ?");
-    auto cron_driven_agent = 
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo, test_repo, content_repo, configuration, thread_pool);
-    cron_driven_agent->start();
-    auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(), 
context, factory);
-    CHECK(!first_task_reschedule_info.isFinished());
-    if (first_task_reschedule_info.getNextExecutionTime() > 
std::chrono::steady_clock::now() + 1min) {  // To avoid possibly failing around 
dec 31 23:59:59
-      auto wait_time_till_next_execution_time = 
std::chrono::round<std::chrono::seconds>(first_task_reschedule_info.getNextExecutionTime()
 - std::chrono::steady_clock::now());
-
-      auto current_time = 
date::make_zoned<std::chrono::seconds>(date::current_zone(), 
std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now()));
-      auto current_year_month_day = 
date::year_month_day(date::floor<date::days>(current_time.get_local_time()));
-      auto new_years_day = 
date::make_zoned<std::chrono::seconds>(date::current_zone(), 
date::local_days{date::year{current_year_month_day.year()+date::years(1)}/date::January/1});
-
-      auto time_until_new_years_day = new_years_day.get_local_time() - 
current_time.get_local_time();
-
-      CHECK(std::chrono::round<std::chrono::minutes>(time_until_new_years_day 
- wait_time_till_next_execution_time) == 0min);
-      CHECK(count_proc->getNumberOfTriggers() == 0);
-
-      auto second_task_reschedule_info = 
cron_driven_agent->run(count_proc.get(), context, factory);
-      CHECK(!second_task_reschedule_info.isFinished());
-      
CHECK(std::chrono::round<std::chrono::minutes>(first_task_reschedule_info.getNextExecutionTime()
 - second_task_reschedule_info.getNextExecutionTime()) == 0min);
-      CHECK(count_proc->getNumberOfTriggers() == 0);
-    }
-  }
+  count_proc_->setOnTriggerDuration(150ms);
+  auto third_task_reschedule_info = timer_driven_agent->run(count_proc_.get(), 
context_, factory_);
+  CHECK(!third_task_reschedule_info.isFinished());
+  CHECK(first_task_reschedule_info.getNextExecutionTime() < 
std::chrono::steady_clock::now());
+  CHECK(count_proc_->getNumberOfTriggers() == 3);
+}
 
-  SECTION("Cron Driven every sec") {
-    count_proc->setCronPeriod("* * * * * *");
-    auto cron_driven_agent = 
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo, test_repo, content_repo, configuration, thread_pool);
-    cron_driven_agent->start();
-    auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(), 
context, factory);
-    CHECK(!first_task_reschedule_info.isFinished());
-    CHECK(first_task_reschedule_info.getNextExecutionTime() <= 
std::chrono::steady_clock::now() + 1s);
-    CHECK(count_proc->getNumberOfTriggers() == 0);
-
-    
std::this_thread::sleep_until(first_task_reschedule_info.getNextExecutionTime());
-    auto second_task_reschedule_info = 
cron_driven_agent->run(count_proc.get(), context, factory);
+TEST_CASE_METHOD(SchedulingAgentTestFixture, "EventDrivenSchedulingAgent") {
+  auto event_driven_agent = 
std::make_shared<EventDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
+  event_driven_agent->start();
+  auto first_task_reschedule_info = event_driven_agent->run(count_proc_.get(), 
context_, factory_);
+  CHECK(!first_task_reschedule_info.isFinished());
+  CHECK(first_task_reschedule_info.getNextExecutionTime() < 
std::chrono::steady_clock::now());
+  auto count_num_after_one_schedule = count_proc_->getNumberOfTriggers();
+  CHECK(count_num_after_one_schedule > 100);
+
+  auto second_task_reschedule_info = 
event_driven_agent->run(count_proc_.get(), context_, factory_);
+  CHECK(!second_task_reschedule_info.isFinished());
+  CHECK(second_task_reschedule_info.getNextExecutionTime() < 
std::chrono::steady_clock::now());
+  auto count_num_after_two_schedule = count_proc_->getNumberOfTriggers();
+  CHECK(count_num_after_two_schedule > count_num_after_one_schedule+100);
+}
+
+TEST_CASE_METHOD(SchedulingAgentTestFixture, "Cron Driven every year") {
+  count_proc_->setCronPeriod("0 0 0 1 1 ?");
+  auto cron_driven_agent = 
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
+  cron_driven_agent->start();
+  auto first_task_reschedule_info = cron_driven_agent->run(count_proc_.get(), 
context_, factory_);
+  CHECK(!first_task_reschedule_info.isFinished());
+  if (first_task_reschedule_info.getNextExecutionTime() > 
std::chrono::steady_clock::now() + 1min) {  // To avoid possibly failing around 
dec 31 23:59:59
+    auto wait_time_till_next_execution_time = 
std::chrono::round<std::chrono::seconds>(first_task_reschedule_info.getNextExecutionTime()
 - std::chrono::steady_clock::now());
+
+    auto current_time = 
date::make_zoned<std::chrono::seconds>(date::current_zone(), 
std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now()));
+    auto current_year_month_day = 
date::year_month_day(date::floor<date::days>(current_time.get_local_time()));
+    auto new_years_day = 
date::make_zoned<std::chrono::seconds>(date::current_zone(), 
date::local_days{date::year{current_year_month_day.year()+date::years(1)}/date::January/1});
+
+    auto time_until_new_years_day = new_years_day.get_local_time() - 
current_time.get_local_time();
+
+    CHECK(std::chrono::abs(time_until_new_years_day - 
wait_time_till_next_execution_time) < 1min);
+    CHECK(count_proc_->getNumberOfTriggers() == 0);
+
+    auto second_task_reschedule_info = 
cron_driven_agent->run(count_proc_.get(), context_, factory_);
     CHECK(!second_task_reschedule_info.isFinished());
-    CHECK(second_task_reschedule_info.getNextExecutionTime() <= 
std::chrono::steady_clock::now() + 1s);
-    CHECK(count_proc->getNumberOfTriggers() == 1);
+    CHECK(std::chrono::abs(first_task_reschedule_info.getNextExecutionTime() - 
second_task_reschedule_info.getNextExecutionTime()) < 1min);
+
+    CHECK(count_proc_->getNumberOfTriggers() == 0);
   }
+}
+
+TEST_CASE_METHOD(SchedulingAgentTestFixture, "Cron Driven every sec") {
+  count_proc_->setCronPeriod("* * * * * *");
+  auto cron_driven_agent = 
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
+  cron_driven_agent->start();
+  auto first_task_reschedule_info = cron_driven_agent->run(count_proc_.get(), 
context_, factory_);
+  CHECK(!first_task_reschedule_info.isFinished());
+  CHECK(first_task_reschedule_info.getNextExecutionTime() <= 
std::chrono::steady_clock::now() + 1s);
+  CHECK(count_proc_->getNumberOfTriggers() == 0);
+
+  
std::this_thread::sleep_until(first_task_reschedule_info.getNextExecutionTime());
+  auto second_task_reschedule_info = cron_driven_agent->run(count_proc_.get(), 
context_, factory_);
+  CHECK(!second_task_reschedule_info.isFinished());
+  CHECK(second_task_reschedule_info.getNextExecutionTime() <= 
std::chrono::steady_clock::now() + 1s);
+  CHECK(count_proc_->getNumberOfTriggers() == 1);
+}
 
-  SECTION("Cron Driven no future triggers") {
-    count_proc->setCronPeriod("* * * * * * 2012");
-    auto cron_driven_agent = 
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo, test_repo, content_repo, configuration, thread_pool);
-    cron_driven_agent->start();
-    auto first_task_reschedule_info = cron_driven_agent->run(count_proc.get(), 
context, factory);
-    CHECK(first_task_reschedule_info.isFinished());
+TEST_CASE_METHOD(SchedulingAgentTestFixture, "Cron Driven no future triggers") 
{
+  count_proc_->setCronPeriod("* * * * * * 2012");
+  auto cron_driven_agent = 
std::make_shared<CronDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
+  cron_driven_agent->start();
+  auto first_task_reschedule_info = cron_driven_agent->run(count_proc_.get(), 
context_, factory_);
+  CHECK(first_task_reschedule_info.isFinished());
+}
+
+TEST_CASE_METHOD(SchedulingAgentTestFixture, "Timer driven should respect both 
yield and run schedule") {
+  SECTION("Fast yield slow schedule") {
+    count_proc_->setSchedulingPeriod(1min);
+    count_proc_->setYieldPeriodMsec(10ms);
+  }
+  SECTION("Slow yield fast schedule") {
+    count_proc_->setSchedulingPeriod(10ms);
+    count_proc_->setYieldPeriodMsec(1min);
   }
+  count_proc_->setShouldYield(true);
+  auto timer_driven_agent = 
std::make_shared<TimerDrivenSchedulingAgent>(gsl::make_not_null(controller_services_provider_.get()),
 test_repo_, test_repo_, content_repo_, configuration_, thread_pool_);
+  timer_driven_agent->start();
+  auto first_task_reschedule_info = timer_driven_agent->run(count_proc_.get(), 
context_, factory_);
+  CHECK(!first_task_reschedule_info.isFinished());
+  CHECK(first_task_reschedule_info.getNextExecutionTime() > 
std::chrono::steady_clock::now() + 100ms);
+  CHECK(count_proc_->getNumberOfTriggers() == 1);
 }
+
 }  // namespace org::apache::nifi::minifi::testing

Reply via email to