This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 4624d302ac3522c948fe87bbd96a7a6f6c8de901 Author: Martin Zink <[email protected]> AuthorDate: Mon Oct 2 19:31:09 2023 +0200 MINIFICPP-2212 use administrative yield duration instead of onschedule retry interval Closes #1664 Signed-off-by: Marton Szasz <[email protected]> --- CONFIGURE.md | 1 - libminifi/include/SchedulingAgent.h | 2 ++ libminifi/include/core/ProcessGroup.h | 9 --------- libminifi/include/core/flow/FlowSchema.h | 1 - libminifi/src/agent/JsonSchema.cpp | 4 +--- libminifi/src/core/ProcessGroup.cpp | 17 +++++------------ libminifi/src/core/flow/FlowSchema.cpp | 2 -- libminifi/src/core/flow/StructuredConfiguration.cpp | 11 ----------- .../test/integration/OnScheduleErrorHandlingTests.cpp | 2 ++ libminifi/test/resources/TestKafkaOnSchedule.yml | 1 - libminifi/test/resources/TestOnScheduleRetry.yml | 1 - libminifi/test/resources/TestStateTransactionality.yml | 1 - 12 files changed, 10 insertions(+), 42 deletions(-) diff --git a/CONFIGURE.md b/CONFIGURE.md index 178821994..f23b389a9 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -29,7 +29,6 @@ It's recommended to create your configuration in YAML format or configure the ag Flow Controller: id: 471deef6-2a6e-4a7d-912a-81cc17e3a205 name: MiNiFi Flow - onschedule retry interval: 30000 ms Processors: - name: GetFile diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index b24b74d97..398cd52d2 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -109,6 +109,8 @@ class SchedulingAgent { SchedulingAgent(const SchedulingAgent &parent) = delete; SchedulingAgent &operator=(const SchedulingAgent &parent) = delete; + std::chrono::milliseconds getAdminYieldDuration() const { return admin_yield_duration_; } + protected: std::mutex mutex_; std::atomic<bool> running_; diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index ce507cb9a..d149dad69 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -132,14 +132,6 @@ class ProcessGroup : public CoreComponent { return (yield_period_msec_); } - void setOnScheduleRetryPeriod(int64_t period) { - onschedule_retry_msec_ = period; - } - - int64_t getOnScheduleRetryPeriod() { - return onschedule_retry_msec_; - } - int getVersion() const { return config_version_; } @@ -247,7 +239,6 @@ class ProcessGroup : public CoreComponent { // Yield Period in Milliseconds std::atomic<std::chrono::milliseconds> yield_period_msec_; std::atomic<uint64_t> timeout_; - std::atomic<int64_t> onschedule_retry_msec_; // URL std::string url_; diff --git a/libminifi/include/core/flow/FlowSchema.h b/libminifi/include/core/flow/FlowSchema.h index 0badac303..773550ddc 100644 --- a/libminifi/include/core/flow/FlowSchema.h +++ b/libminifi/include/core/flow/FlowSchema.h @@ -36,7 +36,6 @@ struct FlowSchema { Keys penalization_period; Keys proc_yield_period; Keys runduration_nanos; - Keys onschedule_retry_interval; Keys connections; Keys max_queue_size; diff --git a/libminifi/src/agent/JsonSchema.cpp b/libminifi/src/agent/JsonSchema.cpp index f9bfa3895..90a1df510 100644 --- a/libminifi/src/agent/JsonSchema.cpp +++ b/libminifi/src/agent/JsonSchema.cpp @@ -353,7 +353,6 @@ static std::string buildSchema(const std::unordered_map<std::string, std::string "properties": { "name": {"type": "string"}, "version": {"type": "integer"}, - "onschedule retry interval": {"$ref": "#/definitions/time"}, )" + process_group_properties + R"( } }, @@ -368,8 +367,7 @@ static std::string buildSchema(const std::unordered_map<std::string, std::string "required": ["name"], "properties": { "name": {"type": "string"}, - "version": {"type": "integer"}, - "onschedule retry interval": {"$ref": "#/definitions/time"} + "version": {"type": "integer"} } }, )" + process_group_properties + R"( diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index db5e9a7eb..e56cbfd14 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -33,8 +33,6 @@ using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core { -constexpr int DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS = 30000; - std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator(); ProcessGroup::ProcessGroup(ProcessGroupType type, std::string_view name, const utils::Identifier& uuid) @@ -54,12 +52,6 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string_view name, const u transmitting_(false), transport_protocol_("RAW"), logger_(logging::LoggerFactory<ProcessGroup>::getLogger()) { - if (parent_process_group_ != nullptr) { - onschedule_retry_msec_ = parent_process_group_->getOnScheduleRetryPeriod(); - } else { - onschedule_retry_msec_ = DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS; - } - logger_->log_debug("ProcessGroup %s created", name_); } @@ -69,7 +61,6 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string_view name) type_(type), parent_process_group_(nullptr), yield_period_msec_(0ms), - onschedule_retry_msec_(DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS), transmitting_(false), transport_protocol_("RAW"), logger_(logging::LoggerFactory<ProcessGroup>::getLogger()) { @@ -167,12 +158,14 @@ void ProcessGroup::startProcessingProcessors(TimerDrivenSchedulingAgent& timeSch } } - if (!onScheduleTimer_ && !failed_processors_.empty() && onschedule_retry_msec_ > 0) { - logger_->log_info("Retrying failed processors in %lld msec", onschedule_retry_msec_.load()); + // The admin yield duration comes from the configuration, should be equal in all three schedulers + std::chrono::milliseconds admin_yield_duration = timeScheduler.getAdminYieldDuration(); + if (!onScheduleTimer_ && !failed_processors_.empty() && admin_yield_duration > 0ms) { + logger_->log_info("Retrying failed processors in %lld msec", admin_yield_duration.count()); auto func = [this, eventScheduler = &eventScheduler, cronScheduler = &cronScheduler, timeScheduler = &timeScheduler]() { this->startProcessingProcessors(*timeScheduler, *eventScheduler, *cronScheduler); }; - onScheduleTimer_ = std::make_unique<utils::CallBackTimer>(std::chrono::milliseconds(onschedule_retry_msec_), func); + onScheduleTimer_ = std::make_unique<utils::CallBackTimer>(admin_yield_duration, func); onScheduleTimer_->start(); } else if (failed_processors_.empty() && onScheduleTimer_) { onScheduleTimer_->stop(); diff --git a/libminifi/src/core/flow/FlowSchema.cpp b/libminifi/src/core/flow/FlowSchema.cpp index a5df0dc64..c07e548e7 100644 --- a/libminifi/src/core/flow/FlowSchema.cpp +++ b/libminifi/src/core/flow/FlowSchema.cpp @@ -32,7 +32,6 @@ FlowSchema FlowSchema::getDefault() { .penalization_period = {"penalization period"}, .proc_yield_period = {"yield period"}, .runduration_nanos = {"run duration nanos"}, - .onschedule_retry_interval = {"onschedule retry interval"}, .connections = {"Connections"}, .max_queue_size = {"max work queue size"}, @@ -92,7 +91,6 @@ FlowSchema FlowSchema::getNiFiFlowJson() { .proc_yield_period = {"yieldDuration"}, // TODO(adebreceni): MINIFICPP-2033 since this is unused the mismatch between nano and milli is not an issue .runduration_nanos = {"runDurationMillis"}, - .onschedule_retry_interval = {}, .connections = {"connections"}, .max_queue_size = {"backPressureObjectThreshold"}, diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp index 4e5461e1a..322b8d042 100644 --- a/libminifi/src/core/flow/StructuredConfiguration.cpp +++ b/libminifi/src/core/flow/StructuredConfiguration.cpp @@ -80,17 +80,6 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup( group = FlowConfiguration::createSimpleProcessGroup(flowName, uuid, version); } - if (node[schema_.onschedule_retry_interval]) { - auto onScheduleRetryPeriod = node[schema_.onschedule_retry_interval].getString().value(); - logger_->log_debug("parseRootProcessGroup: onschedule retry period => [%s]", onScheduleRetryPeriod); - - auto on_schedule_retry_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(onScheduleRetryPeriod); - if (on_schedule_retry_period_value.has_value() && group) { - logger_->log_debug("parseRootProcessGroup: onschedule retry => [%" PRId64 "] ms", on_schedule_retry_period_value->count()); - group->setOnScheduleRetryPeriod(on_schedule_retry_period_value->count()); - } - } - return group; } diff --git a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp index 14f74f6b4..7446dadb3 100644 --- a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp +++ b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp @@ -68,6 +68,7 @@ class KamikazeErrorHandlingTests : public IntegrationBase { LogTestController::getInstance().setDebug<core::Processor>(); LogTestController::getInstance().setDebug<core::ProcessSession>(); LogTestController::getInstance().setDebug<minifi::processors::KamikazeProcessor>(); + configuration->set(minifi::Configure::nifi_administrative_yield_duration, "100 ms"); } }; @@ -109,6 +110,7 @@ class EventDriverScheduleErrorHandlingTests: public IntegrationBase { void testSetup() override { LogTestController::getInstance().setDebug<core::ProcessGroup>(); + configuration->set(minifi::Configure::nifi_administrative_yield_duration, "100 ms"); } }; diff --git a/libminifi/test/resources/TestKafkaOnSchedule.yml b/libminifi/test/resources/TestKafkaOnSchedule.yml index 26102c5e5..e12766a02 100644 --- a/libminifi/test/resources/TestKafkaOnSchedule.yml +++ b/libminifi/test/resources/TestKafkaOnSchedule.yml @@ -19,7 +19,6 @@ Flow Controller: name: MiNiFi Flow - onschedule retry interval: 100 ms Processors: - name: generate id: 2438e3c8-015a-1000-79ca-83af40ec1991 diff --git a/libminifi/test/resources/TestOnScheduleRetry.yml b/libminifi/test/resources/TestOnScheduleRetry.yml index c22e3dbf9..5122df746 100644 --- a/libminifi/test/resources/TestOnScheduleRetry.yml +++ b/libminifi/test/resources/TestOnScheduleRetry.yml @@ -19,7 +19,6 @@ Flow Controller: name: MiNiFi Flow id: 2438e3c8-015a-1000-79ca-83af40ec1990 - onschedule retry interval: 1000 ms Processors: - name: kamikaze id: 2438e3c8-015a-1000-79ca-83af40ec1991 diff --git a/libminifi/test/resources/TestStateTransactionality.yml b/libminifi/test/resources/TestStateTransactionality.yml index 5269c2915..7d7037812 100644 --- a/libminifi/test/resources/TestStateTransactionality.yml +++ b/libminifi/test/resources/TestStateTransactionality.yml @@ -19,7 +19,6 @@ Flow Controller: name: MiNiFi Flow id: 2438e3c8-015a-1000-79ca-83af40ec1990 - onschedule retry interval: 1000 ms Processors: - name: statefulProcessor id: 2437e3c8-a15a-4567-79ca-83af40ec1998
