Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master a8de19653 -> 3676468fc


MINIFI-236: Make GetFile, PutFile, TailFile, and ExecuteProcess thread safe

This closes #71.

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/3676468f
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/3676468f
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/3676468f

Branch: refs/heads/master
Commit: 3676468fc0746ca9a31c92fa980fb412f3c010b2
Parents: a8de196
Author: Marc Parisi <[email protected]>
Authored: Tue Mar 28 15:50:48 2017 -0400
Committer: Aldrin Piri <[email protected]>
Committed: Wed Apr 5 16:38:03 2017 +0200

----------------------------------------------------------------------
 .travis.yml                                    |   2 +-
 CMakeLists.txt                                 |  15 +
 libminifi/include/core/ConfigurableComponent.h |  16 +-
 libminifi/include/core/FlowConfiguration.h     |   2 +-
 libminifi/include/core/ProcessSession.h        |  17 +-
 libminifi/include/core/Processor.h             |   2 -
 libminifi/include/processors/GetFile.h         |  77 +++---
 libminifi/include/processors/PutFile.h         |  22 +-
 libminifi/include/processors/TailFile.h        |  19 +-
 libminifi/include/utils/ThreadPool.h           | 287 ++++++++++++++++++++
 libminifi/src/core/ConfigurableComponent.cpp   |  10 +-
 libminifi/src/core/FlowConfiguration.cpp       |   6 +-
 libminifi/src/core/ProcessSession.cpp          |  94 ++++++-
 libminifi/src/processors/GetFile.cpp           | 100 +++----
 libminifi/src/processors/PutFile.cpp           |  53 ++--
 libminifi/src/processors/TailFile.cpp          |  32 ++-
 libminifi/test/TestExecuteProcess.cpp          | 131 +++++++++
 libminifi/test/unit/ProcessorTests.cpp         | 144 ++++++++--
 18 files changed, 852 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 31cb731..faa291b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -47,4 +47,4 @@ matrix:
         - package='openssl'; [[ $(brew ls --versions ${package}) ]] && { brew 
outdated ${package} || brew upgrade ${package}; } || brew install ${package}
 
 script:
-  - mkdir ./build && cd ./build && cmake .. && make && ./tests
+  - mkdir ./build && cd ./build && cmake .. && make && make test

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5d7875e..b84706d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -136,6 +136,21 @@ enable_testing(test)
     target_include_directories(tests PRIVATE BEFORE 
"libminifi/include/provenance")
     target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} 
${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library 
civetweb-cpp)
     add_test(NAME LibMinifiTests COMMAND tests)
+    
+    file(GLOB LIBMINIFI_TEST_EXECUTE_PROCESS 
"libminifi/test/TestExecuteProcess.cpp")
+    add_executable(testExecuteProcess ${LIBMINIFI_TEST_EXECUTE_PROCESS} 
${SPD_SOURCES})
+    target_include_directories(testExecuteProcess PRIVATE BEFORE 
"thirdparty/yaml-cpp-yaml-cpp-0.5.3/include")
+    target_include_directories(testExecuteProcess PRIVATE BEFORE 
${LEVELDB_INCLUDE_DIRS})
+    target_include_directories(testExecuteProcess PRIVATE BEFORE "include")
+    target_include_directories(testExecuteProcess PRIVATE BEFORE 
"libminifi/include/")
+    target_include_directories(testExecuteProcess PRIVATE BEFORE 
"libminifi/include/core")
+    target_include_directories(testExecuteProcess PRIVATE BEFORE 
"libminifi/include/core/repository")
+    target_include_directories(testExecuteProcess PRIVATE BEFORE 
"libminifi/include/io")
+    target_include_directories(testExecuteProcess PRIVATE BEFORE 
"libminifi/include/utils")
+    target_include_directories(testExecuteProcess PRIVATE BEFORE 
"libminifi/include/processors")
+    target_include_directories(testExecuteProcess PRIVATE BEFORE 
"libminifi/include/provenance")
+    target_link_libraries(testExecuteProcess ${CMAKE_THREAD_LIBS_INIT} 
${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp 
c-library civetweb-cpp)
+    add_test(NAME ExecuteProcess COMMAND testExecuteProcess)
 
 # Create a custom build target called "docker" that will invoke DockerBuild.sh 
and create the NiFi-MiNiFi-CPP Docker image
 add_custom_target(

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/include/core/ConfigurableComponent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ConfigurableComponent.h 
b/libminifi/include/core/ConfigurableComponent.h
index c0cc623..d46216b 100644
--- a/libminifi/include/core/ConfigurableComponent.h
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -19,8 +19,14 @@
 #ifndef LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_
 #define LIBMINIFI_INCLUDE_CORE_CONFIGURABLECOMPONENT_H_
 
+#include <mutex>
+#include <iostream>
+#include <map>
+#include <memory>
+#include <set>
+
+#include "logging/Logger.h"
 #include "Property.h"
-#include "core/logging/Logger.h"
 
 namespace org {
 namespace apache {
@@ -35,10 +41,8 @@ namespace core {
 class ConfigurableComponent {
  public:
 
-
   ConfigurableComponent() = delete;
 
-
   explicit ConfigurableComponent(std::shared_ptr<logging::Logger> logger);
 
   explicit ConfigurableComponent(const ConfigurableComponent &&other);
@@ -81,7 +85,6 @@ class ConfigurableComponent {
 
  protected:
 
-
   /**
    * Returns true if the instance can be edited.
    * @return true/false
@@ -89,10 +92,13 @@ class ConfigurableComponent {
   virtual bool canEdit()= 0;
 
   std::mutex configuration_mutex_;
-  std::shared_ptr<logging::Logger> logger_;
+
   // Supported properties
   std::map<std::string, Property> properties_;
 
+ private:
+  std::shared_ptr<logging::Logger> my_logger_;
+
 };
 
 } /* namespace core */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h 
b/libminifi/include/core/FlowConfiguration.h
index c7eedd2..e95e684 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -25,10 +25,10 @@
 #include "processors/GetFile.h"
 #include "processors/PutFile.h"
 #include "processors/TailFile.h"
+#include "processors/ListenHTTP.h"
 #include "processors/ListenSyslog.h"
 #include "processors/GenerateFlowFile.h"
 #include "processors/RealTimeDataCollector.h"
-#include "processors/ListenHTTP.h"
 #include "processors/LogAttribute.h"
 #include "processors/ExecuteProcess.h"
 #include "processors/AppendHostInfo.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/include/core/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSession.h 
b/libminifi/include/core/ProcessSession.h
index b516817..a80769e 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -77,7 +77,13 @@ class ProcessSession {
 // Create a new UUID FlowFile with no content resource claim and without parent
   std::shared_ptr<core::FlowFile> create();
 // Create a new UUID FlowFile with no content resource claim and inherit all 
attributes from parent
-  std::shared_ptr<core::FlowFile> create(std::shared_ptr<core::FlowFile> 
&parent);
+  std::shared_ptr<core::FlowFile> create(
+      std::shared_ptr<core::FlowFile> &&parent);
+
+  std::shared_ptr<core::FlowFile> create(
+        std::shared_ptr<core::FlowFile> &parent){
+    return create(parent);
+  }
 // Clone a new UUID FlowFile from parent both for content resource claim and 
attributes
   std::shared_ptr<core::FlowFile> clone(
       std::shared_ptr<core::FlowFile> &parent);
@@ -121,7 +127,14 @@ class ProcessSession {
 // Penalize the flow
   void penalize(std::shared_ptr<core::FlowFile> &flow);
   void penalize(std::shared_ptr<core::FlowFile> &&flow);
-// Import the existed file into the flow
+
+  /**
+   * Imports a file from the data stream
+   * @param stream incoming data stream that contains the data to store into a 
file
+   * @param flow flow file
+   */
+  void importFrom(io::DataStream &stream, std::shared_ptr<core::FlowFile> 
&&flow);
+  // import from the data source.
   void import(std::string source, std::shared_ptr<core::FlowFile> &flow,
               bool keepSource = true, uint64_t offset = 0);
   void import(std::string source, std::shared_ptr<core::FlowFile> &&flow,

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/include/core/Processor.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Processor.h 
b/libminifi/include/core/Processor.h
index fd0411f..4a71816 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -251,8 +251,6 @@ private:
 
   // Check all incoming connections for work
   bool isWorkAvailable();
-  // Logger
-  std::shared_ptr<logging::Logger> logger_;
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
   Processor(const Processor &parent);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/include/processors/GetFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GetFile.h 
b/libminifi/include/processors/GetFile.h
index f1f0694..cc3beaa 100644
--- a/libminifi/include/processors/GetFile.h
+++ b/libminifi/include/processors/GetFile.h
@@ -1,6 +1,4 @@
 /**
- * @file GetFile.h
- * GetFile class declaration
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -20,6 +18,7 @@
 #ifndef __GET_FILE_H__
 #define __GET_FILE_H__
 
+#include <atomic>
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
@@ -31,6 +30,20 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
+  struct GetFileRequest{
+    std::string directory = ".";
+    bool recursive = true;
+    bool keepSourceFile = false;
+    int64_t minAge = 0;
+    int64_t maxAge = 0;
+    int64_t minSize = 0;
+    int64_t maxSize = 0;
+    bool ignoreHiddenFile = true;
+    int64_t pollInterval = 0;
+    int64_t batchSize = 10;
+    std::string fileFilter= "[^\\.].*";
+  };
+
 // GetFile Class
 class GetFile : public core::Processor {
  public:
@@ -38,21 +51,9 @@ class GetFile : public core::Processor {
   /*!
    * Create a new processor
    */
-  GetFile(std::string name, uuid_t uuid = NULL)
+  explicit GetFile(std::string name, uuid_t uuid = NULL)
       : Processor(name, uuid) {
-    logger_ = logging::Logger::getLogger();
-    _directory = ".";
-    _recursive = true;
-    _keepSourceFile = false;
-    _minAge = 0;
-    _maxAge = 0;
-    _minSize = 0;
-    _maxSize = 0;
-    _ignoreHiddenFile = true;
-    _pollInterval = 0;
-    _batchSize = 10;
-    _lastDirectoryListingTime = getTimeMillis();
-    _fileFilter = "[^\\.].*";
+
   }
   // Destructor
   virtual ~GetFile() {
@@ -79,16 +80,28 @@ class GetFile : public core::Processor {
   virtual void onTrigger(
       core::ProcessContext *context,
       core::ProcessSession *session);
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  void onSchedule(
+        core::ProcessContext *context,
+        core::ProcessSessionFactory *sessionFactory);
   // Initialize, over write by NiFi GetFile
   virtual void initialize(void);
-  // perform directory listing
-  void performListing(std::string dir);
+  /**
+   * performs a listeing on the directory.
+   * @param dir directory to list
+   * @param request get file request.
+   */
+  void performListing(std::string dir,const GetFileRequest &request);
 
  protected:
 
  private:
-  // Logger
-  std::shared_ptr<logging::Logger> logger_;
+
   // Queue for store directory list
   std::queue<std::string> _dirList;
   // Get Listing size
@@ -101,23 +114,19 @@ class GetFile : public core::Processor {
   // Put full path file name into directory listing
   void putListing(std::string fileName);
   // Poll directory listing for files
-  void pollListing(std::queue<std::string> &list, int maxSize);
+  void pollListing(std::queue<std::string> &list,const GetFileRequest 
&request);
   // Check whether file can be added to the directory listing
-  bool acceptFile(std::string fullName, std::string name);
+  bool acceptFile(std::string fullName, std::string name, const GetFileRequest 
&request);
+  // Get file request object.
+  GetFileRequest request_;
   // Mutex for protection of the directory listing
+
   std::mutex mutex_;
-  std::string _directory;
-  bool _recursive;
-  bool _keepSourceFile;
-  int64_t _minAge;
-  int64_t _maxAge;
-  int64_t _minSize;
-  int64_t _maxSize;
-  bool _ignoreHiddenFile;
-  int64_t _pollInterval;
-  int64_t _batchSize;
-  uint64_t _lastDirectoryListingTime;
-  std::string _fileFilter;
+
+  // last listing time for root directory ( if recursive, we will consider the 
root
+  // as the top level time.
+  std::atomic<uint64_t> last_listing_time_;
+
 };
 
 } /* namespace processors */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/include/processors/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/PutFile.h 
b/libminifi/include/processors/PutFile.h
index 7653fac..c0effaf 100644
--- a/libminifi/include/processors/PutFile.h
+++ b/libminifi/include/processors/PutFile.h
@@ -45,7 +45,6 @@ class PutFile : public core::Processor {
    */
   PutFile(std::string name, uuid_t uuid = NULL)
       : core::Processor(name, uuid) {
-    logger_ = logging::Logger::getLogger();
   }
   // Destructor
   virtual ~PutFile() {
@@ -59,10 +58,18 @@ class PutFile : public core::Processor {
   static core::Relationship Success;
   static core::Relationship Failure;
 
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context,
+                  core::ProcessSessionFactory *sessionFactory);
+
   // OnTrigger method, implemented by NiFi PutFile
-  virtual void onTrigger(
-      core::ProcessContext *context,
-      core::ProcessSession *session);
+  virtual void onTrigger(core::ProcessContext *context,
+                         core::ProcessSession *session);
   // Initialize, over write by NiFi PutFile
   virtual void initialize(void);
 
@@ -84,8 +91,11 @@ class PutFile : public core::Processor {
  protected:
 
  private:
-  // Logger
-  std::shared_ptr<logging::Logger> logger_;
+
+  // directory
+  std::string directory_;
+  // conflict resolution type.
+  std::string conflict_resolution_;
 
   bool putFile(core::ProcessSession *session,
                std::shared_ptr<FlowFileRecord> flowFile,

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/include/processors/TailFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/TailFile.h 
b/libminifi/include/processors/TailFile.h
index 5be76e4..c7b7b46 100644
--- a/libminifi/include/processors/TailFile.h
+++ b/libminifi/include/processors/TailFile.h
@@ -38,9 +38,8 @@ class TailFile : public core::Processor {
   /*!
    * Create a new processor
    */
-  TailFile(std::string name, uuid_t uuid = NULL)
+  explicit TailFile(std::string name, uuid_t uuid = NULL)
       : core::Processor(name, uuid) {
-    logger_ = logging::Logger::getLogger();
     _stateRecovered = false;
   }
   // Destructor
@@ -57,9 +56,8 @@ class TailFile : public core::Processor {
 
  public:
   // OnTrigger method, implemented by NiFi TailFile
-  virtual void onTrigger(
-      core::ProcessContext *context,
-      core::ProcessSession *session);
+  virtual void onTrigger(core::ProcessContext *context,
+                         core::ProcessSession *session);
   // Initialize, over write by NiFi TailFile
   virtual void initialize(void);
   // recoverState
@@ -70,11 +68,7 @@ class TailFile : public core::Processor {
  protected:
 
  private:
-  // Logger
-  std::shared_ptr<logging::Logger> logger_;
-  std::string _fileLocation;
-  // Property Specified Tailed File Name
-  std::string _fileName;
+  std::mutex tail_file_mutex_;
   // File to save state
   std::string _stateFile;
   // State related to the tailed file
@@ -86,7 +80,10 @@ class TailFile : public core::Processor {
   std::string trimLeft(const std::string& s);
   std::string trimRight(const std::string& s);
   void parseStateFileLine(char *buf);
-  void checkRollOver();
+  /**
+   * Check roll over for the provided file.
+   */
+  void checkRollOver(const std::string &, const std::string&);
 
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h 
b/libminifi/include/utils/ThreadPool.h
new file mode 100644
index 0000000..7508900
--- /dev/null
+++ b/libminifi/include/utils/ThreadPool.h
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_THREAD_POOL_H
+#define LIBMINIFI_INCLUDE_THREAD_POOL_H
+
+#include <iostream>
+#include <atomic>
+#include <mutex>
+#include <vector>
+#include <queue>
+#include <future>
+#include <thread>
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * Worker task
+ * purpose: Provides a wrapper for the functor
+ * and returns a future based on the template argument.
+ */
+template< typename T>
+class Worker{
+public:
+  explicit Worker(std::function<T()> &task) : task(task)
+  {
+    promise = std::make_shared<std::promise<T>>();
+  }
+
+  /**
+   * Move constructor for worker tasks
+   */
+  Worker(Worker &&other) : task (std::move(other.task)),
+                                               promise(other.promise)
+  {
+  }
+
+
+  /**
+   * Runs the task and takes the output from the funtor
+   * setting the result into the promise
+   */
+  void run()
+  {
+    T result = task();
+    promise->set_value(result);
+  }
+
+   Worker<T>(const Worker<T>&) = delete;
+    Worker<T>& operator = (const Worker<T>&) = delete;
+
+  Worker<T>& operator = (Worker<T>&&) ;
+
+  std::shared_ptr<std::promise<T>> getPromise();
+
+private:
+   std::function<T()> task;
+   std::shared_ptr<std::promise<T>> promise;
+};
+
+template< typename T>
+Worker<T>&  Worker<T>::operator = (Worker<T>&& other)
+{
+    task = std::move(other.task);
+    promise = other.promise;
+    return *this;
+}
+
+
+template<typename T>
+std::shared_ptr<std::promise<T>> Worker<T>::getPromise(){
+    return promise;
+  }
+
+/**
+ * Thread pool
+ * Purpose: Provides a thread pool with basic functionality similar to
+ * ThreadPoolExecutor
+ * Design: Locked control over a manager thread that controls the worker 
threads
+ */
+template<typename T>
+class ThreadPool
+    {
+    public:
+        ThreadPool(int max_worker_threads, bool daemon_threads=false) : 
max_worker_threads_(max_worker_threads)
+       ,daemon_threads_(daemon_threads), running_(false){
+         current_workers_ = 0;
+       }
+        virtual ~ThreadPool(){
+         shutdown();
+       }
+
+       /**
+        * Execute accepts a worker task and returns
+        * a future
+        * @param task this thread pool will subsume ownership of
+        * the worker task
+        * @return future with the impending result.
+        */
+        std::future<T> execute(Worker<T> &&task);
+       /**
+        * Starts the Thread Pool
+        */
+        void start();
+       /**
+        * Shutdown the thread pool and clear any
+        * currently running activities
+        */
+       void shutdown();
+       /**
+        * Set the max concurrent tasks. When this is done
+        * we must start and restart the thread pool if
+        * the number of tasks is less than the currently configured number
+        */
+       void setMaxConcurrentTasks(uint16_t max)
+       {
+         std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+         if (running_)
+         {
+           shutdown();
+         }
+         max_worker_threads_= max;
+         if (!running_)
+           start();
+       }
+
+    protected:
+
+       /**
+       * Drain will notify tasks to stop following notification
+       */
+       void drain()
+       {
+         while(current_workers_ > 0)
+         {
+           tasks_available_.notify_one();
+         }
+       }
+       // determines if threads are detached
+       bool daemon_threads_;
+       // max worker threads
+        int max_worker_threads_;
+       // current worker tasks.
+       std::atomic<int> current_workers_;
+       // thread queue
+        std::vector<std::thread> thread_queue_;
+       // manager thread
+        std::thread manager_thread_;
+       // atomic running boolean
+       std::atomic<bool> running_;
+       // worker queue of worker objects
+        std::queue<Worker<T>> worker_queue_;
+       // notification for available work
+        std::condition_variable tasks_available_;
+       // manager mutex
+       std::recursive_mutex manager_mutex_;
+       // work queue mutex
+        std::mutex worker_queue_mutex_;
+
+       /**
+        * Call for the manager to start worker threads
+        */
+       void startWorkers();
+
+       /**
+        * Runs worker tasks
+        */
+        void run_tasks();
+    };
+
+template<typename T>
+std::future<T> ThreadPool<T>::execute(Worker<T> &&task){
+
+  std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+  bool wasEmpty = worker_queue_.empty();
+  std::future<T> future = task.getPromise()->get_future();
+  worker_queue_.push(std::move(task));
+  if (wasEmpty)
+  {
+      tasks_available_.notify_one();
+  }
+  return future;
+}
+
+template< typename T>
+void  ThreadPool<T>::startWorkers(){
+    for (int i = 0; i < max_worker_threads_; i++)
+    {
+      thread_queue_.push_back( std::thread(&ThreadPool::run_tasks, this));
+      current_workers_++;
+    }
+
+     if (daemon_threads_)
+     {
+       for (auto &thread : thread_queue_){
+           thread.detach();
+       }
+     }
+    for (auto &thread : thread_queue_)
+    {
+       if (thread.joinable())
+         thread.join();
+    }
+}
+template< typename T>
+void  ThreadPool<T>::run_tasks()
+{
+  while (running_.load())
+    {
+       std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+       if (worker_queue_.empty())
+       {
+
+           tasks_available_.wait(lock);
+       }
+
+       if (!running_.load())
+         break;
+
+       if (worker_queue_.empty())
+         continue;
+       Worker<T> task = std::move(worker_queue_.front());
+       worker_queue_.pop();
+       task.run();
+    }
+    current_workers_--;
+
+}
+template< typename T>
+ void ThreadPool<T>::start()
+{
+  std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+  if (!running_)
+  {
+    running_ = true;
+    manager_thread_ = std::thread(&ThreadPool::startWorkers, this);
+
+  }
+}
+
+template< typename T>
+void ThreadPool<T>::shutdown(){
+
+  std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
+  if (running_.load())
+  {
+
+    running_.store(false);
+
+    drain();
+    if (manager_thread_.joinable())
+      manager_thread_.join();
+    {
+    std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+    thread_queue_.clear();
+    current_workers_ = 0;
+    while(!worker_queue_.empty())
+      worker_queue_.pop();
+    }
+  }
+}
+
+
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/src/core/ConfigurableComponent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurableComponent.cpp 
b/libminifi/src/core/ConfigurableComponent.cpp
index e5703d1..67a43dd 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -28,14 +28,14 @@ namespace minifi {
 namespace core {
 
 ConfigurableComponent::ConfigurableComponent(std::shared_ptr<logging::Logger> 
logger)
-    : logger_(logger) {
+    : my_logger_(logger) {
 
 }
 
 ConfigurableComponent::ConfigurableComponent(
     const ConfigurableComponent &&other)
     : properties_(std::move(other.properties_)),
-      logger_(std::move(other.logger_)) {
+      my_logger_(std::move(other.my_logger_)) {
 
 }
 ConfigurableComponent::~ConfigurableComponent() {
@@ -57,7 +57,7 @@ bool ConfigurableComponent::getProperty(const std::string 
name,
   if (it != properties_.end()) {
     Property item = it->second;
     value = item.getValue();
-    logger_->log_info("Processor %s property name %s value %s", name.c_str(),
+    my_logger_->log_info("Processor %s property name %s value %s", 
name.c_str(),
                       item.getName().c_str(), value.c_str());
     return true;
   } else {
@@ -79,7 +79,7 @@ bool ConfigurableComponent::setProperty(const std::string 
name,
     Property item = it->second;
     item.setValue(value);
     properties_[item.getName()] = item;
-    logger_->log_info("Component %s property name %s value %s", name.c_str(),
+    my_logger_->log_info("Component %s property name %s value %s", 
name.c_str(),
                       item.getName().c_str(), value.c_str());
     return true;
   } else {
@@ -101,7 +101,7 @@ bool ConfigurableComponent::setProperty(Property &prop, 
std::string value) {
     Property item = it->second;
     item.setValue(value);
     properties_[item.getName()] = item;
-    logger_->log_info("property name %s value %s", prop.getName().c_str(),
+    my_logger_->log_info("property name %s value %s", prop.getName().c_str(),
                       item.getName().c_str(), value.c_str());
     return true;
   } else {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp 
b/libminifi/src/core/FlowConfiguration.cpp
index c6472cc..f2dda0d 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -64,9 +64,9 @@ std::shared_ptr<core::Processor> 
FlowConfiguration::createProcessor(
     processor = std::make_shared<
         org::apache::nifi::minifi::processors::ListenSyslog>(name, uuid);
   } else if (name
-      == org::apache::nifi::minifi::processors::ListenHTTP::ProcessorName) {
-    processor = std::make_shared<
-        org::apache::nifi::minifi::processors::ListenHTTP>(name, uuid);
+        == org::apache::nifi::minifi::processors::ListenHTTP::ProcessorName) {
+      processor = std::make_shared<
+          org::apache::nifi::minifi::processors::ListenHTTP>(name, uuid);
   } else if (name
       == org::apache::nifi::minifi::processors::ExecuteProcess::ProcessorName) 
{
     processor = std::make_shared<

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index e6fa7c4..09c3fa3 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -50,7 +50,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::create() {
   return record;
 }
 
-std::shared_ptr<core::FlowFile> 
ProcessSession::create(std::shared_ptr<core::FlowFile> &parent) {
+std::shared_ptr<core::FlowFile> 
ProcessSession::create(std::shared_ptr<core::FlowFile> &&parent) {
   std::map<std::string, std::string> empty;
   std::shared_ptr<core::FlowFile> record = 
std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),
       empty);
@@ -530,6 +530,97 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> 
&&flow,
   }
 }
 
+/**
+  * Imports a file from the data stream
+  * @param stream incoming data stream that contains the data to store into a 
file
+  * @param flow flow file
+  *
+  */
+void ProcessSession::importFrom(io::DataStream &stream,
+                            std::shared_ptr<core::FlowFile> &&flow) {
+  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>();
+
+  int max_read = getpagesize();
+  std::vector<uint8_t> charBuffer;
+  charBuffer.resize(max_read);
+
+  try {
+    std::ofstream fs;
+    uint64_t startTime = getTimeMillis();
+    fs.open(claim->getContentFullPath().c_str(),
+            std::fstream::out | std::fstream::binary | std::fstream::trunc);
+
+
+    if (fs.is_open() ) {
+
+      size_t position = 0;
+      const size_t max_size = stream.getSize();
+      size_t read_size = max_read;
+      while(position < max_size)
+      {
+        if ((max_size - position) > max_read)
+        {
+          read_size = max_read;
+        }
+        else
+        {
+          read_size = max_size - position;
+        }
+        charBuffer.clear();
+        stream.readData(charBuffer,read_size);
+
+        fs.write((const char*)charBuffer.data(),read_size);
+        position+=read_size;
+      }
+      // Open the source file and stream to the flow file
+
+      if (fs.good() && fs.tellp() >= 0) {
+        flow->setSize(fs.tellp());
+        flow->setOffset(0);
+        if (flow->getResourceClaim() != nullptr) {
+          // Remove the old claim
+          flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+          flow->clearResourceClaim();
+        }
+        flow->setResourceClaim(claim);
+        claim->increaseFlowFileRecordOwnedCount();
+
+        logger_->log_debug(
+            "Import offset %d length %d into content %s for FlowFile UUID %s",
+            flow->getOffset(), flow->getSize(),
+            flow->getResourceClaim()->getContentFullPath().c_str(),
+            flow->getUUIDStr().c_str());
+
+        fs.close();
+        std::string details = process_context_->getProcessorNode().getName()
+            + " modify flow record content " + flow->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flow, details, endTime - startTime);
+      } else {
+        fs.close();
+        throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+      }
+    } else {
+      throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+    }
+
+  } catch (std::exception &exception) {
+    if (flow && flow->getResourceClaim() == claim) {
+      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+      flow->clearResourceClaim();
+    }
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
+  } catch (...) {
+    if (flow && flow->getResourceClaim() == claim) {
+      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
+      flow->clearResourceClaim();
+    }
+    logger_->log_debug("Caught Exception during process session write");
+    throw;
+  }
+}
+
 void ProcessSession::import(std::string source,
                             std::shared_ptr<core::FlowFile> &flow,
                             bool keepSource, uint64_t offset) {
@@ -639,6 +730,7 @@ void ProcessSession::import(std::string source,
           fs.write(buf, input.gcount());
       }
 
+
       if (fs.good() && fs.tellp() >= 0) {
         flow->setSize(fs.tellp());
         flow->setOffset(0);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/src/processors/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetFile.cpp 
b/libminifi/src/processors/GetFile.cpp
index cf05657..652caf7 100644
--- a/libminifi/src/processors/GetFile.cpp
+++ b/libminifi/src/processors/GetFile.cpp
@@ -1,6 +1,4 @@
 /**
- * @file GetFile.cpp
- * GetFile class implementation
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -87,8 +85,8 @@ core::Property GetFile::FileFilter(
     "File Filter",
     "Only files whose names match the given regular expression will be picked 
up",
     "[^\\.].*");
-core::Relationship GetFile::Success(
-    "success", "All files are routed to success");
+core::Relationship GetFile::Success("success",
+                                    "All files are routed to success");
 
 void GetFile::initialize() {
   // Set the supported properties
@@ -111,77 +109,78 @@ void GetFile::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void GetFile::onTrigger(
-    core::ProcessContext *context,
-    core::ProcessSession *session) {
+void GetFile::onSchedule(core::ProcessContext *context,
+                         core::ProcessSessionFactory *sessionFactory) {
   std::string value;
 
   logger_->log_info("onTrigger GetFile");
   if (context->getProperty(Directory.getName(), value)) {
-    _directory = value;
+    request_.directory = value;
   }
   if (context->getProperty(BatchSize.getName(), value)) {
-    core::Property::StringToInt(value, _batchSize);
+    core::Property::StringToInt(value, request_.batchSize);
   }
   if (context->getProperty(IgnoreHiddenFile.getName(), value)) {
     org::apache::nifi::minifi::utils::StringUtils::StringToBool(
-        value, _ignoreHiddenFile);
+        value, request_.ignoreHiddenFile);
   }
   if (context->getProperty(KeepSourceFile.getName(), value)) {
     org::apache::nifi::minifi::utils::StringUtils::StringToBool(
-        value, _keepSourceFile);
+        value, request_.keepSourceFile);
   }
 
   logger_->log_info("onTrigger GetFile");
   if (context->getProperty(MaxAge.getName(), value)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(value, _maxAge,
-                                                                unit)
-        && core::Property::ConvertTimeUnitToMS(
-            _maxAge, unit, _maxAge)) {
+    if (core::Property::StringToTime(value, request_.maxAge, unit)
+        && core::Property::ConvertTimeUnitToMS(request_.maxAge, unit,
+                                               request_.maxAge)) {
 
     }
   }
   if (context->getProperty(MinAge.getName(), value)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(value, _minAge,
-                                                                unit)
-        && core::Property::ConvertTimeUnitToMS(
-            _minAge, unit, _minAge)) {
+    if (core::Property::StringToTime(value, request_.minAge, unit)
+        && core::Property::ConvertTimeUnitToMS(request_.minAge, unit,
+                                               request_.minAge)) {
 
     }
   }
   if (context->getProperty(MaxSize.getName(), value)) {
-    core::Property::StringToInt(value, _maxSize);
+    core::Property::StringToInt(value, request_.maxSize);
   }
   if (context->getProperty(MinSize.getName(), value)) {
-    core::Property::StringToInt(value, _minSize);
+    core::Property::StringToInt(value, request_.minSize);
   }
   if (context->getProperty(PollInterval.getName(), value)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(value,
-                                                                _pollInterval,
-                                                                unit)
-        && core::Property::ConvertTimeUnitToMS(
-            _pollInterval, unit, _pollInterval)) {
+    if (core::Property::StringToTime(value, request_.pollInterval, unit)
+        && core::Property::ConvertTimeUnitToMS(request_.pollInterval, unit,
+                                               request_.pollInterval)) {
 
     }
   }
   if (context->getProperty(Recurse.getName(), value)) {
-    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value,
-                                                                _recursive);
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(
+        value, request_.recursive);
   }
 
   if (context->getProperty(FileFilter.getName(), value)) {
-    _fileFilter = value;
+    request_.fileFilter = value;
   }
+}
+
+void GetFile::onTrigger(core::ProcessContext *context,
+                        core::ProcessSession *session) {
 
   // Perform directory list
   logger_->log_info("Is listing empty %i", isListingEmpty());
   if (isListingEmpty()) {
-    if (_pollInterval == 0
-        || (getTimeMillis() - _lastDirectoryListingTime) > _pollInterval) {
-      performListing(_directory);
+
+    if (request_.pollInterval == 0
+        || (getTimeMillis() - last_listing_time_) > request_.pollInterval) {
+      performListing(request_.directory, request_);
+      last_listing_time_.store(getTimeMillis());
     }
   }
   logger_->log_info("Is listing empty %i", isListingEmpty());
@@ -189,7 +188,7 @@ void GetFile::onTrigger(
   if (!isListingEmpty()) {
     try {
       std::queue<std::string> list;
-      pollListing(list, _batchSize);
+      pollListing(list, request_);
       while (!list.empty()) {
 
         std::string fileName = list.front();
@@ -205,7 +204,7 @@ void GetFile::onTrigger(
         flowFile->updateKeyedAttribute(FILENAME, name);
         flowFile->updateKeyedAttribute(PATH, path);
         flowFile->addKeyedAttribute(ABSOLUTE_PATH, fileName);
-        session->import(fileName, flowFile, _keepSourceFile);
+        session->import(fileName, flowFile, request_.keepSourceFile);
         session->transfer(flowFile, Success);
       }
     } catch (std::exception &exception) {
@@ -230,10 +229,12 @@ void GetFile::putListing(std::string fileName) {
   _dirList.push(fileName);
 }
 
-void GetFile::pollListing(std::queue<std::string> &list, int maxSize) {
+void GetFile::pollListing(std::queue<std::string> &list,
+                          const GetFileRequest &request) {
   std::lock_guard<std::mutex> lock(mutex_);
 
-  while (!_dirList.empty() && (maxSize == 0 || list.size() < maxSize)) {
+  while (!_dirList.empty()
+      && (request.maxSize == 0 || list.size() < request.maxSize)) {
     std::string fileName = _dirList.front();
     _dirList.pop();
     list.push(fileName);
@@ -242,37 +243,38 @@ void GetFile::pollListing(std::queue<std::string> &list, 
int maxSize) {
   return;
 }
 
-bool GetFile::acceptFile(std::string fullName, std::string name) {
+bool GetFile::acceptFile(std::string fullName, std::string name,
+                         const GetFileRequest &request) {
   struct stat statbuf;
 
   if (stat(fullName.c_str(), &statbuf) == 0) {
-    if (_minSize > 0 && statbuf.st_size < _minSize)
+    if (request.minSize > 0 && statbuf.st_size < request.minSize)
       return false;
 
-    if (_maxSize > 0 && statbuf.st_size > _maxSize)
+    if (request.maxSize > 0 && statbuf.st_size > request.maxSize)
       return false;
 
     uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
     uint64_t fileAge = getTimeMillis() - modifiedTime;
-    if (_minAge > 0 && fileAge < _minAge)
+    if (request.minAge > 0 && fileAge < request.minAge)
       return false;
-    if (_maxAge > 0 && fileAge > _maxAge)
+    if (request.maxAge > 0 && fileAge > request.maxAge)
       return false;
 
-    if (_ignoreHiddenFile && fullName.c_str()[0] == '.')
+    if (request.ignoreHiddenFile && fullName.c_str()[0] == '.')
       return false;
 
     if (access(fullName.c_str(), R_OK) != 0)
       return false;
 
-    if (_keepSourceFile == false && access(fullName.c_str(), W_OK) != 0)
+    if (request.keepSourceFile == false && access(fullName.c_str(), W_OK) != 0)
       return false;
 
 #ifdef __GNUC__
 #if (__GNUC__ >= 4)
 #if (__GNUC_MINOR__ < 9)
     regex_t regex;
-    int ret = regcomp(&regex, _fileFilter.c_str(), 0);
+    int ret = regcomp(&regex, request.fileFilter.c_str(), 0);
     if (ret)
       return false;
     ret = regexec(&regex, name.c_str(), (size_t) 0, NULL, 0);
@@ -281,7 +283,7 @@ bool GetFile::acceptFile(std::string fullName, std::string 
name) {
       return false;
 #else
     try {
-      std::regex re(_fileFilter);
+      std::regex re(fileFilter);
 
       if (!std::regex_match(name, re)) {
         return false;
@@ -301,7 +303,7 @@ bool GetFile::acceptFile(std::string fullName, std::string 
name) {
   return false;
 }
 
-void GetFile::performListing(std::string dir) {
+void GetFile::performListing(std::string dir, const GetFileRequest &request) {
   logger_->log_info("Performing file listing against %s", dir.c_str());
   DIR *d;
   d = opendir(dir.c_str());
@@ -317,14 +319,14 @@ void GetFile::performListing(std::string dir) {
     std::string d_name = entry->d_name;
     if ((entry->d_type & DT_DIR)) {
       // if this is a directory
-      if (_recursive && strcmp(d_name.c_str(), "..") != 0
+      if (request.recursive && strcmp(d_name.c_str(), "..") != 0
           && strcmp(d_name.c_str(), ".") != 0) {
         std::string path = dir + "/" + d_name;
-        performListing(path);
+        performListing(path, request);
       }
     } else {
       std::string fileName = dir + "/" + d_name;
-      if (acceptFile(fileName, d_name)) {
+      if (acceptFile(fileName, d_name, request)) {
         // check whether we can take this file
         putListing(fileName);
       }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/src/processors/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/PutFile.cpp 
b/libminifi/src/processors/PutFile.cpp
index 85cf09b..51fbb6f 100644
--- a/libminifi/src/processors/PutFile.cpp
+++ b/libminifi/src/processors/PutFile.cpp
@@ -24,13 +24,13 @@
 #include <fstream>
 #include <uuid/uuid.h>
 
+#include "io/validation.h"
 #include "utils/StringUtils.h"
 #include "utils/TimeUtil.h"
 #include "processors/PutFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
-
 namespace org {
 namespace apache {
 namespace nifi {
@@ -43,15 +43,16 @@ const std::string 
PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL("fail");
 
 const std::string PutFile::ProcessorName("PutFile");
 
-core::Property PutFile::Directory(
-    "Output Directory", "The output directory to which to put files", ".");
+core::Property PutFile::Directory("Output Directory",
+                                  "The output directory to which to put files",
+                                  ".");
 core::Property PutFile::ConflictResolution(
     "Conflict Resolution Strategy",
     "Indicates what should happen when a file with the same name already 
exists in the output directory",
     CONFLICT_RESOLUTION_STRATEGY_FAIL);
 
-core::Relationship PutFile::Success(
-    "success", "All files are routed to success");
+core::Relationship PutFile::Success("success",
+                                    "All files are routed to success");
 core::Relationship PutFile::Failure(
     "failure",
     "Failed files (conflict, write failure, etc.) are transferred to failure");
@@ -69,25 +70,30 @@ void PutFile::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void PutFile::onTrigger(
-    core::ProcessContext *context,
-    core::ProcessSession *session) {
-  std::string directory;
+void PutFile::onSchedule(core::ProcessContext *context,
+                         core::ProcessSessionFactory *sessionFactory) {
 
-  if (!context->getProperty(Directory.getName(), directory)) {
+  if (!context->getProperty(Directory.getName(), directory_)) {
     logger_->log_error("Directory attribute is missing or invalid");
-    return;
   }
 
-  std::string conflictResolution;
-
-  if (!context->getProperty(ConflictResolution.getName(), conflictResolution)) 
{
+  if (!context->getProperty(ConflictResolution.getName(),
+                            conflict_resolution_)) {
     logger_->log_error(
         "Conflict Resolution Strategy attribute is missing or invalid");
+  }
+
+}
+void PutFile::onTrigger(core::ProcessContext *context,
+                        core::ProcessSession *session) {
+
+  if (IsNullOrEmpty(directory_) || IsNullOrEmpty(conflict_resolution_)) {
+    context->yield();
     return;
   }
 
-  std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->get());
+  std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
+      FlowFileRecord>(session->get());
 
   // Do nothing if there are no incoming files
   if (!flowFile) {
@@ -103,17 +109,17 @@ void PutFile::onTrigger(
   uuid_generate(tmpFileUuid);
   uuid_unparse_lower(tmpFileUuid, tmpFileUuidStr);
   std::stringstream tmpFileSs;
-  tmpFileSs << directory << "/." << filename << "." << tmpFileUuidStr;
+  tmpFileSs << directory_ << "/." << filename << "." << tmpFileUuidStr;
   std::string tmpFile = tmpFileSs.str();
   logger_->log_info("PutFile using temporary file %s", tmpFile.c_str());
 
   // Determine dest full file paths
   std::stringstream destFileSs;
-  destFileSs << directory << "/" << filename;
+  destFileSs << directory_ << "/" << filename;
   std::string destFile = destFileSs.str();
 
   logger_->log_info("PutFile writing file %s into directory %s",
-                    filename.c_str(), directory.c_str());
+                    filename.c_str(), directory_.c_str());
 
   // If file exists, apply conflict resolution strategy
   struct stat statResult;
@@ -121,11 +127,11 @@ void PutFile::onTrigger(
   if (stat(destFile.c_str(), &statResult) == 0) {
     logger_->log_info(
         "Destination file %s exists; applying Conflict Resolution Strategy: 
%s",
-        destFile.c_str(), conflictResolution.c_str());
+        destFile.c_str(), conflict_resolution_.c_str());
 
-    if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_REPLACE) {
+    if (conflict_resolution_ == CONFLICT_RESOLUTION_STRATEGY_REPLACE) {
       putFile(session, flowFile, tmpFile, destFile);
-    } else if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_IGNORE) {
+    } else if (conflict_resolution_ == CONFLICT_RESOLUTION_STRATEGY_IGNORE) {
       session->transfer(flowFile, Success);
     } else {
       session->transfer(flowFile, Failure);
@@ -136,8 +142,8 @@ void PutFile::onTrigger(
 }
 
 bool PutFile::putFile(core::ProcessSession *session,
-                      std::shared_ptr<FlowFileRecord> flowFile, const 
std::string &tmpFile,
-                      const std::string &destFile) {
+                      std::shared_ptr<FlowFileRecord> flowFile,
+                      const std::string &tmpFile, const std::string &destFile) 
{
 
   ReadCallback cb(tmpFile, destFile);
   session->read(flowFile, &cb);
@@ -205,7 +211,6 @@ PutFile::ReadCallback::~ReadCallback() {
   unlink(_tmpFile.c_str());
 }
 
-
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/src/processors/TailFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/TailFile.cpp 
b/libminifi/src/processors/TailFile.cpp
index 859daa6..bcdd8fd 100644
--- a/libminifi/src/processors/TailFile.cpp
+++ b/libminifi/src/processors/TailFile.cpp
@@ -145,10 +145,10 @@ static bool sortTailMatchedFileItem(TailMatchedFileItem i,
                                     TailMatchedFileItem j) {
   return (i.modifiedTime < j.modifiedTime);
 }
-void TailFile::checkRollOver() {
+void TailFile::checkRollOver(const std::string &fileLocation, const 
std::string &fileName) {
   struct stat statbuf;
   std::vector<TailMatchedFileItem> matchedFiles;
-  std::string fullPath = this->_fileLocation + "/" + _currentTailFileName;
+  std::string fullPath = fileLocation + "/" + _currentTailFileName;
 
   if (stat(fullPath.c_str(), &statbuf) == 0) {
     if (statbuf.st_size > this->_currentTailFilePosition)
@@ -157,12 +157,12 @@ void TailFile::checkRollOver() {
 
     uint64_t modifiedTimeCurrentTailFile =
         ((uint64_t) (statbuf.st_mtime) * 1000);
-    std::string pattern = _fileName;
-    std::size_t found = _fileName.find_last_of(".");
+    std::string pattern = fileName;
+    std::size_t found = fileName.find_last_of(".");
     if (found != std::string::npos)
-      pattern = _fileName.substr(0, found);
+      pattern = fileName.substr(0, found);
     DIR *d;
-    d = opendir(this->_fileLocation.c_str());
+    d = opendir(fileLocation.c_str());
     if (!d)
       return;
     while (1) {
@@ -173,7 +173,7 @@ void TailFile::checkRollOver() {
       std::string d_name = entry->d_name;
       if (!(entry->d_type & DT_DIR)) {
         std::string fileName = d_name;
-        std::string fileFullName = this->_fileLocation + "/" + d_name;
+        std::string fileFullName = fileLocation + "/" + d_name;
         if (fileFullName.find(pattern) != std::string::npos
             && stat(fileFullName.c_str(), &statbuf) == 0) {
           if (((uint64_t) (statbuf.st_mtime) * 1000)
@@ -215,24 +215,28 @@ void TailFile::checkRollOver() {
 void TailFile::onTrigger(
     core::ProcessContext *context,
     core::ProcessSession *session) {
+
+  std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
   std::string value;
+  std::string fileLocation="";
+  std::string fileName="";
   if (context->getProperty(FileName.getName(), value)) {
     std::size_t found = value.find_last_of("/\\");
-    this->_fileLocation = value.substr(0, found);
-    this->_fileName = value.substr(found + 1);
+    fileLocation = value.substr(0, found);
+    fileName = value.substr(found + 1);
   }
   if (context->getProperty(StateFile.getName(), value)) {
     _stateFile = value + "." + getUUIDStr();
   }
   if (!this->_stateRecovered) {
     _stateRecovered = true;
-    this->_currentTailFileName = _fileName;
+    this->_currentTailFileName = fileName;
     this->_currentTailFilePosition = 0;
     // recover the state if we have not done so
     this->recoverState();
   }
-  checkRollOver();
-  std::string fullPath = this->_fileLocation + "/" + _currentTailFileName;
+  checkRollOver(fileLocation,fileName);
+  std::string fullPath = fileLocation + "/" + _currentTailFileName;
   struct stat statbuf;
   if (stat(fullPath.c_str(), &statbuf) == 0) {
     if (statbuf.st_size <= this->_currentTailFilePosition)
@@ -241,13 +245,13 @@ void TailFile::onTrigger(
       context->yield();
       return;
     }
-    std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());;
+    std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());
     if (!flowFile)
       return;
     std::size_t found = _currentTailFileName.find_last_of(".");
     std::string baseName = _currentTailFileName.substr(0, found);
     std::string extension = _currentTailFileName.substr(found + 1);
-    flowFile->updateKeyedAttribute(PATH, _fileLocation);
+    flowFile->updateKeyedAttribute(PATH, fileLocation);
     flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
     session->import(fullPath, flowFile, true, this->_currentTailFilePosition);
     session->transfer(flowFile, Success);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/test/TestExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/TestExecuteProcess.cpp 
b/libminifi/test/TestExecuteProcess.cpp
new file mode 100644
index 0000000..6aa51fc
--- /dev/null
+++ b/libminifi/test/TestExecuteProcess.cpp
@@ -0,0 +1,131 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <uuid/uuid.h>
+#include <fstream>
+#include "FlowController.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "core/logging/LogAppenders.h"
+#include "core/logging/BaseLogger.h"
+#include "processors/GetFile.h"
+#include "core/core.h"
+#include "core/FlowFile.h"
+#include "core/Processor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessorNode.h"
+
+int main(int argc, char  **argv)
+{
+
+  std::ostringstream oss;
+  std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+      logging::BaseLogger>(
+      new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
+                                                                         0));
+  std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
+  logger->updateLogger(std::move(outputLogger));
+
+
+  outputLogger = std::unique_ptr<logging::BaseLogger>(
+        new org::apache::nifi::minifi::core::logging::NullAppender());
+    logger->updateLogger(std::move(outputLogger));
+
+  std::shared_ptr<core::Processor> processor = std::make_shared<
+      org::apache::nifi::minifi::processors::ExecuteProcess>("executeProcess");
+  processor->setMaxConcurrentTasks(1);
+
+  std::shared_ptr<core::Repository> test_repo =
+      std::make_shared<TestRepository>();
+
+  std::shared_ptr<TestRepository> repo =
+      std::static_pointer_cast<TestRepository>(test_repo);
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
+      TestFlowController>(test_repo, test_repo);
+
+  uuid_t processoruuid;
+  assert(true == processor->getUUID(processoruuid));
+
+  std::shared_ptr<minifi::Connection> connection = std::make_shared<
+      minifi::Connection>(test_repo, "executeProcessConnection");
+  connection->setRelationship(core::Relationship("success", "description"));
+
+  // link the connections so that we can test results at the end for this
+  connection->setSource(processor);
+  connection->setDestination(processor);
+
+  connection->setSourceUUID(processoruuid);
+  connection->setDestinationUUID(processoruuid);
+
+  processor->addConnection(connection);
+  assert(processor->getName() == "executeProcess");
+
+  std::shared_ptr<core::FlowFile> record;
+  processor->setScheduledState(core::ScheduledState::RUNNING);
+
+  processor->initialize();
+
+  std::atomic<bool> is_ready(false);
+
+  std::vector<std::thread> processor_workers;
+
+  core::ProcessorNode node2(processor);
+  std::shared_ptr<core::ProcessContext> contextset = std::make_shared<
+      core::ProcessContext>(node2, test_repo);
+  core::ProcessSessionFactory factory(contextset.get());
+  processor->onSchedule(contextset.get(), &factory);
+
+  for (int i = 0; i < 1; i++) {
+    //
+    processor_workers.push_back(
+        std::thread(
+            [processor,test_repo,&is_ready]()
+            {
+              core::ProcessorNode node(processor);
+              std::shared_ptr<core::ProcessContext> context = 
std::make_shared<core::ProcessContext>(node, test_repo);
+              
context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command,"sleep
 0.5");
+              
//context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::CommandArguments,"
 1 >>" + ss.str());
+              std::shared_ptr<core::ProcessSession> session = 
std::make_shared<core::ProcessSession>(context.get());
+              while(!is_ready.load(std::memory_order_relaxed)) {
+
+              }
+
+              processor->onTrigger(context.get(), session.get());
+
+            }));
+  }
+
+  is_ready.store(true, std::memory_order_relaxed);
+  //is_ready.store(true);
+
+  std::for_each(processor_workers.begin(), processor_workers.end(),
+                [](std::thread &t)
+                {
+                  t.join();
+                });
+
+    outputLogger = std::unique_ptr<logging::BaseLogger>(
+      new org::apache::nifi::minifi::core::logging::NullAppender());
+  logger->updateLogger(std::move(outputLogger));
+
+
+  std::shared_ptr<org::apache::nifi::minifi::processors::ExecuteProcess> execp 
=
+      std::static_pointer_cast<
+          org::apache::nifi::minifi::processors::ExecuteProcess>(processor);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/3676468f/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp 
b/libminifi/test/unit/ProcessorTests.cpp
index c040e4d..91a55f7 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -41,17 +41,16 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
 
   testController.enableDebug();
 
-  
-
   std::shared_ptr<core::Processor> processor = std::make_shared<
       org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
 
-  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
-  
-  std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
-  std::shared_ptr<minifi::FlowController> controller =  
std::make_shared<TestFlowController>(test_repo, test_repo);
+  std::shared_ptr<core::Repository> test_repo =
+      std::make_shared<TestRepository>();
 
-      
+  std::shared_ptr<TestRepository> repo =
+      std::static_pointer_cast<TestRepository>(test_repo);
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
+      TestFlowController>(test_repo, test_repo);
 
   char format[] = "/tmp/gt.XXXXXX";
   char *dir = testController.createTempDirectory(format);
@@ -59,9 +58,8 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
   uuid_t processoruuid;
   REQUIRE(true == processor->getUUID(processoruuid));
 
-
   std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(test_repo,"getfileCreate2Connection");
+      minifi::Connection>(test_repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
   // link the connections so that we can test results at the end for this
@@ -77,11 +75,12 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
   core::ProcessorNode node(processor);
 
   core::ProcessContext context(node, test_repo);
+  core::ProcessSessionFactory factory(&context);
   
context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
                       dir);
   core::ProcessSession session(&context);
 
-
+  processor->onSchedule(&context, &factory);
   REQUIRE(processor->getName() == "getfileCreate2");
 
   std::shared_ptr<core::FlowFile> record;
@@ -107,7 +106,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
   unlink(ss.str().c_str());
   reporter = session.getProvenanceReporter();
 
-  REQUIRE( processor->getName() == "getfileCreate2");
+  REQUIRE(processor->getName() == "getfileCreate2");
 
   records = reporter->getEvents();
 
@@ -140,7 +139,104 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
     if (!found)
       throw std::runtime_error("Did not find record");
 
+  }
+
+}
+
+TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") {
+
+  TestController testController;
+
+  testController.enableDebug();
+
+  std::shared_ptr<core::Processor> processor = std::make_shared<
+      org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
+
+  std::shared_ptr<core::Repository> test_repo =
+      std::make_shared<TestRepository>();
+
+  std::shared_ptr<TestRepository> repo =
+      std::static_pointer_cast<TestRepository>(test_repo);
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared<
+      TestFlowController>(test_repo, test_repo);
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  uuid_t processoruuid;
+  REQUIRE(true == processor->getUUID(processoruuid));
+
+  std::shared_ptr<minifi::Connection> connection = std::make_shared<
+      minifi::Connection>(test_repo, "getfileCreate2Connection");
+  connection->setRelationship(core::Relationship("success", "description"));
+
+  // link the connections so that we can test results at the end for this
+  connection->setSource(processor);
+  connection->setDestination(processor);
+
+  connection->setSourceUUID(processoruuid);
+  connection->setDestinationUUID(processoruuid);
+
+  processor->addConnection(connection);
+  REQUIRE(dir != NULL);
+
+  core::ProcessorNode node(processor);
+  core::ProcessContext context(node, test_repo);
+  core::ProcessSessionFactory factory(&context);
+  
context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory,
+                      dir);
+  // replicate 10 threads
+  processor->setScheduledState(core::ScheduledState::RUNNING);
+  processor->onSchedule(&context, &factory);
+
+  for (int i = 0; i < 10; i++) {
+
+    core::ProcessSession session(&context);
+    REQUIRE(processor->getName() == "getfileCreate2");
+
+    std::shared_ptr<core::FlowFile> record;
+
+    processor->onTrigger(&context, &session);
+
+    provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
+    std::set<provenance::ProvenanceEventRecord*> records =
+        reporter->getEvents();
+    record = session.get();
+    REQUIRE(record == nullptr);
+    REQUIRE(records.size() == 0);
+
+    std::fstream file;
+    std::stringstream ss;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
 
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    processor->onTrigger(&context, &session);
+    unlink(ss.str().c_str());
+    rmdir(dir);
+    reporter = session.getProvenanceReporter();
+
+    REQUIRE(processor->getName() == "getfileCreate2");
+
+    records = reporter->getEvents();
+
+    for (provenance::ProvenanceEventRecord *provEventRecord : records) {
+      REQUIRE(provEventRecord->getComponentType() == processor->getName());
+    }
+    session.commit();
+    std::shared_ptr<core::FlowFile> ffr = session.get();
+
+    REQUIRE(2 == repo->getRepoMap().size());
+
+    for (auto entry : repo->getRepoMap()) {
+      provenance::ProvenanceEventRecord newRecord;
+      newRecord.DeSerialize((uint8_t*) entry.second.data(),
+                            entry.second.length());
+
+    }
   }
 
 }
@@ -158,9 +254,7 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
 
   testController.enableDebug();
 
-
-  std::shared_ptr<core::Repository> repo = std::make_shared<
-      TestRepository>();
+  std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
 
   std::shared_ptr<core::Processor> processor = std::make_shared<
       org::apache::nifi::minifi::processors::GetFile>("getfileCreate2");
@@ -177,25 +271,22 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   uuid_t logattribute_uuid;
   REQUIRE(true == logAttribute->getUUID(logattribute_uuid));
 
-
   std::shared_ptr<minifi::Connection> connection = std::make_shared<
-      minifi::Connection>(repo,"getfileCreate2Connection");
+      minifi::Connection>(repo, "getfileCreate2Connection");
   connection->setRelationship(core::Relationship("success", "description"));
 
   std::shared_ptr<minifi::Connection> connection2 = std::make_shared<
-      minifi::Connection>(repo,"logattribute");
+      minifi::Connection>(repo, "logattribute");
   connection2->setRelationship(core::Relationship("success", "description"));
 
   // link the connections so that we can test results at the end for this
   connection->setSource(processor);
 
-
   // link the connections so that we can test results at the end for this
   connection->setDestination(logAttribute);
 
   connection2->setSource(logAttribute);
 
-
   connection2->setSourceUUID(logattribute_uuid);
   connection->setSourceUUID(processoruuid);
   connection->setDestinationUUID(logattribute_uuid);
@@ -219,10 +310,15 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
 
   std::shared_ptr<core::FlowFile> record;
   processor->setScheduledState(core::ScheduledState::RUNNING);
+
+  core::ProcessSessionFactory factory(&context);
+  processor->onSchedule(&context, &factory);
   processor->onTrigger(&context, &session);
 
   logAttribute->incrementActiveTasks();
   logAttribute->setScheduledState(core::ScheduledState::RUNNING);
+  core::ProcessSessionFactory factory2(&context2);
+  logAttribute->onSchedule(&context2, &factory2);
   logAttribute->onTrigger(&context2, &session2);
 
   provenance::ProvenanceReporter *reporter = session.getProvenanceReporter();
@@ -270,3 +366,13 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   logger->updateLogger(std::move(outputLogger));
 
 }
+
+int fileSize(const char *add) {
+  std::ifstream mySource;
+  mySource.open(add, std::ios_base::binary);
+  mySource.seekg(0, std::ios_base::end);
+  int size = mySource.tellg();
+  mySource.close();
+  return size;
+}
+

Reply via email to