This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 9627f2ebf9699c0eae6710a15fc713ebcf0d71e4 Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed Nov 9 13:29:58 2022 +0100 MINIFICPP-1927 Fix ExecuteProcess arg escaping Arguments were split at whitespaces with no way of escaping, despite the property description stating, that "White space can be escaped by enclosing it in double-quotes". This fixes quote escaping for arguments. Additionally, ExecuteProcess was refactored, and tests were rewritten. Closes #1414 Signed-off-by: Marton Szasz <[email protected]> --- PROCESSORS.md | 16 +- .../processors/ExecuteProcess.cpp | 379 +++++++++++---------- .../processors/ExecuteProcess.h | 82 ++--- .../standard-processors/tests/CMakeLists.txt | 10 +- .../tests/integration/TestExecuteProcess.cpp | 104 ------ .../tests/resource_apps/EchoParameters.cpp | 34 ++ .../tests/unit/ExecuteProcessTests.cpp | 175 ++++++++++ 7 files changed, 461 insertions(+), 339 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index d052e5c6c..3144e8390 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -551,18 +551,18 @@ In the list below, the names of required properties appear in bold. Any other pr ### Description -Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected to be long-running,the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual format,as it typically does not make sense to split binary data on arbitrary time-based intervals. +Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected to be long-running,the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual format,as it typically does not make sense to split binary data on arbitrary time-based intervals. This processor is not available on Windows systems. ### Properties In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description | -|-----------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| Batch Duration | 0 sec | | If the process is expected to be long-running and produce textual output, a batch duration can be specified. | -| Command | | | Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.<br/>**Supports Expression Language: true** | -| Command Arguments | | | The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.<br/>**Supports Expression Language: true** | -| Redirect Error Stream | false | | If true will redirect any error stream output of the process to the output stream. | -| Working Directory | | | The directory to use as the current working directory when executing the command<br/>**Supports Expression Language: true** | +| Name | Default Value | Allowable Values | Description | +|-----------------------|---------------|------------------|----------------------------------------------------------------------------------------------------------------------------------| +| Batch Duration | 0 sec | | If the process is expected to be long-running and produce textual output, a batch duration can be specified. | +| Command | | | Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH. | +| Command Arguments | | | The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes. | +| Redirect Error Stream | false | | If true will redirect any error stream output of the process to the output stream. | +| Working Directory | | | The directory to use as the current working directory when executing the command | ### Relationships | Name | Description | diff --git a/extensions/standard-processors/processors/ExecuteProcess.cpp b/extensions/standard-processors/processors/ExecuteProcess.cpp index 7f43e1e0b..dfa32fedc 100644 --- a/extensions/standard-processors/processors/ExecuteProcess.cpp +++ b/extensions/standard-processors/processors/ExecuteProcess.cpp @@ -17,10 +17,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#ifndef WIN32 #include "ExecuteProcess.h" #include <cstring> #include <memory> #include <string> +#include <iomanip> #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/PropertyBuilder.h" @@ -29,43 +31,37 @@ #include "utils/TimeUtil.h" #include "core/TypedValues.h" #include "utils/gsl.h" - -#if defined(__clang__) -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wsign-compare" -#pragma clang diagnostic ignored "-Wunused-result" -#elif defined(__GNUC__) || defined(__GNUG__) -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wsign-compare" -#pragma GCC diagnostic ignored "-Wunused-result" -#endif +#include "utils/Environment.h" using namespace std::literals::chrono_literals; -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { -#ifndef WIN32 +namespace org::apache::nifi::minifi::processors { core::Property ExecuteProcess::Command( - core::PropertyBuilder::createProperty("Command")->withDescription("Specifies the command to be executed; if just the name of an executable" - " is provided, it must be in the user's environment PATH.")->supportsExpressionLanguage(true)->withDefaultValue("")->build()); + core::PropertyBuilder::createProperty("Command") + ->withDescription("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.") + ->build()); + core::Property ExecuteProcess::CommandArguments( - core::PropertyBuilder::createProperty("Command Arguments")->withDescription("The arguments to supply to the executable delimited by white space. White " - "space can be escaped by enclosing it in " - "double-quotes.")->supportsExpressionLanguage(true)->withDefaultValue("")->build()); + core::PropertyBuilder::createProperty("Command Arguments") + ->withDescription("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.") + ->build()); + core::Property ExecuteProcess::WorkingDir( - core::PropertyBuilder::createProperty("Working Directory")->withDescription("The directory to use as the current working directory when executing the command")->supportsExpressionLanguage(true) - ->withDefaultValue("")->build()); + core::PropertyBuilder::createProperty("Working Directory") + ->withDescription("The directory to use as the current working directory when executing the command") + ->build()); core::Property ExecuteProcess::BatchDuration( - core::PropertyBuilder::createProperty("Batch Duration")->withDescription("If the process is expected to be long-running and produce textual output, a " - "batch duration can be specified.")->withDefaultValue<core::TimePeriodValue>("0 sec")->build()); + core::PropertyBuilder::createProperty("Batch Duration") + ->withDescription("If the process is expected to be long-running and produce textual output, a batch duration can be specified.") + ->withDefaultValue<core::TimePeriodValue>("0 sec") + ->build()); core::Property ExecuteProcess::RedirectErrorStream( - core::PropertyBuilder::createProperty("Redirect Error Stream")->withDescription("If true will redirect any error stream output of the process to the output stream.")->withDefaultValue<bool>(false) - ->build()); + core::PropertyBuilder::createProperty("Redirect Error Stream") + ->withDescription("If true will redirect any error stream output of the process to the output stream.") + ->withDefaultValue<bool>(false) + ->build()); core::Relationship ExecuteProcess::Success("success", "All created FlowFiles are routed to this relationship."); @@ -74,172 +70,201 @@ void ExecuteProcess::initialize() { setSupportedRelationships(relationships()); } -void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { +void ExecuteProcess::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*session_factory*/) { + gsl_Expects(context); std::string value; - std::shared_ptr<core::FlowFile> flow_file; - if (context->getProperty(Command, value, flow_file)) { - this->_command = value; + if (context->getProperty(Command.getName(), value)) { + command_ = value; } - if (context->getProperty(CommandArguments, value, flow_file)) { - this->_commandArgument = value; + if (context->getProperty(CommandArguments.getName(), value)) { + command_argument_ = value; } - if (context->getProperty(WorkingDir, value, flow_file)) { - this->_workingDir = value; + if (context->getProperty(WorkingDir.getName(), value)) { + working_dir_ = value; } if (auto batch_duration = context->getProperty<core::TimePeriodValue>(BatchDuration)) { - _batchDuration = batch_duration->getMilliseconds(); - logger_->log_debug("Setting _batchDuration"); + batch_duration_ = batch_duration->getMilliseconds(); + logger_->log_debug("Setting batch duration to %d milliseconds", batch_duration_.count()); } if (context->getProperty(RedirectErrorStream.getName(), value)) { - _redirectErrorStream = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false); + redirect_error_stream_ = org::apache::nifi::minifi::utils::StringUtils::toBool(value).value_or(false); } - this->_fullCommand = _command + " " + _commandArgument; - if (_fullCommand.length() == 0) { - yield(); - return; + full_command_ = command_ + " " + command_argument_; +} + +std::vector<std::string> ExecuteProcess::readArgs() const { + std::vector<std::string> args; + std::stringstream input_stream{full_command_}; + while (input_stream) { + std::string word; + input_stream >> std::quoted(word); + if (!word.empty()) { + args.push_back(word); + } } - if (_workingDir.length() > 0 && _workingDir != ".") { - // change to working directory - if (chdir(_workingDir.c_str()) != 0) { - logger_->log_error("Execute Command can not chdir %s", _workingDir); - yield(); - return; + + return args; +} + +void ExecuteProcess::executeProcessForkFailed() { + logger_->log_error("Execute Process fork failed"); + close(pipefd_[0]); + close(pipefd_[1]); + yield(); +} + +void ExecuteProcess::executeChildProcess() { + std::vector<char*> argv; + auto args = readArgs(); + argv.reserve(args.size() + 1); + for (auto& arg : args) { + argv.push_back(arg.data()); + } + argv.push_back(nullptr); + + static constexpr int STDOUT = 1; + static constexpr int STDERR = 2; + if (dup2(pipefd_[1], STDOUT) < 0) { // points pipefd at file descriptor + logger_->log_error("Failed to point pipe at file descriptor"); + exit(1); + } + if (redirect_error_stream_ && dup2(pipefd_[1], STDERR) < 0) { + logger_->log_error("Failed to redirect error stream of the executed process to the output stream"); + exit(1); + } + close(pipefd_[0]); + if (execvp(argv[0], argv.data()) < 0) { + logger_->log_error("Failed to execute child process"); + exit(1); + } + exit(0); +} + +void ExecuteProcess::readOutputInBatches(core::ProcessSession& session) { + while (true) { + std::this_thread::sleep_for(batch_duration_); + char buffer[4096]; + const auto num_read = read(pipefd_[0], buffer, sizeof(buffer)); + if (num_read <= 0) { + break; + } + logger_->log_debug("Execute Command Respond %zd", num_read); + auto flow_file = session.create(); + if (!flow_file) { + logger_->log_error("Flow file could not be created!"); + continue; } + flow_file->addAttribute("command", command_); + flow_file->addAttribute("command.arguments", command_argument_); + session.writeBuffer(flow_file, gsl::make_span(buffer, gsl::narrow<size_t>(num_read))); + session.transfer(flow_file, Success); + session.commit(); } - logger_->log_info("Execute Command %s", _fullCommand); - // split the command into array - char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " "); - int argc = 0; - char *argv[64]; - while (p != 0 && argc < 64) { - argv[argc] = p; - p = std::strtok(NULL, " "); - argc++; - } - argv[argc] = NULL; - int status; - if (!_processRunning) { - _processRunning = true; - // if the process has not launched yet - // create the pipe - if (pipe(_pipefd) == -1) { - _processRunning = false; - yield(); - return; +} + +bool ExecuteProcess::writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const { + if (!flow_file) { + flow_file = session.create(); + if (!flow_file) { + logger_->log_error("Flow file could not be created!"); + return false; } - switch (_pid = fork()) { - case -1: - logger_->log_error("Execute Process fork failed"); - _processRunning = false; - close(_pipefd[0]); - close(_pipefd[1]); - yield(); - break; - case 0: // this is the code the child runs - close(1); // close stdout - dup(_pipefd[1]); // points pipefd at file descriptor - if (_redirectErrorStream) - // redirect stderr - dup2(_pipefd[1], 2); - close(_pipefd[0]); - execvp(argv[0], argv); - exit(1); - break; - default: // this is the code the parent runs - // the parent isn't going to write to the pipe - close(_pipefd[1]); - if (_batchDuration > 0ms) { - while (true) { - std::this_thread::sleep_for(_batchDuration); - char buffer[4096]; - const auto numRead = read(_pipefd[0], buffer, sizeof(buffer)); - if (numRead <= 0) - break; - logger_->log_debug("Execute Command Respond %zd", numRead); - auto flowFile = session->create(); - if (!flowFile) - continue; - flowFile->addAttribute("command", _command); - flowFile->addAttribute("command.arguments", _commandArgument); - session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead))); - session->transfer(flowFile, Success); - session->commit(); - } - } else { - char buffer[4096]; - char *bufPtr = buffer; - size_t totalRead = 0; - std::shared_ptr<core::FlowFile> flowFile = nullptr; - while (true) { - const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead)); - if (numRead <= 0) { - if (totalRead > 0) { - logger_->log_debug("Execute Command Respond %zu", totalRead); - // child exits and close the pipe - const auto buffer_span = gsl::make_span(buffer, totalRead); - if (!flowFile) { - flowFile = session->create(); - if (!flowFile) - break; - flowFile->addAttribute("command", _command); - flowFile->addAttribute("command.arguments", _commandArgument); - session->writeBuffer(flowFile, buffer_span); - } else { - session->appendBuffer(flowFile, buffer_span); - } - session->transfer(flowFile, Success); - } - break; - } else { - if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) { - // we reach the max buffer size - logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer)); - if (!flowFile) { - flowFile = session->create(); - if (!flowFile) - continue; - flowFile->addAttribute("command", _command); - flowFile->addAttribute("command.arguments", _commandArgument); - session->writeBuffer(flowFile, buffer); - } else { - session->appendBuffer(flowFile, buffer); - } - // Rewind - totalRead = 0; - bufPtr = buffer; - } else { - totalRead += numRead; - bufPtr += numRead; - } - } - } - } - - wait(&status); - if (WIFEXITED(status)) { - logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WEXITSTATUS(status), _pid); - } else { - logger_->log_info("Execute Command Complete %s status %d pid %d", _fullCommand, WTERMSIG(status), _pid); - } - - close(_pipefd[0]); - _processRunning = false; - break; + flow_file->addAttribute("command", command_); + flow_file->addAttribute("command.arguments", command_argument_); + session.writeBuffer(flow_file, buffer); + } else { + session.appendBuffer(flow_file, buffer); + } + return true; +} + +void ExecuteProcess::readOutput(core::ProcessSession& session) { + char buffer[4096]; + char *buf_ptr = buffer; + size_t read_to_buffer = 0; + std::shared_ptr<core::FlowFile> flow_file; + auto num_read = read(pipefd_[0], buf_ptr, sizeof(buffer)); + while (num_read > 0) { + if (num_read == static_cast<ssize_t>(sizeof(buffer) - read_to_buffer)) { + // we reach the max buffer size + logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer)); + if (!writeToFlowFile(session, flow_file, buffer)) { + continue; + } + // Rewind + read_to_buffer = 0; + buf_ptr = buffer; + } else { + read_to_buffer += num_read; + buf_ptr += num_read; + } + num_read = read(pipefd_[0], buf_ptr, (sizeof(buffer) - read_to_buffer)); + } + + if (read_to_buffer > 0) { + logger_->log_debug("Execute Command Respond %zu", read_to_buffer); + // child exits and close the pipe + const auto buffer_span = gsl::make_span(buffer, read_to_buffer); + if (!writeToFlowFile(session, flow_file, buffer_span)) { + return; } } + if (flow_file) { + session.transfer(flow_file, Success); + } +} + +void ExecuteProcess::collectChildProcessOutput(core::ProcessSession& session) { + // the parent isn't going to write to the pipe + close(pipefd_[1]); + if (batch_duration_ > 0ms) { + readOutputInBatches(session); + } else { + readOutput(session); + } + + int status = 0; + wait(&status); + if (WIFEXITED(status)) { + logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WEXITSTATUS(status), pid_); + } else { + logger_->log_info("Execute Command Complete %s status %d pid %d", full_command_, WTERMSIG(status), pid_); + } + + close(pipefd_[0]); + pid_ = 0; +} + +void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + gsl_Expects(context && session); + if (full_command_.length() == 0) { + yield(); + return; + } + if (!utils::Environment::setCurrentWorkingDirectory(working_dir_.c_str())) { + yield(); + return; + } + logger_->log_info("Execute Command %s", full_command_); + + if (pipe(pipefd_) == -1) { + yield(); + return; + } + switch (pid_ = fork()) { + case -1: + executeProcessForkFailed(); + break; + case 0: // this is the code the child runs + executeChildProcess(); + break; + default: // this is the code the parent runs + collectChildProcessOutput(*session); + break; + } } REGISTER_RESOURCE(ExecuteProcess, Processor); -#endif -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#if defined(__clang__) -#pragma clang diagnostic pop -#elif defined(__GNUC__) || defined(__GNUG__) -#pragma GCC diagnostic pop +} // namespace org::apache::nifi::minifi::processors #endif diff --git a/extensions/standard-processors/processors/ExecuteProcess.h b/extensions/standard-processors/processors/ExecuteProcess.h index 5556c3181..f61681b8b 100644 --- a/extensions/standard-processors/processors/ExecuteProcess.h +++ b/extensions/standard-processors/processors/ExecuteProcess.h @@ -17,24 +17,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_EXECUTEPROCESS_H_ -#define EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_EXECUTEPROCESS_H_ +#ifndef WIN32 +#pragma once #include <errno.h> #include <signal.h> #include <stdio.h> #include <sys/types.h> +#include <sys/wait.h> #include <chrono> #include <iostream> #include <memory> #include <string> #include <thread> +#include <vector> -#ifndef WIN32 -#include <sys/wait.h> - -#endif #include "core/Core.h" #include "core/logging/LoggerConfiguration.h" #include "core/Processor.h" @@ -42,31 +40,26 @@ #include "FlowFileRecord.h" #include "utils/gsl.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { -#ifndef WIN32 +namespace org::apache::nifi::minifi::processors { -// ExecuteProcess Class class ExecuteProcess : public core::Processor { public: - ExecuteProcess(const std::string& name, const utils::Identifier& uuid = {}) // NOLINT - : Processor(name, uuid) { - _redirectErrorStream = false; - _workingDir = "."; - _processRunning = false; - _pid = 0; + explicit ExecuteProcess(const std::string& name, const utils::Identifier& uuid = {}) + : Processor(name, uuid), + working_dir_("."), + redirect_error_stream_(false), + pid_(0) { } ~ExecuteProcess() override { - if (_processRunning && _pid > 0) - kill(_pid, SIGTERM); + if (pid_ > 0) { + kill(pid_, SIGTERM); + } } EXTENSIONAPI static constexpr const char* Description = "Runs an operating system command specified by the user and writes the output of that command to a FlowFile. " "If the command is expected to be long-running, the Processor can output the partial data on a specified interval. " - "When this option is used, the output is expected to be in textual format, as it typically does not make sense to split binary data on arbitrary time-based intervals."; + "When this option is used, the output is expected to be in textual format, as it typically does not make sense to split binary data on arbitrary time-based intervals. " + "This processor is not available on Windows systems."; EXTENSIONAPI static core::Property Command; EXTENSIONAPI static core::Property CommandArguments; @@ -88,38 +81,33 @@ class ExecuteProcess : public core::Processor { EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; - EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED; - EXTENSIONAPI static constexpr bool IsSingleThreaded = false; + EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN; + EXTENSIONAPI static constexpr bool IsSingleThreaded = true; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - public: - // OnTrigger method, implemented by NiFi ExecuteProcess void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override; - // Initialize, over write by NiFi ExecuteProcess + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *session_factory) override; void initialize() override; private: - // Logger + std::vector<std::string> readArgs() const; + void executeProcessForkFailed(); + void executeChildProcess(); + void collectChildProcessOutput(core::ProcessSession& session); + void readOutputInBatches(core::ProcessSession& session); + void readOutput(core::ProcessSession& session); + bool writeToFlowFile(core::ProcessSession& session, std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) const; + std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ExecuteProcess>::getLogger(); - // Property - std::string _command; - std::string _commandArgument; - std::string _workingDir; - std::chrono::milliseconds _batchDuration = std::chrono::milliseconds(0); - bool _redirectErrorStream; - // Full command - std::string _fullCommand; - // whether the process is running - bool _processRunning; - int _pipefd[2]; - pid_t _pid; + std::string command_; + std::string command_argument_; + std::string working_dir_; + std::chrono::milliseconds batch_duration_ = std::chrono::milliseconds(0); + bool redirect_error_stream_; + std::string full_command_; + int pipefd_[2]{}; + pid_t pid_; }; +} // namespace org::apache::nifi::minifi::processors #endif -} // namespace processors -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org - -#endif // EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_EXECUTEPROCESS_H_ diff --git a/extensions/standard-processors/tests/CMakeLists.txt b/extensions/standard-processors/tests/CMakeLists.txt index db9acd9b1..01e162de2 100644 --- a/extensions/standard-processors/tests/CMakeLists.txt +++ b/extensions/standard-processors/tests/CMakeLists.txt @@ -20,6 +20,7 @@ file(GLOB PROCESSOR_UNIT_TESTS "unit/*.cpp") file(GLOB PROCESSOR_INTEGRATION_TESTS "integration/*.cpp") +file(GLOB RESOURCE_APPS "resource_apps/*.cpp") if(OPENSSL_OFF) list(REMOVE_ITEM PROCESSOR_INTEGRATION_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/integration/SecureSocketGetTCPTest.cpp") list(REMOVE_ITEM PROCESSOR_INTEGRATION_TESTS "${CMAKE_CURRENT_SOURCE_DIR}/integration/TLSServerSocketSupportedProtocolsTest.cpp") @@ -85,9 +86,6 @@ FOREACH(testfile ${PROCESSOR_INTEGRATION_TESTS}) ENDFOREACH() message("-- Finished building ${INT_TEST_COUNT} integration test file(s)...") - -add_test(NAME TestExecuteProcess COMMAND TestExecuteProcess ) - if(NOT OPENSSL_OFF) add_test(NAME SecureSocketGetTCPTest COMMAND SecureSocketGetTCPTest "${TEST_RESOURCES}/TestGetTCPSecure.yml" "${TEST_RESOURCES}/") add_test(NAME SecureSocketGetTCPTestEmptyPass COMMAND SecureSocketGetTCPTest "${TEST_RESOURCES}/TestGetTCPSecureEmptyPass.yml" "${TEST_RESOURCES}/") @@ -101,3 +99,9 @@ endif() add_test(NAME TailFileTest COMMAND TailFileTest "${TEST_RESOURCES}/TestTailFile.yml" "${TEST_RESOURCES}/") add_test(NAME TailFileCronTest COMMAND TailFileTest "${TEST_RESOURCES}/TestTailFileCron.yml" "${TEST_RESOURCES}/") + +FOREACH(resourcefile ${RESOURCE_APPS}) + get_filename_component(resourcefilename "${resourcefile}" NAME_WE) + add_executable("${resourcefilename}" "${resourcefile}") + set_target_properties(${resourcefilename} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin") +ENDFOREACH() diff --git a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp deleted file mode 100644 index 9d6ae3da1..000000000 --- a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp +++ /dev/null @@ -1,104 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#include <type_traits> //NOLINT -#include <sys/stat.h> //NOLINT -#include <chrono> //NOLINT -#include <thread> //NOLINT -#undef NDEBUG -#include <cassert> -#include <string> -#include <utility> -#include <memory> -#include <vector> -#include <fstream> -#include "core/repository/VolatileContentRepository.h" -#include "unit/ProvenanceTestHelper.h" -#include "FlowController.h" -#include "processors/GetFile.h" -#include "core/Core.h" -#include "core/FlowFile.h" -#include "core/Processor.h" -#include "core/controller/ControllerServiceNode.h" -#include "core/controller/ControllerServiceProvider.h" -#include "processors/ExecuteProcess.h" -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" -#include "core/ProcessorNode.h" -#include "TestBase.h" -#include "Catch.h" - -int main(int /*argc*/, char ** /*argv*/) { -#ifndef WIN32 - TestController testController; - std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ExecuteProcess>("executeProcess"); - processor->setMaxConcurrentTasks(1); - - std::shared_ptr<core::Repository> test_repo = std::make_shared<TestThreadedRepository>(); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - std::shared_ptr<minifi::FlowController> controller = std::make_shared<TestFlowController>(test_repo, test_repo, content_repo); - - utils::Identifier processoruuid = processor->getUUID(); - assert(processoruuid); - auto connection = std::make_unique<minifi::Connection>(test_repo, content_repo, "executeProcessConnection"); - connection->addRelationship(core::Relationship("success", "description")); - - // link the connections so that we can test results at the end for this - connection->setSource(processor.get()); - connection->setDestination(processor.get()); - - connection->setSourceUUID(processoruuid); - connection->setDestinationUUID(processoruuid); - - processor->addConnection(connection.get()); - assert(processor->getName() == "executeProcess"); - - std::shared_ptr<core::FlowFile> record; - processor->setScheduledState(core::ScheduledState::RUNNING); - - processor->initialize(); - - std::atomic<bool> is_ready(false); - - std::vector<std::thread> processor_workers; - - auto node2 = std::make_shared<core::ProcessorNode>(processor.get()); - auto contextset = std::make_shared<core::ProcessContext>(node2, nullptr, test_repo, test_repo); - core::ProcessSessionFactory factory(contextset); - processor->onSchedule(contextset.get(), &factory); - - processor_workers.push_back(std::thread([processor, test_repo, &is_ready]() { - auto node = std::make_shared<core::ProcessorNode>(processor.get()); - auto context = std::make_shared<core::ProcessContext>(node, nullptr, test_repo, test_repo); - context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command, "sleep 0.5"); - auto session = std::make_shared<core::ProcessSession>(context); - while (!is_ready.load(std::memory_order_relaxed)) { - } - processor->onTrigger(context.get(), session.get()); - })); - - is_ready.store(true, std::memory_order_relaxed); - - std::for_each(processor_workers.begin(), processor_workers.end(), [](std::thread &t) { - t.join(); - }); - - auto execp = std::static_pointer_cast<org::apache::nifi::minifi::processors::ExecuteProcess>(processor); -#endif -} diff --git a/extensions/standard-processors/tests/resource_apps/EchoParameters.cpp b/extensions/standard-processors/tests/resource_apps/EchoParameters.cpp new file mode 100644 index 000000000..86468e32e --- /dev/null +++ b/extensions/standard-processors/tests/resource_apps/EchoParameters.cpp @@ -0,0 +1,34 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <iostream> +#include <chrono> +#include <thread> + +int main(int argc, char** argv) { + if (argc < 3) { + std::cerr << "Usage: ./EchoParameters <delay between parameters milliseconds> <text to write>" << std::endl; + return 1; + } + + std::cout << argv[2] << std::endl; + for (int i = 3; i < argc; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(std::stoi(argv[1]))); + std::cout << argv[i] << std::endl; + } + return 0; +} diff --git a/extensions/standard-processors/tests/unit/ExecuteProcessTests.cpp b/extensions/standard-processors/tests/unit/ExecuteProcessTests.cpp new file mode 100644 index 000000000..a40a47f66 --- /dev/null +++ b/extensions/standard-processors/tests/unit/ExecuteProcessTests.cpp @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <string> + +#include "Catch.h" +#include "processors/ExecuteProcess.h" +#include "SingleProcessorTestController.h" +#include "utils/file/FileUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::test { +#ifndef WIN32 + +class ExecuteProcessTestsFixture { + public: + ExecuteProcessTestsFixture() + : execute_process_(std::make_shared<processors::ExecuteProcess>("ExecuteProcess")), + controller_(execute_process_) { + LogTestController::getInstance().setTrace<processors::ExecuteProcess>(); + } + protected: + std::shared_ptr<processors::ExecuteProcess> execute_process_; + test::SingleProcessorTestController controller_; +}; + +TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can run a single command", "[ExecuteProcess]") { + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, "echo -n test")); + + controller_.plan->scheduleProcessor(execute_process_); + auto result = controller_.trigger(); + + auto success_flow_files = result.at(processors::ExecuteProcess::Success); + REQUIRE(success_flow_files.size() == 1); + CHECK(controller_.plan->getContent(success_flow_files[0]) == "test"); + CHECK(success_flow_files[0]->getAttribute("command") == "echo -n test"); + CHECK(success_flow_files[0]->getAttribute("command.arguments") == ""); +} + +TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can run an executable with a parameter", "[ExecuteProcess]") { + auto command = minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), "EchoParameters"); + std::string arguments = "0 test_data"; + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, command)); + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments, arguments)); + + controller_.plan->scheduleProcessor(execute_process_); + auto result = controller_.trigger(); + + auto success_flow_files = result.at(processors::ExecuteProcess::Success); + REQUIRE(success_flow_files.size() == 1); + CHECK(controller_.plan->getContent(success_flow_files[0]) == "test_data\n"); + CHECK(success_flow_files[0]->getAttribute("command") == command); + CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments); +} + +TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can run an executable with escaped parameters", "[ExecuteProcess]") { + auto command = minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), "EchoParameters"); + std::string arguments = R"(0 test_data test_data2 "test data 3" "\"test data 4\")"; + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, command)); + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments, arguments)); + + controller_.plan->scheduleProcessor(execute_process_); + auto result = controller_.trigger(); + + auto success_flow_files = result.at(processors::ExecuteProcess::Success); + REQUIRE(success_flow_files.size() == 1); + CHECK(controller_.plan->getContent(success_flow_files[0]) == "test_data\ntest_data2\ntest data 3\n\"test data 4\"\n"); + CHECK(success_flow_files[0]->getAttribute("command") == command); + CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments); +} + +TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess does not produce a flowfile if no output is generated", "[ExecuteProcess]") { + auto command = minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), "EchoParameters"); + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, command)); + + controller_.plan->scheduleProcessor(execute_process_); + auto result = controller_.trigger(); + + auto success_flow_files = result.at(processors::ExecuteProcess::Success); + REQUIRE(success_flow_files.empty()); +} + +TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can redirect error stream to stdout", "[ExecuteProcess]") { + auto command = minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), "EchoParameters"); + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, command)); + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::RedirectErrorStream, "true")); + + controller_.plan->scheduleProcessor(execute_process_); + auto result = controller_.trigger(); + + auto success_flow_files = result.at(processors::ExecuteProcess::Success); + REQUIRE(success_flow_files.size() == 1); + CHECK(controller_.plan->getContent(success_flow_files[0]) == "Usage: ./EchoParameters <delay between parameters milliseconds> <text to write>\n"); + CHECK(success_flow_files[0]->getAttribute("command") == command); + CHECK(success_flow_files[0]->getAttribute("command.arguments") == ""); +} + +TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can change workdir", "[ExecuteProcess]") { + auto command = "./EchoParameters"; + std::string arguments = "0 test_data"; + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, command)); + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments, arguments)); + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::WorkingDir, minifi::utils::file::get_executable_dir())); + + controller_.plan->scheduleProcessor(execute_process_); + auto result = controller_.trigger(); + + auto success_flow_files = result.at(processors::ExecuteProcess::Success); + REQUIRE(success_flow_files.size() == 1); + CHECK(controller_.plan->getContent(success_flow_files[0]) == "test_data\n"); + CHECK(success_flow_files[0]->getAttribute("command") == command); + CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments); +} + +TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess can forward long running output in batches", "[ExecuteProcess]") { + auto command = minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), "EchoParameters"); + std::string arguments = "100 test_data1 test_data2"; + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, command)); + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments, arguments)); + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::BatchDuration, "10 ms")); + + controller_.plan->scheduleProcessor(execute_process_); + auto result = controller_.trigger(); + + auto success_flow_files = result.at(processors::ExecuteProcess::Success); + REQUIRE(success_flow_files.size() == 2); + CHECK(controller_.plan->getContent(success_flow_files[0]) == "test_data1\n"); + CHECK(success_flow_files[0]->getAttribute("command") == command); + CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments); + CHECK(controller_.plan->getContent(success_flow_files[1]) == "test_data2\n"); + CHECK(success_flow_files[1]->getAttribute("command") == command); + CHECK(success_flow_files[1]->getAttribute("command.arguments") == arguments); +} + +TEST_CASE_METHOD(ExecuteProcessTestsFixture, "ExecuteProcess buffer long outputs", "[ExecuteProcess]") { + auto command = minifi::utils::file::concat_path(minifi::utils::file::get_executable_dir(), "EchoParameters"); + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::Command, command)); + std::string param1; + + SECTION("Exact buffer size output") { + param1.assign(4095, 'a'); // buffer size is 4096, so 4095 'a' characters plus '\n' character should be exactly the buffer size + } + SECTION("Larger than buffer size output") { + param1.assign(8200, 'a'); + } + + std::string arguments = "0 " + param1; + REQUIRE(execute_process_->setProperty(processors::ExecuteProcess::CommandArguments, arguments)); + + controller_.plan->scheduleProcessor(execute_process_); + auto result = controller_.trigger(); + + auto success_flow_files = result.at(processors::ExecuteProcess::Success); + REQUIRE(success_flow_files.size() == 1); + CHECK(controller_.plan->getContent(success_flow_files[0]) == param1.append("\n")); + CHECK(success_flow_files[0]->getAttribute("command") == command); + CHECK(success_flow_files[0]->getAttribute("command.arguments") == arguments); +} + +#endif +} // namespace org::apache::nifi::minifi::test
