lordgamez commented on code in PR #1414: URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1011972454
########## extensions/standard-processors/processors/ExecuteProcess.cpp: ########## @@ -74,172 +68,226 @@ void ExecuteProcess::initialize() { setSupportedRelationships(relationships()); } -void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { +void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) { + gsl_Expects(context); std::string value; - std::shared_ptr<core::FlowFile> flow_file; - if (context->getProperty(Command, value, flow_file)) { - this->_command = value; + if (context->getProperty(Command.getName(), value)) { + command_ = value; } - if (context->getProperty(CommandArguments, value, flow_file)) { - this->_commandArgument = value; + if (context->getProperty(CommandArguments.getName(), value)) { + command_argument_ = value; } - if (context->getProperty(WorkingDir, value, flow_file)) { - this->_workingDir = value; + if (context->getProperty(WorkingDir.getName(), value)) { + working_dir_ = value; } if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) { - _batchDuration = batch_duration->getMilliseconds(); - logger_->log_debug("Setting _batchDuration"); + batch_duration_ = batch_duration->getMilliseconds(); + logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count()); } if (context->getProperty(RedirectErrorStream.getName(), value)) { - _redirectErrorStream = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false); + redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false); } - this->_fullCommand = _command + " " + _commandArgument; - if (_fullCommand.length() == 0) { - yield(); + full_command_ = command_ + " " + command_argument_; +} + +bool ExecuteProcess::changeWorkdir() const { + if (working_dir_.length() > 0 && working_dir_ != ".") { + if (chdir(working_dir_.c_str()) != 0) { + logger_->log_error("Execute Command can not chdir %s", working_dir_); + return false; + } + } + return true; +} + +std::vector<std::string> ExecuteProcess::readArgs() const { + std::vector<std::string> args; + std::string current_param; + bool in_escaped = false; + auto currentParamShouldBeAppended = [&](std::size_t i) { + bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\'; + bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped; + bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' '; + return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character; + }; + + for (std::size_t i = 0; i < full_command_.size(); ++i) { + if (currentParamShouldBeAppended(i)) { + current_param += full_command_[i]; + } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) { + in_escaped = !in_escaped; + } else if (full_command_[i] == ' ' && !in_escaped) { + if (!current_param.empty()) { + args.push_back(current_param); + } + current_param.clear(); + } + } + if (!current_param.empty()) { + args.push_back(current_param); + } + return args; +} + +void ExecuteProcess::executeProcessForkFailed() { + logger_->log_error("Execute Process fork failed"); + close(pipefd_[0]); + close(pipefd_[1]); + yield(); +} + +void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) { + const int STDOUT = 1; + const int STDERR = 2; + close(STDOUT); + const auto guard = gsl::finally([]() { + exit(1); Review Comment: Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853 that it should only return 1 in error cases ########## extensions/standard-processors/processors/ExecuteProcess.cpp: ########## @@ -74,172 +68,226 @@ void ExecuteProcess::initialize() { setSupportedRelationships(relationships()); } -void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { +void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) { + gsl_Expects(context); std::string value; - std::shared_ptr<core::FlowFile> flow_file; - if (context->getProperty(Command, value, flow_file)) { - this->_command = value; + if (context->getProperty(Command.getName(), value)) { + command_ = value; } - if (context->getProperty(CommandArguments, value, flow_file)) { - this->_commandArgument = value; + if (context->getProperty(CommandArguments.getName(), value)) { + command_argument_ = value; } - if (context->getProperty(WorkingDir, value, flow_file)) { - this->_workingDir = value; + if (context->getProperty(WorkingDir.getName(), value)) { + working_dir_ = value; } if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) { - _batchDuration = batch_duration->getMilliseconds(); - logger_->log_debug("Setting _batchDuration"); + batch_duration_ = batch_duration->getMilliseconds(); + logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count()); } if (context->getProperty(RedirectErrorStream.getName(), value)) { - _redirectErrorStream = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false); + redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false); } - this->_fullCommand = _command + " " + _commandArgument; - if (_fullCommand.length() == 0) { - yield(); + full_command_ = command_ + " " + command_argument_; +} + +bool ExecuteProcess::changeWorkdir() const { + if (working_dir_.length() > 0 && working_dir_ != ".") { + if (chdir(working_dir_.c_str()) != 0) { + logger_->log_error("Execute Command can not chdir %s", working_dir_); + return false; + } + } + return true; +} + +std::vector<std::string> ExecuteProcess::readArgs() const { + std::vector<std::string> args; + std::string current_param; + bool in_escaped = false; + auto currentParamShouldBeAppended = [&](std::size_t i) { + bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && in_escaped && i > 0 && full_command_[i - 1] == '\\'; + bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped; + bool non_special_character = full_command_[i] != '\\' && full_command_[i] != '\"' && full_command_[i] != ' '; + return current_char_is_escaped_apostrophe || whitespace_in_escaped_block || non_special_character; + }; + + for (std::size_t i = 0; i < full_command_.size(); ++i) { + if (currentParamShouldBeAppended(i)) { + current_param += full_command_[i]; + } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || full_command_[i - 1] != '\\')) { + in_escaped = !in_escaped; + } else if (full_command_[i] == ' ' && !in_escaped) { + if (!current_param.empty()) { + args.push_back(current_param); + } + current_param.clear(); + } + } + if (!current_param.empty()) { + args.push_back(current_param); + } + return args; +} + +void ExecuteProcess::executeProcessForkFailed() { + logger_->log_error("Execute Process fork failed"); + close(pipefd_[0]); + close(pipefd_[1]); + yield(); +} + +void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) { + const int STDOUT = 1; + const int STDERR = 2; + close(STDOUT); + const auto guard = gsl::finally([]() { + exit(1); + }); + if (dup(pipefd_[1]) < 0) { // points pipefd at file descriptor + logger_->log_error("Failed to point pipe at file descriptor"); return; } - if (_workingDir.length() > 0 && _workingDir != ".") { - // change to working directory - if (chdir(_workingDir.c_str()) != 0) { - logger_->log_error("Execute Command can not chdir %s", _workingDir); - yield(); - return; + if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) { + logger_->log_error("Failed to redirect error stream of the executed process to the output stream"); + return; + } + close(pipefd_[0]); + if (execvp(argv[0], argv.data()) < 0) { + logger_->log_error("Failed to execute child process"); + } +} + +void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) { + while (true) { + std::this_thread::sleep_for(batch_duration_); + char buffer[4096]; + const auto num_read = read(pipefd_[0], buffer, sizeof(buffer)); + if (num_read <= 0) { + break; + } + logger_->log_debug("Execute Command Respond %zd", num_read); + auto flow_file = session.create(); + if (!flow_file) { + continue; } Review Comment: Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org