This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit dcbabbad04abceacce2a246a905589ac1f0e8fd6 Author: Martin Zink <[email protected]> AuthorDate: Thu Jun 15 19:35:05 2023 +0200 MINIFICPP-2125 fix for waking up prematurely after processor yields Closes #1581 Signed-off-by: Marton Szasz <[email protected]> --- extensions/standard-processors/processors/GetTCP.h | 2 +- .../tests/unit/FlowJsonTests.cpp | 7 +- .../tests/unit/ProcessorTests.cpp | 6 ++ .../tests/unit/YamlConfigurationTests.cpp | 10 +- libminifi/include/core/Processor.h | 37 ++++--- libminifi/include/utils/Monitors.h | 108 +++++---------------- libminifi/include/utils/ThreadPool.h | 7 +- libminifi/src/CronDrivenSchedulingAgent.cpp | 4 +- libminifi/src/EventDrivenSchedulingAgent.cpp | 4 +- libminifi/src/TimerDrivenSchedulingAgent.cpp | 3 +- libminifi/src/core/Processor.cpp | 31 +++--- .../src/core/flow/StructuredConfiguration.cpp | 4 +- libminifi/src/utils/ThreadPool.cpp | 27 +++--- libminifi/test/unit/BackTraceTests.cpp | 32 ++---- libminifi/test/unit/SchedulingAgentTests.cpp | 19 +++- libminifi/test/unit/ThreadPoolTests.cpp | 54 ++++++----- 16 files changed, 149 insertions(+), 206 deletions(-) diff --git a/extensions/standard-processors/processors/GetTCP.h b/extensions/standard-processors/processors/GetTCP.h index e8070afd2..7e0dd03fd 100644 --- a/extensions/standard-processors/processors/GetTCP.h +++ b/extensions/standard-processors/processors/GetTCP.h @@ -69,7 +69,7 @@ class SocketAfterExecute : public utils::AfterExecute<int> { return !running_; } - std::chrono::milliseconds wait_time() override { + std::chrono::steady_clock::duration wait_time() override { // wait 500ms return std::chrono::milliseconds(500); } diff --git a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp index 495965122..71c04bf21 100644 --- a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp +++ b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp @@ -21,12 +21,9 @@ #include <chrono> #include "core/repository/VolatileContentRepository.h" #include "core/ProcessGroup.h" -#include "core/RepositoryFactory.h" #include "core/yaml/YamlConfiguration.h" #include "TailFile.h" -#include "TestBase.h" #include "Catch.h" -#include "utils/TestUtils.h" #include "utils/StringUtils.h" #include "ConfigurationTestController.h" #include "Funnel.h" @@ -115,9 +112,9 @@ TEST_CASE("NiFi flow json format is correctly parsed") { REQUIRE(proc->getUUIDStr() == "00000000-0000-0000-0000-000000000001"); REQUIRE(15 == proc->getMaxConcurrentTasks()); REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == proc->getSchedulingStrategy()); - REQUIRE(3s == proc->getSchedulingPeriodNano()); + REQUIRE(3s == proc->getSchedulingPeriod()); REQUIRE(12s == proc->getPenalizationPeriod()); - REQUIRE(4s == proc->getYieldPeriodMsec()); + REQUIRE(4s == proc->getYieldPeriod()); REQUIRE(proc->isAutoTerminated({"one", ""})); REQUIRE(proc->isAutoTerminated({"two", ""})); REQUIRE_FALSE(proc->isAutoTerminated({"three", ""})); diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp index ccb322584..570128479 100644 --- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp +++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp @@ -824,3 +824,9 @@ TEST_CASE("Test getProcessorType", "[getProcessorType]") { auto processor = plan->addProcessor("GenerateFlowFile", "myProc"); REQUIRE(processor->getProcessorType() == "GenerateFlowFile"); } + +TEST_CASE("IsYield and getYieldTime is consistent") { + auto processor = TestProcessorNoContent("test_processor"); + processor.yield(1ms); + REQUIRE(processor.isYield() == (processor.getYieldTime() != 0ms)); +} diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp index 8179f328a..f829729ed 100644 --- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp +++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp @@ -26,7 +26,6 @@ #include "TailFile.h" #include "TestBase.h" #include "Catch.h" -#include "utils/TestUtils.h" #include "utils/StringUtils.h" #include "ConfigurationTestController.h" #include "utils/IntegrationTestUtils.h" @@ -151,9 +150,9 @@ Provenance Reporting: REQUIRE(!rootFlowConfig->findProcessorByName("TailFile")->getUUIDStr().empty()); REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks()); REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy()); - REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano()); + REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriod()); REQUIRE(30s == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod()); - REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec()); + REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriod()); REQUIRE(0s == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano()); std::map<std::string, minifi::Connection*> connectionMap; @@ -462,9 +461,9 @@ NiFi Properties Overrides: {} REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks()); REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy()); REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks()); - REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano()); + REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriod()); REQUIRE(30s == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod()); - REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec()); + REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriod()); REQUIRE(0s == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano()); std::map<std::string, minifi::Connection*> connectionMap; @@ -816,7 +815,6 @@ TEST_CASE("Test UUID duplication checks", "[YamlConfiguration]") { class: SSLContextService )"; - auto config_old = config_yaml; utils::StringUtils::replaceAll(config_yaml, std::string("00000000-0000-0000-0000-00000000000") + i, "99999999-9999-9999-9999-999999999999"); REQUIRE_THROWS_WITH(yaml_config.getRootFromPayload(config_yaml), "General Operation: UUID 99999999-9999-9999-9999-999999999999 is duplicated in the flow configuration"); } diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index 824b5eb03..8cc3b8379 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -71,8 +71,7 @@ class ProcessContext; class ProcessSession; class ProcessSessionFactory; -// Minimum scheduling period in Nano Second -constexpr std::chrono::nanoseconds MINIMUM_SCHEDULING_NANOS{30000}; +constexpr std::chrono::microseconds MINIMUM_SCHEDULING_PERIOD{30}; #define BUILDING_DLL 1 @@ -102,12 +101,12 @@ class Processor : public Connectable, public ConfigurableComponent, public state return strategy_; } - void setSchedulingPeriodNano(std::chrono::nanoseconds period) { - scheduling_period_nano_ = std::max(MINIMUM_SCHEDULING_NANOS, period); + void setSchedulingPeriod(std::chrono::steady_clock::duration period) { + scheduling_period_ = std::max(std::chrono::steady_clock::duration(MINIMUM_SCHEDULING_PERIOD), period); } - std::chrono::nanoseconds getSchedulingPeriodNano() const { - return scheduling_period_nano_; + std::chrono::steady_clock::duration getSchedulingPeriod() const { + return scheduling_period_; } void setCronPeriod(const std::string &period) { @@ -118,20 +117,20 @@ class Processor : public Connectable, public ConfigurableComponent, public state return cron_period_; } - void setRunDurationNano(std::chrono::nanoseconds period) { - run_duration_nano_ = period; + void setRunDurationNano(std::chrono::steady_clock::duration period) { + run_duration_ = period; } - std::chrono::nanoseconds getRunDurationNano() const { - return (run_duration_nano_); + std::chrono::steady_clock::duration getRunDurationNano() const { + return (run_duration_); } void setYieldPeriodMsec(std::chrono::milliseconds period) { - yield_period_msec_ = period; + yield_period_ = period; } - std::chrono::milliseconds getYieldPeriodMsec() const { - return yield_period_msec_; + std::chrono::steady_clock::duration getYieldPeriod() const { + return yield_period_; } void setPenalizationPeriod(std::chrono::milliseconds period) { @@ -171,13 +170,13 @@ class Processor : public Connectable, public ConfigurableComponent, public state void yield() override; - void yield(std::chrono::milliseconds delta_time); + void yield(std::chrono::steady_clock::duration delta_time); virtual bool isYield(); void clearYield(); - std::chrono::milliseconds getYieldTime() const; + std::chrono::steady_clock::duration getYieldTime() const; // Whether flow file queue full in any of the outgoing connection bool flowFilesOutGoingFull() const; @@ -239,9 +238,9 @@ class Processor : public Connectable, public ConfigurableComponent, public state std::atomic<ScheduledState> state_; - std::atomic<std::chrono::nanoseconds> scheduling_period_nano_; - std::atomic<std::chrono::nanoseconds> run_duration_nano_; - std::atomic<std::chrono::milliseconds> yield_period_msec_; + std::atomic<std::chrono::steady_clock::duration> scheduling_period_; + std::atomic<std::chrono::steady_clock::duration> run_duration_; + std::atomic<std::chrono::steady_clock::duration> yield_period_; std::atomic<uint8_t> active_tasks_; std::atomic<bool> _triggerWhenEmpty; @@ -251,7 +250,7 @@ class Processor : public Connectable, public ConfigurableComponent, public state private: mutable std::mutex mutex_; - std::atomic<std::chrono::time_point<std::chrono::system_clock>> yield_expiration_{}; + std::atomic<std::chrono::steady_clock::time_point> yield_expiration_{}; static std::mutex& getGraphMutex() { static std::mutex mutex{}; diff --git a/libminifi/include/utils/Monitors.h b/libminifi/include/utils/Monitors.h index 3b446561f..aed00ce2d 100644 --- a/libminifi/include/utils/Monitors.h +++ b/libminifi/include/utils/Monitors.h @@ -15,20 +15,17 @@ * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_UTILS_MONITORS_H_ -#define LIBMINIFI_INCLUDE_UTILS_MONITORS_H_ +#pragma once -#include <chrono> +#include <algorithm> #include <atomic> +#include <chrono> #if defined(WIN32) #include <future> // This is required to work around a VS2017 bug, see the details below #endif +#include "utils/gsl.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace utils { +namespace org::apache::nifi::minifi::utils { /** * Worker task helper that determines @@ -40,95 +37,51 @@ class AfterExecute { virtual ~AfterExecute() = default; AfterExecute() = default; - AfterExecute(AfterExecute&& /*other*/) = default; + AfterExecute(AfterExecute&& /*other*/) noexcept = default; virtual bool isFinished(const T &result) = 0; virtual bool isCancelled(const T &result) = 0; /** * Time to wait before re-running this task if necessary * @return milliseconds since epoch after which we are eligible to re-run this task. */ - virtual std::chrono::milliseconds wait_time() = 0; + virtual std::chrono::steady_clock::duration wait_time() = 0; }; /** * Uses the wait time for a given worker to determine if it is eligible to run */ -class TimerAwareMonitor : public utils::AfterExecute<std::chrono::milliseconds> { - public: - TimerAwareMonitor(std::atomic<bool> *run_monitor) // NOLINT - : current_wait_(std::chrono::milliseconds(0)), - run_monitor_(run_monitor) { - } - bool isFinished(const std::chrono::milliseconds &result) override { - current_wait_.store(result); - if (*run_monitor_) { - return false; - } - return true; - } - bool isCancelled(const std::chrono::milliseconds& /*result*/) override { - if (*run_monitor_) { - return false; - } - return true; - } - /** - * Time to wait before re-running this task if necessary - * @return milliseconds since epoch after which we are eligible to re-run this task. - */ - std::chrono::milliseconds wait_time() override { - return current_wait_.load(); - } - - protected: - std::atomic<std::chrono::milliseconds> current_wait_; - std::atomic<bool> *run_monitor_; -}; - -class SingleRunMonitor : public utils::AfterExecute<bool>{ - public: - SingleRunMonitor(std::chrono::milliseconds retry_interval = std::chrono::milliseconds(100)) // NOLINT - : retry_interval_(retry_interval) { - } - - bool isFinished(const bool &result) override { - return result; - } - bool isCancelled(const bool& /*result*/) override { - return false; - } - std::chrono::milliseconds wait_time() override { - return retry_interval_; - } - protected: - const std::chrono::milliseconds retry_interval_; -}; - struct TaskRescheduleInfo { - TaskRescheduleInfo(bool result, std::chrono::milliseconds wait_time) - : wait_time_(wait_time), finished_(result) {} + TaskRescheduleInfo(bool result, std::chrono::steady_clock::duration wait_time) + : wait_time_(wait_time), finished_(result) { + gsl_Expects(wait_time >= std::chrono::milliseconds(0)); + } - std::chrono::milliseconds wait_time_; + std::chrono::steady_clock::duration wait_time_; bool finished_; static TaskRescheduleInfo Done() { - return TaskRescheduleInfo(true, std::chrono::milliseconds(0)); + return {true, std::chrono::steady_clock::duration(0)}; } - static TaskRescheduleInfo RetryIn(std::chrono::milliseconds interval) { - return TaskRescheduleInfo(false, interval); + static TaskRescheduleInfo RetryIn(std::chrono::steady_clock::duration interval) { + return {false, interval}; + } + + static TaskRescheduleInfo RetryAfter(std::chrono::steady_clock::time_point time_point) { + auto interval = std::max(time_point - std::chrono::steady_clock::now(), std::chrono::steady_clock::duration(0)); + return {false, interval}; } static TaskRescheduleInfo RetryImmediately() { - return TaskRescheduleInfo(false, std::chrono::milliseconds(0)); + return {false, std::chrono::steady_clock::duration(0)}; } #if defined(WIN32) // https://developercommunity.visualstudio.com/content/problem/60897/c-shared-state-futuresstate-default-constructs-the.html // Because of this bug we need to have this object default constructible, which makes no sense otherwise. Hack. private: - TaskRescheduleInfo() : wait_time_(std::chrono::milliseconds(0)), finished_(true) {} + TaskRescheduleInfo() : wait_time_(std::chrono::steady_clock::duration(0)), finished_(true) {} friend class std::_Associated_state<TaskRescheduleInfo>; #endif }; @@ -147,22 +100,13 @@ class ComplexMonitor : public utils::AfterExecute<TaskRescheduleInfo> { bool isCancelled(const TaskRescheduleInfo& /*result*/) override { return false; } - /** - * Time to wait before re-running this task if necessary - * @return milliseconds since epoch after which we are eligible to re-run this task. - */ - std::chrono::milliseconds wait_time() override { + + std::chrono::steady_clock::duration wait_time() override { return current_wait_.load(); } private: - std::atomic<std::chrono::milliseconds> current_wait_ {std::chrono::milliseconds(0)}; + std::atomic<std::chrono::steady_clock::duration> current_wait_ {std::chrono::steady_clock::duration(0)}; }; -} // namespace utils -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org - -#endif // LIBMINIFI_INCLUDE_UTILS_MONITORS_H_ +} // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 68c2663ec..dde867909 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -94,7 +94,7 @@ class Worker { promise->set_value(result); return false; } - next_exec_time_ = std::max(next_exec_time_ + run_determinant_->wait_time(), std::chrono::steady_clock::now()); + next_exec_time_ = std::max(next_exec_time_, std::chrono::steady_clock::now() + run_determinant_->wait_time()); return true; } @@ -106,11 +106,6 @@ class Worker { return next_exec_time_; } - virtual std::chrono::milliseconds getWaitTime() const { - return run_determinant_->wait_time(); - } - - std::shared_ptr<std::promise<T>> getPromise() const; const TaskId &getIdentifier() const { diff --git a/libminifi/src/CronDrivenSchedulingAgent.cpp b/libminifi/src/CronDrivenSchedulingAgent.cpp index 0e12bd3ae..fdc885747 100644 --- a/libminifi/src/CronDrivenSchedulingAgent.cpp +++ b/libminifi/src/CronDrivenSchedulingAgent.cpp @@ -50,7 +50,7 @@ utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(core::Processor* proces return utils::TaskRescheduleInfo::Done(); if (*next_to_last_trigger > current_time.get_local_time()) - return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_to_last_trigger-current_time.get_local_time())); + return utils::TaskRescheduleInfo::RetryIn(*next_to_last_trigger-current_time.get_local_time()); auto on_trigger_result = this->onTrigger(processor, processContext, sessionFactory); @@ -61,7 +61,7 @@ utils::TaskRescheduleInfo CronDrivenSchedulingAgent::run(core::Processor* proces return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime()); if (auto next_trigger = schedules_.at(uuid).calculateNextTrigger(current_time.get_local_time())) - return utils::TaskRescheduleInfo::RetryIn(ceil<milliseconds>(*next_trigger-current_time.get_local_time())); + return utils::TaskRescheduleInfo::RetryIn(*next_trigger-current_time.get_local_time()); } return utils::TaskRescheduleInfo::Done(); } diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp index 3a23ba839..b50d2d6f3 100644 --- a/libminifi/src/EventDrivenSchedulingAgent.cpp +++ b/libminifi/src/EventDrivenSchedulingAgent.cpp @@ -39,11 +39,11 @@ utils::TaskRescheduleInfo EventDrivenSchedulingAgent::run(core::Processor* proce const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { if (this->running_) { auto start_time = std::chrono::steady_clock::now(); - // trigger processor until it has work to do, but no more than half a sec + // trigger processor until it has work to do, but no more than the configured nifi.flow.engine.event.driven.time.slice while (processor->isRunning() && (std::chrono::steady_clock::now() - start_time < time_slice_)) { this->onTrigger(processor, processContext, sessionFactory); if (processor->isYield()) { - return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(processor->getYieldTime())); + return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime()); } } return utils::TaskRescheduleInfo::RetryImmediately(); // Let's continue work as soon as a thread is available diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp index 2b4072170..113cee8f8 100644 --- a/libminifi/src/TimerDrivenSchedulingAgent.cpp +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -28,11 +28,12 @@ namespace org::apache::nifi::minifi { utils::TaskRescheduleInfo TimerDrivenSchedulingAgent::run(core::Processor* processor, const std::shared_ptr<core::ProcessContext> &processContext, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { if (this->running_ && processor->isRunning()) { + auto trigger_start_time = std::chrono::steady_clock::now(); this->onTrigger(processor, processContext, sessionFactory); if (processor->isYield()) return utils::TaskRescheduleInfo::RetryIn(processor->getYieldTime()); - return utils::TaskRescheduleInfo::RetryIn(std::chrono::duration_cast<std::chrono::milliseconds>(processor->getSchedulingPeriodNano())); + return utils::TaskRescheduleInfo::RetryAfter(trigger_start_time + processor->getSchedulingPeriod()); } return utils::TaskRescheduleInfo::Done(); } diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index fdb15f4a3..61965d1b8 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -50,9 +50,9 @@ Processor::Processor(std::string name, std::shared_ptr<ProcessorMetrics> metrics state_ = DISABLED; strategy_ = TIMER_DRIVEN; _triggerWhenEmpty = false; - scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS; - run_duration_nano_ = DEFAULT_RUN_DURATION; - yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS; + scheduling_period_ = MINIMUM_SCHEDULING_PERIOD; + run_duration_ = DEFAULT_RUN_DURATION; + yield_period_ = DEFAULT_YIELD_PERIOD_SECONDS; penalization_period_ = DEFAULT_PENALIZATION_PERIOD; max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS; active_tasks_ = 0; @@ -69,9 +69,9 @@ Processor::Processor(std::string name, const utils::Identifier& uuid, std::share state_ = DISABLED; strategy_ = TIMER_DRIVEN; _triggerWhenEmpty = false; - scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS; - run_duration_nano_ = DEFAULT_RUN_DURATION; - yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS; + scheduling_period_ = MINIMUM_SCHEDULING_PERIOD; + run_duration_ = DEFAULT_RUN_DURATION; + yield_period_ = DEFAULT_YIELD_PERIOD_SECONDS; penalization_period_ = DEFAULT_PENALIZATION_PERIOD; max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS; active_tasks_ = 0; @@ -372,28 +372,23 @@ void Processor::setMaxConcurrentTasks(const uint8_t tasks) { } void Processor::yield() { - yield_expiration_ = std::chrono::system_clock::now() + yield_period_msec_.load(); + yield_expiration_ = std::chrono::steady_clock::now() + yield_period_.load(); } -void Processor::yield(std::chrono::milliseconds delta_time) { - yield_expiration_ = std::chrono::system_clock::now() + delta_time; +void Processor::yield(std::chrono::steady_clock::duration delta_time) { + yield_expiration_ = std::chrono::steady_clock::now() + delta_time; } bool Processor::isYield() { - return yield_expiration_.load() >= std::chrono::system_clock::now(); + return getYieldTime() > 0ms; } void Processor::clearYield() { - yield_expiration_ = std::chrono::system_clock::time_point(); + yield_expiration_ = std::chrono::steady_clock::time_point(); } -std::chrono::milliseconds Processor::getYieldTime() const { - auto yield_expiration = yield_expiration_.load(); - auto current_time = std::chrono::system_clock::now(); - if (yield_expiration > current_time) - return std::chrono::duration_cast<std::chrono::milliseconds>(yield_expiration - current_time); - else - return 0ms; +std::chrono::steady_clock::duration Processor::getYieldTime() const { + return std::max(yield_expiration_.load()-std::chrono::steady_clock::now(), std::chrono::steady_clock::duration{0}); } } // namespace org::apache::nifi::minifi::core diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp index 5306af402..f4e493897 100644 --- a/libminifi/src/core/flow/StructuredConfiguration.cpp +++ b/libminifi/src/core/flow/StructuredConfiguration.cpp @@ -247,7 +247,7 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") { if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(procCfg.schedulingPeriod)) { logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%" PRId64 "] ns", scheduling_period->count()); - processor->setSchedulingPeriodNano(*scheduling_period); + processor->setSchedulingPeriod(*scheduling_period); } } else { processor->setCronPeriod(procCfg.schedulingPeriod); @@ -437,7 +437,7 @@ void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(schedulingPeriodStr)) { logger_->log_debug("ProvenanceReportingTask schedulingPeriod %" PRId64 " ns", scheduling_period->count()); - reportTask->setSchedulingPeriodNano(*scheduling_period); + reportTask->setSchedulingPeriod(*scheduling_period); } if (schedulingStrategyStr == "TIMER_DRIVEN") { diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp index baadb59ec..06c2eb4e1 100644 --- a/libminifi/src/utils/ThreadPool.cpp +++ b/libminifi/src/utils/ThreadPool.cpp @@ -18,6 +18,8 @@ #include "utils/ThreadPool.h" #include "core/state/UpdateController.h" +using namespace std::literals::chrono_literals; + namespace org::apache::nifi::minifi::utils { template<typename T> @@ -92,7 +94,7 @@ void ThreadPool<T>::run_tasks(const std::shared_ptr<WorkerThread>& thread) { // The threadpool is running, but the ConcurrentQueue is stopped -> shouldn't happen during normal conditions // Might happen during startup or shutdown for a very short time if (running_.load()) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(1ms); } } } @@ -105,8 +107,7 @@ void ThreadPool<T>::manage_delayed_queue() { std::unique_lock<std::mutex> lock(worker_queue_mutex_); // Put the tasks ready to run in the worker queue - while (!delayed_worker_queue_.empty() && - delayed_worker_queue_.top().getNextExecutionTime() <= std::chrono::steady_clock::now()) { + while (!delayed_worker_queue_.empty() && delayed_worker_queue_.top().getNextExecutionTime() <= std::chrono::steady_clock::now()) { // I'm very sorry for this - committee must has been seriously drunk when the interface of prio queue was submitted. Worker<T> task = std::move(const_cast<Worker<T>&>(delayed_worker_queue_.top())); delayed_worker_queue_.pop(); @@ -115,9 +116,8 @@ void ThreadPool<T>::manage_delayed_queue() { if (delayed_worker_queue_.empty()) { delayed_task_available_.wait(lock); } else { - auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>( - delayed_worker_queue_.top().getNextExecutionTime() - std::chrono::steady_clock::now()); - delayed_task_available_.wait_for(lock, std::max(wait_time, std::chrono::milliseconds(1))); + auto wait_time = delayed_worker_queue_.top().getNextExecutionTime() - std::chrono::steady_clock::now(); + delayed_task_available_.wait_for(lock, std::max(wait_time, std::chrono::steady_clock::duration(1ms))); } } } @@ -151,12 +151,13 @@ void ThreadPool<T>::manageWorkers() { if (nullptr != thread_manager_) { while (running_) { - auto waitperiod = std::chrono::milliseconds(500); + auto wait_period = 500ms; { - std::unique_lock<std::recursive_mutex> lock(manager_mutex_, std::try_to_lock); - if (!lock.owns_lock()) { + std::unique_lock<std::recursive_mutex> manager_lock(manager_mutex_, std::try_to_lock); + if (!manager_lock.owns_lock()) { // Threadpool is being stopped/started or config is being changed, better wait a bit - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::this_thread::sleep_for(10ms); + continue; } if (thread_manager_->isAboveMax(current_workers_)) { auto max = thread_manager_->getMaxConcurrentTasks(); @@ -167,7 +168,7 @@ void ThreadPool<T>::manageWorkers() { thread_reduction_count_++; thread_manager_->reduce(); } else if (thread_manager_->canIncrease() && max_worker_threads_ > current_workers_) { // increase slowly - std::unique_lock<std::mutex> lock(worker_queue_mutex_); + std::unique_lock<std::mutex> worker_queue_lock(worker_queue_mutex_); auto worker_thread = std::make_shared<WorkerThread>(); worker_thread->thread_ = createThread([this, worker_thread] { run_tasks(worker_thread); }); if (daemon_threads_) { @@ -178,13 +179,13 @@ void ThreadPool<T>::manageWorkers() { } std::shared_ptr<WorkerThread> thread_ref; while (deceased_thread_queue_.tryDequeue(thread_ref)) { - std::unique_lock<std::mutex> lock(worker_queue_mutex_); + std::unique_lock<std::mutex> worker_queue_lock(worker_queue_mutex_); if (thread_ref->thread_.joinable()) thread_ref->thread_.join(); thread_queue_.erase(std::remove(thread_queue_.begin(), thread_queue_.end(), thread_ref), thread_queue_.end()); } } - std::this_thread::sleep_for(waitperiod); + std::this_thread::sleep_for(wait_period); } } else { for (auto &thread : thread_queue_) { diff --git a/libminifi/test/unit/BackTraceTests.cpp b/libminifi/test/unit/BackTraceTests.cpp index 4366f7a38..46363638d 100644 --- a/libminifi/test/unit/BackTraceTests.cpp +++ b/libminifi/test/unit/BackTraceTests.cpp @@ -25,9 +25,7 @@ #include "utils/Monitors.h" #include "utils/ThreadPool.h" -bool function() { - return true; -} +using namespace std::literals::chrono_literals; class WorkerNumberExecutions : public utils::AfterExecute<int> { public: @@ -35,9 +33,9 @@ class WorkerNumberExecutions : public utils::AfterExecute<int> { : tasks(tasks) { } - explicit WorkerNumberExecutions(WorkerNumberExecutions && other) noexcept + WorkerNumberExecutions(WorkerNumberExecutions && other) noexcept : runs(other.runs), - tasks(other.tasks) { + tasks(other.tasks) { } bool isFinished(const int &result) override { @@ -47,13 +45,8 @@ class WorkerNumberExecutions : public utils::AfterExecute<int> { return false; } - [[nodiscard]] int getRuns() const { - return runs; - } - - std::chrono::milliseconds wait_time() override { - // wait 50ms - return std::chrono::milliseconds(50); + std::chrono::steady_clock::duration wait_time() override { + return 50ms; } protected: @@ -68,30 +61,21 @@ TEST_CASE("BT1", "[TPT1]") { #endif } -std::atomic<int> counter; - -int counterFunction() { - std::this_thread::sleep_for(std::chrono::milliseconds(150)); - return ++counter; -} - TEST_CASE("BT2", "[TPT2]") { - counter = 0; + std::atomic<int> counter = 0; utils::ThreadPool<int> pool(4); pool.start(); std::this_thread::sleep_for(std::chrono::milliseconds(150)); for (int i = 0; i < 3; i++) { - std::function<int()> f_ex = counterFunction; std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(5)); - utils::Worker<int> functor(f_ex, "id", std::move(after_execute)); + utils::Worker<int> functor([&counter]() { return ++counter; }, "id", std::move(after_execute)); std::future<int> fut; pool.execute(std::move(functor), fut); // NOLINT(bugprone-use-after-move) } - std::function<int()> f_ex = counterFunction; std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(5)); - utils::Worker<int> functor(f_ex, "id", std::move(after_execute)); + utils::Worker<int> functor([&counter]() { return ++counter; }, "id", std::move(after_execute)); std::future<int> fut; pool.execute(std::move(functor), fut); // NOLINT(bugprone-use-after-move) diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp b/libminifi/test/unit/SchedulingAgentTests.cpp index 36a74046e..c3cf6638a 100644 --- a/libminifi/test/unit/SchedulingAgentTests.cpp +++ b/libminifi/test/unit/SchedulingAgentTests.cpp @@ -15,6 +15,8 @@ * limitations under the License. */ +#include <chrono> + #include "../Catch.h" #include "../TestBase.h" #include "ProvenanceTestHelper.h" @@ -35,12 +37,16 @@ class CountOnTriggersProcessor : public minifi::core::Processor { ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS void onTrigger(core::ProcessContext*, core::ProcessSession*) override { + if (on_trigger_duration_ > 0ms) + std::this_thread::sleep_for(on_trigger_duration_); ++number_of_triggers; } size_t getNumberOfTriggers() const { return number_of_triggers; } + void setOnTriggerDuration(std::chrono::steady_clock::duration on_trigger_duration) { on_trigger_duration_ = on_trigger_duration; } private: + std::chrono::steady_clock::duration on_trigger_duration_ = 0ms; std::atomic<size_t> number_of_triggers = 0; }; @@ -64,7 +70,7 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") { auto node = std::make_shared<core::ProcessorNode>(count_proc.get()); auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo); std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); - count_proc->setSchedulingPeriodNano(1250ms); + count_proc->setSchedulingPeriod(125ms); #ifdef WIN32 utils::dateSetInstall(TZ_DATA_DIR); #endif @@ -74,14 +80,21 @@ TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") { timer_driven_agent->start(); auto first_task_reschedule_info = timer_driven_agent->run(count_proc.get(), context, factory); CHECK(!first_task_reschedule_info.finished_); - CHECK(first_task_reschedule_info.wait_time_ == 1250ms); + CHECK(first_task_reschedule_info.wait_time_ <= 125ms); CHECK(count_proc->getNumberOfTriggers() == 1); + count_proc->setOnTriggerDuration(50ms); auto second_task_reschedule_info = timer_driven_agent->run(count_proc.get(), context, factory); CHECK(!second_task_reschedule_info.finished_); - CHECK(second_task_reschedule_info.wait_time_ == 1250ms); + CHECK(second_task_reschedule_info.wait_time_ <= 75ms); CHECK(count_proc->getNumberOfTriggers() == 2); + + count_proc->setOnTriggerDuration(150ms); + auto third_task_reschedule_info = timer_driven_agent->run(count_proc.get(), context, factory); + CHECK(!third_task_reschedule_info.finished_); + CHECK(third_task_reschedule_info.wait_time_ == 0ms); + CHECK(count_proc->getNumberOfTriggers() == 3); } SECTION("Event Driven") { diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp index ae05cd56d..948fc73e5 100644 --- a/libminifi/test/unit/ThreadPoolTests.cpp +++ b/libminifi/test/unit/ThreadPoolTests.cpp @@ -23,9 +23,7 @@ #include "../Catch.h" #include "utils/ThreadPool.h" -bool function() { - return true; -} +using namespace std::literals::chrono_literals; class WorkerNumberExecutions : public utils::AfterExecute<int> { public: @@ -45,13 +43,8 @@ class WorkerNumberExecutions : public utils::AfterExecute<int> { return false; } - [[nodiscard]] int getRuns() const { - return runs; - } - - std::chrono::milliseconds wait_time() override { - // wait 50ms - return std::chrono::milliseconds(50); + std::chrono::steady_clock::duration wait_time() override { + return 50ms; } protected: @@ -61,8 +54,7 @@ class WorkerNumberExecutions : public utils::AfterExecute<int> { TEST_CASE("ThreadPoolTest1", "[TPT1]") { utils::ThreadPool<bool> pool(5); - std::function<bool()> f_ex = function; - utils::Worker<bool> functor(f_ex, "id"); + utils::Worker<bool> functor([](){ return true; }, "id"); pool.start(); std::future<bool> fut; pool.execute(std::move(functor), fut); // NOLINT(bugprone-use-after-move) @@ -70,21 +62,39 @@ TEST_CASE("ThreadPoolTest1", "[TPT1]") { REQUIRE(true == fut.get()); } -std::atomic<int> counter; - -int counterFunction() { - return ++counter; -} - TEST_CASE("ThreadPoolTest2", "[TPT2]") { - counter = 0; + std::atomic<int> counter = 0; utils::ThreadPool<int> pool(5); - std::function<int()> f_ex = counterFunction; std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new WorkerNumberExecutions(20)); - utils::Worker<int> functor(f_ex, "id", std::move(after_execute)); + utils::Worker<int> functor([&counter]() { return ++counter; }, "id", std::move(after_execute)); pool.start(); std::future<int> fut; - pool.execute(std::move(functor), fut); // NOLINT(bugprone-use-after-move) + pool.execute(std::move(functor), fut); fut.wait(); REQUIRE(20 == fut.get()); } + +TEST_CASE("Worker wait time should be relative to the last run") { + std::vector<std::chrono::steady_clock::time_point> worker_execution_time_points; + utils::ThreadPool<utils::TaskRescheduleInfo> pool(1); + auto wait_time_between_tasks = 10ms; + utils::Worker<utils::TaskRescheduleInfo> worker([&]()->utils::TaskRescheduleInfo { + worker_execution_time_points.push_back(std::chrono::steady_clock::now()); + if (worker_execution_time_points.size() == 2) { + return utils::TaskRescheduleInfo::Done(); + } else { + return utils::TaskRescheduleInfo::RetryIn(wait_time_between_tasks); + } + }, "id", std::make_unique<utils::ComplexMonitor>()); + std::this_thread::sleep_for(wait_time_between_tasks + 1ms); // Pre-waiting should not matter + + std::future<utils::TaskRescheduleInfo> task_future; + pool.execute(std::move(worker), task_future); + pool.start(); + + auto final_task_reschedule_info = task_future.get(); + + CHECK(final_task_reschedule_info.finished_); + REQUIRE(worker_execution_time_points.size() == 2); + CHECK(worker_execution_time_points[1] - worker_execution_time_points[0] >= wait_time_between_tasks); +}
