This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 4624d302ac3522c948fe87bbd96a7a6f6c8de901
Author: Martin Zink <[email protected]>
AuthorDate: Mon Oct 2 19:31:09 2023 +0200

    MINIFICPP-2212 use administrative yield duration instead of onschedule 
retry interval
    
    Closes #1664
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 CONFIGURE.md                                            |  1 -
 libminifi/include/SchedulingAgent.h                     |  2 ++
 libminifi/include/core/ProcessGroup.h                   |  9 ---------
 libminifi/include/core/flow/FlowSchema.h                |  1 -
 libminifi/src/agent/JsonSchema.cpp                      |  4 +---
 libminifi/src/core/ProcessGroup.cpp                     | 17 +++++------------
 libminifi/src/core/flow/FlowSchema.cpp                  |  2 --
 libminifi/src/core/flow/StructuredConfiguration.cpp     | 11 -----------
 .../test/integration/OnScheduleErrorHandlingTests.cpp   |  2 ++
 libminifi/test/resources/TestKafkaOnSchedule.yml        |  1 -
 libminifi/test/resources/TestOnScheduleRetry.yml        |  1 -
 libminifi/test/resources/TestStateTransactionality.yml  |  1 -
 12 files changed, 10 insertions(+), 42 deletions(-)

diff --git a/CONFIGURE.md b/CONFIGURE.md
index 178821994..f23b389a9 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -29,7 +29,6 @@ It's recommended to create your configuration in YAML format 
or configure the ag
     Flow Controller:
         id: 471deef6-2a6e-4a7d-912a-81cc17e3a205
         name: MiNiFi Flow
-        onschedule retry interval: 30000 ms
 
     Processors:
         - name: GetFile
diff --git a/libminifi/include/SchedulingAgent.h 
b/libminifi/include/SchedulingAgent.h
index b24b74d97..398cd52d2 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -109,6 +109,8 @@ class SchedulingAgent {
   SchedulingAgent(const SchedulingAgent &parent) = delete;
   SchedulingAgent &operator=(const SchedulingAgent &parent) = delete;
 
+  std::chrono::milliseconds getAdminYieldDuration() const { return 
admin_yield_duration_; }
+
  protected:
   std::mutex mutex_;
   std::atomic<bool> running_;
diff --git a/libminifi/include/core/ProcessGroup.h 
b/libminifi/include/core/ProcessGroup.h
index ce507cb9a..d149dad69 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -132,14 +132,6 @@ class ProcessGroup : public CoreComponent {
     return (yield_period_msec_);
   }
 
-  void setOnScheduleRetryPeriod(int64_t period) {
-    onschedule_retry_msec_ = period;
-  }
-
-  int64_t getOnScheduleRetryPeriod() {
-    return onschedule_retry_msec_;
-  }
-
   int getVersion() const {
     return config_version_;
   }
@@ -247,7 +239,6 @@ class ProcessGroup : public CoreComponent {
   // Yield Period in Milliseconds
   std::atomic<std::chrono::milliseconds> yield_period_msec_;
   std::atomic<uint64_t> timeout_;
-  std::atomic<int64_t> onschedule_retry_msec_;
 
   // URL
   std::string url_;
diff --git a/libminifi/include/core/flow/FlowSchema.h 
b/libminifi/include/core/flow/FlowSchema.h
index 0badac303..773550ddc 100644
--- a/libminifi/include/core/flow/FlowSchema.h
+++ b/libminifi/include/core/flow/FlowSchema.h
@@ -36,7 +36,6 @@ struct FlowSchema {
   Keys penalization_period;
   Keys proc_yield_period;
   Keys runduration_nanos;
-  Keys onschedule_retry_interval;
 
   Keys connections;
   Keys max_queue_size;
diff --git a/libminifi/src/agent/JsonSchema.cpp 
b/libminifi/src/agent/JsonSchema.cpp
index f9bfa3895..90a1df510 100644
--- a/libminifi/src/agent/JsonSchema.cpp
+++ b/libminifi/src/agent/JsonSchema.cpp
@@ -353,7 +353,6 @@ static std::string buildSchema(const 
std::unordered_map<std::string, std::string
       "properties": {
         "name": {"type": "string"},
         "version": {"type": "integer"},
-        "onschedule retry interval": {"$ref": "#/definitions/time"},
         )" + process_group_properties + R"(
       }
     },
@@ -368,8 +367,7 @@ static std::string buildSchema(const 
std::unordered_map<std::string, std::string
           "required": ["name"],
           "properties": {
             "name": {"type": "string"},
-            "version": {"type": "integer"},
-            "onschedule retry interval": {"$ref": "#/definitions/time"}
+            "version": {"type": "integer"}
           }
         },
         )" + process_group_properties + R"(
diff --git a/libminifi/src/core/ProcessGroup.cpp 
b/libminifi/src/core/ProcessGroup.cpp
index db5e9a7eb..e56cbfd14 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -33,8 +33,6 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::core {
 
-constexpr int DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS = 30000;
-
 std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = 
utils::IdGenerator::getIdGenerator();
 
 ProcessGroup::ProcessGroup(ProcessGroupType type, std::string_view name, const 
utils::Identifier& uuid)
@@ -54,12 +52,6 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, 
std::string_view name, const u
       transmitting_(false),
       transport_protocol_("RAW"),
       logger_(logging::LoggerFactory<ProcessGroup>::getLogger()) {
-  if (parent_process_group_ != nullptr) {
-    onschedule_retry_msec_ = parent_process_group_->getOnScheduleRetryPeriod();
-  } else {
-    onschedule_retry_msec_ = DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS;
-  }
-
   logger_->log_debug("ProcessGroup %s created", name_);
 }
 
@@ -69,7 +61,6 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, 
std::string_view name)
       type_(type),
       parent_process_group_(nullptr),
       yield_period_msec_(0ms),
-      onschedule_retry_msec_(DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS),
       transmitting_(false),
       transport_protocol_("RAW"),
       logger_(logging::LoggerFactory<ProcessGroup>::getLogger()) {
@@ -167,12 +158,14 @@ void 
ProcessGroup::startProcessingProcessors(TimerDrivenSchedulingAgent& timeSch
     }
   }
 
-  if (!onScheduleTimer_ && !failed_processors_.empty() && 
onschedule_retry_msec_ > 0) {
-    logger_->log_info("Retrying failed processors in %lld msec", 
onschedule_retry_msec_.load());
+  // The admin yield duration comes from the configuration, should be equal in 
all three schedulers
+  std::chrono::milliseconds admin_yield_duration = 
timeScheduler.getAdminYieldDuration();
+  if (!onScheduleTimer_ && !failed_processors_.empty() && admin_yield_duration 
> 0ms) {
+    logger_->log_info("Retrying failed processors in %lld msec", 
admin_yield_duration.count());
     auto func = [this, eventScheduler = &eventScheduler, cronScheduler = 
&cronScheduler, timeScheduler = &timeScheduler]() {
       this->startProcessingProcessors(*timeScheduler, *eventScheduler, 
*cronScheduler);
     };
-    onScheduleTimer_ = 
std::make_unique<utils::CallBackTimer>(std::chrono::milliseconds(onschedule_retry_msec_),
 func);
+    onScheduleTimer_ = 
std::make_unique<utils::CallBackTimer>(admin_yield_duration, func);
     onScheduleTimer_->start();
   } else if (failed_processors_.empty() && onScheduleTimer_) {
     onScheduleTimer_->stop();
diff --git a/libminifi/src/core/flow/FlowSchema.cpp 
b/libminifi/src/core/flow/FlowSchema.cpp
index a5df0dc64..c07e548e7 100644
--- a/libminifi/src/core/flow/FlowSchema.cpp
+++ b/libminifi/src/core/flow/FlowSchema.cpp
@@ -32,7 +32,6 @@ FlowSchema FlowSchema::getDefault() {
       .penalization_period = {"penalization period"},
       .proc_yield_period = {"yield period"},
       .runduration_nanos = {"run duration nanos"},
-      .onschedule_retry_interval = {"onschedule retry interval"},
 
       .connections = {"Connections"},
       .max_queue_size = {"max work queue size"},
@@ -92,7 +91,6 @@ FlowSchema FlowSchema::getNiFiFlowJson() {
       .proc_yield_period = {"yieldDuration"},
       // TODO(adebreceni): MINIFICPP-2033 since this is unused the mismatch 
between nano and milli is not an issue
       .runduration_nanos = {"runDurationMillis"},
-      .onschedule_retry_interval = {},
 
       .connections = {"connections"},
       .max_queue_size = {"backPressureObjectThreshold"},
diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp 
b/libminifi/src/core/flow/StructuredConfiguration.cpp
index 4e5461e1a..322b8d042 100644
--- a/libminifi/src/core/flow/StructuredConfiguration.cpp
+++ b/libminifi/src/core/flow/StructuredConfiguration.cpp
@@ -80,17 +80,6 @@ std::unique_ptr<core::ProcessGroup> 
StructuredConfiguration::createProcessGroup(
     group = FlowConfiguration::createSimpleProcessGroup(flowName, uuid, 
version);
   }
 
-  if (node[schema_.onschedule_retry_interval]) {
-    auto onScheduleRetryPeriod = 
node[schema_.onschedule_retry_interval].getString().value();
-    logger_->log_debug("parseRootProcessGroup: onschedule retry period => 
[%s]", onScheduleRetryPeriod);
-
-    auto on_schedule_retry_period_value = 
utils::timeutils::StringToDuration<std::chrono::milliseconds>(onScheduleRetryPeriod);
-    if (on_schedule_retry_period_value.has_value() && group) {
-      logger_->log_debug("parseRootProcessGroup: onschedule retry => [%" 
PRId64 "] ms", on_schedule_retry_period_value->count());
-      group->setOnScheduleRetryPeriod(on_schedule_retry_period_value->count());
-    }
-  }
-
   return group;
 }
 
diff --git a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp 
b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
index 14f74f6b4..7446dadb3 100644
--- a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
+++ b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
@@ -68,6 +68,7 @@ class KamikazeErrorHandlingTests : public IntegrationBase {
     LogTestController::getInstance().setDebug<core::Processor>();
     LogTestController::getInstance().setDebug<core::ProcessSession>();
     
LogTestController::getInstance().setDebug<minifi::processors::KamikazeProcessor>();
+    configuration->set(minifi::Configure::nifi_administrative_yield_duration, 
"100 ms");
   }
 };
 
@@ -109,6 +110,7 @@ class EventDriverScheduleErrorHandlingTests: public 
IntegrationBase {
 
   void testSetup() override {
     LogTestController::getInstance().setDebug<core::ProcessGroup>();
+    configuration->set(minifi::Configure::nifi_administrative_yield_duration, 
"100 ms");
   }
 };
 
diff --git a/libminifi/test/resources/TestKafkaOnSchedule.yml 
b/libminifi/test/resources/TestKafkaOnSchedule.yml
index 26102c5e5..e12766a02 100644
--- a/libminifi/test/resources/TestKafkaOnSchedule.yml
+++ b/libminifi/test/resources/TestKafkaOnSchedule.yml
@@ -19,7 +19,6 @@
 
 Flow Controller:
   name: MiNiFi Flow
-  onschedule retry interval: 100 ms
 Processors:
   - name: generate
     id: 2438e3c8-015a-1000-79ca-83af40ec1991
diff --git a/libminifi/test/resources/TestOnScheduleRetry.yml 
b/libminifi/test/resources/TestOnScheduleRetry.yml
index c22e3dbf9..5122df746 100644
--- a/libminifi/test/resources/TestOnScheduleRetry.yml
+++ b/libminifi/test/resources/TestOnScheduleRetry.yml
@@ -19,7 +19,6 @@
 Flow Controller:
     name: MiNiFi Flow
     id: 2438e3c8-015a-1000-79ca-83af40ec1990
-    onschedule retry interval: 1000 ms
 Processors:
     - name: kamikaze
       id: 2438e3c8-015a-1000-79ca-83af40ec1991
diff --git a/libminifi/test/resources/TestStateTransactionality.yml 
b/libminifi/test/resources/TestStateTransactionality.yml
index 5269c2915..7d7037812 100644
--- a/libminifi/test/resources/TestStateTransactionality.yml
+++ b/libminifi/test/resources/TestStateTransactionality.yml
@@ -19,7 +19,6 @@
 Flow Controller:
     name: MiNiFi Flow
     id: 2438e3c8-015a-1000-79ca-83af40ec1990
-    onschedule retry interval: 1000 ms
 Processors:
     - name: statefulProcessor
       id: 2437e3c8-a15a-4567-79ca-83af40ec1998

Reply via email to