lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1016279852
##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +70,202 @@ void ExecuteProcess::initialize() {
setSupportedRelationships(relationships());
}
-void ExecuteProcess::onTrigger(core::ProcessContext *context,
core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context,
core::ProcessSessionFactory* /*session_factory*/) {
+ gsl_Expects(context);
std::string value;
- std::shared_ptr<core::FlowFile> flow_file;
- if (context->getProperty(Command, value, flow_file)) {
- this->_command = value;
+ if (context->getProperty(Command.getName(), value)) {
+ command_ = value;
}
- if (context->getProperty(CommandArguments, value, flow_file)) {
- this->_commandArgument = value;
+ if (context->getProperty(CommandArguments.getName(), value)) {
+ command_argument_ = value;
}
- if (context->getProperty(WorkingDir, value, flow_file)) {
- this->_workingDir = value;
+ if (context->getProperty(WorkingDir.getName(), value)) {
+ working_dir_ = value;
}
if (auto batch_duration =
context->getProperty<core::TimePeriodValue>(BatchDuration)) {
- _batchDuration = batch_duration->getMilliseconds();
- logger_->log_debug("Setting _batchDuration");
+ batch_duration_ = batch_duration->getMilliseconds();
+ logger_->log_debug("Setting batch duration to %d milliseconds",
batch_duration_.count());
}
if (context->getProperty(RedirectErrorStream.getName(), value)) {
- _redirectErrorStream =
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+ redirect_error_stream_ =
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
}
- this->_fullCommand = _command + " " + _commandArgument;
- if (_fullCommand.length() == 0) {
- yield();
- return;
+ full_command_ = command_ + " " + command_argument_;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+ std::vector<std::string> args;
+ std::stringstream input_stream{full_command_};
+ while (input_stream) {
+ std::string word;
+ input_stream >> std::quoted(word);
+ if (!word.empty()) {
+ args.push_back(word);
+ }
}
- if (_workingDir.length() > 0 && _workingDir != ".") {
- // change to working directory
- if (chdir(_workingDir.c_str()) != 0) {
- logger_->log_error("Execute Command can not chdir %s", _workingDir);
- yield();
- return;
+
+ return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+ logger_->log_error("Execute Process fork failed");
+ close(pipefd_[0]);
+ close(pipefd_[1]);
+ yield();
+}
+
+void ExecuteProcess::executeChildProcess() {
+ std::vector<char*> argv;
+ auto args = readArgs();
+ argv.reserve(args.size() + 1);
+ for (auto& arg : args) {
+ argv.push_back(arg.data());
+ }
+ argv.push_back(nullptr);
+
+ static constexpr int STDOUT = 1;
+ static constexpr int STDERR = 2;
+ close(STDOUT);
+ if (dup(pipefd_[1]) < 0) { // points pipefd at file descriptor
Review Comment:
Thanks for the clarification, the official `dup2` description was not quite
clear on this to me, updated in 4ef7c6d01164c907defe70995b632e085911e83f
##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +70,202 @@ void ExecuteProcess::initialize() {
setSupportedRelationships(relationships());
}
-void ExecuteProcess::onTrigger(core::ProcessContext *context,
core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context,
core::ProcessSessionFactory* /*session_factory*/) {
+ gsl_Expects(context);
std::string value;
- std::shared_ptr<core::FlowFile> flow_file;
- if (context->getProperty(Command, value, flow_file)) {
- this->_command = value;
+ if (context->getProperty(Command.getName(), value)) {
+ command_ = value;
}
- if (context->getProperty(CommandArguments, value, flow_file)) {
- this->_commandArgument = value;
+ if (context->getProperty(CommandArguments.getName(), value)) {
+ command_argument_ = value;
}
- if (context->getProperty(WorkingDir, value, flow_file)) {
- this->_workingDir = value;
+ if (context->getProperty(WorkingDir.getName(), value)) {
+ working_dir_ = value;
}
if (auto batch_duration =
context->getProperty<core::TimePeriodValue>(BatchDuration)) {
- _batchDuration = batch_duration->getMilliseconds();
- logger_->log_debug("Setting _batchDuration");
+ batch_duration_ = batch_duration->getMilliseconds();
+ logger_->log_debug("Setting batch duration to %d milliseconds",
batch_duration_.count());
}
if (context->getProperty(RedirectErrorStream.getName(), value)) {
- _redirectErrorStream =
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+ redirect_error_stream_ =
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
}
- this->_fullCommand = _command + " " + _commandArgument;
- if (_fullCommand.length() == 0) {
- yield();
- return;
+ full_command_ = command_ + " " + command_argument_;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+ std::vector<std::string> args;
+ std::stringstream input_stream{full_command_};
+ while (input_stream) {
+ std::string word;
+ input_stream >> std::quoted(word);
+ if (!word.empty()) {
+ args.push_back(word);
+ }
}
- if (_workingDir.length() > 0 && _workingDir != ".") {
- // change to working directory
- if (chdir(_workingDir.c_str()) != 0) {
- logger_->log_error("Execute Command can not chdir %s", _workingDir);
- yield();
- return;
+
+ return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+ logger_->log_error("Execute Process fork failed");
+ close(pipefd_[0]);
+ close(pipefd_[1]);
+ yield();
+}
+
+void ExecuteProcess::executeChildProcess() {
+ std::vector<char*> argv;
+ auto args = readArgs();
+ argv.reserve(args.size() + 1);
+ for (auto& arg : args) {
+ argv.push_back(arg.data());
+ }
+ argv.push_back(nullptr);
+
+ static constexpr int STDOUT = 1;
+ static constexpr int STDERR = 2;
+ close(STDOUT);
+ if (dup(pipefd_[1]) < 0) { // points pipefd at file descriptor
+ logger_->log_error("Failed to point pipe at file descriptor");
+ exit(1);
+ }
+ if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+ logger_->log_error("Failed to redirect error stream of the executed
process to the output stream");
+ exit(1);
+ }
+ close(pipefd_[0]);
+ if (execvp(argv[0], argv.data()) < 0) {
+ logger_->log_error("Failed to execute child process");
+ exit(1);
+ }
+ exit(0);
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+ while (true) {
+ std::this_thread::sleep_for(batch_duration_);
+ char buffer[4096];
+ const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+ if (num_read <= 0) {
+ break;
+ }
+ logger_->log_debug("Execute Command Respond %zd", num_read);
+ auto flow_file = session.create();
+ if (!flow_file) {
+ logger_->log_error("Flow file could not be created!");
+ continue;
}
+ flow_file->addAttribute("command", command_);
+ flow_file->addAttribute("command.arguments", command_argument_);
+ session.writeBuffer(flow_file, gsl::make_span(buffer,
gsl::narrow<size_t>(num_read)));
+ session.transfer(flow_file, Success);
+ session.commit();
}
- logger_->log_info("Execute Command %s", _fullCommand);
- // split the command into array
- char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
- int argc = 0;
- char *argv[64];
- while (p != 0 && argc < 64) {
- argv[argc] = p;
- p = std::strtok(NULL, " ");
- argc++;
- }
- argv[argc] = NULL;
- int status;
- if (!_processRunning) {
- _processRunning = true;
- // if the process has not launched yet
- // create the pipe
- if (pipe(_pipefd) == -1) {
- _processRunning = false;
- yield();
- return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session,
std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const
{
+ if (!flow_file) {
+ flow_file = session.create();
+ if (!flow_file) {
+ logger_->log_error("Flow file could not be created!");
+ return false;
}
- switch (_pid = fork()) {
- case -1:
- logger_->log_error("Execute Process fork failed");
- _processRunning = false;
- close(_pipefd[0]);
- close(_pipefd[1]);
- yield();
- break;
- case 0: // this is the code the child runs
- close(1); // close stdout
- dup(_pipefd[1]); // points pipefd at file descriptor
- if (_redirectErrorStream)
- // redirect stderr
- dup2(_pipefd[1], 2);
- close(_pipefd[0]);
- execvp(argv[0], argv);
- exit(1);
- break;
- default: // this is the code the parent runs
- // the parent isn't going to write to the pipe
- close(_pipefd[1]);
- if (_batchDuration > 0ms) {
- while (true) {
- std::this_thread::sleep_for(_batchDuration);
- char buffer[4096];
- const auto numRead = read(_pipefd[0], buffer, sizeof(buffer));
- if (numRead <= 0)
- break;
- logger_->log_debug("Execute Command Respond %zd", numRead);
- auto flowFile = session->create();
- if (!flowFile)
- continue;
- flowFile->addAttribute("command", _command);
- flowFile->addAttribute("command.arguments", _commandArgument);
- session->writeBuffer(flowFile, gsl::make_span(buffer,
gsl::narrow<size_t>(numRead)));
- session->transfer(flowFile, Success);
- session->commit();
- }
- } else {
- char buffer[4096];
- char *bufPtr = buffer;
- size_t totalRead = 0;
- std::shared_ptr<core::FlowFile> flowFile = nullptr;
- while (true) {
- const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) -
totalRead));
- if (numRead <= 0) {
- if (totalRead > 0) {
- logger_->log_debug("Execute Command Respond %zu", totalRead);
- // child exits and close the pipe
- const auto buffer_span = gsl::make_span(buffer, totalRead);
- if (!flowFile) {
- flowFile = session->create();
- if (!flowFile)
- break;
- flowFile->addAttribute("command", _command);
- flowFile->addAttribute("command.arguments",
_commandArgument);
- session->writeBuffer(flowFile, buffer_span);
- } else {
- session->appendBuffer(flowFile, buffer_span);
- }
- session->transfer(flowFile, Success);
- }
- break;
- } else {
- if (numRead == static_cast<ssize_t>((sizeof(buffer) -
totalRead))) {
- // we reach the max buffer size
- logger_->log_debug("Execute Command Max Respond %zu",
sizeof(buffer));
- if (!flowFile) {
- flowFile = session->create();
- if (!flowFile)
- continue;
- flowFile->addAttribute("command", _command);
- flowFile->addAttribute("command.arguments",
_commandArgument);
- session->writeBuffer(flowFile, buffer);
- } else {
- session->appendBuffer(flowFile, buffer);
- }
- // Rewind
- totalRead = 0;
- bufPtr = buffer;
- } else {
- totalRead += numRead;
- bufPtr += numRead;
- }
- }
- }
- }
-
- wait(&status);
- if (WIFEXITED(status)) {
- logger_->log_info("Execute Command Complete %s status %d pid %d",
_fullCommand, WEXITSTATUS(status), _pid);
- } else {
- logger_->log_info("Execute Command Complete %s status %d pid %d",
_fullCommand, WTERMSIG(status), _pid);
- }
-
- close(_pipefd[0]);
- _processRunning = false;
- break;
+ flow_file->addAttribute("command", command_);
+ flow_file->addAttribute("command.arguments", command_argument_);
+ session.writeBuffer(flow_file, buffer);
+ } else {
+ session.appendBuffer(flow_file, buffer);
+ }
+ return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+ char buffer[4096];
+ char *buf_ptr = buffer;
+ size_t read_to_buffer = 0;
+ std::shared_ptr<core::FlowFile> flow_file;
+ auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - read_to_buffer));
Review Comment:
Good catch, updated in 4ef7c6d01164c907defe70995b632e085911e83f
##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +70,202 @@ void ExecuteProcess::initialize() {
setSupportedRelationships(relationships());
}
-void ExecuteProcess::onTrigger(core::ProcessContext *context,
core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context,
core::ProcessSessionFactory* /*session_factory*/) {
+ gsl_Expects(context);
std::string value;
- std::shared_ptr<core::FlowFile> flow_file;
- if (context->getProperty(Command, value, flow_file)) {
- this->_command = value;
+ if (context->getProperty(Command.getName(), value)) {
+ command_ = value;
}
- if (context->getProperty(CommandArguments, value, flow_file)) {
- this->_commandArgument = value;
+ if (context->getProperty(CommandArguments.getName(), value)) {
+ command_argument_ = value;
}
- if (context->getProperty(WorkingDir, value, flow_file)) {
- this->_workingDir = value;
+ if (context->getProperty(WorkingDir.getName(), value)) {
+ working_dir_ = value;
}
if (auto batch_duration =
context->getProperty<core::TimePeriodValue>(BatchDuration)) {
- _batchDuration = batch_duration->getMilliseconds();
- logger_->log_debug("Setting _batchDuration");
+ batch_duration_ = batch_duration->getMilliseconds();
+ logger_->log_debug("Setting batch duration to %d milliseconds",
batch_duration_.count());
}
if (context->getProperty(RedirectErrorStream.getName(), value)) {
- _redirectErrorStream =
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+ redirect_error_stream_ =
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
}
- this->_fullCommand = _command + " " + _commandArgument;
- if (_fullCommand.length() == 0) {
- yield();
- return;
+ full_command_ = command_ + " " + command_argument_;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+ std::vector<std::string> args;
+ std::stringstream input_stream{full_command_};
+ while (input_stream) {
+ std::string word;
+ input_stream >> std::quoted(word);
+ if (!word.empty()) {
+ args.push_back(word);
+ }
}
- if (_workingDir.length() > 0 && _workingDir != ".") {
- // change to working directory
- if (chdir(_workingDir.c_str()) != 0) {
- logger_->log_error("Execute Command can not chdir %s", _workingDir);
- yield();
- return;
+
+ return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+ logger_->log_error("Execute Process fork failed");
+ close(pipefd_[0]);
+ close(pipefd_[1]);
+ yield();
+}
+
+void ExecuteProcess::executeChildProcess() {
+ std::vector<char*> argv;
+ auto args = readArgs();
+ argv.reserve(args.size() + 1);
+ for (auto& arg : args) {
+ argv.push_back(arg.data());
+ }
+ argv.push_back(nullptr);
+
+ static constexpr int STDOUT = 1;
+ static constexpr int STDERR = 2;
+ close(STDOUT);
+ if (dup(pipefd_[1]) < 0) { // points pipefd at file descriptor
+ logger_->log_error("Failed to point pipe at file descriptor");
+ exit(1);
+ }
+ if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+ logger_->log_error("Failed to redirect error stream of the executed
process to the output stream");
+ exit(1);
+ }
+ close(pipefd_[0]);
+ if (execvp(argv[0], argv.data()) < 0) {
+ logger_->log_error("Failed to execute child process");
+ exit(1);
+ }
+ exit(0);
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+ while (true) {
+ std::this_thread::sleep_for(batch_duration_);
+ char buffer[4096];
+ const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+ if (num_read <= 0) {
+ break;
+ }
+ logger_->log_debug("Execute Command Respond %zd", num_read);
+ auto flow_file = session.create();
+ if (!flow_file) {
+ logger_->log_error("Flow file could not be created!");
+ continue;
}
+ flow_file->addAttribute("command", command_);
+ flow_file->addAttribute("command.arguments", command_argument_);
+ session.writeBuffer(flow_file, gsl::make_span(buffer,
gsl::narrow<size_t>(num_read)));
+ session.transfer(flow_file, Success);
+ session.commit();
}
- logger_->log_info("Execute Command %s", _fullCommand);
- // split the command into array
- char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
- int argc = 0;
- char *argv[64];
- while (p != 0 && argc < 64) {
- argv[argc] = p;
- p = std::strtok(NULL, " ");
- argc++;
- }
- argv[argc] = NULL;
- int status;
- if (!_processRunning) {
- _processRunning = true;
- // if the process has not launched yet
- // create the pipe
- if (pipe(_pipefd) == -1) {
- _processRunning = false;
- yield();
- return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session,
std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const
{
+ if (!flow_file) {
+ flow_file = session.create();
+ if (!flow_file) {
+ logger_->log_error("Flow file could not be created!");
+ return false;
}
- switch (_pid = fork()) {
- case -1:
- logger_->log_error("Execute Process fork failed");
- _processRunning = false;
- close(_pipefd[0]);
- close(_pipefd[1]);
- yield();
- break;
- case 0: // this is the code the child runs
- close(1); // close stdout
- dup(_pipefd[1]); // points pipefd at file descriptor
- if (_redirectErrorStream)
- // redirect stderr
- dup2(_pipefd[1], 2);
- close(_pipefd[0]);
- execvp(argv[0], argv);
- exit(1);
- break;
- default: // this is the code the parent runs
- // the parent isn't going to write to the pipe
- close(_pipefd[1]);
- if (_batchDuration > 0ms) {
- while (true) {
- std::this_thread::sleep_for(_batchDuration);
- char buffer[4096];
- const auto numRead = read(_pipefd[0], buffer, sizeof(buffer));
- if (numRead <= 0)
- break;
- logger_->log_debug("Execute Command Respond %zd", numRead);
- auto flowFile = session->create();
- if (!flowFile)
- continue;
- flowFile->addAttribute("command", _command);
- flowFile->addAttribute("command.arguments", _commandArgument);
- session->writeBuffer(flowFile, gsl::make_span(buffer,
gsl::narrow<size_t>(numRead)));
- session->transfer(flowFile, Success);
- session->commit();
- }
- } else {
- char buffer[4096];
- char *bufPtr = buffer;
- size_t totalRead = 0;
- std::shared_ptr<core::FlowFile> flowFile = nullptr;
- while (true) {
- const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) -
totalRead));
- if (numRead <= 0) {
- if (totalRead > 0) {
- logger_->log_debug("Execute Command Respond %zu", totalRead);
- // child exits and close the pipe
- const auto buffer_span = gsl::make_span(buffer, totalRead);
- if (!flowFile) {
- flowFile = session->create();
- if (!flowFile)
- break;
- flowFile->addAttribute("command", _command);
- flowFile->addAttribute("command.arguments",
_commandArgument);
- session->writeBuffer(flowFile, buffer_span);
- } else {
- session->appendBuffer(flowFile, buffer_span);
- }
- session->transfer(flowFile, Success);
- }
- break;
- } else {
- if (numRead == static_cast<ssize_t>((sizeof(buffer) -
totalRead))) {
- // we reach the max buffer size
- logger_->log_debug("Execute Command Max Respond %zu",
sizeof(buffer));
- if (!flowFile) {
- flowFile = session->create();
- if (!flowFile)
- continue;
- flowFile->addAttribute("command", _command);
- flowFile->addAttribute("command.arguments",
_commandArgument);
- session->writeBuffer(flowFile, buffer);
- } else {
- session->appendBuffer(flowFile, buffer);
- }
- // Rewind
- totalRead = 0;
- bufPtr = buffer;
- } else {
- totalRead += numRead;
- bufPtr += numRead;
- }
- }
- }
- }
-
- wait(&status);
- if (WIFEXITED(status)) {
- logger_->log_info("Execute Command Complete %s status %d pid %d",
_fullCommand, WEXITSTATUS(status), _pid);
- } else {
- logger_->log_info("Execute Command Complete %s status %d pid %d",
_fullCommand, WTERMSIG(status), _pid);
- }
-
- close(_pipefd[0]);
- _processRunning = false;
- break;
+ flow_file->addAttribute("command", command_);
+ flow_file->addAttribute("command.arguments", command_argument_);
+ session.writeBuffer(flow_file, buffer);
+ } else {
+ session.appendBuffer(flow_file, buffer);
+ }
+ return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+ char buffer[4096];
+ char *buf_ptr = buffer;
+ size_t read_to_buffer = 0;
+ std::shared_ptr<core::FlowFile> flow_file;
+ auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - read_to_buffer));
+ while (num_read > 0) {
+ if (num_read == static_cast<ssize_t>((sizeof(buffer) - read_to_buffer))) {
Review Comment:
Good point, updated in 4ef7c6d01164c907defe70995b632e085911e83f
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]