arpadboda commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r452930520
##########
File path: libminifi/src/FlowController.cpp
##########
@@ -239,8 +239,35 @@ int16_t FlowController::stop(bool force, uint64_t
timeToWait) {
if (running_) {
// immediately indicate that we are not running
logger_->log_info("Stop Flow Controller");
- if (this->root_)
+ if (this->root_) {
+ // stop source processors first
+ this->root_->stopProcessing(timer_scheduler_, event_scheduler_,
cron_scheduler_, [] (const std::shared_ptr<core::Processor>& proc) -> bool {
+ return !proc->hasIncomingConnections();
+ });
+ std::chrono::milliseconds shutdown_timer{0};
+ // we enable C2 to progressively increase the timeout
+ // in case it sees that waiting for a little longer could
+ // allow the FlowFiles to be processed
+ auto shutdown_timeout = [&]() -> std::chrono::milliseconds {
+ if (timeToWait != 0) {
+ return std::chrono::milliseconds{timeToWait};
+ }
+ static const core::TimePeriodValue default_timeout{"10 sec"};
Review comment:
I think this shouldn't be defined in the middle of the code, very
difficult to search for.
Moreover it can delay shutdown, so I would prefer to leave it 0 by default.
##########
File path: libminifi/src/FlowController.cpp
##########
@@ -239,8 +239,35 @@ int16_t FlowController::stop(bool force, uint64_t
timeToWait) {
if (running_) {
// immediately indicate that we are not running
logger_->log_info("Stop Flow Controller");
- if (this->root_)
+ if (this->root_) {
+ // stop source processors first
+ this->root_->stopProcessing(timer_scheduler_, event_scheduler_,
cron_scheduler_, [] (const std::shared_ptr<core::Processor>& proc) -> bool {
+ return !proc->hasIncomingConnections();
+ });
+ std::chrono::milliseconds shutdown_timer{0};
+ // we enable C2 to progressively increase the timeout
+ // in case it sees that waiting for a little longer could
+ // allow the FlowFiles to be processed
+ auto shutdown_timeout = [&]() -> std::chrono::milliseconds {
+ if (timeToWait != 0) {
+ return std::chrono::milliseconds{timeToWait};
+ }
+ static const core::TimePeriodValue default_timeout{"10 sec"};
+ utils::optional<core::TimePeriodValue> shutdown_timeout;
+ std::string shutdown_timeout_str;
+ if
(configuration_->get(minifi::Configure::nifi_flowcontroller_drain_timeout,
shutdown_timeout_str)) {
+ shutdown_timeout =
core::TimePeriodValue::fromString(shutdown_timeout_str);
+ }
+ return
std::chrono::milliseconds{shutdown_timeout.value_or(default_timeout).getMilliseconds()};
+ };
+ std::size_t count;
+ while (shutdown_timer < shutdown_timeout() && (count =
this->root_->getTotalFlowFileCount()) != 0) {
+ std::this_thread::sleep_for(shutdown_check_interval_);
+ shutdown_timer += shutdown_check_interval_;
Review comment:
I would do it by saving the current time when this activity is started
and check if the time elapsed exceeds the maximum allowed for this operation.
##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -412,6 +414,18 @@ void ProcessGroup::drainConnections() {
}
}
+std::size_t ProcessGroup::getTotalFlowFileCount() const {
+ std::size_t sum = 0;
+ for (auto& conn : connections_) {
Review comment:
Nitpicking: const auto&
##########
File path: libminifi/test/flow-tests/FlowControllerTests.cpp
##########
@@ -119,7 +107,125 @@ TEST_CASE("Flow shutdown drains connections",
"[TestFlow1]") {
controller->stop(true);
+ REQUIRE(sinkProc->trigger_count == 0);
+
for (auto& it : connectionMap) {
REQUIRE(it.second->isEmpty());
}
}
+
+TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") {
+ TestControllerWithFlow testController(yamlConfig);
+ auto controller = testController.controller_;
+ auto root = testController.root_;
+
+ auto sourceProc =
std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+ auto sinkProc =
std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+ // prevent the initial trigger
+ // in case the source got triggered
+ // and the scheduler triggers the sink
+ // before we could initiate the shutdown
+ sinkProc->yield(100);
+
+ testController.startFlow();
+
+ // wait for the source processor to enqueue its flowFiles
+ std::this_thread::sleep_for(std::chrono::milliseconds{50});
+
+ REQUIRE(sourceProc->trigger_count.load() == 1);
+ REQUIRE(sinkProc->trigger_count.load() == 0);
+
+ controller->stop(true);
+
+ REQUIRE(sourceProc->trigger_count.load() == 1);
+ REQUIRE(sinkProc->trigger_count.load() == 3);
+}
+
+TEST_CASE("Flow stopped after grace period", "[TestFlow3]") {
+ TestControllerWithFlow testController(yamlConfig);
+ auto controller = testController.controller_;
+ auto root = testController.root_;
+
+
testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout,
"1000 ms");
+
+ auto sourceProc =
std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
+ auto sinkProc =
std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
+
+ // prevent the initial trigger
+ // in case the source got triggered
+ // and the scheduler triggers the sink
+ sinkProc->yield(100);
+
+ sinkProc->onTriggerCb_ = [&]{
+ static std::atomic<bool> first_onTrigger{true};
+ bool isFirst = true;
+ // sleep only on the first trigger
+ if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
+ std::this_thread::sleep_for(std::chrono::milliseconds{1500});
+ }
+ };
+
+ testController.startFlow();
+
+ // wait for the source processor to enqueue its flowFiles
+ std::this_thread::sleep_for(std::chrono::milliseconds{50});
Review comment:
Can't we simply wait here while the source processor's trigger count
becomes one?
##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -412,6 +414,18 @@ void ProcessGroup::drainConnections() {
}
}
+std::size_t ProcessGroup::getTotalFlowFileCount() const {
+ std::size_t sum = 0;
+ for (auto& conn : connections_) {
+ sum += conn->getQueueSize();
+ }
+
+ for (ProcessGroup* childGroup : child_process_groups_) {
Review comment:
const ptr
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]