adamdebreceni commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r454137073
##########
File path: libminifi/src/FlowController.cpp
##########
@@ -239,8 +239,35 @@ int16_t FlowController::stop(bool force, uint64_t
timeToWait) {
if (running_) {
// immediately indicate that we are not running
logger_->log_info("Stop Flow Controller");
- if (this->root_)
+ if (this->root_) {
+ // stop source processors first
+ this->root_->stopProcessing(timer_scheduler_, event_scheduler_,
cron_scheduler_, [] (const std::shared_ptr<core::Processor>& proc) -> bool {
+ return !proc->hasIncomingConnections();
+ });
+ std::chrono::milliseconds shutdown_timer{0};
+ // we enable C2 to progressively increase the timeout
+ // in case it sees that waiting for a little longer could
+ // allow the FlowFiles to be processed
+ auto shutdown_timeout = [&]() -> std::chrono::milliseconds {
+ if (timeToWait != 0) {
+ return std::chrono::milliseconds{timeToWait};
+ }
+ static const core::TimePeriodValue default_timeout{"10 sec"};
+ utils::optional<core::TimePeriodValue> shutdown_timeout;
+ std::string shutdown_timeout_str;
+ if
(configuration_->get(minifi::Configure::nifi_flowcontroller_drain_timeout,
shutdown_timeout_str)) {
+ shutdown_timeout =
core::TimePeriodValue::fromString(shutdown_timeout_str);
+ }
+ return
std::chrono::milliseconds{shutdown_timeout.value_or(default_timeout).getMilliseconds()};
+ };
+ std::size_t count;
+ while (shutdown_timer < shutdown_timeout() && (count =
this->root_->getTotalFlowFileCount()) != 0) {
+ std::this_thread::sleep_for(shutdown_check_interval_);
+ shutdown_timer += shutdown_check_interval_;
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]