This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit dcbabbad04abceacce2a246a905589ac1f0e8fd6
Author: Martin Zink <[email protected]>
AuthorDate: Thu Jun 15 19:35:05 2023 +0200

    MINIFICPP-2125 fix for waking up prematurely after processor yields
    
    Closes #1581
    Signed-off-by: Marton Szasz <[email protected]>
---
 extensions/standard-processors/processors/GetTCP.h |   2 +-
 .../tests/unit/FlowJsonTests.cpp                   |   7 +-
 .../tests/unit/ProcessorTests.cpp                  |   6 ++
 .../tests/unit/YamlConfigurationTests.cpp          |  10 +-
 libminifi/include/core/Processor.h                 |  37 ++++---
 libminifi/include/utils/Monitors.h                 | 108 +++++----------------
 libminifi/include/utils/ThreadPool.h               |   7 +-
 libminifi/src/CronDrivenSchedulingAgent.cpp        |   4 +-
 libminifi/src/EventDrivenSchedulingAgent.cpp       |   4 +-
 libminifi/src/TimerDrivenSchedulingAgent.cpp       |   3 +-
 libminifi/src/core/Processor.cpp                   |  31 +++---
 .../src/core/flow/StructuredConfiguration.cpp      |   4 +-
 libminifi/src/utils/ThreadPool.cpp                 |  27 +++---
 libminifi/test/unit/BackTraceTests.cpp             |  32 ++----
 libminifi/test/unit/SchedulingAgentTests.cpp       |  19 +++-
 libminifi/test/unit/ThreadPoolTests.cpp            |  54 ++++++-----
 16 files changed, 149 insertions(+), 206 deletions(-)

diff --git a/extensions/standard-processors/processors/GetTCP.h 
b/extensions/standard-processors/processors/GetTCP.h
index e8070afd2..7e0dd03fd 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -69,7 +69,7 @@ class SocketAfterExecute : public utils::AfterExecute<int> {
     return !running_;
   }
 
-  std::chrono::milliseconds wait_time() override {
+  std::chrono::steady_clock::duration wait_time() override {
     // wait 500ms
     return std::chrono::milliseconds(500);
   }
diff --git a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp 
b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
index 495965122..71c04bf21 100644
--- a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
+++ b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp
@@ -21,12 +21,9 @@
 #include <chrono>
 #include "core/repository/VolatileContentRepository.h"
 #include "core/ProcessGroup.h"
-#include "core/RepositoryFactory.h"
 #include "core/yaml/YamlConfiguration.h"
 #include "TailFile.h"
-#include "TestBase.h"
 #include "Catch.h"
-#include "utils/TestUtils.h"
 #include "utils/StringUtils.h"
 #include "ConfigurationTestController.h"
 #include "Funnel.h"
@@ -115,9 +112,9 @@ TEST_CASE("NiFi flow json format is correctly parsed") {
   REQUIRE(proc->getUUIDStr() == "00000000-0000-0000-0000-000000000001");
   REQUIRE(15 == proc->getMaxConcurrentTasks());
   REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == 
proc->getSchedulingStrategy());
-  REQUIRE(3s == proc->getSchedulingPeriodNano());
+  REQUIRE(3s == proc->getSchedulingPeriod());
   REQUIRE(12s == proc->getPenalizationPeriod());
-  REQUIRE(4s == proc->getYieldPeriodMsec());
+  REQUIRE(4s == proc->getYieldPeriod());
   REQUIRE(proc->isAutoTerminated({"one", ""}));
   REQUIRE(proc->isAutoTerminated({"two", ""}));
   REQUIRE_FALSE(proc->isAutoTerminated({"three", ""}));
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp 
b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index ccb322584..570128479 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -824,3 +824,9 @@ TEST_CASE("Test getProcessorType", "[getProcessorType]") {
   auto processor = plan->addProcessor("GenerateFlowFile", "myProc");
   REQUIRE(processor->getProcessorType() == "GenerateFlowFile");
 }
+
+TEST_CASE("IsYield and getYieldTime is consistent") {
+  auto processor = TestProcessorNoContent("test_processor");
+  processor.yield(1ms);
+  REQUIRE(processor.isYield() == (processor.getYieldTime() != 0ms));
+}
diff --git 
a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp 
b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index 8179f328a..f829729ed 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -26,7 +26,6 @@
 #include "TailFile.h"
 #include "TestBase.h"
 #include "Catch.h"
-#include "utils/TestUtils.h"
 #include "utils/StringUtils.h"
 #include "ConfigurationTestController.h"
 #include "utils/IntegrationTestUtils.h"
@@ -151,9 +150,9 @@ Provenance Reporting:
     
REQUIRE(!rootFlowConfig->findProcessorByName("TailFile")->getUUIDStr().empty());
     REQUIRE(1 == 
rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
     REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == 
rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy());
-    REQUIRE(1s == 
rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano());
+    REQUIRE(1s == 
rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriod());
     REQUIRE(30s == 
rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod());
-    REQUIRE(1s == 
rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec());
+    REQUIRE(1s == 
rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriod());
     REQUIRE(0s == 
rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());
 
     std::map<std::string, minifi::Connection*> connectionMap;
@@ -462,9 +461,9 @@ NiFi Properties Overrides: {}
   REQUIRE(1 == 
rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
   REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == 
rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy());
   REQUIRE(1 == 
rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
-  REQUIRE(1s == 
rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano());
+  REQUIRE(1s == 
rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriod());
   REQUIRE(30s == 
rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod());
-  REQUIRE(1s == 
rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec());
+  REQUIRE(1s == 
rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriod());
   REQUIRE(0s == 
rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());
 
   std::map<std::string, minifi::Connection*> connectionMap;
@@ -816,7 +815,6 @@ TEST_CASE("Test UUID duplication checks", 
"[YamlConfiguration]") {
               class: SSLContextService
             )";
 
-      auto config_old = config_yaml;
       utils::StringUtils::replaceAll(config_yaml, 
std::string("00000000-0000-0000-0000-00000000000") + i, 
"99999999-9999-9999-9999-999999999999");
       REQUIRE_THROWS_WITH(yaml_config.getRootFromPayload(config_yaml), 
"General Operation: UUID 99999999-9999-9999-9999-999999999999 is duplicated in 
the flow configuration");
     }
diff --git a/libminifi/include/core/Processor.h 
b/libminifi/include/core/Processor.h
index 824b5eb03..8cc3b8379 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -71,8 +71,7 @@ class ProcessContext;
 class ProcessSession;
 class ProcessSessionFactory;
 
-// Minimum scheduling period in Nano Second
-constexpr std::chrono::nanoseconds MINIMUM_SCHEDULING_NANOS{30000};
+constexpr std::chrono::microseconds MINIMUM_SCHEDULING_PERIOD{30};
 
 #define BUILDING_DLL 1
 
@@ -102,12 +101,12 @@ class Processor : public Connectable, public 
ConfigurableComponent, public state
     return strategy_;
   }
 
-  void setSchedulingPeriodNano(std::chrono::nanoseconds period) {
-    scheduling_period_nano_ = std::max(MINIMUM_SCHEDULING_NANOS, period);
+  void setSchedulingPeriod(std::chrono::steady_clock::duration period) {
+    scheduling_period_ = 
std::max(std::chrono::steady_clock::duration(MINIMUM_SCHEDULING_PERIOD), 
period);
   }
 
-  std::chrono::nanoseconds getSchedulingPeriodNano() const {
-    return scheduling_period_nano_;
+  std::chrono::steady_clock::duration getSchedulingPeriod() const {
+    return scheduling_period_;
   }
 
   void setCronPeriod(const std::string &period) {
@@ -118,20 +117,20 @@ class Processor : public Connectable, public 
ConfigurableComponent, public state
     return cron_period_;
   }
 
-  void setRunDurationNano(std::chrono::nanoseconds period) {
-    run_duration_nano_ = period;
+  void setRunDurationNano(std::chrono::steady_clock::duration period) {
+    run_duration_ = period;
   }
 
-  std::chrono::nanoseconds getRunDurationNano() const {
-    return (run_duration_nano_);
+  std::chrono::steady_clock::duration getRunDurationNano() const {
+    return (run_duration_);
   }
 
   void setYieldPeriodMsec(std::chrono::milliseconds period) {
-    yield_period_msec_ = period;
+    yield_period_ = period;
   }
 
-  std::chrono::milliseconds getYieldPeriodMsec() const {
-    return yield_period_msec_;
+  std::chrono::steady_clock::duration getYieldPeriod() const {
+    return yield_period_;
   }
 
   void setPenalizationPeriod(std::chrono::milliseconds period) {
@@ -171,13 +170,13 @@ class Processor : public Connectable, public 
ConfigurableComponent, public state
 
   void yield() override;
 
-  void yield(std::chrono::milliseconds delta_time);
+  void yield(std::chrono::steady_clock::duration delta_time);
 
   virtual bool isYield();
 
   void clearYield();
 
-  std::chrono::milliseconds getYieldTime() const;
+  std::chrono::steady_clock::duration getYieldTime() const;
   // Whether flow file queue full in any of the outgoing connection
   bool flowFilesOutGoingFull() const;
 
@@ -239,9 +238,9 @@ class Processor : public Connectable, public 
ConfigurableComponent, public state
 
   std::atomic<ScheduledState> state_;
 
-  std::atomic<std::chrono::nanoseconds> scheduling_period_nano_;
-  std::atomic<std::chrono::nanoseconds> run_duration_nano_;
-  std::atomic<std::chrono::milliseconds> yield_period_msec_;
+  std::atomic<std::chrono::steady_clock::duration> scheduling_period_;
+  std::atomic<std::chrono::steady_clock::duration> run_duration_;
+  std::atomic<std::chrono::steady_clock::duration> yield_period_;
 
   std::atomic<uint8_t> active_tasks_;
   std::atomic<bool> _triggerWhenEmpty;
@@ -251,7 +250,7 @@ class Processor : public Connectable, public 
ConfigurableComponent, public state
 
  private:
   mutable std::mutex mutex_;
-  std::atomic<std::chrono::time_point<std::chrono::system_clock>> 
yield_expiration_{};
+  std::atomic<std::chrono::steady_clock::time_point> yield_expiration_{};
 
   static std::mutex& getGraphMutex() {
     static std::mutex mutex{};
diff --git a/libminifi/include/utils/Monitors.h 
b/libminifi/include/utils/Monitors.h
index 3b446561f..aed00ce2d 100644
--- a/libminifi/include/utils/Monitors.h
+++ b/libminifi/include/utils/Monitors.h
@@ -15,20 +15,17 @@
  * limitations under the License.
  */
 
-#ifndef LIBMINIFI_INCLUDE_UTILS_MONITORS_H_
-#define LIBMINIFI_INCLUDE_UTILS_MONITORS_H_
+#pragma once
 
-#include <chrono>
+#include <algorithm>
 #include <atomic>
+#include <chrono>
 #if defined(WIN32)
 #include <future>  // This is required to work around a VS2017 bug, see the 
details below
 #endif
+#include "utils/gsl.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
+namespace org::apache::nifi::minifi::utils {
 
 /**
  * Worker task helper that determines
@@ -40,95 +37,51 @@ class AfterExecute {
   virtual ~AfterExecute() = default;
 
   AfterExecute() = default;
-  AfterExecute(AfterExecute&& /*other*/) = default;
+  AfterExecute(AfterExecute&& /*other*/)  noexcept = default;
   virtual bool isFinished(const T &result) = 0;
   virtual bool isCancelled(const T &result) = 0;
   /**
    * Time to wait before re-running this task if necessary
    * @return milliseconds since epoch after which we are eligible to re-run 
this task.
    */
-  virtual std::chrono::milliseconds wait_time() = 0;
+  virtual std::chrono::steady_clock::duration wait_time() = 0;
 };
 
 /**
  * Uses the wait time for a given worker to determine if it is eligible to run
  */
-class TimerAwareMonitor : public 
utils::AfterExecute<std::chrono::milliseconds> {
- public:
-  TimerAwareMonitor(std::atomic<bool> *run_monitor) // NOLINT
-      : current_wait_(std::chrono::milliseconds(0)),
-        run_monitor_(run_monitor) {
-  }
-  bool isFinished(const std::chrono::milliseconds &result) override {
-    current_wait_.store(result);
-    if (*run_monitor_) {
-      return false;
-    }
-    return true;
-  }
-  bool isCancelled(const std::chrono::milliseconds& /*result*/) override {
-    if (*run_monitor_) {
-      return false;
-    }
-    return true;
-  }
-  /**
-   * Time to wait before re-running this task if necessary
-   * @return milliseconds since epoch after which we are eligible to re-run 
this task.
-   */
-  std::chrono::milliseconds wait_time() override {
-    return current_wait_.load();
-  }
-
- protected:
-  std::atomic<std::chrono::milliseconds> current_wait_;
-  std::atomic<bool> *run_monitor_;
-};
-
-class SingleRunMonitor : public utils::AfterExecute<bool>{
- public:
-  SingleRunMonitor(std::chrono::milliseconds retry_interval = 
std::chrono::milliseconds(100)) // NOLINT
-      : retry_interval_(retry_interval) {
-  }
-
-  bool isFinished(const bool &result) override {
-    return result;
-  }
-  bool isCancelled(const bool& /*result*/) override {
-    return false;
-  }
-  std::chrono::milliseconds wait_time() override {
-    return retry_interval_;
-  }
- protected:
-  const std::chrono::milliseconds retry_interval_;
-};
-
 
 struct TaskRescheduleInfo {
-  TaskRescheduleInfo(bool result, std::chrono::milliseconds wait_time)
-    : wait_time_(wait_time), finished_(result) {}
+  TaskRescheduleInfo(bool result, std::chrono::steady_clock::duration 
wait_time)
+    : wait_time_(wait_time), finished_(result) {
+    gsl_Expects(wait_time >= std::chrono::milliseconds(0));
+  }
 
-  std::chrono::milliseconds wait_time_;
+  std::chrono::steady_clock::duration wait_time_;
   bool finished_;
 
   static TaskRescheduleInfo Done() {
-    return TaskRescheduleInfo(true, std::chrono::milliseconds(0));
+    return {true, std::chrono::steady_clock::duration(0)};
   }
 
-  static TaskRescheduleInfo RetryIn(std::chrono::milliseconds interval) {
-    return TaskRescheduleInfo(false, interval);
+  static TaskRescheduleInfo RetryIn(std::chrono::steady_clock::duration 
interval) {
+    return {false, interval};
+  }
+
+  static TaskRescheduleInfo RetryAfter(std::chrono::steady_clock::time_point 
time_point) {
+    auto interval = std::max(time_point - std::chrono::steady_clock::now(), 
std::chrono::steady_clock::duration(0));
+    return {false, interval};
   }
 
   static TaskRescheduleInfo RetryImmediately() {
-    return TaskRescheduleInfo(false, std::chrono::milliseconds(0));
+    return {false, std::chrono::steady_clock::duration(0)};
   }
 
 #if defined(WIN32)
 // 
https://developercommunity.visualstudio.com/content/problem/60897/c-shared-state-futuresstate-default-constructs-the.html
 // Because of this bug we need to have this object default constructible, 
which makes no sense otherwise. Hack.
  private:
-  TaskRescheduleInfo() : wait_time_(std::chrono::milliseconds(0)), 
finished_(true) {}
+  TaskRescheduleInfo() : wait_time_(std::chrono::steady_clock::duration(0)), 
finished_(true) {}
   friend class std::_Associated_state<TaskRescheduleInfo>;
 #endif
 };
@@ -147,22 +100,13 @@ class ComplexMonitor : public 
utils::AfterExecute<TaskRescheduleInfo> {
   bool isCancelled(const TaskRescheduleInfo& /*result*/) override {
     return false;
   }
-  /**
-   * Time to wait before re-running this task if necessary
-   * @return milliseconds since epoch after which we are eligible to re-run 
this task.
-   */
-  std::chrono::milliseconds wait_time() override {
+
+  std::chrono::steady_clock::duration wait_time() override {
     return current_wait_.load();
   }
 
  private:
-  std::atomic<std::chrono::milliseconds> current_wait_ 
{std::chrono::milliseconds(0)};
+  std::atomic<std::chrono::steady_clock::duration> current_wait_ 
{std::chrono::steady_clock::duration(0)};
 };
 
-}  // namespace utils
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_UTILS_MONITORS_H_
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/ThreadPool.h 
b/libminifi/include/utils/ThreadPool.h
index 68c2663ec..dde867909 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -94,7 +94,7 @@ class Worker {
       promise->set_value(result);
       return false;
     }
-    next_exec_time_ = std::max(next_exec_time_ + 
run_determinant_->wait_time(), std::chrono::steady_clock::now());
+    next_exec_time_ = std::max(next_exec_time_, 
std::chrono::steady_clock::now() + run_determinant_->wait_time());
     return true;
   }
 
@@ -106,11 +106,6 @@ class Worker {
     return next_exec_time_;
   }
 
-  virtual std::chrono::milliseconds getWaitTime() const {
-    return run_determinant_->wait_time();
-  }
-
-
   std::shared_ptr<std::promise<T>> getPromise() const;
 
   const TaskId &getIdentifier() const {
diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp 
b/libminifi/src/CronDrivenSchedulingAgent.cpp
index 0e12bd3ae..fdc885747 100644
--- a/libminifi/src/CronDrivenSchedulingAgent.cpp
+++ b/libminifi/src/CronDrivenSchedulingAgent.cpp
@@ -50,7 +50,7 @@ utils::TaskRescheduleInfo 
CronDrivenSchedulingAgent::run(core::Processor* proces
       return utils::TaskRescheduleInfo::Done();
 
     if (*next_to_last_trigger > current_time.get_local_time())
-      return 
utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_to_last_trigger-current_time.get_local_time()));
+      return 
utils::TaskRescheduleInfo::RetryIn(*next_to_last_trigger-current_time.get_local_time());
 
     auto on_trigger_result = this->onTrigger(processor, processContext, 
sessionFactory);
 
@@ -61,7 +61,7 @@ utils::TaskRescheduleInfo 
CronDrivenSchedulingAgent::run(core::Processor* proces
       return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
 
     if (auto next_trigger = 
schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time()))
-      return 
utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_trigger-current_time.get_local_time()));
+      return 
utils::TaskRescheduleInfo::RetryIn(*next_trigger-current_time.get_local_time());
   }
   return utils::TaskRescheduleInfo::Done();
 }
diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp 
b/libminifi/src/EventDrivenSchedulingAgent.cpp
index 3a23ba839..b50d2d6f3 100644
--- a/libminifi/src/EventDrivenSchedulingAgent.cpp
+++ b/libminifi/src/EventDrivenSchedulingAgent.cpp
@@ -39,11 +39,11 @@ utils::TaskRescheduleInfo 
EventDrivenSchedulingAgent::run(core::Processor* proce
                                          const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (this->running_) {
     auto start_time = std::chrono::steady_clock::now();
-    // trigger processor until it has work to do, but no more than half a sec
+    // trigger processor until it has work to do, but no more than the 
configured nifi.flow.engine.event.driven.time.slice
     while (processor->isRunning() && (std::chrono::steady_clock::now() - 
start_time < time_slice_)) {
       this->onTrigger(processor, processContext, sessionFactory);
       if (processor->isYield()) {
-        return 
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime()));
+        return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
       }
     }
     return utils::TaskRescheduleInfo::RetryImmediately();  // Let's continue 
work as soon as a thread is available
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp 
b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 2b4072170..113cee8f8 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -28,11 +28,12 @@ namespace org::apache::nifi::minifi {
 utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* 
processor, const std::shared_ptr<core::ProcessContext> &processContext,
                                          const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   if (this->running_ && processor->isRunning()) {
+    auto trigger_start_time = std::chrono::steady_clock::now();
     this->onTrigger(processor, processContext, sessionFactory);
     if (processor->isYield())
       return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime());
 
-    return 
utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(processor->getSchedulingPeriodNano()));
+    return utils::TaskRescheduleInfo::RetryAfter(trigger_start_time + 
processor->getSchedulingPeriod());
   }
   return utils::TaskRescheduleInfo::Done();
 }
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index fdb15f4a3..61965d1b8 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -50,9 +50,9 @@ Processor::Processor(std::string name, 
std::shared_ptr<ProcessorMetrics> metrics
   state_ = DISABLED;
   strategy_ = TIMER_DRIVEN;
   _triggerWhenEmpty = false;
-  scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
-  run_duration_nano_ = DEFAULT_RUN_DURATION;
-  yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS;
+  scheduling_period_ = MINIMUM_SCHEDULING_PERIOD;
+  run_duration_ = DEFAULT_RUN_DURATION;
+  yield_period_ = DEFAULT_YIELD_PERIOD_SECONDS;
   penalization_period_ = DEFAULT_PENALIZATION_PERIOD;
   max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS;
   active_tasks_ = 0;
@@ -69,9 +69,9 @@ Processor::Processor(std::string name, const 
utils::Identifier& uuid, std::share
   state_ = DISABLED;
   strategy_ = TIMER_DRIVEN;
   _triggerWhenEmpty = false;
-  scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS;
-  run_duration_nano_ = DEFAULT_RUN_DURATION;
-  yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS;
+  scheduling_period_ = MINIMUM_SCHEDULING_PERIOD;
+  run_duration_ = DEFAULT_RUN_DURATION;
+  yield_period_ = DEFAULT_YIELD_PERIOD_SECONDS;
   penalization_period_ = DEFAULT_PENALIZATION_PERIOD;
   max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS;
   active_tasks_ = 0;
@@ -372,28 +372,23 @@ void Processor::setMaxConcurrentTasks(const uint8_t 
tasks) {
 }
 
 void Processor::yield() {
-  yield_expiration_ = std::chrono::system_clock::now() + 
yield_period_msec_.load();
+  yield_expiration_ = std::chrono::steady_clock::now() + yield_period_.load();
 }
 
-void Processor::yield(std::chrono::milliseconds delta_time) {
-  yield_expiration_ = std::chrono::system_clock::now() + delta_time;
+void Processor::yield(std::chrono::steady_clock::duration delta_time) {
+  yield_expiration_ = std::chrono::steady_clock::now() + delta_time;
 }
 
 bool Processor::isYield() {
-  return yield_expiration_.load() >= std::chrono::system_clock::now();
+  return getYieldTime() > 0ms;
 }
 
 void Processor::clearYield() {
-  yield_expiration_ = std::chrono::system_clock::time_point();
+  yield_expiration_ = std::chrono::steady_clock::time_point();
 }
 
-std::chrono::milliseconds Processor::getYieldTime() const {
-  auto yield_expiration = yield_expiration_.load();
-  auto current_time = std::chrono::system_clock::now();
-  if (yield_expiration > current_time)
-    return 
std::chrono::duration_cast<std::chrono::milliseconds>(yield_expiration - 
current_time);
-  else
-    return 0ms;
+std::chrono::steady_clock::duration Processor::getYieldTime() const {
+  return std::max(yield_expiration_.load()-std::chrono::steady_clock::now(), 
std::chrono::steady_clock::duration{0});
 }
 
 }  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp 
b/libminifi/src/core/flow/StructuredConfiguration.cpp
index 5306af402..f4e493897 100644
--- a/libminifi/src/core/flow/StructuredConfiguration.cpp
+++ b/libminifi/src/core/flow/StructuredConfiguration.cpp
@@ -247,7 +247,7 @@ void StructuredConfiguration::parseProcessorNode(const 
Node& processors_node, co
     if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || 
procCfg.schedulingStrategy == "EVENT_DRIVEN") {
       if (auto scheduling_period = 
utils::timeutils::StringToDuration<std::chrono::nanoseconds>(procCfg.schedulingPeriod))
 {
         logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => 
[%" PRId64 "] ns", scheduling_period->count());
-        processor->setSchedulingPeriodNano(*scheduling_period);
+        processor->setSchedulingPeriod(*scheduling_period);
       }
     } else {
       processor->setCronPeriod(procCfg.schedulingPeriod);
@@ -437,7 +437,7 @@ void 
StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P
 
   if (auto scheduling_period = 
utils::timeutils::StringToDuration<std::chrono::nanoseconds>(schedulingPeriodStr))
 {
     logger_->log_debug("ProvenanceReportingTask schedulingPeriod %" PRId64 " 
ns", scheduling_period->count());
-    reportTask->setSchedulingPeriodNano(*scheduling_period);
+    reportTask->setSchedulingPeriod(*scheduling_period);
   }
 
   if (schedulingStrategyStr == "TIMER_DRIVEN") {
diff --git a/libminifi/src/utils/ThreadPool.cpp 
b/libminifi/src/utils/ThreadPool.cpp
index baadb59ec..06c2eb4e1 100644
--- a/libminifi/src/utils/ThreadPool.cpp
+++ b/libminifi/src/utils/ThreadPool.cpp
@@ -18,6 +18,8 @@
 #include "utils/ThreadPool.h"
 #include "core/state/UpdateController.h"
 
+using namespace std::literals::chrono_literals;
+
 namespace org::apache::nifi::minifi::utils {
 
 template<typename T>
@@ -92,7 +94,7 @@ void ThreadPool<T>::run_tasks(const 
std::shared_ptr<WorkerThread>& thread) {
       // The threadpool is running, but the ConcurrentQueue is stopped -> 
shouldn't happen during normal conditions
       // Might happen during startup or shutdown for a very short time
       if (running_.load()) {
-        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        std::this_thread::sleep_for(1ms);
       }
     }
   }
@@ -105,8 +107,7 @@ void ThreadPool<T>::manage_delayed_queue() {
     std::unique_lock<std::mutex> lock(worker_queue_mutex_);
 
     // Put the tasks ready to run in the worker queue
-    while (!delayed_worker_queue_.empty() &&
-        delayed_worker_queue_.top().getNextExecutionTime() <= 
std::chrono::steady_clock::now()) {
+    while (!delayed_worker_queue_.empty() && 
delayed_worker_queue_.top().getNextExecutionTime() <= 
std::chrono::steady_clock::now()) {
       // I'm very sorry for this - committee must has been seriously drunk 
when the interface of prio queue was submitted.
       Worker<T> task = 
std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top()));
       delayed_worker_queue_.pop();
@@ -115,9 +116,8 @@ void ThreadPool<T>::manage_delayed_queue() {
     if (delayed_worker_queue_.empty()) {
       delayed_task_available_.wait(lock);
     } else {
-      auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(
-          delayed_worker_queue_.top().getNextExecutionTime() - 
std::chrono::steady_clock::now());
-      delayed_task_available_.wait_for(lock, std::max(wait_time, 
std::chrono::milliseconds(1)));
+      auto wait_time = delayed_worker_queue_.top().getNextExecutionTime() - 
std::chrono::steady_clock::now();
+      delayed_task_available_.wait_for(lock, std::max(wait_time, 
std::chrono::steady_clock::duration(1ms)));
     }
   }
 }
@@ -151,12 +151,13 @@ void ThreadPool<T>::manageWorkers() {
 
   if (nullptr != thread_manager_) {
     while (running_) {
-      auto waitperiod = std::chrono::milliseconds(500);
+      auto wait_period = 500ms;
       {
-        std::unique_lock<std::recursive_mutex> lock(manager_mutex_, 
std::try_to_lock);
-        if (!lock.owns_lock()) {
+        std::unique_lock<std::recursive_mutex> manager_lock(manager_mutex_, 
std::try_to_lock);
+        if (!manager_lock.owns_lock()) {
           // Threadpool is being stopped/started or config is being changed, 
better wait a bit
-          std::this_thread::sleep_for(std::chrono::milliseconds(10));
+          std::this_thread::sleep_for(10ms);
+          continue;
         }
         if (thread_manager_->isAboveMax(current_workers_)) {
           auto max = thread_manager_->getMaxConcurrentTasks();
@@ -167,7 +168,7 @@ void ThreadPool<T>::manageWorkers() {
             thread_reduction_count_++;
           thread_manager_->reduce();
         } else if (thread_manager_->canIncrease() && max_worker_threads_ > 
current_workers_) {  // increase slowly
-          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          std::unique_lock<std::mutex> worker_queue_lock(worker_queue_mutex_);
           auto worker_thread = std::make_shared<WorkerThread>();
           worker_thread->thread_ = createThread([this, worker_thread] { 
run_tasks(worker_thread); });
           if (daemon_threads_) {
@@ -178,13 +179,13 @@ void ThreadPool<T>::manageWorkers() {
         }
         std::shared_ptr<WorkerThread> thread_ref;
         while (deceased_thread_queue_.tryDequeue(thread_ref)) {
-          std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+          std::unique_lock<std::mutex> worker_queue_lock(worker_queue_mutex_);
           if (thread_ref->thread_.joinable())
             thread_ref->thread_.join();
           thread_queue_.erase(std::remove(thread_queue_.begin(), 
thread_queue_.end(), thread_ref), thread_queue_.end());
         }
       }
-      std::this_thread::sleep_for(waitperiod);
+      std::this_thread::sleep_for(wait_period);
     }
   } else {
     for (auto &thread : thread_queue_) {
diff --git a/libminifi/test/unit/BackTraceTests.cpp 
b/libminifi/test/unit/BackTraceTests.cpp
index 4366f7a38..46363638d 100644
--- a/libminifi/test/unit/BackTraceTests.cpp
+++ b/libminifi/test/unit/BackTraceTests.cpp
@@ -25,9 +25,7 @@
 #include "utils/Monitors.h"
 #include "utils/ThreadPool.h"
 
-bool function() {
-  return true;
-}
+using namespace std::literals::chrono_literals;
 
 class WorkerNumberExecutions : public utils::AfterExecute<int> {
  public:
@@ -35,9 +33,9 @@ class WorkerNumberExecutions : public 
utils::AfterExecute<int> {
       : tasks(tasks) {
   }
 
-  explicit WorkerNumberExecutions(WorkerNumberExecutions && other) noexcept
+  WorkerNumberExecutions(WorkerNumberExecutions && other) noexcept
       : runs(other.runs),
-        tasks(other.tasks) {
+      tasks(other.tasks) {
   }
 
   bool isFinished(const int &result) override {
@@ -47,13 +45,8 @@ class WorkerNumberExecutions : public 
utils::AfterExecute<int> {
     return false;
   }
 
-  [[nodiscard]] int getRuns() const {
-    return runs;
-  }
-
-  std::chrono::milliseconds wait_time() override {
-    // wait 50ms
-    return std::chrono::milliseconds(50);
+  std::chrono::steady_clock::duration wait_time() override {
+    return 50ms;
   }
 
  protected:
@@ -68,30 +61,21 @@ TEST_CASE("BT1", "[TPT1]") {
 #endif
 }
 
-std::atomic<int> counter;
-
-int counterFunction() {
-  std::this_thread::sleep_for(std::chrono::milliseconds(150));
-  return ++counter;
-}
-
 TEST_CASE("BT2", "[TPT2]") {
-  counter = 0;
+  std::atomic<int> counter = 0;
   utils::ThreadPool<int> pool(4);
   pool.start();
   std::this_thread::sleep_for(std::chrono::milliseconds(150));
   for (int i = 0; i < 3; i++) {
-    std::function<int()> f_ex = counterFunction;
     std::unique_ptr<utils::AfterExecute<int>> after_execute = 
std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(5));
-    utils::Worker<int> functor(f_ex, "id", std::move(after_execute));
+    utils::Worker<int> functor([&counter]() { return ++counter; }, "id", 
std::move(after_execute));
 
     std::future<int> fut;
     pool.execute(std::move(functor), fut);  // NOLINT(bugprone-use-after-move)
   }
 
-  std::function<int()> f_ex = counterFunction;
   std::unique_ptr<utils::AfterExecute<int>> after_execute = 
std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(5));
-  utils::Worker<int> functor(f_ex, "id", std::move(after_execute));
+  utils::Worker<int> functor([&counter]() { return ++counter; }, "id", 
std::move(after_execute));
 
   std::future<int> fut;
   pool.execute(std::move(functor), fut);  // NOLINT(bugprone-use-after-move)
diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp 
b/libminifi/test/unit/SchedulingAgentTests.cpp
index 36a74046e..c3cf6638a 100644
--- a/libminifi/test/unit/SchedulingAgentTests.cpp
+++ b/libminifi/test/unit/SchedulingAgentTests.cpp
@@ -15,6 +15,8 @@
  * limitations under the License.
  */
 
+#include <chrono>
+
 #include "../Catch.h"
 #include "../TestBase.h"
 #include "ProvenanceTestHelper.h"
@@ -35,12 +37,16 @@ class CountOnTriggersProcessor : public 
minifi::core::Processor {
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
   void onTrigger(core::ProcessContext*, core::ProcessSession*) override {
+    if (on_trigger_duration_ > 0ms)
+      std::this_thread::sleep_for(on_trigger_duration_);
     ++number_of_triggers;
   }
 
   size_t getNumberOfTriggers() const { return number_of_triggers; }
+  void setOnTriggerDuration(std::chrono::steady_clock::duration 
on_trigger_duration) { on_trigger_duration_ = on_trigger_duration; }
 
  private:
+  std::chrono::steady_clock::duration on_trigger_duration_ = 0ms;
   std::atomic<size_t> number_of_triggers = 0;
 };
 
@@ -64,7 +70,7 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
   auto node = std::make_shared<core::ProcessorNode>(count_proc.get());
   auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo, 
repo, content_repo);
   std::shared_ptr<core::ProcessSessionFactory> factory = 
std::make_shared<core::ProcessSessionFactory>(context);
-  count_proc->setSchedulingPeriodNano(1250ms);
+  count_proc->setSchedulingPeriod(125ms);
 #ifdef WIN32
   utils::dateSetInstall(TZ_DATA_DIR);
 #endif
@@ -74,14 +80,21 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
     timer_driven_agent->start();
     auto first_task_reschedule_info = 
timer_driven_agent->run(count_proc.get(), context, factory);
     CHECK(!first_task_reschedule_info.finished_);
-    CHECK(first_task_reschedule_info.wait_time_ == 1250ms);
+    CHECK(first_task_reschedule_info.wait_time_ <= 125ms);
     CHECK(count_proc->getNumberOfTriggers() == 1);
 
+    count_proc->setOnTriggerDuration(50ms);
     auto second_task_reschedule_info = 
timer_driven_agent->run(count_proc.get(), context, factory);
 
     CHECK(!second_task_reschedule_info.finished_);
-    CHECK(second_task_reschedule_info.wait_time_ == 1250ms);
+    CHECK(second_task_reschedule_info.wait_time_ <= 75ms);
     CHECK(count_proc->getNumberOfTriggers() == 2);
+
+    count_proc->setOnTriggerDuration(150ms);
+    auto third_task_reschedule_info = 
timer_driven_agent->run(count_proc.get(), context, factory);
+    CHECK(!third_task_reschedule_info.finished_);
+    CHECK(third_task_reschedule_info.wait_time_ == 0ms);
+    CHECK(count_proc->getNumberOfTriggers() == 3);
   }
 
   SECTION("Event Driven") {
diff --git a/libminifi/test/unit/ThreadPoolTests.cpp 
b/libminifi/test/unit/ThreadPoolTests.cpp
index ae05cd56d..948fc73e5 100644
--- a/libminifi/test/unit/ThreadPoolTests.cpp
+++ b/libminifi/test/unit/ThreadPoolTests.cpp
@@ -23,9 +23,7 @@
 #include "../Catch.h"
 #include "utils/ThreadPool.h"
 
-bool function() {
-  return true;
-}
+using namespace std::literals::chrono_literals;
 
 class WorkerNumberExecutions : public utils::AfterExecute<int> {
  public:
@@ -45,13 +43,8 @@ class WorkerNumberExecutions : public 
utils::AfterExecute<int> {
     return false;
   }
 
-  [[nodiscard]] int getRuns() const {
-    return runs;
-  }
-
-  std::chrono::milliseconds wait_time() override {
-    // wait 50ms
-    return std::chrono::milliseconds(50);
+  std::chrono::steady_clock::duration wait_time() override {
+    return 50ms;
   }
 
  protected:
@@ -61,8 +54,7 @@ class WorkerNumberExecutions : public 
utils::AfterExecute<int> {
 
 TEST_CASE("ThreadPoolTest1", "[TPT1]") {
   utils::ThreadPool<bool> pool(5);
-  std::function<bool()> f_ex = function;
-  utils::Worker<bool> functor(f_ex, "id");
+  utils::Worker<bool> functor([](){ return true; }, "id");
   pool.start();
   std::future<bool> fut;
   pool.execute(std::move(functor), fut);  // NOLINT(bugprone-use-after-move)
@@ -70,21 +62,39 @@ TEST_CASE("ThreadPoolTest1", "[TPT1]") {
   REQUIRE(true == fut.get());
 }
 
-std::atomic<int> counter;
-
-int counterFunction() {
-  return ++counter;
-}
-
 TEST_CASE("ThreadPoolTest2", "[TPT2]") {
-  counter = 0;
+  std::atomic<int> counter = 0;
   utils::ThreadPool<int> pool(5);
-  std::function<int()> f_ex = counterFunction;
   std::unique_ptr<utils::AfterExecute<int>> after_execute = 
std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(20));
-  utils::Worker<int> functor(f_ex, "id", std::move(after_execute));
+  utils::Worker<int> functor([&counter]() { return ++counter; }, "id", 
std::move(after_execute));
   pool.start();
   std::future<int> fut;
-  pool.execute(std::move(functor), fut);  // NOLINT(bugprone-use-after-move)
+  pool.execute(std::move(functor), fut);
   fut.wait();
   REQUIRE(20 == fut.get());
 }
+
+TEST_CASE("Worker wait time should be relative to the last run") {
+  std::vector<std::chrono::steady_clock::time_point> 
worker_execution_time_points;
+  utils::ThreadPool<utils::TaskRescheduleInfo> pool(1);
+  auto wait_time_between_tasks = 10ms;
+  utils::Worker<utils::TaskRescheduleInfo> 
worker([&]()->utils::TaskRescheduleInfo {
+    worker_execution_time_points.push_back(std::chrono::steady_clock::now());
+    if (worker_execution_time_points.size() == 2) {
+      return utils::TaskRescheduleInfo::Done();
+    } else {
+      return utils::TaskRescheduleInfo::RetryIn(wait_time_between_tasks);
+    }
+  }, "id", std::make_unique<utils::ComplexMonitor>());
+  std::this_thread::sleep_for(wait_time_between_tasks + 1ms);  // Pre-waiting 
should not matter
+
+  std::future<utils::TaskRescheduleInfo> task_future;
+  pool.execute(std::move(worker), task_future);
+  pool.start();
+
+  auto final_task_reschedule_info = task_future.get();
+
+  CHECK(final_task_reschedule_info.finished_);
+  REQUIRE(worker_execution_time_points.size() == 2);
+  CHECK(worker_execution_time_points[1] - worker_execution_time_points[0] >= 
wait_time_between_tasks);
+}


Reply via email to