lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1011971961


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, 
core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = 
context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", 
batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = 
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && 
in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] 
!= '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block 
|| non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || 
full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);
+  });
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
     return;
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed 
process to the output stream");
+    return;
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+  }
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, 
gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, 
std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const 
{
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, 
gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - 
totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", 
_commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - 
totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", 
sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", 
_commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", 
_fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", 
_fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t total_read = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  while (num_read > 0) {
+    if (num_read == static_cast<ssize_t>((sizeof(buffer) - total_read))) {
+      // we reach the max buffer size
+      logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
+      if (!writeToFlowFile(session, flow_file, buffer)) {
+        continue;
+      }
+      // Rewind
+      total_read = 0;
+      buf_ptr = buffer;
+    } else {
+      total_read += num_read;
+      buf_ptr += num_read;
+    }
+    num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - total_read));
+  }
+
+  if (total_read > 0) {
+    logger_->log_debug("Execute Command Respond %zu", total_read);
+    // child exits and close the pipe
+    const auto buffer_span = gsl::make_span(buffer, total_read);
+    if (!writeToFlowFile(session, flow_file, buffer_span)) {
+      return;
     }
+    session.transfer(flow_file, Success);
+  }
+}
+
+void ExecuteProcess::collectChildProcessOutput(core::ProcessSession& session) {
+  // the parent isn't going to write to the pipe
+  close(pipefd_[1]);
+  if (batch_duration_ > 0ms) {
+    readOutputInBatches(session);
+  } else {
+    readOutput(session);
+  }
+
+  int status = 0;
+  wait(&status);
+  if (WIFEXITED(status)) {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", 
full_command_, WEXITSTATUS(status), pid_);
+  } else {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", 
full_command_, WTERMSIG(status), pid_);
+  }
+
+  close(pipefd_[0]);
+  pid_ = 0;
+}
+
+void ExecuteProcess::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session) {
+  gsl_Expects(context && session);
+  if (full_command_.length() == 0) {
+    yield();
+    return;
+  }
+  if (!changeWorkdir()) {
+    yield();
+    return;
+  }
+  logger_->log_info("Execute Command %s", full_command_);
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);
+  if (process_running_) {

Review Comment:
   It shouldn't be needed, removed in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, 
core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = 
context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", 
batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = 
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && 
in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] 
!= '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block 
|| non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || 
full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;

Review Comment:
   Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to