lordgamez commented on code in PR #1414:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1414#discussion_r1011972454


##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, 
core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = 
context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", 
batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = 
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && 
in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] 
!= '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block 
|| non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || 
full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);

Review Comment:
   Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853 that it should only 
return 1 in error cases



##########
extensions/standard-processors/processors/ExecuteProcess.cpp:
##########
@@ -74,172 +68,226 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, 
core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = 
context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", 
batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = 
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
+  full_command_ = command_ + " " + command_argument_;
+}
+
+bool ExecuteProcess::changeWorkdir() const {
+  if (working_dir_.length() > 0 && working_dir_ != ".") {
+    if (chdir(working_dir_.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s", working_dir_);
+      return false;
+    }
+  }
+  return true;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::string current_param;
+  bool in_escaped = false;
+  auto currentParamShouldBeAppended = [&](std::size_t i) {
+    bool current_char_is_escaped_apostrophe = full_command_[i] == '\"' && 
in_escaped && i > 0 && full_command_[i - 1] == '\\';
+    bool whitespace_in_escaped_block = full_command_[i] == ' ' && in_escaped;
+    bool non_special_character = full_command_[i] != '\\' && full_command_[i] 
!= '\"' && full_command_[i] != ' ';
+    return current_char_is_escaped_apostrophe || whitespace_in_escaped_block 
|| non_special_character;
+  };
+
+  for (std::size_t i = 0; i < full_command_.size(); ++i) {
+    if (currentParamShouldBeAppended(i)) {
+      current_param += full_command_[i];
+    } else if (full_command_[i] == '\"' && (!in_escaped || i == 0 || 
full_command_[i - 1] != '\\')) {
+      in_escaped = !in_escaped;
+    } else if (full_command_[i] == ' ' && !in_escaped) {
+      if (!current_param.empty()) {
+        args.push_back(current_param);
+      }
+      current_param.clear();
+    }
+  }
+  if (!current_param.empty()) {
+    args.push_back(current_param);
+  }
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess(const std::vector<char*>& argv) {
+  const int STDOUT = 1;
+  const int STDERR = 2;
+  close(STDOUT);
+  const auto guard = gsl::finally([]() {
+    exit(1);
+  });
+  if (dup(pipefd_[1]) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
     return;
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed 
process to the output stream");
+    return;
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+  }
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      continue;
     }

Review Comment:
   Updated in b8a9a3297a4c6e56c02a1b5767f13596ddf7b853



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to