Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 8beb3772d -> 6f8269606
MINIFI-34 - Adjust CMake files to complete under Travis CI CMake utilize system libuuid instead of third party Add missing include directories Add dummy.cpp Move commit ExecuteProcess to the CMake based file structure This closes #14 and closes #18. Signed-off-by: Aldrin Piri <ald...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/6f826960 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/6f826960 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/6f826960 Branch: refs/heads/master Commit: 6f8269606de9a12cd0f50be4204545702f276f5d Parents: b02af54 Author: Andre F de Miranda <trix...@users.noreply.github.com> Authored: Fri Oct 7 15:40:20 2016 +1100 Committer: Aldrin Piri <ald...@apache.org> Committed: Thu Oct 13 11:06:55 2016 -0400 ---------------------------------------------------------------------- .travis.yml | 7 +- CMakeLists.txt | 26 +++- inc/ExecuteProcess.h | 112 --------------- include/spdlog/dummy.cpp | 22 +++ libminifi/CMakeLists.txt | 23 +-- libminifi/include/ExecuteProcess.h | 112 +++++++++++++++ libminifi/src/ExecuteProcess.cpp | 244 ++++++++++++++++++++++++++++++++ main/CMakeLists.txt | 10 +- src/ExecuteProcess.cpp | 244 -------------------------------- 9 files changed, 425 insertions(+), 375 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index eb2baa5..3866739 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,6 +14,8 @@ # limitations under the License. sudo: required +dist: trusty + language: cpp script: @@ -26,9 +28,10 @@ addons: apt: sources: - ubuntu-toolchain-r-test - - boost-latest +# - boost-latest packages: - gcc-4.8 - g++-4.8 - - libboost1.55-all-dev + - libboost-all-dev + - uuid-dev - libxml2-dev http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 1cc95c2..082e994 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,20 +19,36 @@ cmake_minimum_required(VERSION 2.6) -set(PROJECT "nifi-minifi-cpp") -set(VERSION "0.1.0") +project(nifi-minifi-cpp) +set(PROJECT_NAME "nifi-minifi-cpp") +set(PROJECT_VERSION_MAJOR 0) +set(PROJECT_VERSION_MINOR 1) +set(PROJECT_VERSION_PATCH 0) #### Establish Project Configuration #### # Enable usage of the VERSION specifier # https://cmake.org/cmake/help/v3.0/policy/CMP0048.html#policy:CMP0048 -cmake_policy(SET CMP0048 NEW) +IF(POLICY CMP0048) + CMAKE_POLICY(SET CMP0048 OLD) +ENDIF(POLICY CMP0048) -project(${PROJECT} - VERSION ${VERSION}) +include(CheckCXXCompilerFlag) +CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11) +CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X) +if(COMPILER_SUPPORTS_CXX11) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +elseif(COMPILER_SUPPORTS_CXX0X) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x") +else() + message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.") +endif() set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD_REQUIRED ON) +# Search for threads +find_package(Threads REQUIRED) + # Provide custom modules for the project list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/inc/ExecuteProcess.h ---------------------------------------------------------------------- diff --git a/inc/ExecuteProcess.h b/inc/ExecuteProcess.h deleted file mode 100644 index dce287a..0000000 --- a/inc/ExecuteProcess.h +++ /dev/null @@ -1,112 +0,0 @@ -/** - * @file ExecuteProcess.h - * ExecuteProcess class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __EXECUTE_PROCESS_H__ -#define __EXECUTE_PROCESS_H__ - -#include <stdio.h> -#include <unistd.h> -#include <string> -#include <errno.h> -#include <chrono> -#include <thread> -#include <unistd.h> -#include <sys/wait.h> -#include <iostream> -#include <sys/types.h> -#include <signal.h> -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! ExecuteProcess Class -class ExecuteProcess : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - ExecuteProcess(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - _logger = Logger::getLogger(); - _redirectErrorStream = false; - _batchDuration = 0; - _workingDir = "."; - _processRunning = false; - _pid = 0; - } - //! Destructor - virtual ~ExecuteProcess() - { - if (_processRunning && _pid > 0) - kill(_pid, SIGTERM); - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property Command; - static Property CommandArguments; - static Property WorkingDir; - static Property BatchDuration; - static Property RedirectErrorStream; - //! Supported Relationships - static Relationship Success; - - //! Nest Callback Class for write stream - class WriteCallback : public OutputStreamCallback - { - public: - WriteCallback(char *data, uint64_t size) - : _data(data), _dataSize(size) {} - char *_data; - uint64_t _dataSize; - void process(std::ofstream *stream) { - if (_data && _dataSize > 0) - stream->write(_data, _dataSize); - } - }; - -public: - //! OnTrigger method, implemented by NiFi ExecuteProcess - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi ExecuteProcess - virtual void initialize(void); - -protected: - -private: - //! Logger - Logger *_logger; - //! Property - std::string _command; - std::string _commandArgument; - std::string _workingDir; - int64_t _batchDuration; - bool _redirectErrorStream; - //! Full command - std::string _fullCommand; - //! whether the process is running - bool _processRunning; - int _pipefd[2]; - pid_t _pid; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/include/spdlog/dummy.cpp ---------------------------------------------------------------------- diff --git a/include/spdlog/dummy.cpp b/include/spdlog/dummy.cpp new file mode 100644 index 0000000..6b2d4ab --- /dev/null +++ b/include/spdlog/dummy.cpp @@ -0,0 +1,22 @@ +/** + * @file dummy.cpp + * MiNiFiMain implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Dummy CPP file to work around Cmake limitation on header only libraries + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/libminifi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index 571b73d..2d6b84b 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -19,26 +19,33 @@ cmake_minimum_required (VERSION 2.6) -set(PROJECT "apache-nifi-minifi-cpp") -set(VERSION "0.1.0") +project(nifi-libminifi) +set(PROJECT_NAME "nifi-libminifi") +set(PROJECT_VERSION_MAJOR 0) +set(PROJECT_VERSION_MINOR 1) +set(PROJECT_VERSION_PATCH 0) + #### Establish Project Configuration #### # Enable usage of the VERSION specifier # https://cmake.org/cmake/help/v3.0/policy/CMP0048.html#policy:CMP0048 -cmake_policy(SET CMP0048 NEW) - -project(${PROJECT} - VERSION ${VERSION}) +IF(POLICY CMP0048) + CMAKE_POLICY(SET CMP0048 OLD) +ENDIF(POLICY CMP0048) set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD_REQUIRED ON) include_directories(../include) +include_directories(../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include) include_directories(include) file(GLOB SOURCES "src/*.cpp") +file(GLOB SPD_SOURCES "../include/spdlog/*") -add_library(spdlog INTERFACE) +# Workaround the limitations of having a +# header only library +add_library(spdlog STATIC ${SPD_SOURCES}) add_library(minifi STATIC ${SOURCES}) # Include libxml2 @@ -48,4 +55,4 @@ if (LIBXML2_FOUND) target_link_libraries (minifi ${LIBXML2_LIBRARIES}) else () # Build from our local version -endif (LIBXML2_FOUND) \ No newline at end of file +endif (LIBXML2_FOUND) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/libminifi/include/ExecuteProcess.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ExecuteProcess.h b/libminifi/include/ExecuteProcess.h new file mode 100644 index 0000000..dce287a --- /dev/null +++ b/libminifi/include/ExecuteProcess.h @@ -0,0 +1,112 @@ +/** + * @file ExecuteProcess.h + * ExecuteProcess class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __EXECUTE_PROCESS_H__ +#define __EXECUTE_PROCESS_H__ + +#include <stdio.h> +#include <unistd.h> +#include <string> +#include <errno.h> +#include <chrono> +#include <thread> +#include <unistd.h> +#include <sys/wait.h> +#include <iostream> +#include <sys/types.h> +#include <signal.h> +#include "FlowFileRecord.h" +#include "Processor.h" +#include "ProcessSession.h" + +//! ExecuteProcess Class +class ExecuteProcess : public Processor +{ +public: + //! Constructor + /*! + * Create a new processor + */ + ExecuteProcess(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) + { + _logger = Logger::getLogger(); + _redirectErrorStream = false; + _batchDuration = 0; + _workingDir = "."; + _processRunning = false; + _pid = 0; + } + //! Destructor + virtual ~ExecuteProcess() + { + if (_processRunning && _pid > 0) + kill(_pid, SIGTERM); + } + //! Processor Name + static const std::string ProcessorName; + //! Supported Properties + static Property Command; + static Property CommandArguments; + static Property WorkingDir; + static Property BatchDuration; + static Property RedirectErrorStream; + //! Supported Relationships + static Relationship Success; + + //! Nest Callback Class for write stream + class WriteCallback : public OutputStreamCallback + { + public: + WriteCallback(char *data, uint64_t size) + : _data(data), _dataSize(size) {} + char *_data; + uint64_t _dataSize; + void process(std::ofstream *stream) { + if (_data && _dataSize > 0) + stream->write(_data, _dataSize); + } + }; + +public: + //! OnTrigger method, implemented by NiFi ExecuteProcess + virtual void onTrigger(ProcessContext *context, ProcessSession *session); + //! Initialize, over write by NiFi ExecuteProcess + virtual void initialize(void); + +protected: + +private: + //! Logger + Logger *_logger; + //! Property + std::string _command; + std::string _commandArgument; + std::string _workingDir; + int64_t _batchDuration; + bool _redirectErrorStream; + //! Full command + std::string _fullCommand; + //! whether the process is running + bool _processRunning; + int _pipefd[2]; + pid_t _pid; +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/libminifi/src/ExecuteProcess.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ExecuteProcess.cpp b/libminifi/src/ExecuteProcess.cpp new file mode 100644 index 0000000..16d9457 --- /dev/null +++ b/libminifi/src/ExecuteProcess.cpp @@ -0,0 +1,244 @@ +/** + * @file ExecuteProcess.cpp + * ExecuteProcess class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "TimeUtil.h" +#include "ExecuteProcess.h" +#include "ProcessContext.h" +#include "ProcessSession.h" +#include <cstring> + +const std::string ExecuteProcess::ProcessorName("ExecuteProcess"); +Property ExecuteProcess::Command("Command", "Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.", ""); +Property ExecuteProcess::CommandArguments("Command Arguments", + "The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.", ""); +Property ExecuteProcess::WorkingDir("Working Directory", + "The directory to use as the current working directory when executing the command", ""); +Property ExecuteProcess::BatchDuration("Batch Duration", + "If the process is expected to be long-running and produce textual output, a batch duration can be specified.", "0"); +Property ExecuteProcess::RedirectErrorStream("Redirect Error Stream", + "If true will redirect any error stream output of the process to the output stream.", "false"); +Relationship ExecuteProcess::Success("success", "All created FlowFiles are routed to this relationship."); + +void ExecuteProcess::initialize() +{ + //! Set the supported properties + std::set<Property> properties; + properties.insert(Command); + properties.insert(CommandArguments); + properties.insert(WorkingDir); + properties.insert(BatchDuration); + properties.insert(RedirectErrorStream); + setSupportedProperties(properties); + //! Set the supported relationships + std::set<Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + + +void ExecuteProcess::onTrigger(ProcessContext *context, ProcessSession *session) +{ + std::string value; + if (context->getProperty(Command.getName(), value)) + { + this->_command = value; + } + if (context->getProperty(CommandArguments.getName(), value)) + { + this->_commandArgument = value; + } + if (context->getProperty(WorkingDir.getName(), value)) + { + this->_workingDir = value; + } + if (context->getProperty(BatchDuration.getName(), value)) + { + TimeUnit unit; + if (Property::StringToTime(value, _batchDuration, unit) && + Property::ConvertTimeUnitToMS(_batchDuration, unit, _batchDuration)) + { + + } + } + if (context->getProperty(RedirectErrorStream.getName(), value)) + { + Property::StringToBool(value, _redirectErrorStream); + } + this->_fullCommand = _command + " " + _commandArgument; + if (_fullCommand.length() == 0) + { + yield(); + return; + } + if (_workingDir.length() > 0 && _workingDir != ".") + { + // change to working directory + if (chdir(_workingDir.c_str()) != 0) + { + _logger->log_error("Execute Command can not chdir %s", _workingDir.c_str()); + yield(); + return; + } + } + _logger->log_info("Execute Command %s", _fullCommand.c_str()); + // split the command into array + char cstr[_fullCommand.length()+1]; + std::strcpy(cstr, _fullCommand.c_str()); + char *p = std::strtok (cstr, " "); + int argc = 0; + char *argv[64]; + while (p != 0 && argc < 64) + { + argv[argc] = p; + p = std::strtok(NULL, " "); + argc++; + } + argv[argc] = NULL; + int status, died; + if (!_processRunning) + { + _processRunning = true; + // if the process has not launched yet + // create the pipe + if (pipe(_pipefd) == -1) + { + _processRunning = false; + yield(); + return; + } + switch (_pid = fork()) + { + case -1: + _logger->log_error("Execute Process fork failed"); + _processRunning = false; + close(_pipefd[0]); + close(_pipefd[1]); + yield(); + break; + case 0 : // this is the code the child runs + close(1); // close stdout + dup(_pipefd[1]); // points pipefd at file descriptor + if (_redirectErrorStream) + // redirect stderr + dup2(_pipefd[1], 2); + close(_pipefd[0]); + execvp(argv[0], argv); + exit(1); + break; + default: // this is the code the parent runs + // the parent isn't going to write to the pipe + close(_pipefd[1]); + if (_batchDuration > 0) + { + while (1) + { + std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration)); + char buffer[4096]; + int numRead = read(_pipefd[0], buffer, sizeof(buffer)); + if (numRead <= 0) + break; + _logger->log_info("Execute Command Respond %d", numRead); + ExecuteProcess::WriteCallback callback(buffer, numRead); + FlowFileRecord *flowFile = session->create(); + if (!flowFile) + continue; + session->write(flowFile, &callback); + session->transfer(flowFile, Success); + session->commit(); + } + } + else + { + char buffer[4096]; + char *bufPtr = buffer; + int totalRead = 0; + FlowFileRecord *flowFile = NULL; + while (1) + { + int numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead)); + if (numRead <= 0) + { + if (totalRead > 0) + { + _logger->log_info("Execute Command Respond %d", totalRead); + // child exits and close the pipe + ExecuteProcess::WriteCallback callback(buffer, totalRead); + if (!flowFile) + { + flowFile = session->create(); + if (!flowFile) + break; + session->write(flowFile, &callback); + } + else + { + session->append(flowFile, &callback); + } + session->transfer(flowFile, Success); + } + break; + } + else + { + if (numRead == (sizeof(buffer) - totalRead)) + { + // we reach the max buffer size + _logger->log_info("Execute Command Max Respond %d", sizeof(buffer)); + ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer)); + if (!flowFile) + { + flowFile = session->create(); + if (!flowFile) + continue; + session->write(flowFile, &callback); + } + else + { + session->append(flowFile, &callback); + } + // Rewind + totalRead = 0; + bufPtr = buffer; + } + else + { + totalRead += numRead; + bufPtr += numRead; + } + } + } + } + + died= wait(&status); + if (WIFEXITED(status)) + { + _logger->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WEXITSTATUS(status), _pid); + } + else + { + _logger->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WTERMSIG(status), _pid); + } + + close(_pipefd[0]); + _processRunning = false; + break; + } + } +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/main/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt index 2f470a5..e740955 100644 --- a/main/CMakeLists.txt +++ b/main/CMakeLists.txt @@ -19,9 +19,11 @@ cmake_minimum_required(VERSION 2.6) -cmake_policy(SET CMP0048 NEW) +IF(POLICY CMP0048) + CMAKE_POLICY(SET CMP0048 OLD) +ENDIF(POLICY CMP0048) -include_directories(../include ../libminifi/include ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include) +include_directories(../include ../libminifi/include ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty) # Include libxml2 find_package(LibXml2) @@ -33,8 +35,8 @@ endif (LIBXML2_FOUND) add_executable(minifiexe MiNiFiMain.cpp) -# Link against minifi and yaml-cpp -target_link_libraries(minifiexe minifi yaml-cpp) +# Link against minifi, yaml-cpp and uuid +target_link_libraries(minifiexe minifi yaml-cpp uuid) set_target_properties(minifiexe PROPERTIES OUTPUT_NAME minifi) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/src/ExecuteProcess.cpp ---------------------------------------------------------------------- diff --git a/src/ExecuteProcess.cpp b/src/ExecuteProcess.cpp deleted file mode 100644 index 16d9457..0000000 --- a/src/ExecuteProcess.cpp +++ /dev/null @@ -1,244 +0,0 @@ -/** - * @file ExecuteProcess.cpp - * ExecuteProcess class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "TimeUtil.h" -#include "ExecuteProcess.h" -#include "ProcessContext.h" -#include "ProcessSession.h" -#include <cstring> - -const std::string ExecuteProcess::ProcessorName("ExecuteProcess"); -Property ExecuteProcess::Command("Command", "Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.", ""); -Property ExecuteProcess::CommandArguments("Command Arguments", - "The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.", ""); -Property ExecuteProcess::WorkingDir("Working Directory", - "The directory to use as the current working directory when executing the command", ""); -Property ExecuteProcess::BatchDuration("Batch Duration", - "If the process is expected to be long-running and produce textual output, a batch duration can be specified.", "0"); -Property ExecuteProcess::RedirectErrorStream("Redirect Error Stream", - "If true will redirect any error stream output of the process to the output stream.", "false"); -Relationship ExecuteProcess::Success("success", "All created FlowFiles are routed to this relationship."); - -void ExecuteProcess::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(Command); - properties.insert(CommandArguments); - properties.insert(WorkingDir); - properties.insert(BatchDuration); - properties.insert(RedirectErrorStream); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - - -void ExecuteProcess::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::string value; - if (context->getProperty(Command.getName(), value)) - { - this->_command = value; - } - if (context->getProperty(CommandArguments.getName(), value)) - { - this->_commandArgument = value; - } - if (context->getProperty(WorkingDir.getName(), value)) - { - this->_workingDir = value; - } - if (context->getProperty(BatchDuration.getName(), value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _batchDuration, unit) && - Property::ConvertTimeUnitToMS(_batchDuration, unit, _batchDuration)) - { - - } - } - if (context->getProperty(RedirectErrorStream.getName(), value)) - { - Property::StringToBool(value, _redirectErrorStream); - } - this->_fullCommand = _command + " " + _commandArgument; - if (_fullCommand.length() == 0) - { - yield(); - return; - } - if (_workingDir.length() > 0 && _workingDir != ".") - { - // change to working directory - if (chdir(_workingDir.c_str()) != 0) - { - _logger->log_error("Execute Command can not chdir %s", _workingDir.c_str()); - yield(); - return; - } - } - _logger->log_info("Execute Command %s", _fullCommand.c_str()); - // split the command into array - char cstr[_fullCommand.length()+1]; - std::strcpy(cstr, _fullCommand.c_str()); - char *p = std::strtok (cstr, " "); - int argc = 0; - char *argv[64]; - while (p != 0 && argc < 64) - { - argv[argc] = p; - p = std::strtok(NULL, " "); - argc++; - } - argv[argc] = NULL; - int status, died; - if (!_processRunning) - { - _processRunning = true; - // if the process has not launched yet - // create the pipe - if (pipe(_pipefd) == -1) - { - _processRunning = false; - yield(); - return; - } - switch (_pid = fork()) - { - case -1: - _logger->log_error("Execute Process fork failed"); - _processRunning = false; - close(_pipefd[0]); - close(_pipefd[1]); - yield(); - break; - case 0 : // this is the code the child runs - close(1); // close stdout - dup(_pipefd[1]); // points pipefd at file descriptor - if (_redirectErrorStream) - // redirect stderr - dup2(_pipefd[1], 2); - close(_pipefd[0]); - execvp(argv[0], argv); - exit(1); - break; - default: // this is the code the parent runs - // the parent isn't going to write to the pipe - close(_pipefd[1]); - if (_batchDuration > 0) - { - while (1) - { - std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration)); - char buffer[4096]; - int numRead = read(_pipefd[0], buffer, sizeof(buffer)); - if (numRead <= 0) - break; - _logger->log_info("Execute Command Respond %d", numRead); - ExecuteProcess::WriteCallback callback(buffer, numRead); - FlowFileRecord *flowFile = session->create(); - if (!flowFile) - continue; - session->write(flowFile, &callback); - session->transfer(flowFile, Success); - session->commit(); - } - } - else - { - char buffer[4096]; - char *bufPtr = buffer; - int totalRead = 0; - FlowFileRecord *flowFile = NULL; - while (1) - { - int numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead)); - if (numRead <= 0) - { - if (totalRead > 0) - { - _logger->log_info("Execute Command Respond %d", totalRead); - // child exits and close the pipe - ExecuteProcess::WriteCallback callback(buffer, totalRead); - if (!flowFile) - { - flowFile = session->create(); - if (!flowFile) - break; - session->write(flowFile, &callback); - } - else - { - session->append(flowFile, &callback); - } - session->transfer(flowFile, Success); - } - break; - } - else - { - if (numRead == (sizeof(buffer) - totalRead)) - { - // we reach the max buffer size - _logger->log_info("Execute Command Max Respond %d", sizeof(buffer)); - ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer)); - if (!flowFile) - { - flowFile = session->create(); - if (!flowFile) - continue; - session->write(flowFile, &callback); - } - else - { - session->append(flowFile, &callback); - } - // Rewind - totalRead = 0; - bufPtr = buffer; - } - else - { - totalRead += numRead; - bufPtr += numRead; - } - } - } - } - - died= wait(&status); - if (WIFEXITED(status)) - { - _logger->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WEXITSTATUS(status), _pid); - } - else - { - _logger->log_info("Execute Command Complete %s status %d pid %d", _fullCommand.c_str(), WTERMSIG(status), _pid); - } - - close(_pipefd[0]); - _processRunning = false; - break; - } - } -} -