This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 9627f2ebf9699c0eae6710a15fc713ebcf0d71e4
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Nov 9 13:29:58 2022 +0100

    MINIFICPP-1927 Fix ExecuteProcess arg escaping
    
    Arguments were split at whitespaces with no way of escaping, despite the
    property description stating, that "White space can be escaped by
    enclosing it in double-quotes". This fixes quote escaping for arguments.
    
    Additionally, ExecuteProcess was refactored, and tests were rewritten.
    
    Closes #1414
    Signed-off-by: Marton Szasz <[email protected]>
---
 PROCESSORS.md                                      |  16 +-
 .../processors/ExecuteProcess.cpp                  | 379 +++++++++++----------
 .../processors/ExecuteProcess.h                    |  82 ++---
 .../standard-processors/tests/CMakeLists.txt       |  10 +-
 .../tests/integration/TestExecuteProcess.cpp       | 104 ------
 .../tests/resource_apps/EchoParameters.cpp         |  34 ++
 .../tests/unit/ExecuteProcessTests.cpp             | 175 ++++++++++
 7 files changed, 461 insertions(+), 339 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index d052e5c6c..3144e8390 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -551,18 +551,18 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 
 ### Description
 
-Runs an operating system command specified by the user and writes the output 
of that command to a FlowFile. If the command is expected to be 
long-running,the Processor can output the partial data on a specified interval. 
When this option is used, the output is expected to be in textual format,as it 
typically does not make sense to split binary data on arbitrary time-based 
intervals.
+Runs an operating system command specified by the user and writes the output 
of that command to a FlowFile. If the command is expected to be 
long-running,the Processor can output the partial data on a specified interval. 
When this option is used, the output is expected to be in textual format,as it 
typically does not make sense to split binary data on arbitrary time-based 
intervals. This processor is not available on Windows systems.
 ### Properties
 
 In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
 
-| Name                  | Default Value | Allowable Values | Description       
                                                                                
                                                                          |
-|-----------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| Batch Duration        | 0 sec         |                  | If the process is 
expected to be long-running and produce textual output, a batch duration can be 
specified.                                                                |
-| Command               |               |                  | Specifies the 
command to be executed; if just the name of an executable is provided, it must 
be in the user's environment PATH.<br/>**Supports Expression Language: true**  |
-| Command Arguments     |               |                  | The arguments to 
supply to the executable delimited by white space. White space can be escaped 
by enclosing it in double-quotes.<br/>**Supports Expression Language: true** |
-| Redirect Error Stream | false         |                  | If true will 
redirect any error stream output of the process to the output stream.           
                                                                               |
-| Working Directory     |               |                  | The directory to 
use as the current working directory when executing the command<br/>**Supports 
Expression Language: true**                                                 |
+| Name                  | Default Value | Allowable Values | Description       
                                                                                
                               |
+|-----------------------|---------------|------------------|----------------------------------------------------------------------------------------------------------------------------------|
+| Batch Duration        | 0 sec         |                  | If the process is 
expected to be long-running and produce textual output, a batch duration can be 
specified.                     |
+| Command               |               |                  | Specifies the 
command to be executed; if just the name of an executable is provided, it must 
be in the user's environment PATH.  |
+| Command Arguments     |               |                  | The arguments to 
supply to the executable delimited by white space. White space can be escaped 
by enclosing it in double-quotes. |
+| Redirect Error Stream | false         |                  | If true will 
redirect any error stream output of the process to the output stream.           
                                    |
+| Working Directory     |               |                  | The directory to 
use as the current working directory when executing the command                 
                                |
 ### Relationships
 
 | Name    | Description                                            |
diff --git a/extensions/standard-processors/processors/ExecuteProcess.cpp 
b/extensions/standard-processors/processors/ExecuteProcess.cpp
index 7f43e1e0b..dfa32fedc 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.cpp
+++ b/extensions/standard-processors/processors/ExecuteProcess.cpp
@@ -17,10 +17,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#ifndef WIN32
 #include "ExecuteProcess.h"
 #include <cstring>
 #include <memory>
 #include <string>
+#include <iomanip>
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/PropertyBuilder.h"
@@ -29,43 +31,37 @@
 #include "utils/TimeUtil.h"
 #include "core/TypedValues.h"
 #include "utils/gsl.h"
-
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#pragma clang diagnostic ignored "-Wunused-result"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#pragma GCC diagnostic ignored "-Wunused-result"
-#endif
+#include "utils/Environment.h"
 
 using namespace std::literals::chrono_literals;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-#ifndef WIN32
+namespace org::apache::nifi::minifi::processors {
 core::Property ExecuteProcess::Command(
-    
core::PropertyBuilder::createProperty("Command")->withDescription("Specifies 
the command to be executed; if just the name of an executable"
-                                                                      " is 
provided, it must be in the user's environment 
PATH.")->supportsExpressionLanguage(true)->withDefaultValue("")->build());
+    core::PropertyBuilder::createProperty("Command")
+      ->withDescription("Specifies the command to be executed; if just the 
name of an executable is provided, it must be in the user's environment PATH.")
+      ->build());
+
 core::Property ExecuteProcess::CommandArguments(
-    core::PropertyBuilder::createProperty("Command 
Arguments")->withDescription("The arguments to supply to the executable 
delimited by white space. White "
-                                                                               
 "space can be escaped by enclosing it in "
-                                                                               
 
"double-quotes.")->supportsExpressionLanguage(true)->withDefaultValue("")->build());
+    core::PropertyBuilder::createProperty("Command Arguments")
+      ->withDescription("The arguments to supply to the executable delimited 
by white space. White space can be escaped by enclosing it in double-quotes.")
+      ->build());
+
 core::Property ExecuteProcess::WorkingDir(
-    core::PropertyBuilder::createProperty("Working 
Directory")->withDescription("The directory to use as the current working 
directory when executing the command")->supportsExpressionLanguage(true)
-        ->withDefaultValue("")->build());
+    core::PropertyBuilder::createProperty("Working Directory")
+      ->withDescription("The directory to use as the current working directory 
when executing the command")
+      ->build());
 
 core::Property ExecuteProcess::BatchDuration(
-    core::PropertyBuilder::createProperty("Batch 
Duration")->withDescription("If the process is expected to be long-running and 
produce textual output, a "
-                                                                             
"batch duration can be specified.")->withDefaultValue<core::TimePeriodValue>("0 
sec")->build());
+    core::PropertyBuilder::createProperty("Batch Duration")
+      ->withDescription("If the process is expected to be long-running and 
produce textual output, a batch duration can be specified.")
+      ->withDefaultValue<core::TimePeriodValue>("0 sec")
+      ->build());
 
 core::Property ExecuteProcess::RedirectErrorStream(
-    core::PropertyBuilder::createProperty("Redirect Error 
Stream")->withDescription("If true will redirect any error stream output of the 
process to the output stream.")->withDefaultValue<bool>(false)
-        ->build());
+    core::PropertyBuilder::createProperty("Redirect Error Stream")
+      ->withDescription("If true will redirect any error stream output of the 
process to the output stream.")
+      ->withDefaultValue<bool>(false)
+      ->build());
 
 core::Relationship ExecuteProcess::Success("success", "All created FlowFiles 
are routed to this relationship.");
 
@@ -74,172 +70,201 @@ void ExecuteProcess::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ExecuteProcess::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session) {
+void ExecuteProcess::onSchedule(core::ProcessContext* context, 
core::ProcessSessionFactory* /*session_factory*/) {
+  gsl_Expects(context);
   std::string value;
-  std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command, value, flow_file)) {
-    this->_command = value;
+  if (context->getProperty(Command.getName(), value)) {
+    command_ = value;
   }
-  if (context->getProperty(CommandArguments, value, flow_file)) {
-    this->_commandArgument = value;
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    command_argument_ = value;
   }
-  if (context->getProperty(WorkingDir, value, flow_file)) {
-    this->_workingDir = value;
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    working_dir_ = value;
   }
   if (auto batch_duration = 
context->getProperty<core::TimePeriodValue>(BatchDuration)) {
-    _batchDuration = batch_duration->getMilliseconds();
-    logger_->log_debug("Setting _batchDuration");
+    batch_duration_ = batch_duration->getMilliseconds();
+    logger_->log_debug("Setting batch duration to %d milliseconds", 
batch_duration_.count());
   }
   if (context->getProperty(RedirectErrorStream.getName(), value)) {
-    _redirectErrorStream =  
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
+    redirect_error_stream_ = 
org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false);
   }
-  this->_fullCommand = _command + " " + _commandArgument;
-  if (_fullCommand.length() == 0) {
-    yield();
-    return;
+  full_command_ = command_ + " " + command_argument_;
+}
+
+std::vector<std::string> ExecuteProcess::readArgs() const {
+  std::vector<std::string> args;
+  std::stringstream input_stream{full_command_};
+  while (input_stream) {
+    std::string word;
+    input_stream >> std::quoted(word);
+    if (!word.empty()) {
+      args.push_back(word);
+    }
   }
-  if (_workingDir.length() > 0 && _workingDir != ".") {
-    // change to working directory
-    if (chdir(_workingDir.c_str()) != 0) {
-      logger_->log_error("Execute Command can not chdir %s", _workingDir);
-      yield();
-      return;
+
+  return args;
+}
+
+void ExecuteProcess::executeProcessForkFailed() {
+  logger_->log_error("Execute Process fork failed");
+  close(pipefd_[0]);
+  close(pipefd_[1]);
+  yield();
+}
+
+void ExecuteProcess::executeChildProcess() {
+  std::vector<char*> argv;
+  auto args = readArgs();
+  argv.reserve(args.size() + 1);
+  for (auto& arg : args) {
+    argv.push_back(arg.data());
+  }
+  argv.push_back(nullptr);
+
+  static constexpr int STDOUT = 1;
+  static constexpr int STDERR = 2;
+  if (dup2(pipefd_[1], STDOUT) < 0) {  // points pipefd at file descriptor
+    logger_->log_error("Failed to point pipe at file descriptor");
+    exit(1);
+  }
+  if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) {
+    logger_->log_error("Failed to redirect error stream of the executed 
process to the output stream");
+    exit(1);
+  }
+  close(pipefd_[0]);
+  if (execvp(argv[0], argv.data()) < 0) {
+    logger_->log_error("Failed to execute child process");
+    exit(1);
+  }
+  exit(0);
+}
+
+void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) {
+  while (true) {
+    std::this_thread::sleep_for(batch_duration_);
+    char buffer[4096];
+    const auto num_read = read(pipefd_[0], buffer, sizeof(buffer));
+    if (num_read <= 0) {
+      break;
+    }
+    logger_->log_debug("Execute Command Respond %zd", num_read);
+    auto flow_file = session.create();
+    if (!flow_file) {
+      logger_->log_error("Flow file could not be created!");
+      continue;
     }
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, gsl::make_span(buffer, 
gsl::narrow<size_t>(num_read)));
+    session.transfer(flow_file, Success);
+    session.commit();
   }
-  logger_->log_info("Execute Command %s", _fullCommand);
-  // split the command into array
-  char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " ");
-  int argc = 0;
-  char *argv[64];
-  while (p != 0 && argc < 64) {
-    argv[argc] = p;
-    p = std::strtok(NULL, " ");
-    argc++;
-  }
-  argv[argc] = NULL;
-  int status;
-  if (!_processRunning) {
-    _processRunning = true;
-    // if the process has not launched yet
-    // create the pipe
-    if (pipe(_pipefd) == -1) {
-      _processRunning = false;
-      yield();
-      return;
+}
+
+bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, 
std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const 
{
+  if (!flow_file) {
+    flow_file = session.create();
+    if (!flow_file) {
+      logger_->log_error("Flow file could not be created!");
+      return false;
     }
-    switch (_pid = fork()) {
-      case -1:
-        logger_->log_error("Execute Process fork failed");
-        _processRunning = false;
-        close(_pipefd[0]);
-        close(_pipefd[1]);
-        yield();
-        break;
-      case 0:  // this is the code the child runs
-        close(1);      // close stdout
-        dup(_pipefd[1]);  // points pipefd at file descriptor
-        if (_redirectErrorStream)
-          // redirect stderr
-          dup2(_pipefd[1], 2);
-        close(_pipefd[0]);
-        execvp(argv[0], argv);
-        exit(1);
-        break;
-      default:  // this is the code the parent runs
-        // the parent isn't going to write to the pipe
-        close(_pipefd[1]);
-        if (_batchDuration > 0ms) {
-          while (true) {
-            std::this_thread::sleep_for(_batchDuration);
-            char buffer[4096];
-            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
-            if (numRead <= 0)
-              break;
-            logger_->log_debug("Execute Command Respond %zd", numRead);
-            auto flowFile = session->create();
-            if (!flowFile)
-              continue;
-            flowFile->addAttribute("command", _command);
-            flowFile->addAttribute("command.arguments", _commandArgument);
-            session->writeBuffer(flowFile, gsl::make_span(buffer, 
gsl::narrow<size_t>(numRead)));
-            session->transfer(flowFile, Success);
-            session->commit();
-          }
-        } else {
-          char buffer[4096];
-          char *bufPtr = buffer;
-          size_t totalRead = 0;
-          std::shared_ptr<core::FlowFile> flowFile = nullptr;
-          while (true) {
-            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - 
totalRead));
-            if (numRead <= 0) {
-              if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %zu", totalRead);
-                // child exits and close the pipe
-                const auto buffer_span = gsl::make_span(buffer, totalRead);
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    break;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", 
_commandArgument);
-                  session->writeBuffer(flowFile, buffer_span);
-                } else {
-                  session->appendBuffer(flowFile, buffer_span);
-                }
-                session->transfer(flowFile, Success);
-              }
-              break;
-            } else {
-              if (numRead == static_cast<ssize_t>((sizeof(buffer) - 
totalRead))) {
-                // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %zu", 
sizeof(buffer));
-                if (!flowFile) {
-                  flowFile = session->create();
-                  if (!flowFile)
-                    continue;
-                  flowFile->addAttribute("command", _command);
-                  flowFile->addAttribute("command.arguments", 
_commandArgument);
-                  session->writeBuffer(flowFile, buffer);
-                } else {
-                  session->appendBuffer(flowFile, buffer);
-                }
-                // Rewind
-                totalRead = 0;
-                bufPtr = buffer;
-              } else {
-                totalRead += numRead;
-                bufPtr += numRead;
-              }
-            }
-          }
-        }
-
-        wait(&status);
-        if (WIFEXITED(status)) {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", 
_fullCommand, WEXITSTATUS(status), _pid);
-        } else {
-          logger_->log_info("Execute Command Complete %s status %d pid %d", 
_fullCommand, WTERMSIG(status), _pid);
-        }
-
-        close(_pipefd[0]);
-        _processRunning = false;
-        break;
+    flow_file->addAttribute("command", command_);
+    flow_file->addAttribute("command.arguments", command_argument_);
+    session.writeBuffer(flow_file, buffer);
+  } else {
+    session.appendBuffer(flow_file, buffer);
+  }
+  return true;
+}
+
+void ExecuteProcess::readOutput(core::ProcessSession& session) {
+  char buffer[4096];
+  char *buf_ptr = buffer;
+  size_t read_to_buffer = 0;
+  std::shared_ptr<core::FlowFile> flow_file;
+  auto num_read = read(pipefd_[0], buf_ptr, sizeof(buffer));
+  while (num_read > 0) {
+    if (num_read == static_cast<ssize_t>(sizeof(buffer) - read_to_buffer)) {
+      // we reach the max buffer size
+      logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
+      if (!writeToFlowFile(session, flow_file, buffer)) {
+        continue;
+      }
+      // Rewind
+      read_to_buffer = 0;
+      buf_ptr = buffer;
+    } else {
+      read_to_buffer += num_read;
+      buf_ptr += num_read;
+    }
+    num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - read_to_buffer));
+  }
+
+  if (read_to_buffer > 0) {
+    logger_->log_debug("Execute Command Respond %zu", read_to_buffer);
+    // child exits and close the pipe
+    const auto buffer_span = gsl::make_span(buffer, read_to_buffer);
+    if (!writeToFlowFile(session, flow_file, buffer_span)) {
+      return;
     }
   }
+  if (flow_file) {
+    session.transfer(flow_file, Success);
+  }
+}
+
+void ExecuteProcess::collectChildProcessOutput(core::ProcessSession& session) {
+  // the parent isn't going to write to the pipe
+  close(pipefd_[1]);
+  if (batch_duration_ > 0ms) {
+    readOutputInBatches(session);
+  } else {
+    readOutput(session);
+  }
+
+  int status = 0;
+  wait(&status);
+  if (WIFEXITED(status)) {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", 
full_command_, WEXITSTATUS(status), pid_);
+  } else {
+    logger_->log_info("Execute Command Complete %s status %d pid %d", 
full_command_, WTERMSIG(status), pid_);
+  }
+
+  close(pipefd_[0]);
+  pid_ = 0;
+}
+
+void ExecuteProcess::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session) {
+  gsl_Expects(context && session);
+  if (full_command_.length() == 0) {
+    yield();
+    return;
+  }
+  if (!utils::Environment::setCurrentWorkingDirectory(working_dir_.c_str())) {
+    yield();
+    return;
+  }
+  logger_->log_info("Execute Command %s", full_command_);
+
+  if (pipe(pipefd_) == -1) {
+    yield();
+    return;
+  }
+  switch (pid_ = fork()) {
+    case -1:
+      executeProcessForkFailed();
+      break;
+    case 0:  // this is the code the child runs
+      executeChildProcess();
+      break;
+    default:  // this is the code the parent runs
+      collectChildProcessOutput(*session);
+      break;
+  }
 }
 
 REGISTER_RESOURCE(ExecuteProcess, Processor);
 
-#endif
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#if defined(__clang__)
-#pragma clang diagnostic pop
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic pop
+}  // namespace org::apache::nifi::minifi::processors
 #endif
diff --git a/extensions/standard-processors/processors/ExecuteProcess.h 
b/extensions/standard-processors/processors/ExecuteProcess.h
index 5556c3181..f61681b8b 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.h
+++ b/extensions/standard-processors/processors/ExecuteProcess.h
@@ -17,24 +17,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_EXECUTEPROCESS_H_
-#define EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_EXECUTEPROCESS_H_
+#ifndef WIN32
+#pragma once
 
 #include <errno.h>
 #include <signal.h>
 #include <stdio.h>
 #include <sys/types.h>
+#include <sys/wait.h>
 
 #include <chrono>
 #include <iostream>
 #include <memory>
 #include <string>
 #include <thread>
+#include <vector>
 
-#ifndef WIN32
-#include <sys/wait.h>
-
-#endif
 #include "core/Core.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "core/Processor.h"
@@ -42,31 +40,26 @@
 #include "FlowFileRecord.h"
 #include "utils/gsl.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-#ifndef WIN32
+namespace org::apache::nifi::minifi::processors {
 
-// ExecuteProcess Class
 class ExecuteProcess : public core::Processor {
  public:
-  ExecuteProcess(const std::string& name, const utils::Identifier& uuid = {}) 
// NOLINT
-      : Processor(name, uuid) {
-    _redirectErrorStream = false;
-    _workingDir = ".";
-    _processRunning = false;
-    _pid = 0;
+  explicit ExecuteProcess(const std::string& name, const utils::Identifier& 
uuid = {})
+      : Processor(name, uuid),
+        working_dir_("."),
+        redirect_error_stream_(false),
+        pid_(0) {
   }
   ~ExecuteProcess() override {
-    if (_processRunning && _pid > 0)
-      kill(_pid, SIGTERM);
+    if (pid_ > 0) {
+      kill(pid_, SIGTERM);
+    }
   }
 
   EXTENSIONAPI static constexpr const char* Description = "Runs an operating 
system command specified by the user and writes the output of that command to a 
FlowFile. "
       "If the command is expected to be long-running, the Processor can output 
the partial data on a specified interval. "
-      "When this option is used, the output is expected to be in textual 
format, as it typically does not make sense to split binary data on arbitrary 
time-based intervals.";
+      "When this option is used, the output is expected to be in textual 
format, as it typically does not make sense to split binary data on arbitrary 
time-based intervals. "
+      "This processor is not available on Windows systems.";
 
   EXTENSIONAPI static core::Property Command;
   EXTENSIONAPI static core::Property CommandArguments;
@@ -88,38 +81,33 @@ class ExecuteProcess : public core::Processor {
 
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
-  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_ALLOWED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_FORBIDDEN;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
- public:
-  // OnTrigger method, implemented by NiFi ExecuteProcess
   void onTrigger(core::ProcessContext *context, core::ProcessSession *session) 
override;
-  // Initialize, over write by NiFi ExecuteProcess
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory 
*session_factory) override;
   void initialize() override;
 
  private:
-  // Logger
+  std::vector<std::string> readArgs() const;
+  void executeProcessForkFailed();
+  void executeChildProcess();
+  void collectChildProcessOutput(core::ProcessSession& session);
+  void readOutputInBatches(core::ProcessSession& session);
+  void readOutput(core::ProcessSession& session);
+  bool writeToFlowFile(core::ProcessSession& session, 
std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const;
+
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<ExecuteProcess>::getLogger();
-  // Property
-  std::string _command;
-  std::string _commandArgument;
-  std::string _workingDir;
-  std::chrono::milliseconds _batchDuration  = std::chrono::milliseconds(0);
-  bool _redirectErrorStream;
-  // Full command
-  std::string _fullCommand;
-  // whether the process is running
-  bool _processRunning;
-  int _pipefd[2];
-  pid_t _pid;
+  std::string command_;
+  std::string command_argument_;
+  std::string working_dir_;
+  std::chrono::milliseconds batch_duration_  = std::chrono::milliseconds(0);
+  bool redirect_error_stream_;
+  std::string full_command_;
+  int pipefd_[2]{};
+  pid_t pid_;
 };
 
+}  // namespace org::apache::nifi::minifi::processors
 #endif
-}  // namespace processors
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_EXECUTEPROCESS_H_
diff --git a/extensions/standard-processors/tests/CMakeLists.txt 
b/extensions/standard-processors/tests/CMakeLists.txt
index db9acd9b1..01e162de2 100644
--- a/extensions/standard-processors/tests/CMakeLists.txt
+++ b/extensions/standard-processors/tests/CMakeLists.txt
@@ -20,6 +20,7 @@
 
 file(GLOB PROCESSOR_UNIT_TESTS  "unit/*.cpp")
 file(GLOB PROCESSOR_INTEGRATION_TESTS "integration/*.cpp")
+file(GLOB RESOURCE_APPS "resource_apps/*.cpp")
 if(OPENSSL_OFF)
     list(REMOVE_ITEM PROCESSOR_INTEGRATION_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/integration/SecureSocketGetTCPTest.cpp")
     list(REMOVE_ITEM PROCESSOR_INTEGRATION_TESTS 
"${CMAKE_CURRENT_SOURCE_DIR}/integration/TLSServerSocketSupportedProtocolsTest.cpp")
@@ -85,9 +86,6 @@ FOREACH(testfile ${PROCESSOR_INTEGRATION_TESTS})
 ENDFOREACH()
 message("-- Finished building ${INT_TEST_COUNT} integration test file(s)...")
 
-
-add_test(NAME TestExecuteProcess COMMAND TestExecuteProcess )
-
 if(NOT OPENSSL_OFF)
     add_test(NAME SecureSocketGetTCPTest COMMAND SecureSocketGetTCPTest 
"${TEST_RESOURCES}/TestGetTCPSecure.yml"  "${TEST_RESOURCES}/")
     add_test(NAME SecureSocketGetTCPTestEmptyPass COMMAND 
SecureSocketGetTCPTest "${TEST_RESOURCES}/TestGetTCPSecureEmptyPass.yml"  
"${TEST_RESOURCES}/")
@@ -101,3 +99,9 @@ endif()
 add_test(NAME TailFileTest COMMAND TailFileTest 
"${TEST_RESOURCES}/TestTailFile.yml"  "${TEST_RESOURCES}/")
 
 add_test(NAME TailFileCronTest COMMAND TailFileTest 
"${TEST_RESOURCES}/TestTailFileCron.yml"  "${TEST_RESOURCES}/")
+
+FOREACH(resourcefile ${RESOURCE_APPS})
+    get_filename_component(resourcefilename "${resourcefile}" NAME_WE)
+    add_executable("${resourcefilename}" "${resourcefile}")
+    set_target_properties(${resourcefilename} PROPERTIES 
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin")
+ENDFOREACH()
diff --git 
a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp 
b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
deleted file mode 100644
index 9d6ae3da1..000000000
--- a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#include <type_traits> //NOLINT
-#include <sys/stat.h> //NOLINT
-#include <chrono> //NOLINT
-#include <thread> //NOLINT
-#undef NDEBUG
-#include <cassert>
-#include <string>
-#include <utility>
-#include <memory>
-#include <vector>
-#include <fstream>
-#include "core/repository/VolatileContentRepository.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "FlowController.h"
-#include "processors/GetFile.h"
-#include "core/Core.h"
-#include "core/FlowFile.h"
-#include "core/Processor.h"
-#include "core/controller/ControllerServiceNode.h"
-#include "core/controller/ControllerServiceProvider.h"
-#include "processors/ExecuteProcess.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessorNode.h"
-#include "TestBase.h"
-#include "Catch.h"
-
-int main(int /*argc*/, char ** /*argv*/) {
-#ifndef WIN32
-  TestController testController;
-  std::shared_ptr<core::Processor> processor = 
std::make_shared<org::apache::nifi::minifi::processors::ExecuteProcess>("executeProcess");
-  processor->setMaxConcurrentTasks(1);
-
-  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestThreadedRepository>();
-  std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-  std::shared_ptr<minifi::FlowController> controller = 
std::make_shared<TestFlowController>(test_repo, test_repo, content_repo);
-
-  utils::Identifier processoruuid = processor->getUUID();
-  assert(processoruuid);
-  auto connection = std::make_unique<minifi::Connection>(test_repo, 
content_repo, "executeProcessConnection");
-  connection->addRelationship(core::Relationship("success", "description"));
-
-  // link the connections so that we can test results at the end for this
-  connection->setSource(processor.get());
-  connection->setDestination(processor.get());
-
-  connection->setSourceUUID(processoruuid);
-  connection->setDestinationUUID(processoruuid);
-
-  processor->addConnection(connection.get());
-  assert(processor->getName() == "executeProcess");
-
-  std::shared_ptr<core::FlowFile> record;
-  processor->setScheduledState(core::ScheduledState::RUNNING);
-
-  processor->initialize();
-
-  std::atomic<bool> is_ready(false);
-
-  std::vector<std::thread> processor_workers;
-
-  auto node2 = std::make_shared<core::ProcessorNode>(processor.get());
-  auto contextset = std::make_shared<core::ProcessContext>(node2, nullptr, 
test_repo, test_repo);
-  core::ProcessSessionFactory factory(contextset);
-  processor->onSchedule(contextset.get(), &factory);
-
-  processor_workers.push_back(std::thread([processor, test_repo, &is_ready]() {
-    auto node = std::make_shared<core::ProcessorNode>(processor.get());
-    auto context = std::make_shared<core::ProcessContext>(node, nullptr, 
test_repo, test_repo);
-    
context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command,
 "sleep 0.5");
-    auto session = std::make_shared<core::ProcessSession>(context);
-    while (!is_ready.load(std::memory_order_relaxed)) {
-    }
-    processor->onTrigger(context.get(), session.get());
-  }));
-
-  is_ready.store(true, std::memory_order_relaxed);
-
-  std::for_each(processor_workers.begin(), processor_workers.end(), 
[](std::thread &t) {
-    t.join();
-  });
-
-  auto execp = 
std::static_pointer_cast<org::apache::nifi::minifi::processors::ExecuteProcess>(processor);
-#endif
-}
diff --git 
a/extensions/standard-processors/tests/resource_apps/EchoParameters.cpp 
b/extensions/standard-processors/tests/resource_apps/EchoParameters.cpp
new file mode 100644
index 000000000..86468e32e
--- /dev/null
+++ b/extensions/standard-processors/tests/resource_apps/EchoParameters.cpp
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <iostream>
+#include <chrono>
+#include <thread>
+
+int main(int argc, char** argv) {
+  if (argc < 3) {
+    std::cerr << "Usage: ./EchoParameters <delay between parameters 
milliseconds> <text to write>" << std::endl;
+    return 1;
+  }
+
+  std::cout << argv[2] << std::endl;
+  for (int i = 3; i < argc; ++i) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(std::stoi(argv[1])));
+    std::cout << argv[i] << std::endl;
+  }
+  return 0;
+}
diff --git a/extensions/standard-processors/tests/unit/ExecuteProcessTests.cpp 
b/extensions/standard-processors/tests/unit/ExecuteProcessTests.cpp
new file mode 100644
index 000000000..a40a47f66
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/ExecuteProcessTests.cpp
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ExecuteProcess.h"
+#include "SingleProcessorTestController.h"
+#include "utils/file/FileUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+#ifndef WIN32
+
+class ExecuteProcessTestsFixture {
+ public:
+  ExecuteProcessTestsFixture()
+      : 
execute_process_(std::make_shared<processors::ExecuteProcess>("ExecuteProcess")),
+        controller_(execute_process_) {
+    LogTestController::getInstance().setTrace<processors::ExecuteProcess>();
+  }
+ protected:
+  std::shared_ptr<processors::ExecuteProcess> execute_process_;
+  test::SingleProcessorTestController controller_;
+};
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can run a single 
command", "[ExecuteProcess]") {
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, 
"echo -n test"));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.size() == 1);
+  CHECK(controller_.plan->getContent(success_flow_files[0]) == "test");
+  CHECK(success_flow_files[0]->getAttribute("command") == "echo -n test");
+  CHECK(success_flow_files[0]->getAttribute("command.arguments") == "");
+}
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can run an 
executable with a parameter", "[ExecuteProcess]") {
+  auto command = 
minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), 
"EchoParameters");
+  std::string arguments = "0 test_data";
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, 
command));
+  
REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments,
 arguments));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.size() == 1);
+  CHECK(controller_.plan->getContent(success_flow_files[0]) == "test_data\n");
+  CHECK(success_flow_files[0]->getAttribute("command") == command);
+  CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments);
+}
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can run an 
executable with escaped parameters", "[ExecuteProcess]") {
+  auto command = 
minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), 
"EchoParameters");
+  std::string arguments = R"(0 test_data test_data2 "test data 3" "\"test data 
4\")";
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, 
command));
+  
REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments,
 arguments));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.size() == 1);
+  CHECK(controller_.plan->getContent(success_flow_files[0]) == 
"test_data\ntest_data2\ntest data 3\n\"test data 4\"\n");
+  CHECK(success_flow_files[0]->getAttribute("command") == command);
+  CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments);
+}
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess does not produce 
a flowfile if no output is generated", "[ExecuteProcess]") {
+  auto command = 
minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), 
"EchoParameters");
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, 
command));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.empty());
+}
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can redirect 
error stream to stdout", "[ExecuteProcess]") {
+  auto command = 
minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), 
"EchoParameters");
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, 
command));
+  
REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::RedirectErrorStream,
 "true"));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.size() == 1);
+  CHECK(controller_.plan->getContent(success_flow_files[0]) == "Usage: 
./EchoParameters <delay between parameters milliseconds> <text to write>\n");
+  CHECK(success_flow_files[0]->getAttribute("command") == command);
+  CHECK(success_flow_files[0]->getAttribute("command.arguments") == "");
+}
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can change 
workdir", "[ExecuteProcess]") {
+  auto command = "./EchoParameters";
+  std::string arguments = "0 test_data";
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, 
command));
+  
REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments,
 arguments));
+  
REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::WorkingDir, 
minifi::utils::file::get_executable_dir()));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.size() == 1);
+  CHECK(controller_.plan->getContent(success_flow_files[0]) == "test_data\n");
+  CHECK(success_flow_files[0]->getAttribute("command") == command);
+  CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments);
+}
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can forward long 
running output in batches", "[ExecuteProcess]") {
+  auto command = 
minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), 
"EchoParameters");
+  std::string arguments = "100 test_data1 test_data2";
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, 
command));
+  
REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments,
 arguments));
+  
REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::BatchDuration,
 "10 ms"));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.size() == 2);
+  CHECK(controller_.plan->getContent(success_flow_files[0]) == "test_data1\n");
+  CHECK(success_flow_files[0]->getAttribute("command") == command);
+  CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments);
+  CHECK(controller_.plan->getContent(success_flow_files[1]) == "test_data2\n");
+  CHECK(success_flow_files[1]->getAttribute("command") == command);
+  CHECK(success_flow_files[1]->getAttribute("command.arguments") == arguments);
+}
+
+TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess buffer long 
outputs", "[ExecuteProcess]") {
+  auto command = 
minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), 
"EchoParameters");
+  REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, 
command));
+  std::string param1;
+
+  SECTION("Exact buffer size output") {
+    param1.assign(4095, 'a');  // buffer size is 4096, so 4095 'a' characters 
plus '\n' character should be exactly the buffer size
+  }
+  SECTION("Larger than buffer size output") {
+    param1.assign(8200, 'a');
+  }
+
+  std::string arguments = "0 " + param1;
+  
REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments,
 arguments));
+
+  controller_.plan->scheduleProcessor(execute_process_);
+  auto result = controller_.trigger();
+
+  auto success_flow_files = result.at(processors::ExecuteProcess::Success);
+  REQUIRE(success_flow_files.size() == 1);
+  CHECK(controller_.plan->getContent(success_flow_files[0]) == 
param1.append("\n"));
+  CHECK(success_flow_files[0]->getAttribute("command") == command);
+  CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments);
+}
+
+#endif
+}  // namespace org::apache::nifi::minifi::test


Reply via email to