Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 8beb3772d -> 6f8269606


MINIFI-34 - Adjust CMake files to complete under Travis CI
        CMake utilize system libuuid instead of third party
        Add missing include directories
        Add dummy.cpp

Move commit ExecuteProcess to the CMake based file structure

This closes #14 and closes #18.

Signed-off-by: Aldrin Piri <ald...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/6f826960
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/6f826960
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/6f826960

Branch: refs/heads/master
Commit: 6f8269606de9a12cd0f50be4204545702f276f5d
Parents: b02af54
Author: Andre F de Miranda <trix...@users.noreply.github.com>
Authored: Fri Oct 7 15:40:20 2016 +1100
Committer: Aldrin Piri <ald...@apache.org>
Committed: Thu Oct 13 11:06:55 2016 -0400

----------------------------------------------------------------------
 .travis.yml                        |   7 +-
 CMakeLists.txt                     |  26 +++-
 inc/ExecuteProcess.h               | 112 ---------------
 include/spdlog/dummy.cpp           |  22 +++
 libminifi/CMakeLists.txt           |  23 +--
 libminifi/include/ExecuteProcess.h | 112 +++++++++++++++
 libminifi/src/ExecuteProcess.cpp   | 244 ++++++++++++++++++++++++++++++++
 main/CMakeLists.txt                |  10 +-
 src/ExecuteProcess.cpp             | 244 --------------------------------
 9 files changed, 425 insertions(+), 375 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index eb2baa5..3866739 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -14,6 +14,8 @@
 # limitations under the License.
 sudo: required
 
+dist: trusty
+
 language: cpp
 
 script:
@@ -26,9 +28,10 @@ addons:
   apt:
     sources:
     - ubuntu-toolchain-r-test
-    - boost-latest
+#    - boost-latest
     packages:
     - gcc-4.8
     - g++-4.8
-    - libboost1.55-all-dev
+    - libboost-all-dev
+    - uuid-dev
     - libxml2-dev

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1cc95c2..082e994 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -19,20 +19,36 @@
 
 cmake_minimum_required(VERSION 2.6)
 
-set(PROJECT "nifi-minifi-cpp")
-set(VERSION "0.1.0")
+project(nifi-minifi-cpp)
+set(PROJECT_NAME "nifi-minifi-cpp")
+set(PROJECT_VERSION_MAJOR 0)
+set(PROJECT_VERSION_MINOR 1)
+set(PROJECT_VERSION_PATCH 0)
 
 #### Establish Project Configuration ####
 # Enable usage of the VERSION specifier
 # https://cmake.org/cmake/help/v3.0/policy/CMP0048.html#policy:CMP0048
-cmake_policy(SET CMP0048 NEW)
+IF(POLICY CMP0048)
+  CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
 
-project(${PROJECT}
-        VERSION ${VERSION})
+include(CheckCXXCompilerFlag)
+CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
+CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X)
+if(COMPILER_SUPPORTS_CXX11)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+elseif(COMPILER_SUPPORTS_CXX0X)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x")
+else()
+        message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 
support. Please use a different C++ compiler.")
+endif()
 
 set(CMAKE_CXX_STANDARD 11)
 set(CMAKE_CXX_STANDARD_REQUIRED ON)
 
+# Search for threads
+find_package(Threads REQUIRED)
+
 # Provide custom modules for the project
 list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/inc/ExecuteProcess.h
----------------------------------------------------------------------
diff --git a/inc/ExecuteProcess.h b/inc/ExecuteProcess.h
deleted file mode 100644
index dce287a..0000000
--- a/inc/ExecuteProcess.h
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * @file ExecuteProcess.h
- * ExecuteProcess class declaration
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __EXECUTE_PROCESS_H__
-#define __EXECUTE_PROCESS_H__
-
-#include <stdio.h>
-#include <unistd.h>
-#include <string>
-#include <errno.h>
-#include <chrono>
-#include <thread>
-#include <unistd.h>
-#include <sys/wait.h>
-#include <iostream>
-#include <sys/types.h>
-#include <signal.h>
-#include "FlowFileRecord.h"
-#include "Processor.h"
-#include "ProcessSession.h"
-
-//! ExecuteProcess Class
-class ExecuteProcess : public Processor
-{
-public:
-       //! Constructor
-       /*!
-        * Create a new processor
-        */
-       ExecuteProcess(std::string name, uuid_t uuid = NULL)
-       : Processor(name, uuid)
-       {
-               _logger = Logger::getLogger();
-               _redirectErrorStream = false;
-               _batchDuration = 0;
-               _workingDir = ".";
-               _processRunning = false;
-               _pid = 0;
-       }
-       //! Destructor
-       virtual ~ExecuteProcess()
-       {
-               if (_processRunning && _pid > 0)
-                       kill(_pid, SIGTERM);
-       }
-       //! Processor Name
-       static const std::string ProcessorName;
-       //! Supported Properties
-       static Property Command;
-       static Property CommandArguments;
-       static Property WorkingDir;
-       static Property BatchDuration;
-       static Property RedirectErrorStream;
-       //! Supported Relationships
-       static Relationship Success;
-
-       //! Nest Callback Class for write stream
-       class WriteCallback : public OutputStreamCallback
-       {
-               public:
-               WriteCallback(char *data, uint64_t size)
-               : _data(data), _dataSize(size) {}
-               char *_data;
-               uint64_t _dataSize;
-               void process(std::ofstream *stream) {
-                       if (_data && _dataSize > 0)
-                               stream->write(_data, _dataSize);
-               }
-       };
-
-public:
-       //! OnTrigger method, implemented by NiFi ExecuteProcess
-       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session);
-       //! Initialize, over write by NiFi ExecuteProcess
-       virtual void initialize(void);
-
-protected:
-
-private:
-       //! Logger
-       Logger *_logger;
-       //! Property
-       std::string _command;
-       std::string _commandArgument;
-       std::string _workingDir;
-       int64_t _batchDuration;
-       bool _redirectErrorStream;
-       //! Full command
-       std::string _fullCommand;
-       //! whether the process is running
-       bool _processRunning;
-       int _pipefd[2];
-       pid_t _pid;
-};
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/include/spdlog/dummy.cpp
----------------------------------------------------------------------
diff --git a/include/spdlog/dummy.cpp b/include/spdlog/dummy.cpp
new file mode 100644
index 0000000..6b2d4ab
--- /dev/null
+++ b/include/spdlog/dummy.cpp
@@ -0,0 +1,22 @@
+/**
+ * @file dummy.cpp 
+ * MiNiFiMain implementation 
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Dummy CPP file to work around Cmake limitation on header only libraries
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 571b73d..2d6b84b 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -19,26 +19,33 @@
 
 cmake_minimum_required (VERSION 2.6)
 
-set(PROJECT "apache-nifi-minifi-cpp")
-set(VERSION "0.1.0")
+project(nifi-libminifi)
+set(PROJECT_NAME "nifi-libminifi")
+set(PROJECT_VERSION_MAJOR 0)
+set(PROJECT_VERSION_MINOR 1)
+set(PROJECT_VERSION_PATCH 0)
+
 
 #### Establish Project Configuration ####
 # Enable usage of the VERSION specifier
 # https://cmake.org/cmake/help/v3.0/policy/CMP0048.html#policy:CMP0048
-cmake_policy(SET CMP0048 NEW)
-
-project(${PROJECT}
-        VERSION ${VERSION})
+IF(POLICY CMP0048)
+  CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
 
 set(CMAKE_CXX_STANDARD 11)
 set(CMAKE_CXX_STANDARD_REQUIRED ON)
 
 include_directories(../include)
+include_directories(../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include)
 include_directories(include)
 
 file(GLOB SOURCES "src/*.cpp")
+file(GLOB SPD_SOURCES "../include/spdlog/*")
 
-add_library(spdlog INTERFACE)
+# Workaround the limitations of having a
+# header only library
+add_library(spdlog STATIC ${SPD_SOURCES})
 add_library(minifi STATIC ${SOURCES})
 
 # Include libxml2
@@ -48,4 +55,4 @@ if (LIBXML2_FOUND)
     target_link_libraries (minifi ${LIBXML2_LIBRARIES})
 else ()
     # Build from our local version
-endif (LIBXML2_FOUND)
\ No newline at end of file
+endif (LIBXML2_FOUND)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/libminifi/include/ExecuteProcess.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ExecuteProcess.h 
b/libminifi/include/ExecuteProcess.h
new file mode 100644
index 0000000..dce287a
--- /dev/null
+++ b/libminifi/include/ExecuteProcess.h
@@ -0,0 +1,112 @@
+/**
+ * @file ExecuteProcess.h
+ * ExecuteProcess class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __EXECUTE_PROCESS_H__
+#define __EXECUTE_PROCESS_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <unistd.h>
+#include <sys/wait.h>
+#include <iostream>
+#include <sys/types.h>
+#include <signal.h>
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! ExecuteProcess Class
+class ExecuteProcess : public Processor
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       ExecuteProcess(std::string name, uuid_t uuid = NULL)
+       : Processor(name, uuid)
+       {
+               _logger = Logger::getLogger();
+               _redirectErrorStream = false;
+               _batchDuration = 0;
+               _workingDir = ".";
+               _processRunning = false;
+               _pid = 0;
+       }
+       //! Destructor
+       virtual ~ExecuteProcess()
+       {
+               if (_processRunning && _pid > 0)
+                       kill(_pid, SIGTERM);
+       }
+       //! Processor Name
+       static const std::string ProcessorName;
+       //! Supported Properties
+       static Property Command;
+       static Property CommandArguments;
+       static Property WorkingDir;
+       static Property BatchDuration;
+       static Property RedirectErrorStream;
+       //! Supported Relationships
+       static Relationship Success;
+
+       //! Nest Callback Class for write stream
+       class WriteCallback : public OutputStreamCallback
+       {
+               public:
+               WriteCallback(char *data, uint64_t size)
+               : _data(data), _dataSize(size) {}
+               char *_data;
+               uint64_t _dataSize;
+               void process(std::ofstream *stream) {
+                       if (_data && _dataSize > 0)
+                               stream->write(_data, _dataSize);
+               }
+       };
+
+public:
+       //! OnTrigger method, implemented by NiFi ExecuteProcess
+       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session);
+       //! Initialize, over write by NiFi ExecuteProcess
+       virtual void initialize(void);
+
+protected:
+
+private:
+       //! Logger
+       Logger *_logger;
+       //! Property
+       std::string _command;
+       std::string _commandArgument;
+       std::string _workingDir;
+       int64_t _batchDuration;
+       bool _redirectErrorStream;
+       //! Full command
+       std::string _fullCommand;
+       //! whether the process is running
+       bool _processRunning;
+       int _pipefd[2];
+       pid_t _pid;
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/libminifi/src/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ExecuteProcess.cpp b/libminifi/src/ExecuteProcess.cpp
new file mode 100644
index 0000000..16d9457
--- /dev/null
+++ b/libminifi/src/ExecuteProcess.cpp
@@ -0,0 +1,244 @@
+/**
+ * @file ExecuteProcess.cpp
+ * ExecuteProcess class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TimeUtil.h"
+#include "ExecuteProcess.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+#include <cstring>
+
+const std::string ExecuteProcess::ProcessorName("ExecuteProcess");
+Property ExecuteProcess::Command("Command", "Specifies the command to be 
executed; if just the name of an executable is provided, it must be in the 
user's environment PATH.", "");
+Property ExecuteProcess::CommandArguments("Command Arguments",
+               "The arguments to supply to the executable delimited by white 
space. White space can be escaped by enclosing it in double-quotes.", "");
+Property ExecuteProcess::WorkingDir("Working Directory",
+               "The directory to use as the current working directory when 
executing the command", "");
+Property ExecuteProcess::BatchDuration("Batch Duration",
+               "If the process is expected to be long-running and produce 
textual output, a batch duration can be specified.", "0");
+Property ExecuteProcess::RedirectErrorStream("Redirect Error Stream",
+               "If true will redirect any error stream output of the process 
to the output stream.", "false");
+Relationship ExecuteProcess::Success("success", "All created FlowFiles are 
routed to this relationship.");
+
+void ExecuteProcess::initialize()
+{
+       //! Set the supported properties
+       std::set<Property> properties;
+       properties.insert(Command);
+       properties.insert(CommandArguments);
+       properties.insert(WorkingDir);
+       properties.insert(BatchDuration);
+       properties.insert(RedirectErrorStream);
+       setSupportedProperties(properties);
+       //! Set the supported relationships
+       std::set<Relationship> relationships;
+       relationships.insert(Success);
+       setSupportedRelationships(relationships);
+}
+
+
+void ExecuteProcess::onTrigger(ProcessContext *context, ProcessSession 
*session)
+{
+       std::string value;
+       if (context->getProperty(Command.getName(), value))
+       {
+               this->_command = value;
+       }
+       if (context->getProperty(CommandArguments.getName(), value))
+       {
+               this->_commandArgument = value;
+       }
+       if (context->getProperty(WorkingDir.getName(), value))
+       {
+               this->_workingDir = value;
+       }
+       if (context->getProperty(BatchDuration.getName(), value))
+       {
+               TimeUnit unit;
+               if (Property::StringToTime(value, _batchDuration, unit) &&
+                       Property::ConvertTimeUnitToMS(_batchDuration, unit, 
_batchDuration))
+               {
+
+               }
+       }
+       if (context->getProperty(RedirectErrorStream.getName(), value))
+       {
+               Property::StringToBool(value, _redirectErrorStream);
+       }
+       this->_fullCommand = _command + " " + _commandArgument;
+       if (_fullCommand.length() == 0)
+       {
+               yield();
+               return;
+       }
+       if (_workingDir.length() > 0 && _workingDir != ".")
+       {
+               // change to working directory
+               if (chdir(_workingDir.c_str()) != 0)
+               {
+                       _logger->log_error("Execute Command can not chdir %s", 
_workingDir.c_str());
+                       yield();
+                       return;
+               }
+       }
+       _logger->log_info("Execute Command %s", _fullCommand.c_str());
+       // split the command into array
+       char cstr[_fullCommand.length()+1];
+       std::strcpy(cstr, _fullCommand.c_str());
+       char *p = std::strtok (cstr, " ");
+       int argc = 0;
+       char *argv[64];
+       while (p != 0 && argc < 64)
+       {
+               argv[argc] = p;
+               p = std::strtok(NULL, " ");
+               argc++;
+       }
+       argv[argc] = NULL;
+       int status, died;
+       if (!_processRunning)
+       {
+               _processRunning = true;
+               // if the process has not launched yet
+               // create the pipe
+               if (pipe(_pipefd) == -1)
+               {
+                       _processRunning = false;
+                       yield();
+                       return;
+               }
+               switch (_pid = fork())
+               {
+               case -1:
+                       _logger->log_error("Execute Process fork failed");
+                       _processRunning = false;
+                       close(_pipefd[0]);
+                       close(_pipefd[1]);
+                       yield();
+                       break;
+               case 0 : // this is the code the child runs
+                       close(1);      // close stdout
+                       dup(_pipefd[1]); // points pipefd at file descriptor
+                       if (_redirectErrorStream)
+                               // redirect stderr
+                               dup2(_pipefd[1], 2);
+                       close(_pipefd[0]);
+                       execvp(argv[0], argv);
+                       exit(1);
+                       break;
+               default: // this is the code the parent runs
+                       // the parent isn't going to write to the pipe
+                       close(_pipefd[1]);
+                       if (_batchDuration > 0)
+                       {
+                               while (1)
+                               {
+                                       
std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration));
+                                       char buffer[4096];
+                                       int numRead = read(_pipefd[0], buffer, 
sizeof(buffer));
+                                       if (numRead <= 0)
+                                               break;
+                                       _logger->log_info("Execute Command 
Respond %d", numRead);
+                                       ExecuteProcess::WriteCallback 
callback(buffer, numRead);
+                                       FlowFileRecord *flowFile = 
session->create();
+                                       if (!flowFile)
+                                               continue;
+                                       session->write(flowFile, &callback);
+                                       session->transfer(flowFile, Success);
+                                       session->commit();
+                               }
+                       }
+                       else
+                       {
+                               char buffer[4096];
+                               char *bufPtr = buffer;
+                               int totalRead = 0;
+                               FlowFileRecord *flowFile = NULL;
+                               while (1)
+                               {
+                                       int numRead = read(_pipefd[0], bufPtr, 
(sizeof(buffer) - totalRead));
+                                       if (numRead <= 0)
+                                       {
+                                               if (totalRead > 0)
+                                               {
+                                                       
_logger->log_info("Execute Command Respond %d", totalRead);
+                                                       // child exits and 
close the pipe
+                                                       
ExecuteProcess::WriteCallback callback(buffer, totalRead);
+                                                       if (!flowFile)
+                                                       {
+                                                               flowFile = 
session->create();
+                                                               if (!flowFile)
+                                                                       break;
+                                                               
session->write(flowFile, &callback);
+                                                       }
+                                                       else
+                                                       {
+                                                               
session->append(flowFile, &callback);
+                                                       }
+                                                       
session->transfer(flowFile, Success);
+                                               }
+                                               break;
+                                       }
+                                       else
+                                       {
+                                               if (numRead == (sizeof(buffer) 
- totalRead))
+                                               {
+                                                       // we reach the max 
buffer size
+                                                       
_logger->log_info("Execute Command Max Respond %d", sizeof(buffer));
+                                                       
ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
+                                                       if (!flowFile)
+                                                       {
+                                                               flowFile = 
session->create();
+                                                               if (!flowFile)
+                                                                       
continue;
+                                                               
session->write(flowFile, &callback);
+                                                       }
+                                                       else
+                                                       {
+                                                               
session->append(flowFile, &callback);
+                                                       }
+                                                       // Rewind
+                                                       totalRead = 0;
+                                                       bufPtr = buffer;
+                                               }
+                                               else
+                                               {
+                                                       totalRead += numRead;
+                                                       bufPtr += numRead;
+                                               }
+                                       }
+                               }
+                       }
+
+                       died= wait(&status);
+                       if (WIFEXITED(status))
+                       {
+                               _logger->log_info("Execute Command Complete %s 
status %d pid %d", _fullCommand.c_str(), WEXITSTATUS(status), _pid);
+                       }
+                       else
+                       {
+                               _logger->log_info("Execute Command Complete %s 
status %d pid %d", _fullCommand.c_str(), WTERMSIG(status), _pid);
+                       }
+
+                       close(_pipefd[0]);
+                       _processRunning = false;
+                       break;
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/main/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt
index 2f470a5..e740955 100644
--- a/main/CMakeLists.txt
+++ b/main/CMakeLists.txt
@@ -19,9 +19,11 @@
 
 cmake_minimum_required(VERSION 2.6)
 
-cmake_policy(SET CMP0048 NEW)
+IF(POLICY CMP0048)
+  CMAKE_POLICY(SET CMP0048 OLD)
+ENDIF(POLICY CMP0048)
 
-include_directories(../include ../libminifi/include 
../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include)
+include_directories(../include ../libminifi/include 
../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty)
 
 # Include libxml2
 find_package(LibXml2)
@@ -33,8 +35,8 @@ endif (LIBXML2_FOUND)
 
 add_executable(minifiexe MiNiFiMain.cpp)
 
-# Link against minifi and yaml-cpp
-target_link_libraries(minifiexe minifi yaml-cpp)
+# Link against minifi, yaml-cpp and uuid
+target_link_libraries(minifiexe minifi yaml-cpp uuid)
 set_target_properties(minifiexe
         PROPERTIES OUTPUT_NAME minifi)
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6f826960/src/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/src/ExecuteProcess.cpp b/src/ExecuteProcess.cpp
deleted file mode 100644
index 16d9457..0000000
--- a/src/ExecuteProcess.cpp
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * @file ExecuteProcess.cpp
- * ExecuteProcess class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "TimeUtil.h"
-#include "ExecuteProcess.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-#include <cstring>
-
-const std::string ExecuteProcess::ProcessorName("ExecuteProcess");
-Property ExecuteProcess::Command("Command", "Specifies the command to be 
executed; if just the name of an executable is provided, it must be in the 
user's environment PATH.", "");
-Property ExecuteProcess::CommandArguments("Command Arguments",
-               "The arguments to supply to the executable delimited by white 
space. White space can be escaped by enclosing it in double-quotes.", "");
-Property ExecuteProcess::WorkingDir("Working Directory",
-               "The directory to use as the current working directory when 
executing the command", "");
-Property ExecuteProcess::BatchDuration("Batch Duration",
-               "If the process is expected to be long-running and produce 
textual output, a batch duration can be specified.", "0");
-Property ExecuteProcess::RedirectErrorStream("Redirect Error Stream",
-               "If true will redirect any error stream output of the process 
to the output stream.", "false");
-Relationship ExecuteProcess::Success("success", "All created FlowFiles are 
routed to this relationship.");
-
-void ExecuteProcess::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(Command);
-       properties.insert(CommandArguments);
-       properties.insert(WorkingDir);
-       properties.insert(BatchDuration);
-       properties.insert(RedirectErrorStream);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       setSupportedRelationships(relationships);
-}
-
-
-void ExecuteProcess::onTrigger(ProcessContext *context, ProcessSession 
*session)
-{
-       std::string value;
-       if (context->getProperty(Command.getName(), value))
-       {
-               this->_command = value;
-       }
-       if (context->getProperty(CommandArguments.getName(), value))
-       {
-               this->_commandArgument = value;
-       }
-       if (context->getProperty(WorkingDir.getName(), value))
-       {
-               this->_workingDir = value;
-       }
-       if (context->getProperty(BatchDuration.getName(), value))
-       {
-               TimeUnit unit;
-               if (Property::StringToTime(value, _batchDuration, unit) &&
-                       Property::ConvertTimeUnitToMS(_batchDuration, unit, 
_batchDuration))
-               {
-
-               }
-       }
-       if (context->getProperty(RedirectErrorStream.getName(), value))
-       {
-               Property::StringToBool(value, _redirectErrorStream);
-       }
-       this->_fullCommand = _command + " " + _commandArgument;
-       if (_fullCommand.length() == 0)
-       {
-               yield();
-               return;
-       }
-       if (_workingDir.length() > 0 && _workingDir != ".")
-       {
-               // change to working directory
-               if (chdir(_workingDir.c_str()) != 0)
-               {
-                       _logger->log_error("Execute Command can not chdir %s", 
_workingDir.c_str());
-                       yield();
-                       return;
-               }
-       }
-       _logger->log_info("Execute Command %s", _fullCommand.c_str());
-       // split the command into array
-       char cstr[_fullCommand.length()+1];
-       std::strcpy(cstr, _fullCommand.c_str());
-       char *p = std::strtok (cstr, " ");
-       int argc = 0;
-       char *argv[64];
-       while (p != 0 && argc < 64)
-       {
-               argv[argc] = p;
-               p = std::strtok(NULL, " ");
-               argc++;
-       }
-       argv[argc] = NULL;
-       int status, died;
-       if (!_processRunning)
-       {
-               _processRunning = true;
-               // if the process has not launched yet
-               // create the pipe
-               if (pipe(_pipefd) == -1)
-               {
-                       _processRunning = false;
-                       yield();
-                       return;
-               }
-               switch (_pid = fork())
-               {
-               case -1:
-                       _logger->log_error("Execute Process fork failed");
-                       _processRunning = false;
-                       close(_pipefd[0]);
-                       close(_pipefd[1]);
-                       yield();
-                       break;
-               case 0 : // this is the code the child runs
-                       close(1);      // close stdout
-                       dup(_pipefd[1]); // points pipefd at file descriptor
-                       if (_redirectErrorStream)
-                               // redirect stderr
-                               dup2(_pipefd[1], 2);
-                       close(_pipefd[0]);
-                       execvp(argv[0], argv);
-                       exit(1);
-                       break;
-               default: // this is the code the parent runs
-                       // the parent isn't going to write to the pipe
-                       close(_pipefd[1]);
-                       if (_batchDuration > 0)
-                       {
-                               while (1)
-                               {
-                                       
std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration));
-                                       char buffer[4096];
-                                       int numRead = read(_pipefd[0], buffer, 
sizeof(buffer));
-                                       if (numRead <= 0)
-                                               break;
-                                       _logger->log_info("Execute Command 
Respond %d", numRead);
-                                       ExecuteProcess::WriteCallback 
callback(buffer, numRead);
-                                       FlowFileRecord *flowFile = 
session->create();
-                                       if (!flowFile)
-                                               continue;
-                                       session->write(flowFile, &callback);
-                                       session->transfer(flowFile, Success);
-                                       session->commit();
-                               }
-                       }
-                       else
-                       {
-                               char buffer[4096];
-                               char *bufPtr = buffer;
-                               int totalRead = 0;
-                               FlowFileRecord *flowFile = NULL;
-                               while (1)
-                               {
-                                       int numRead = read(_pipefd[0], bufPtr, 
(sizeof(buffer) - totalRead));
-                                       if (numRead <= 0)
-                                       {
-                                               if (totalRead > 0)
-                                               {
-                                                       
_logger->log_info("Execute Command Respond %d", totalRead);
-                                                       // child exits and 
close the pipe
-                                                       
ExecuteProcess::WriteCallback callback(buffer, totalRead);
-                                                       if (!flowFile)
-                                                       {
-                                                               flowFile = 
session->create();
-                                                               if (!flowFile)
-                                                                       break;
-                                                               
session->write(flowFile, &callback);
-                                                       }
-                                                       else
-                                                       {
-                                                               
session->append(flowFile, &callback);
-                                                       }
-                                                       
session->transfer(flowFile, Success);
-                                               }
-                                               break;
-                                       }
-                                       else
-                                       {
-                                               if (numRead == (sizeof(buffer) 
- totalRead))
-                                               {
-                                                       // we reach the max 
buffer size
-                                                       
_logger->log_info("Execute Command Max Respond %d", sizeof(buffer));
-                                                       
ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
-                                                       if (!flowFile)
-                                                       {
-                                                               flowFile = 
session->create();
-                                                               if (!flowFile)
-                                                                       
continue;
-                                                               
session->write(flowFile, &callback);
-                                                       }
-                                                       else
-                                                       {
-                                                               
session->append(flowFile, &callback);
-                                                       }
-                                                       // Rewind
-                                                       totalRead = 0;
-                                                       bufPtr = buffer;
-                                               }
-                                               else
-                                               {
-                                                       totalRead += numRead;
-                                                       bufPtr += numRead;
-                                               }
-                                       }
-                               }
-                       }
-
-                       died= wait(&status);
-                       if (WIFEXITED(status))
-                       {
-                               _logger->log_info("Execute Command Complete %s 
status %d pid %d", _fullCommand.c_str(), WEXITSTATUS(status), _pid);
-                       }
-                       else
-                       {
-                               _logger->log_info("Execute Command Complete %s 
status %d pid %d", _fullCommand.c_str(), WTERMSIG(status), _pid);
-                       }
-
-                       close(_pipefd[0]);
-                       _processRunning = false;
-                       break;
-               }
-       }
-}
-

Reply via email to