http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/repository/VolatileProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileProvenanceRepository.h 
b/libminifi/include/core/repository/VolatileProvenanceRepository.h
new file mode 100644
index 0000000..7397751
--- /dev/null
+++ b/libminifi/include/core/repository/VolatileProvenanceRepository.h
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEPROVENANCEREPOSITORY_H_
+#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEPROVENANCEREPOSITORY_H_
+
+#include "VolatileRepository.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+/**
+ * Volatile provenance repository.
+ */
+class VolatileProvenanceRepository : public VolatileRepository<std::string>
+{
+
+ public:
+  explicit VolatileProvenanceRepository(std::string repo_name = "", 
std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = 
MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
+  MAX_REPOSITORY_STORAGE_SIZE,
+                                        uint64_t purgePeriod = 
REPOSITORY_PURGE_PERIOD)
+      : VolatileRepository(repo_name.length() > 0 ? repo_name : 
core::getClassName<VolatileRepository>(), "", maxPartitionMillis, 
maxPartitionBytes, purgePeriod)
+
+  {
+    purge_required_ = false;
+  }
+
+  virtual void run() {
+    repo_full_ = false;
+  }
+ private:
+
+};
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEPROVENANCEREPOSITORY_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/repository/VolatileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileRepository.h 
b/libminifi/include/core/repository/VolatileRepository.h
index 870a1f5..958d91a 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -22,9 +22,11 @@
 #include <chrono>
 #include <vector>
 #include <map>
+#include "core/SerializableComponent.h"
 #include "core/Core.h"
 #include "Connection.h"
 #include "utils/StringUtils.h"
+#include "AtomicRepoEntries.h"
 
 namespace org {
 namespace apache {
@@ -33,290 +35,89 @@ namespace minifi {
 namespace core {
 namespace repository {
 
-static uint16_t accounting_size = sizeof(std::vector<uint8_t>) + 
sizeof(std::string) + sizeof(size_t);
-
-class RepoValue {
- public:
-
-  explicit RepoValue() {
-  }
-
-  explicit RepoValue(std::string key, uint8_t *ptr, size_t size)
-      : key_(key) {
-    buffer_.resize(size);
-    std::memcpy(buffer_.data(), ptr, size);
-    fast_size_ = key.size() + size;
-  }
-
-  explicit RepoValue(RepoValue &&other)
-noexcept      : key_(std::move(other.key_)),
-      buffer_(std::move(other.buffer_)),
-      fast_size_(other.fast_size_) {
-      }
-
-      ~RepoValue()
-      {
-      }
-
-      std::string &getKey() {
-        return key_;
-      }
-
-      /**
-       * Return the size of the memory within the key
-       * buffer, the size of timestamp, and the general
-       * system word size
-       */
-      uint64_t size() {
-        return fast_size_;
-      }
-
-      size_t bufferSize() {
-        return buffer_.size();
-      }
-
-      void emplace(std::string &str) {
-        str.insert(0, reinterpret_cast<const char*>(buffer_.data()), 
buffer_.size());
-      }
-
-      RepoValue &operator=(RepoValue &&other) noexcept {
-        key_ = std::move(other.key_);
-        buffer_ = std::move(other.buffer_);
-        other.buffer_.clear();
-        return *this;
-      }
-
-    private:
-      size_t fast_size_;
-      std::string key_;
-      std::vector<uint8_t> buffer_;
-    };
-
-    /**
-     * Purpose: Atomic Entry allows us to create a statically
-     * sized ring buffer, with the ability to create
-     **/
-class AtomicEntry {
-
- public:
-  AtomicEntry()
-      : write_pending_(false),
-        has_value_(false) {
-
-  }
-
-  bool setRepoValue(RepoValue &new_value, size_t &prev_size) {
-    // delete the underlying pointer
-    bool lock = false;
-    if (!write_pending_.compare_exchange_weak(lock, true) && !lock)
-      return false;
-    if (has_value_) {
-      prev_size = value_.size();
-    }
-    value_ = std::move(new_value);
-    has_value_ = true;
-    try_unlock();
-    return true;
-  }
-
-  bool getValue(RepoValue &value) {
-    try_lock();
-    if (!has_value_) {
-      try_unlock();
-      return false;
-    }
-    value = std::move(value_);
-    has_value_ = false;
-    try_unlock();
-    return true;
-  }
-
-  bool getValue(const std::string &key, RepoValue &value) {
-    try_lock();
-    if (!has_value_) {
-      try_unlock();
-      return false;
-    }
-    if (value_.getKey() != key) {
-      try_unlock();
-      return false;
-    }
-    value = std::move(value_);
-    has_value_ = false;
-    try_unlock();
-    return true;
-  }
-
- private:
-
-  inline void try_lock() {
-    bool lock = false;
-    while (!write_pending_.compare_exchange_weak(lock, true) && !lock) {
-      // attempt again
-    }
-  }
-
-  inline void try_unlock() {
-    bool lock = true;
-    while (!write_pending_.compare_exchange_weak(lock, false) && lock) {
-      // attempt again
-    }
-  }
-
-  std::atomic<bool> write_pending_;
-  std::atomic<bool> has_value_;
-  RepoValue value_;
-};
-
 /**
  * Flow File repository
  * Design: Extends Repository and implements the run function, using LevelDB 
as the primary substrate.
  */
-class VolatileRepository : public core::Repository, public 
std::enable_shared_from_this<VolatileRepository> {
+template<typename T>
+class VolatileRepository : public core::Repository, public 
std::enable_shared_from_this<VolatileRepository<T>> {
  public:
 
   static const char *volatile_repo_max_count;
+  static const char *volatile_repo_max_bytes;
   // Constructor
 
-  VolatileRepository(std::string repo_name = "", std::string dir = 
REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = 
MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
+  explicit VolatileRepository(std::string repo_name = "", std::string dir = 
REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = 
MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
   MAX_REPOSITORY_STORAGE_SIZE,
-                     uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
+                              uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
       : Repository(repo_name.length() > 0 ? repo_name : 
core::getClassName<VolatileRepository>(), "", maxPartitionMillis, 
maxPartitionBytes, purgePeriod),
         max_size_(maxPartitionBytes * 0.75),
         current_index_(0),
         max_count_(10000),
+        current_size_(0),
         logger_(logging::LoggerFactory<VolatileRepository>::getLogger())
 
   {
-
+    purge_required_ = false;
   }
 
   // Destructor
-  ~VolatileRepository() {
-    for (auto ent : value_vector_) {
-      delete ent;
-    }
-  }
+  virtual ~VolatileRepository();
 
   /**
    * Initialize thevolatile repsitory
    **/
-  virtual bool initialize(const std::shared_ptr<Configure> &configure) {
-    std::string value = "";
-
-    if (configure != nullptr) {
-      int64_t max_cnt = 0;
-      std::stringstream strstream;
-      strstream << Configure::nifi_volatile_repository_options << getName() << 
"." << volatile_repo_max_count;
-      if (configure->get(strstream.str(), value)) {
-        if (core::Property::StringToInt(value, max_cnt)) {
-          max_count_ = max_cnt;
-        }
 
-      }
-    }
+  virtual bool initialize(const std::shared_ptr<Configure> &configure);
 
-    logger_->log_debug("Resizing value_vector_ for %s count is %d", getName(), 
max_count_);
-    value_vector_.reserve(max_count_);
-    for (int i = 0; i < max_count_; i++) {
-      value_vector_.emplace_back(new AtomicEntry());
-    }
-    return true;
-  }
-
-  virtual void run();
+  virtual void run() = 0;
 
   /**
    * Places a new object into the volatile memory area
    * @param key key to add to the repository
    * @param buf buffer 
    **/
-  virtual bool Put(std::string key, uint8_t *buf, int bufLen) {
-    RepoValue new_value(key, buf, bufLen);
-
-    const size_t size = new_value.size();
-    bool updated = false;
-    size_t reclaimed_size = 0;
-    do {
-
-      int private_index = current_index_.fetch_add(1);
-      // round robin through the beginning
-      if (private_index >= max_count_) {
-        uint16_t new_index = 0;
-        if (current_index_.compare_exchange_weak(new_index, 0)) {
-          private_index = 0;
-        } else {
-          continue;
-        }
-      }
-      logger_->log_info("Set repo value at %d out of %d", private_index, 
max_count_);
-      updated = value_vector_.at(private_index)->setRepoValue(new_value, 
reclaimed_size);
+  virtual bool Put(T key, const uint8_t *buf, size_t bufLen);
 
-      if (reclaimed_size > 0) {
-        current_size_ -= reclaimed_size;
-      }
-
-    } while (!updated);
-    current_size_ += size;
-
-    logger_->log_info("VolatileRepository -- put %s %d %d", key, 
current_size_.load(), current_index_.load());
-    return true;
-  }
   /**
-   *c
    * Deletes the key
    * @return status of the delete operation
    */
-  virtual bool Delete(std::string key) {
-
-    logger_->log_info("VolatileRepository -- delete %s", key);
-    for (auto ent : value_vector_) {
-      // let the destructor do the cleanup
-      RepoValue value;
-      if (ent->getValue(key, value)) {
-        current_size_ -= value.size();
-        return true;
-      }
+  virtual bool Delete(T key);
 
-    }
-    return false;
-  }
   /**
    * Sets the value from the provided key. Once the item is retrieved
    * it may not be retrieved again.
    * @return status of the get operation.
    */
-  virtual bool Get(std::string key, std::string &value) {
-    for (auto ent : value_vector_) {
-      // let the destructor do the cleanup
-      RepoValue repo_value;
-
-      if (ent->getValue(key, repo_value)) {
-        current_size_ -= value.size();
-        repo_value.emplace(value);
-        logger_->log_info("VolatileRepository -- get %s %d", key, 
current_size_.load());
-        return true;
-      }
+  virtual bool Get(const T &key, std::string &value);
+  /**
+   * Deserializes objects into store
+   * @param store vector in which we will store newly created objects.
+   * @param max_size size of objects deserialized
+   */
+  virtual bool 
DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, 
size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> 
lambda);
 
-    }
-    return false;
-  }
+  /**
+   * Deserializes objects into a store that contains a fixed number of objects 
in which
+   * we will deserialize from this repo
+   * @param store precreated object vector
+   * @param max_size size of objects deserialized
+   */
+  virtual bool 
DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, 
size_t &max_size);
 
-  void setConnectionMap(std::map<std::string, 
std::shared_ptr<minifi::Connection>> &connectionMap) {
-    this->connectionMap = connectionMap;
-  }
-  void loadComponent();
-
-  void start() {
-    if (this->purge_period_ <= 0)
-      return;
-    if (running_)
-      return;
-    thread_ = std::thread(&VolatileRepository::run, shared_from_this());
-    thread_.detach();
-    running_ = true;
-    logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
-  }
+  /**
+   * Set the connection map
+   * @param connectionMap map of all connections through this repo.
+   */
+  void setConnectionMap(std::map<std::string, 
std::shared_ptr<minifi::Connection>> &connectionMap);
+
+  /**
+   * Function to load this component.
+   */
+  virtual void loadComponent(const std::shared_ptr<core::ContentRepository> 
&content_repo);
+
+  virtual void start();
 
  protected:
 
@@ -331,22 +132,240 @@ class VolatileRepository : public core::Repository, 
public std::enable_shared_fr
     else
       return false;
   }
-  /**
-   * Purges the volatile repository.
-   */
-  void purge();
 
- private:
   std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
-
-  std::atomic<uint32_t> current_size_;
+  // current size of the volatile repo.
+  std::atomic<size_t> current_size_;
+  // current index.
   std::atomic<uint16_t> current_index_;
-  std::vector<AtomicEntry*> value_vector_;
+  // value vector that exists for non blocking iteration over
+  // objects that store data for this repo instance.
+  std::vector<AtomicEntry<T>*> value_vector_;
+
+  // max count we are allowed to store.
   uint32_t max_count_;
-  uint32_t max_size_;
+  // maximum estimated size
+  size_t max_size_;
+
+  bool purge_required_;
+
+  std::mutex purge_mutex_;
+  // purge list
+  std::vector<T> purge_list_;
+
+ private:
   std::shared_ptr<logging::Logger> logger_;
+
+};
+
+template<typename T>
+const char *VolatileRepository<T>::volatile_repo_max_count = "max.count";
+template<typename T>
+const char *VolatileRepository<T>::volatile_repo_max_bytes = "max.bytes";
+
+template<typename T>
+void VolatileRepository<T>::loadComponent(const 
std::shared_ptr<core::ContentRepository> &content_repo) {
+}
+
+// Destructor
+template<typename T>
+VolatileRepository<T>::~VolatileRepository() {
+  for (auto ent : value_vector_) {
+    delete ent;
+  }
+}
+
+/**
+ * Initialize the volatile repsitory
+ **/
+template<typename T>
+bool VolatileRepository<T>::initialize(const std::shared_ptr<Configure> 
&configure) {
+  std::string value = "";
+
+  if (configure != nullptr) {
+    int64_t max_cnt = 0;
+    std::stringstream strstream;
+    strstream << Configure::nifi_volatile_repository_options << getName() << 
"." << volatile_repo_max_count;
+    if (configure->get(strstream.str(), value)) {
+      if (core::Property::StringToInt(value, max_cnt)) {
+        max_count_ = max_cnt;
+      }
+    }
+
+    strstream.str("");
+    strstream.clear();
+    int64_t max_bytes = 0;
+    strstream << Configure::nifi_volatile_repository_options << getName() << 
"." << volatile_repo_max_bytes;
+    if (configure->get(strstream.str(), value)) {
+      if (core::Property::StringToInt(value, max_bytes)) {
+        if (max_bytes <= 0) {
+          max_size_ = std::numeric_limits<uint32_t>::max();
+        } else {
+          max_size_ = max_bytes;
+        }
+      }
+    }
+  }
+
+  logger_->log_info("Resizing value_vector_ for %s count is %d", getName(), 
max_count_);
+  logger_->log_info("Using a maximum size of %u", max_size_);
+  value_vector_.reserve(max_count_);
+  for (int i = 0; i < max_count_; i++) {
+    value_vector_.emplace_back(new AtomicEntry<T>(&current_size_, &max_size_));
+  }
+  return true;
+}
+
+/**
+ * Places a new object into the volatile memory area
+ * @param key key to add to the repository
+ * @param buf buffer
+ **/
+template<typename T>
+bool VolatileRepository<T>::Put(T key, const uint8_t *buf, size_t bufLen) {
+  RepoValue<T> new_value(key, buf, bufLen);
+
+  const size_t size = new_value.size();
+  bool updated = false;
+  size_t reclaimed_size = 0;
+  RepoValue<T> old_value;
+  do {
+    int private_index = current_index_.fetch_add(1);
+    // round robin through the beginning
+    if (private_index >= max_count_) {
+      uint16_t new_index = 0;
+      if (current_index_.compare_exchange_weak(new_index, 0)) {
+        private_index = 0;
+      } else {
+        continue;
+      }
+    }
+    
+    updated = value_vector_.at(private_index)->setRepoValue(new_value, 
old_value, reclaimed_size);
+    logger_->log_debug("Set repo value at %d out of %d updated %d current_size 
%d, adding %d to  %d", private_index, 
max_count_,updated==true,reclaimed_size,size, current_size_.load());
+    if (updated && reclaimed_size > 0)
+    {
+      std::lock_guard<std::mutex> lock(mutex_);
+      purge_list_.push_back(old_value.getKey());
+    }
+    if (reclaimed_size > 0) {
+      /**
+       * this is okay since current_size_ is really an estimate.
+       * we don't need precise counts.
+       */
+      if (current_size_ < reclaimed_size) {
+        current_size_ = 0;
+      } else {
+        current_size_ -= reclaimed_size;
+      }
+    }
+  } while (!updated);
+  current_size_ += size;
+
+  logger_->log_debug("VolatileRepository -- put %d %d", current_size_.load(), 
current_index_.load());
+  return true;
+}
+/**
+ * Deletes the key
+ * @return status of the delete operation
+ */
+template<typename T>
+bool VolatileRepository<T>::Delete(T key) {
+  for (auto ent : value_vector_) {
+    // let the destructor do the cleanup
+    RepoValue<T> value;
+    if (ent->getValue(key, value)) {
+      current_size_ -= value.size();
+      return true;
+    }
+  }
+  return false;
+}
+/**
+ * Sets the value from the provided key. Once the item is retrieved
+ * it may not be retrieved again.
+ * @return status of the get operation.
+ */
+template<typename T>
+bool VolatileRepository<T>::Get(const T &key, std::string &value) {
+
+  for (auto ent : value_vector_) {
+    // let the destructor do the cleanup
+    RepoValue<T> repo_value;
+    if (ent->getValue(key, repo_value)) {
+      current_size_ -= value.size();
+      repo_value.emplace(value);
+      return true;
+    }
+  }
+  return false;
+}
+
+template<typename T>
+bool 
VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
 &store, size_t &max_size, 
std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
+  size_t requested_batch = max_size;
+  max_size = 0;
+  for (auto ent : value_vector_) {
+    // let the destructor do the cleanup
+    RepoValue<T> repo_value;
+
+    if (ent->getValue(repo_value)) {
+      std::shared_ptr<core::SerializableComponent> newComponent = lambda();
+      // we've taken ownership of this repo value
+      newComponent->DeSerialize(repo_value.getBuffer(), 
repo_value.getBufferSize());
+
+      store.push_back(newComponent);
+
+      if (max_size++ >= requested_batch) {
+        break;
+      }
+    }
+  }
+  if (max_size > 0) {
+    return true;
+  } else {
+    return false;
+  }
+}
+
+template<typename T>
+bool 
VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
 &store, size_t &max_size) {
+  logger_->log_debug("VolatileRepository -- DeSerialize %d", 
current_size_.load());
+  max_size = 0;
+  for (auto ent : value_vector_) {
+    // let the destructor do the cleanup
+    RepoValue<T> repo_value;
+
+    if (ent->getValue(repo_value)) {
+      // we've taken ownership of this repo value
+      store.at(max_size)->DeSerialize(repo_value.getBuffer(), 
repo_value.getBufferSize());
+      if (max_size++ >= store.size()) {
+        break;
+      }
+    }
+  }
+  if (max_size > 0) {
+    return true;
+  } else {
+    return false;
+  }
+}
+
+template<typename T>
+void VolatileRepository<T>::setConnectionMap(std::map<std::string, 
std::shared_ptr<minifi::Connection>> &connectionMap) {
+  this->connectionMap = connectionMap;
+}
+
+template<typename T>
+void VolatileRepository<T>::start() {
+  if (this->purge_period_ <= 0)
+    return;
+  if (running_)
+    return;
+  running_ = true;
+  thread_ = std::thread(&VolatileRepository<T>::run, 
std::enable_shared_from_this<VolatileRepository<T>>::shared_from_this());
+  logger_->log_info("%s Repository Monitor Thread Start", name_);
 }
-;
 
 } /* namespace repository */
 } /* namespace core */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/yaml/YamlConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h 
b/libminifi/include/core/yaml/YamlConfiguration.h
index e03c794..17b060f 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -21,7 +21,7 @@
 #include "core/ProcessorConfig.h"
 #include "yaml-cpp/yaml.h"
 #include "processors/LoadProcessors.h"
-#include "../FlowConfiguration.h"
+#include "core/FlowConfiguration.h"
 #include "Site2SiteClientProtocol.h"
 #include <string>
 #include "io/validation.h"
@@ -46,13 +46,12 @@ namespace core {
 class YamlConfiguration : public FlowConfiguration {
 
  public:
-  explicit YamlConfiguration(std::shared_ptr<core::Repository> repo, 
std::shared_ptr<core::Repository> flow_file_repo, 
std::shared_ptr<io::StreamFactory> stream_factory,
-                             std::shared_ptr<Configure> configuration,
-                             const std::string path = 
DEFAULT_FLOW_YAML_FILE_NAME)
-      : FlowConfiguration(repo, flow_file_repo, stream_factory, configuration, 
path),
+  explicit YamlConfiguration(std::shared_ptr<core::Repository> repo, 
std::shared_ptr<core::Repository> flow_file_repo, 
std::shared_ptr<core::ContentRepository> content_repo,
+                             std::shared_ptr<io::StreamFactory> 
stream_factory, std::shared_ptr<Configure> configuration, const std::string 
path = DEFAULT_FLOW_YAML_FILE_NAME)
+      : FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, 
configuration, path),
         logger_(logging::LoggerFactory<YamlConfiguration>::getLogger()) {
     stream_factory_ = stream_factory;
-    if (IsNullOrEmpty(config_path_)) {
+    if (IsNullOrEmpty (config_path_)) {
       config_path_ = DEFAULT_FLOW_YAML_FILE_NAME;
     }
   }
@@ -93,20 +92,20 @@ class YamlConfiguration : public FlowConfiguration {
   }
 
   /**
-    * Returns a shared pointer to a ProcessGroup object containing the
-    * flow configuration. The yamlConfigPayload argument must be
-    * a payload for the raw YAML configuration.
-    *
-    * @param yamlConfigPayload an input payload for the raw YAML configuration
-    *                           to be parsed and loaded into the flow
-    *                           configuration tree
-    * @return                 the root ProcessGroup node of the flow
-    *                           configuration tree
-    */
-   std::unique_ptr<core::ProcessGroup> getRootFromPayload(std::string 
&yamlConfigPayload) {
-     YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload);
-     return getRoot(&rootYamlNode);
-   }
+   * Returns a shared pointer to a ProcessGroup object containing the
+   * flow configuration. The yamlConfigPayload argument must be
+   * a payload for the raw YAML configuration.
+   *
+   * @param yamlConfigPayload an input payload for the raw YAML configuration
+   *                           to be parsed and loaded into the flow
+   *                           configuration tree
+   * @return                 the root ProcessGroup node of the flow
+   *                           configuration tree
+   */
+  std::unique_ptr<core::ProcessGroup> getRootFromPayload(std::string 
&yamlConfigPayload) {
+    YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload);
+    return getRoot(&rootYamlNode);
+  }
 
  protected:
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/io/AtomicEntryStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/AtomicEntryStream.h 
b/libminifi/include/io/AtomicEntryStream.h
new file mode 100644
index 0000000..5f200f0
--- /dev/null
+++ b/libminifi/include/io/AtomicEntryStream.h
@@ -0,0 +1,205 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_ATOMICENTRYSTREAM_H_
+#define LIBMINIFI_INCLUDE_IO_ATOMICENTRYSTREAM_H_
+
+#include <mutex>
+#include <cstring>
+#include "BaseStream.h"
+#include "core/repository/AtomicRepoEntries.h"
+#include "Exception.h"
+#include "core/logging/LoggerConfiguration.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+template<typename T>
+class AtomicEntryStream : public BaseStream {
+ public:
+  AtomicEntryStream(const T key, core::repository::AtomicEntry<T> *entry)
+      : key_(key),
+        entry_(entry),
+        offset_(0),
+        length_(0),
+        logger_(logging::LoggerFactory<AtomicEntryStream()>::getLogger()) {
+    core::repository::RepoValue<T> *value;
+    if (entry_->getValue(key, &value)) {
+      length_ = value->getBufferSize();
+      entry_->decrementOwnership();
+      invalid_stream_ = false;
+    } else {
+      invalid_stream_ = true;
+    }
+  }
+  
+  virtual ~AtomicEntryStream();
+
+  virtual void closeStream() {
+
+  }
+
+  /**
+   * Skip to the specified offset.
+   * @param offset offset to which we will skip
+   */
+  void seek(uint64_t offset);
+
+  virtual const uint32_t getSize() const {
+    return length_;
+  }
+
+  // data stream extensions
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(std::vector<uint8_t> &buf, int buflen);
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(uint8_t *buf, int buflen);
+
+  /**
+   * Write value to the stream using std::vector
+   * @param buf incoming buffer
+   * @param buflen buffer to write
+   *
+   */
+  virtual int writeData(std::vector<uint8_t> &buf, int buflen);
+
+  /**
+   * writes value to stream
+   * @param value value to write
+   * @param size size of value
+   */
+  virtual int writeData(uint8_t *value, int size);
+
+  /**
+   * Returns the underlying buffer
+   * @return vector's array
+   **/
+  const uint8_t *getBuffer() const {
+    throw std::runtime_error("Stream does not support this operation");
+  }
+
+ protected:
+  size_t length_;
+  size_t offset_;
+  T key_;
+  core::repository::AtomicEntry<T> *entry_;
+  std::atomic<bool> invalid_stream_;
+  std::recursive_mutex entry_lock_;
+
+  // Logger
+  std::shared_ptr<logging::Logger> logger_;
+
+};
+
+template<typename T>
+AtomicEntryStream<T>::~AtomicEntryStream(){
+  logger_->log_debug("Decrementing");
+    entry_->decrementOwnership();
+}
+
+template<typename T>
+void AtomicEntryStream<T>::seek(uint64_t offset) {
+  std::lock_guard<std::recursive_mutex> lock(entry_lock_);
+  offset_ = offset;
+}
+
+template<typename T>
+int AtomicEntryStream<T>::writeData(std::vector<uint8_t> &buf, int buflen) {
+  if (buf.capacity() < buflen || invalid_stream_)
+    return -1;
+  return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen);
+}
+
+// data stream overrides
+template<typename T>
+int AtomicEntryStream<T>::writeData(uint8_t *value, int size) {
+  if (nullptr != value && !invalid_stream_) {
+    std::lock_guard<std::recursive_mutex> lock(entry_lock_);
+    if (entry_->insert(key_, value, size)) {
+      offset_ += size;
+      if (offset_ > length_)
+          {
+        length_ = offset_;
+      }
+      return size;
+    }
+    else {
+      logger_->log_debug("Cannot insert %d bytes due to insufficient space in 
atomic entry", size);
+    }
+
+  }
+  return -1;
+
+}
+
+template<typename T>
+int AtomicEntryStream<T>::readData(std::vector<uint8_t> &buf, int buflen) {
+  if (invalid_stream_){
+    return -1;
+  }
+  if (buf.capacity() < buflen) {
+    buf.resize(buflen);
+  }
+  int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen);
+
+  if (ret < buflen) {
+    buf.resize(ret);
+  }
+  return ret;
+}
+
+template<typename T>
+int AtomicEntryStream<T>::readData(uint8_t *buf, int buflen) {
+  if (nullptr != buf && !invalid_stream_) {
+    std::lock_guard<std::recursive_mutex> lock(entry_lock_);
+    int len = buflen;
+    core::repository::RepoValue<T> *value;
+    if (entry_->getValue(key_, &value)) {
+      if (offset_ + len > value->getBufferSize()) {
+        len = value->getBufferSize() - offset_;
+        if (len <= 0) {
+         entry_->decrementOwnership();
+          return 0;
+        }
+      }
+      std::memcpy(buf, 
reinterpret_cast<uint8_t*>(const_cast<uint8_t*>(value->getBuffer()) + offset_), 
len);
+      offset_ += len;
+    entry_->decrementOwnership();
+      return len;
+    }
+
+  }
+  return -1;
+}
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_IO_ATOMICENTRYSTREAM_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/io/BaseStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/BaseStream.h 
b/libminifi/include/io/BaseStream.h
index cae8a43..cd982bb 100644
--- a/libminifi/include/io/BaseStream.h
+++ b/libminifi/include/io/BaseStream.h
@@ -30,6 +30,11 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
+/**
+ * Base Stream. Not intended to be thread safe as it is not intended to be 
shared
+ *
+ * Extensions may be thread safe and thus shareable, but that is up to the 
implementation.
+ */
 class BaseStream : public DataStream, public Serializable {
 
  public:
@@ -55,6 +60,14 @@ class BaseStream : public DataStream, public Serializable {
 
   int writeData(uint8_t *value, int size);
 
+  virtual void seek(uint32_t offset) {
+    if (composable_stream_ != this) {
+      composable_stream_->seek(offset);
+    } else {
+      DataStream::seek(offset);
+    }
+  }
+
   /**
    * write 2 bytes to stream
    * @param base_value non encoded value

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/io/ClientSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/ClientSocket.h 
b/libminifi/include/io/ClientSocket.h
index c7db7f1..cd8a4fc 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -84,7 +84,7 @@ class Socket : public BaseStream {
    * Static function to return the current machine's host name
    */
   static std::string getMyHostName() {
-    static char *HOSTNAME = init_hostname();
+    static std::string HOSTNAME = init_hostname();
     return HOSTNAME;
   }
 
@@ -239,12 +239,12 @@ class Socket : public BaseStream {
 
  private:
   std::shared_ptr<logging::Logger> logger_;
-  static char* init_hostname() {
+  static std::string init_hostname() {
     char hostname[1024];
     gethostname(hostname, 1024);
     Socket mySock(nullptr, hostname, 0);
     mySock.initialize();
-    return const_cast<char*>(mySock.getHostname().c_str());
+    return mySock.getHostname();
   }
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/io/DataStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/DataStream.h 
b/libminifi/include/io/DataStream.h
index 460930d..2ebc9a4 100644
--- a/libminifi/include/io/DataStream.h
+++ b/libminifi/include/io/DataStream.h
@@ -30,6 +30,8 @@ namespace io {
 /**
  * DataStream defines the mechanism through which
  * binary data will be written to a sink
+ *
+ * This object is not intended to be thread safe.
  */
 class DataStream {
  public:
@@ -58,6 +60,10 @@ class DataStream {
     return 0;
   }
 
+  virtual void seek(uint32_t offset) {
+    readBuffer += offset;
+  }
+
   virtual void closeStream() {
 
   }
@@ -111,7 +117,7 @@ class DataStream {
    * Retrieve size of data stream
    * @return size of data stream
    **/
-  const uint32_t getSize() const {
+  virtual const uint32_t getSize() const {
     return buffer.size();
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/io/FileStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/FileStream.h 
b/libminifi/include/io/FileStream.h
new file mode 100644
index 0000000..23a1f0b
--- /dev/null
+++ b/libminifi/include/io/FileStream.h
@@ -0,0 +1,136 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_IO_TLS_FILESTREAM_H_
+#define LIBMINIFI_INCLUDE_IO_TLS_FILESTREAM_H_
+
+#include <iostream>
+#include <cstdint>
+#include <string>
+#include "EndianCheck.h"
+#include "BaseStream.h"
+#include "Serializable.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+/**
+ * Purpose: File Stream Base stream extension. This is intended to be a thread 
safe access to
+ * read/write to the local file system.
+ *
+ * Design: Simply extends BaseStream and overrides readData/writeData to allow 
a sink to the
+ * fstream object.
+ */
+class FileStream : public io::BaseStream {
+ public:
+  /**
+   * File Stream constructor that accepts an fstream shared pointer.
+   * It must already be initialized for read and write.
+   */
+  explicit FileStream(const std::string &path, uint32_t offset,  bool 
write_enable = false);
+
+  /**
+   * File Stream constructor that accepts an fstream shared pointer.
+   * It must already be initialized for read and write.
+   */
+  explicit FileStream(const std::string &path);
+
+  virtual ~FileStream() {
+    closeStream();
+  }
+
+  virtual void closeStream();
+  /**
+   * Skip to the specified offset.
+   * @param offset offset to which we will skip
+   */
+  void seek(uint64_t offset);
+
+  const uint32_t getSize() const {
+    return length_;
+  }
+
+  // data stream extensions
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(std::vector<uint8_t> &buf, int buflen);
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(uint8_t *buf, int buflen);
+
+  /**
+   * Write value to the stream using std::vector
+   * @param buf incoming buffer
+   * @param buflen buffer to write
+   *
+   */
+  virtual int writeData(std::vector<uint8_t> &buf, int buflen);
+
+  /**
+   * writes value to stream
+   * @param value value to write
+   * @param size size of value
+   */
+  virtual int writeData(uint8_t *value, int size);
+
+  /**
+   * Returns the underlying buffer
+   * @return vector's array
+   **/
+  const uint8_t *getBuffer() const {
+    throw std::runtime_error("Stream does not support this operation");
+  }
+
+ protected:
+
+  /**
+   * Creates a vector and returns the vector using the provided
+   * type name.
+   * @param t incoming object
+   * @returns vector.
+   */
+  template<typename T>
+  std::vector<uint8_t> readBuffer(const T&);
+  std::recursive_mutex file_lock_;
+  std::unique_ptr<std::fstream> file_stream_;
+  size_t offset_;
+  std::string path_;
+  size_t length_;
+
+ private:
+
+  std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_IO_TLS_FILESTREAM_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/processors/ExecuteProcess.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ExecuteProcess.h 
b/libminifi/include/processors/ExecuteProcess.h
index 28dcf76..8cc7a25 100644
--- a/libminifi/include/processors/ExecuteProcess.h
+++ b/libminifi/include/processors/ExecuteProcess.h
@@ -31,6 +31,7 @@
 #include <iostream>
 #include <sys/types.h>
 #include <signal.h>
+#include "io/BaseStream.h"
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
@@ -85,9 +86,12 @@ class ExecuteProcess : public core::Processor {
     }
     char *_data;
     uint64_t _dataSize;
-    void process(std::ofstream *stream) {
+    //void process(std::ofstream *stream) {
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      int64_t ret = 0;
       if (_data && _dataSize > 0)
-        stream->write(_data, _dataSize);
+        ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize);
+      return ret;
     }
   };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/processors/GenerateFlowFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GenerateFlowFile.h 
b/libminifi/include/processors/GenerateFlowFile.h
index abb5740..7551e88 100644
--- a/libminifi/include/processors/GenerateFlowFile.h
+++ b/libminifi/include/processors/GenerateFlowFile.h
@@ -68,9 +68,11 @@ class GenerateFlowFile : public core::Processor {
     }
     char *_data;
     uint64_t _dataSize;
-    void process(std::ofstream *stream) {
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      int64_t ret = 0;
       if (_data && _dataSize > 0)
-        stream->write(_data, _dataSize);
+        ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize);
+      return ret;
     }
   };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/processors/InvokeHTTP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/InvokeHTTP.h 
b/libminifi/include/processors/InvokeHTTP.h
index d55a5be..03a1611 100644
--- a/libminifi/include/processors/InvokeHTTP.h
+++ b/libminifi/include/processors/InvokeHTTP.h
@@ -104,6 +104,12 @@ class InvokeHTTP : public core::Processor {
   void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
   void initialize();
   void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory 
*sessionFactory);
+  /**
+   * Provides a reference to the URL.
+   */
+  const std::string &getUrl() {
+    return url_;
+  }
 
  protected:
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/processors/ListenHTTP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenHTTP.h 
b/libminifi/include/processors/ListenHTTP.h
index c9e42bc..1b58dcd 100644
--- a/libminifi/include/processors/ListenHTTP.h
+++ b/libminifi/include/processors/ListenHTTP.h
@@ -92,7 +92,7 @@ class ListenHTTP : public core::Processor {
   class WriteCallback : public OutputStreamCallback {
    public:
     WriteCallback(struct mg_connection *conn, const struct mg_request_info 
*reqInfo);
-    void process(std::ofstream *stream);
+    int64_t process(std::shared_ptr<io::BaseStream> stream);
 
    private:
     // Logger

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/processors/ListenSyslog.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/ListenSyslog.h 
b/libminifi/include/processors/ListenSyslog.h
index ed54b44..25acac9 100644
--- a/libminifi/include/processors/ListenSyslog.h
+++ b/libminifi/include/processors/ListenSyslog.h
@@ -114,14 +114,16 @@ class ListenSyslog : public core::Processor {
   class WriteCallback : public OutputStreamCallback {
    public:
     WriteCallback(char *data, uint64_t size)
-        : _data(data),
+        : _data(reinterpret_cast<uint8_t*>(data)),
           _dataSize(size) {
     }
-    char *_data;
+    uint8_t *_data;
     uint64_t _dataSize;
-    void process(std::ofstream *stream) {
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      int64_t ret = 0;
       if (_data && _dataSize > 0)
-        stream->write(_data, _dataSize);
+        ret = stream->write(_data, _dataSize);
+      return ret;
     }
   };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/processors/LogAttribute.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/LogAttribute.h 
b/libminifi/include/processors/LogAttribute.h
index 88230f7..b9e333f 100644
--- a/libminifi/include/processors/LogAttribute.h
+++ b/libminifi/include/processors/LogAttribute.h
@@ -87,25 +87,27 @@ class LogAttribute : public core::Processor {
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
    public:
-    ReadCallback(uint64_t size) {
-      _bufferSize = size;
-      _buffer = new char[_bufferSize];
+    ReadCallback(uint64_t size)
+        : read_size_(0) {
+      buffer_size_ = size;
+      buffer_ = new uint8_t[buffer_size_];
     }
     ~ReadCallback() {
-      if (_buffer)
-        delete[] _buffer;
+      if (buffer_)
+        delete[] buffer_;
     }
-    void process(std::ifstream *stream) {
-
-      stream->read(_buffer, _bufferSize);
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      int64_t ret = 0;
+      ret = stream->read(buffer_, buffer_size_);
       if (!stream)
-        _readSize = stream->gcount();
+        read_size_ = stream->getSize();
       else
-        _readSize = _bufferSize;
+        read_size_ = buffer_size_;
+      return ret;
     }
-    char *_buffer;
-    uint64_t _bufferSize;
-    uint64_t _readSize;
+    uint8_t *buffer_;
+    uint64_t buffer_size_;
+    uint64_t read_size_;
   };
 
  public:

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/processors/PutFile.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/PutFile.h 
b/libminifi/include/processors/PutFile.h
index f67e512..c7f2823 100644
--- a/libminifi/include/processors/PutFile.h
+++ b/libminifi/include/processors/PutFile.h
@@ -80,7 +80,8 @@ class PutFile : public core::Processor {
    public:
     ReadCallback(const std::string &tmpFile, const std::string &destFile);
     ~ReadCallback();
-    virtual void process(std::ifstream *stream);bool commit();
+    virtual int64_t process(std::shared_ptr<io::BaseStream> stream);
+    bool commit();
 
    private:
     std::shared_ptr<logging::Logger> logger_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/properties/Configure.h
----------------------------------------------------------------------
diff --git a/libminifi/include/properties/Configure.h 
b/libminifi/include/properties/Configure.h
index 13da55a..341b89c 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -40,6 +40,7 @@ class Configure : public Properties {
   static const char *nifi_server_name;
   static const char *nifi_configuration_class_name;
   static const char *nifi_flow_repository_class_name;
+  static const char *nifi_content_repository_class_name;
   static const char *nifi_volatile_repository_options;
   static const char *nifi_provenance_repository_class_name;
   static const char *nifi_server_port;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/provenance/Provenance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/Provenance.h 
b/libminifi/include/provenance/Provenance.h
index 1479514..b9415dc 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -29,7 +29,8 @@
 #include <string>
 #include <thread>
 #include <vector>
-
+#include "core/Core.h"
+#include "core/SerializableComponent.h"
 #include "core/Repository.h"
 #include "core/Property.h"
 #include "properties/Configure.h"
@@ -50,7 +51,7 @@ namespace provenance {
 #define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048
 
 // Provenance Event Record
-class ProvenanceEventRecord : protected 
org::apache::nifi::minifi::io::Serializable {
+class ProvenanceEventRecord : public core::SerializableComponent {
  public:
   enum ProvenanceEventType {
 
@@ -163,7 +164,8 @@ class ProvenanceEventRecord : protected 
org::apache::nifi::minifi::io::Serializa
    */
   ProvenanceEventRecord(ProvenanceEventType event, std::string componentId, 
std::string componentType);
 
-  ProvenanceEventRecord() {
+  ProvenanceEventRecord()
+      : 
core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()) {
     _eventTime = getTimeMillis();
   }
 
@@ -172,7 +174,11 @@ class ProvenanceEventRecord : protected 
org::apache::nifi::minifi::io::Serializa
   }
   // Get the Event ID
   std::string getEventId() {
-    return _eventIdStr;
+    return uuidStr_;
+  }
+
+  void setEventId(const std::string &id) {
+    setUUIDStr(id);
   }
   // Get Attributes
   std::map<std::string, std::string> getAttributes() {
@@ -220,7 +226,7 @@ class ProvenanceEventRecord : protected 
org::apache::nifi::minifi::io::Serializa
   }
   // Get FlowFileUuid
   std::string getFlowFileUuid() {
-    return uuid_;
+    return flow_uuid_;
   }
   // Get content full path
   std::string getContentFullPath() {
@@ -333,7 +339,7 @@ class ProvenanceEventRecord : protected 
org::apache::nifi::minifi::io::Serializa
     _entryDate = flow->getEntryDate();
     _lineageStartDate = flow->getlineageStartDate();
     _lineageIdentifiers = flow->getlineageIdentifiers();
-    uuid_ = flow->getUUIDStr();
+    flow_uuid_ = flow->getUUIDStr();
     _attributes = flow->getAttributes();
     _size = flow->getSize();
     _offset = flow->getOffset();
@@ -344,15 +350,43 @@ class ProvenanceEventRecord : protected 
org::apache::nifi::minifi::io::Serializa
     }
   }
   // Serialize and Persistent to the repository
-  bool Serialize(const std::shared_ptr<core::Repository> &repo);
+  bool Serialize(const std::shared_ptr<core::SerializableComponent> &repo);
   // DeSerialize
-  bool DeSerialize(const uint8_t *buffer, const int bufferSize);
+  bool DeSerialize(const uint8_t *buffer, const size_t bufferSize);
   // DeSerialize
   bool DeSerialize(org::apache::nifi::minifi::io::DataStream &stream) {
     return DeSerialize(stream.getBuffer(), stream.getSize());
   }
   // DeSerialize
-  bool DeSerialize(const std::shared_ptr<core::Repository> &repo, std::string 
key);
+  bool DeSerialize(const std::shared_ptr<core::SerializableComponent> &repo);
+
+  uint64_t getEventTime(const uint8_t *buffer, const size_t bufferSize) {
+
+    int size = bufferSize > 72 ? 72 : bufferSize;
+    org::apache::nifi::minifi::io::DataStream outStream(buffer, size);
+
+    std::string uuid;
+    int ret = readUTF(uuid, &outStream);
+
+    if (ret <= 0) {
+      return 0;
+    }
+
+    uint32_t eventType;
+    ret = read(eventType, &outStream);
+    if (ret != 4) {
+      return 0;
+    }
+
+    uint64_t event_time;
+
+    ret = read(event_time, &outStream);
+    if (ret != 8) {
+      return 0;
+    }
+
+    return event_time;
+  }
 
  protected:
 
@@ -373,15 +407,13 @@ class ProvenanceEventRecord : protected 
org::apache::nifi::minifi::io::Serializa
   // Size in bytes of the data corresponding to this flow file
   uint64_t _size;
   // flow uuid
-  std::string uuid_;
+  std::string flow_uuid_;
   // Offset to the content
   uint64_t _offset;
   // Full path to the content
   std::string _contentFullPath;
   // Attributes key/values pairs for the flow record
   std::map<std::string, std::string> _attributes;
-  // provenance ID
-  uuid_t _eventId;
   // UUID string for all parents
   std::set<std::string> _lineageIdentifiers;
   // transitUri
@@ -396,8 +428,6 @@ class ProvenanceEventRecord : protected 
org::apache::nifi::minifi::io::Serializa
   std::string _details;
   // sourceQueueIdentifier
   std::string _sourceQueueIdentifier;
-  // event ID Str
-  std::string _eventIdStr;
   // relationship
   std::string _relationship;
   // alternateIdentifierUri;
@@ -437,6 +467,7 @@ class ProvenanceReporter {
   // Add event
   void add(ProvenanceEventRecord *event) {
     _events.insert(event);
+    logger_->log_debug("Prove reporter now %d", _events.size());
   }
   // Remove event
   void remove(ProvenanceEventRecord *event) {
@@ -496,10 +527,9 @@ class ProvenanceReporter {
 
  private:
 
+  std::shared_ptr<logging::Logger> logger_;
   // Incoming connection Iterator
   std::set<ProvenanceEventRecord *> _events;
-  // Logger
-  std::shared_ptr<logging::Logger> logger_;
   // provenance repository.
   std::shared_ptr<core::Repository> repo_;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/provenance/ProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceRepository.h 
b/libminifi/include/provenance/ProvenanceRepository.h
index dd2c5ec..ea78a3c 100644
--- a/libminifi/include/provenance/ProvenanceRepository.h
+++ b/libminifi/include/provenance/ProvenanceRepository.h
@@ -42,12 +42,11 @@ class ProvenanceRepository : public core::Repository, 
public std::enable_shared_
   /*!
    * Create a new provenance repository
    */
-  ProvenanceRepository(const std::string repo_name = "", std::string directory 
= PROVENANCE_DIRECTORY, int64_t maxPartitionMillis =
-  MAX_PROVENANCE_ENTRY_LIFE_TIME,
-                       int64_t maxPartitionBytes = 
MAX_PROVENANCE_STORAGE_SIZE, uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD)
+  ProvenanceRepository(const std::string repo_name = "", std::string directory 
= PROVENANCE_DIRECTORY, int64_t maxPartitionMillis = 
MAX_PROVENANCE_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
+  MAX_PROVENANCE_STORAGE_SIZE,
+                       uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD)
       : Repository(repo_name.length() > 0 ? repo_name : 
core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis, 
maxPartitionBytes, purgePeriod),
         logger_(logging::LoggerFactory<ProvenanceRepository>::getLogger()) {
-
     db_ = NULL;
   }
 
@@ -62,9 +61,8 @@ class ProvenanceRepository : public core::Repository, public 
std::enable_shared_
       return;
     if (running_)
       return;
-    thread_ = std::thread(&ProvenanceRepository::run, shared_from_this());
-    thread_.detach();
     running_ = true;
+    thread_ = std::thread(&ProvenanceRepository::run, shared_from_this());
     logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
   }
 
@@ -98,7 +96,7 @@ class ProvenanceRepository : public core::Repository, public 
std::enable_shared_
     return true;
   }
   // Put
-  virtual bool Put(std::string key, uint8_t *buf, int bufLen) {
+  virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
 
     if (repo_full_)
       return false;
@@ -122,7 +120,7 @@ class ProvenanceRepository : public core::Repository, 
public std::enable_shared_
       return false;
   }
   // Get
-  virtual bool Get(std::string key, std::string &value) {
+  virtual bool Get(const std::string &key, std::string &value) {
     leveldb::Status status;
     status = db_->Get(leveldb::ReadOptions(), key, &value);
     if (status.ok())
@@ -130,17 +128,53 @@ class ProvenanceRepository : public core::Repository, 
public std::enable_shared_
     else
       return false;
   }
-  // Persistent event
-  void registerEvent(std::shared_ptr<ProvenanceEventRecord> &event) {
-    
event->Serialize(std::static_pointer_cast<core::Repository>(shared_from_this()));
-  }
+
   // Remove event
   void removeEvent(ProvenanceEventRecord *event) {
     Delete(event->getEventId());
   }
+
+  virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, 
size_t max_size) {
+    leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
+    for (it->SeekToFirst(); it->Valid(); it->Next()) {
+      std::shared_ptr<ProvenanceEventRecord> eventRead = 
std::make_shared<ProvenanceEventRecord>();
+      std::string key = it->key().ToString();
+      if (store.size() >= max_size)
+        break;
+      if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) 
it->value().size())) {
+        
store.push_back(std::dynamic_pointer_cast<core::CoreComponent>(eventRead));
+      }
+    }
+    delete it;
+    return true;
+  }
+
+  virtual bool 
DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, 
size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> 
lambda) {
+    leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
+    size_t requested_batch = max_size;
+    max_size = 0;
+    for (it->SeekToFirst(); it->Valid(); it->Next()) {
+
+      if (max_size >= requested_batch)
+        break;
+      std::shared_ptr<core::SerializableComponent> eventRead = lambda();
+      std::string key = it->key().ToString();
+      if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) 
it->value().size())) {
+        max_size++;
+        records.push_back(eventRead);
+      }
+
+    }
+    delete it;
+
+    if (max_size > 0) {
+      return true;
+    } else {
+      return false;
+    }
+  }
   //! get record
   void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> 
&records, int maxSize) {
-    std::lock_guard<std::mutex> lock(mutex_);
     leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
     for (it->SeekToFirst(); it->Valid(); it->Next()) {
       std::shared_ptr<ProvenanceEventRecord> eventRead = 
std::make_shared<ProvenanceEventRecord>();
@@ -153,9 +187,29 @@ class ProvenanceRepository : public core::Repository, 
public std::enable_shared_
     }
     delete it;
   }
+
+  virtual bool 
DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, 
size_t &max_size) {
+    leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
+    max_size = 0;
+    for (it->SeekToFirst(); it->Valid(); it->Next()) {
+      std::shared_ptr<ProvenanceEventRecord> eventRead = 
std::make_shared<ProvenanceEventRecord>();
+      std::string key = it->key().ToString();
+
+      if (store.at(max_size)->DeSerialize((uint8_t *) it->value().data(), 
(int) it->value().size())) {
+        max_size++;
+      }
+      if (store.size() >= max_size)
+        break;
+    }
+    delete it;
+    if (max_size > 0) {
+      return true;
+    } else {
+      return false;
+    }
+  }
   //! purge record
   void 
purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> 
&records) {
-    std::lock_guard<std::mutex> lock(mutex_);
     for (auto record : records) {
       Delete(record->getEventId());
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/utils/ByteInputCallBack.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ByteInputCallBack.h 
b/libminifi/include/utils/ByteInputCallBack.h
index a2b7838..86aae09 100644
--- a/libminifi/include/utils/ByteInputCallBack.h
+++ b/libminifi/include/utils/ByteInputCallBack.h
@@ -32,19 +32,27 @@ namespace utils {
  */
 class ByteInputCallBack : public InputStreamCallback {
  public:
-  ByteInputCallBack() {
+  ByteInputCallBack()
+      : ptr(nullptr) {
   }
 
   virtual ~ByteInputCallBack() {
 
   }
 
-  virtual void process(std::ifstream *stream) {
+  int64_t process(std::shared_ptr<io::BaseStream> stream) {
 
-    std::vector<char> nv = 
std::vector<char>(std::istreambuf_iterator<char>(*stream), 
std::istreambuf_iterator<char>());
-    vec = std::move(nv);
+    stream->seek(0);
 
-    ptr = &vec[0];
+    if (stream->getSize() > 0) {
+      vec.resize(stream->getSize());
+
+      stream->readData(vec, stream->getSize());
+    }
+
+    ptr = (char*) &vec[0];
+
+    return vec.size();
 
   }
 
@@ -58,7 +66,7 @@ class ByteInputCallBack : public InputStreamCallback {
 
  private:
   char *ptr;
-  std::vector<char> vec;
+  std::vector<uint8_t> vec;
 };
 
 } /* namespace utils */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/ConfigurationListener.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ConfigurationListener.cpp 
b/libminifi/src/ConfigurationListener.cpp
index aaf50ce..858e455 100644
--- a/libminifi/src/ConfigurationListener.cpp
+++ b/libminifi/src/ConfigurationListener.cpp
@@ -35,14 +35,10 @@ void ConfigurationListener::start() {
   pull_interval_ = 60 * 1000;
   std::string value;
   // grab the value for configuration
-  if (configure_->get(Configure::nifi_configuration_listener_pull_interval,
-      value)) {
+  if (configure_->get(Configure::nifi_configuration_listener_pull_interval, 
value)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(value, pull_interval_, unit)
-        && core::Property::ConvertTimeUnitToMS(pull_interval_, unit,
-            pull_interval_)) {
-      logger_->log_info("Configuration Listener pull interval: [%d] ms",
-           pull_interval_);
+    if (core::Property::StringToTime(value, pull_interval_, unit) && 
core::Property::ConvertTimeUnitToMS(pull_interval_, unit, pull_interval_)) {
+      logger_->log_info("Configuration Listener pull interval: [%d] ms", 
pull_interval_);
     }
   }
 
@@ -62,7 +58,7 @@ void ConfigurationListener::stop() {
 }
 
 void ConfigurationListener::run() {
-  std::unique_lock<std::mutex> lk(mutex_);
+  std::unique_lock < std::mutex > lk(mutex_);
   std::condition_variable cv;
   int64_t interval = 0;
   while (!cv.wait_for(lk, std::chrono::milliseconds(100), [this] {return 
(running_ == false);})) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/Configure.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 8bbc5fc..acad1fd 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -31,7 +31,8 @@ const char *Configure::nifi_graceful_shutdown_seconds = 
"nifi.flowcontroller.gra
 const char *Configure::nifi_log_level = "nifi.log.level";
 const char *Configure::nifi_server_name = "nifi.server.name";
 const char *Configure::nifi_configuration_class_name = 
"nifi.flow.configuration.class.name";
-const char *Configure::nifi_flow_repository_class_name = 
"nifi.flow.repository.class.name";
+const char *Configure::nifi_flow_repository_class_name = 
"nifi.flowfile.repository.class.name";
+const char *Configure::nifi_content_repository_class_name = 
"nifi.content.repository.class.name";
 const char *Configure::nifi_volatile_repository_options = 
"nifi.volatile.repository.options.";
 const char *Configure::nifi_provenance_repository_class_name = 
"nifi.provenance.repository.class.name";
 const char *Configure::nifi_server_port = "nifi.server.port";
@@ -43,39 +44,22 @@ const char 
*Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfil
 const char *Configure::nifi_flowfile_repository_max_storage_time = 
"nifi.flowfile.repository.max.storage.time";
 const char *Configure::nifi_flowfile_repository_directory_default = 
"nifi.flowfile.repository.directory.default";
 const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure";
-const char *Configure::nifi_security_need_ClientAuth =
-    "nifi.security.need.ClientAuth";
-const char *Configure::nifi_security_client_certificate =
-    "nifi.security.client.certificate";
-const char *Configure::nifi_security_client_private_key =
-    "nifi.security.client.private.key";
-const char *Configure::nifi_security_client_pass_phrase =
-    "nifi.security.client.pass.phrase";
-const char *Configure::nifi_security_client_ca_certificate =
-    "nifi.security.client.ca.certificate";
-const char *Configure::nifi_configuration_listener_pull_interval =
-    "nifi.configuration.listener.pull.interval";
-const char *Configure::nifi_configuration_listener_http_url =
-    "nifi.configuration.listener.http.url";
-const char *Configure::nifi_configuration_listener_rest_url =
-    "nifi.configuration.listener.rest.url";
-const char *Configure::nifi_configuration_listener_type =
-    "nifi.configuration.listener.type";
-const char *Configure::nifi_https_need_ClientAuth =
-    "nifi.https.need.ClientAuth";
-const char *Configure::nifi_https_client_certificate =
-    "nifi.https.client.certificate";
-const char *Configure::nifi_https_client_private_key =
-    "nifi.https.client.private.key";
-const char *Configure::nifi_https_client_pass_phrase =
-    "nifi.https.client.pass.phrase";
-const char *Configure::nifi_https_client_ca_certificate =
-    "nifi.https.client.ca.certificate";
-const char *Configure::nifi_rest_api_user_name =
-    "nifi.rest.api.user.name";
-const char *Configure::nifi_rest_api_password =
-    "nifi.rest.api.password";
-
+const char *Configure::nifi_security_need_ClientAuth = 
"nifi.security.need.ClientAuth";
+const char *Configure::nifi_security_client_certificate = 
"nifi.security.client.certificate";
+const char *Configure::nifi_security_client_private_key = 
"nifi.security.client.private.key";
+const char *Configure::nifi_security_client_pass_phrase = 
"nifi.security.client.pass.phrase";
+const char *Configure::nifi_security_client_ca_certificate = 
"nifi.security.client.ca.certificate";
+const char *Configure::nifi_configuration_listener_pull_interval = 
"nifi.configuration.listener.pull.interval";
+const char *Configure::nifi_configuration_listener_http_url = 
"nifi.configuration.listener.http.url";
+const char *Configure::nifi_configuration_listener_rest_url = 
"nifi.configuration.listener.rest.url";
+const char *Configure::nifi_configuration_listener_type = 
"nifi.configuration.listener.type";
+const char *Configure::nifi_https_need_ClientAuth = 
"nifi.https.need.ClientAuth";
+const char *Configure::nifi_https_client_certificate = 
"nifi.https.client.certificate";
+const char *Configure::nifi_https_client_private_key = 
"nifi.https.client.private.key";
+const char *Configure::nifi_https_client_pass_phrase = 
"nifi.https.client.pass.phrase";
+const char *Configure::nifi_https_client_ca_certificate = 
"nifi.https.client.ca.certificate";
+const char *Configure::nifi_rest_api_user_name = "nifi.rest.api.user.name";
+const char *Configure::nifi_rest_api_password = "nifi.rest.api.password";
 
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 0901a30..1d937b4 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -39,9 +39,11 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-Connection::Connection(std::shared_ptr<core::Repository> flow_repository, 
std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t destUUID)
+Connection::Connection(const std::shared_ptr<core::Repository> 
&flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, 
std::string name, uuid_t uuid, uuid_t srcUUID,
+                       uuid_t destUUID)
     : core::Connectable(name, uuid),
       flow_repository_(flow_repository),
+      content_repo_(content_repo),
       logger_(logging::LoggerFactory<Connection>::getLogger()) {
 
   if (srcUUID)
@@ -89,12 +91,12 @@ void Connection::put(std::shared_ptr<core::FlowFile> flow) {
 
     queued_data_size_ += flow->getSize();
 
-    logger_->log_debug("Enqueue flow file UUID %s to connection %s", 
flow->getUUIDStr().c_str(), name_.c_str());
+    logger_->log_debug("Enqueue flow file UUID %s to connection %s %d", 
flow->getUUIDStr(), name_, queue_.size());
   }
 
   if (!flow->isStored()) {
     // Save to the flowfile repo
-    FlowFileRecord event(flow_repository_, flow, this->uuidStr_);
+    FlowFileRecord event(flow_repository_, content_repo_, flow, 
this->uuidStr_);
     if (event.Serialize()) {
       flow->setStoredToRepository(true);
     }
@@ -102,6 +104,7 @@ void Connection::put(std::shared_ptr<core::FlowFile> flow) {
 
   // Notify receiving processor that work may be available
   if (dest_connectable_) {
+    logger_->log_debug("Notifying %s", dest_connectable_->getName());
     dest_connectable_->notifyWork();
   }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/FlowControlProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowControlProtocol.cpp 
b/libminifi/src/FlowControlProtocol.cpp
index dbe27e8..74a1573 100644
--- a/libminifi/src/FlowControlProtocol.cpp
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -63,15 +63,14 @@ int FlowControlProtocol::connectServer(const char *host, 
uint16_t port) {
       close(sock);
       return 0;
     }
-    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
-            reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
+    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 
reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
       logger_->log_error("setsockopt() SO_REUSEADDR failed");
       close(sock);
       return 0;
     }
   }
 
-  int sndsize = 256*1024;
+  int sndsize = 256 * 1024;
   if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, 
reinterpret_cast<char*>(&sndsize), sizeof(sndsize)) < 0) {
     logger_->log_error("setsockopt() SO_SNDBUF failed");
     close(sock);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 2c84811..6358ed0 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -39,8 +39,8 @@
 #include "utils/StringUtils.h"
 #include "core/Core.h"
 #include "core/controller/ControllerServiceProvider.h"
-#include "core/repository/FlowFileRepository.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "core/repository/FlowFileRepository.h"
 
 namespace org {
 namespace apache {
@@ -52,7 +52,7 @@ std::shared_ptr<utils::IdGenerator> 
FlowController::id_generator_ = utils::IdGen
 #define DEFAULT_CONFIG_NAME "conf/flow.yml"
 
 FlowController::FlowController(std::shared_ptr<core::Repository> 
provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, 
std::shared_ptr<Configure> configure,
-                               std::unique_ptr<core::FlowConfiguration> 
flow_configuration, const std::string name, bool headless_mode)
+                               std::unique_ptr<core::FlowConfiguration> 
flow_configuration, std::shared_ptr<core::ContentRepository> content_repo, 
const std::string name, bool headless_mode)
     : 
core::controller::ControllerServiceProvider(core::getClassName<FlowController>()),
       root_(nullptr),
       max_timer_driven_threads_(0),
@@ -68,6 +68,7 @@ 
FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
       controller_service_provider_(nullptr),
       flow_configuration_(std::move(flow_configuration)),
       configuration_(configure),
+      content_repo_(content_repo),
       logger_(logging::LoggerFactory<FlowController>::getLogger()) {
   if (provenance_repo == nullptr)
     throw std::runtime_error("Provenance Repo should not be null");
@@ -159,8 +160,7 @@ bool FlowController::applyConfiguration(std::string 
&configurePayload) {
   std::unique_ptr<core::ProcessGroup> newRoot;
   try {
     newRoot = 
std::move(flow_configuration_->getRootFromPayload(configurePayload));
-  }
-  catch (const YAML::Exception& e) {
+  } catch (const YAML::Exception& e) {
     logger_->log_error("Invalid configuration payload");
     return false;
   }
@@ -168,10 +168,9 @@ bool FlowController::applyConfiguration(std::string 
&configurePayload) {
   if (newRoot == nullptr)
     return false;
 
-  logger_->log_info("Starting to reload Flow Controller with flow control name 
%s, version %d",
-      newRoot->getName().c_str(), newRoot->getVersion());
+  logger_->log_info("Starting to reload Flow Controller with flow control name 
%s, version %d", newRoot->getName().c_str(), newRoot->getVersion());
 
-  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
   stop(true);
   waitUnload(30000);
   this->root_ = std::move(newRoot);
@@ -181,7 +180,7 @@ bool FlowController::applyConfiguration(std::string 
&configurePayload) {
 }
 
 void FlowController::stop(bool force) {
-  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
   if (running_) {
     // immediately indicate that we are not running
     running_ = false;
@@ -222,7 +221,7 @@ void FlowController::waitUnload(const uint64_t 
timeToWaitMs) {
 }
 
 void FlowController::unload() {
-  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
   if (running_) {
     stop(true);
   }
@@ -237,7 +236,7 @@ void FlowController::unload() {
 }
 
 void FlowController::load() {
-  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
   if (running_) {
     stop(true);
   }
@@ -246,29 +245,30 @@ void FlowController::load() {
     // grab the value for configuration
     if (this->http_configuration_listener_ == nullptr && 
configuration_->get(Configure::nifi_configuration_listener_type, listenerType)) 
{
       if (listenerType == "http") {
-        this->http_configuration_listener_ =
-              std::unique_ptr<minifi::HttpConfigurationListener>(new 
minifi::HttpConfigurationListener(shared_from_this(), configuration_));
+        this->http_configuration_listener_ = std::unique_ptr < 
minifi::HttpConfigurationListener > (new 
minifi::HttpConfigurationListener(shared_from_this(), configuration_));
       }
     }
 
     logger_->log_info("Initializing timers");
     if (nullptr == timer_scheduler_) {
-      timer_scheduler_ = 
std::make_shared<TimerDrivenSchedulingAgent>(std::static_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this()),
 provenance_repo_, configuration_);
+      timer_scheduler_ = std::make_shared < TimerDrivenSchedulingAgent
+          > (std::static_pointer_cast < 
core::controller::ControllerServiceProvider > (shared_from_this()), 
provenance_repo_, flow_file_repo_, content_repo_, configuration_);
     }
     if (nullptr == event_scheduler_) {
-      event_scheduler_ = 
std::make_shared<EventDrivenSchedulingAgent>(std::static_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this()),
 provenance_repo_, configuration_);
+      event_scheduler_ = std::make_shared < EventDrivenSchedulingAgent
+          > (std::static_pointer_cast < 
core::controller::ControllerServiceProvider > (shared_from_this()), 
provenance_repo_, flow_file_repo_, content_repo_, configuration_);
     }
     logger_->log_info("Load Flow Controller from file %s", 
configuration_filename_.c_str());
 
-    this->root_ = 
std::shared_ptr<core::ProcessGroup>(flow_configuration_->getRoot(configuration_filename_));
+    this->root_ = std::shared_ptr < core::ProcessGroup > 
(flow_configuration_->getRoot(configuration_filename_));
 
     logger_->log_info("Loaded root processor Group");
 
     controller_service_provider_ = 
flow_configuration_->getControllerServiceProvider();
 
-    
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_);
-    
std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent(
-        std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_));
+    std::static_pointer_cast < 
core::controller::StandardControllerServiceProvider > 
(controller_service_provider_)->setRootGroup(root_);
+    std::static_pointer_cast < 
core::controller::StandardControllerServiceProvider
+        > 
(controller_service_provider_)->setSchedulingAgent(std::static_pointer_cast < 
minifi::SchedulingAgent > (event_scheduler_));
 
     logger_->log_info("Loaded controller service provider");
     // Load Flow File from Repo
@@ -279,7 +279,7 @@ void FlowController::load() {
 }
 
 void FlowController::reload(std::string yamlFile) {
-  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
   logger_->log_info("Starting to reload Flow Controller with yaml %s", 
yamlFile.c_str());
   stop(true);
   unload();
@@ -305,18 +305,18 @@ void FlowController::loadFlowRepo() {
       this->root_->getConnections(connectionMap);
     }
     logger_->log_debug("Number of connections from connectionMap %d", 
connectionMap.size());
-    auto rep = 
std::dynamic_pointer_cast<core::repository::FlowFileRepository>(flow_file_repo_);
+    auto rep = std::dynamic_pointer_cast < 
core::repository::FlowFileRepository > (flow_file_repo_);
     if (nullptr != rep) {
       rep->setConnectionMap(connectionMap);
     }
-    flow_file_repo_->loadComponent();
+    flow_file_repo_->loadComponent(content_repo_);
   } else {
     logger_->log_debug("Flow file repository is not set");
   }
 }
 
 bool FlowController::start() {
-  std::lock_guard<std::recursive_mutex> flow_lock(mutex_);
+  std::lock_guard < std::recursive_mutex > flow_lock(mutex_);
   if (!initialized_) {
     logger_->log_error("Can not start Flow Controller because it has not been 
initialized");
     return false;
@@ -349,8 +349,7 @@ bool FlowController::start() {
  * @param id service identifier
  * @param firstTimeAdded first time this CS was added
  */
-std::shared_ptr<core::controller::ControllerServiceNode> 
FlowController::createControllerService(const std::string &type, const 
std::string &id,
-bool firstTimeAdded) {
+std::shared_ptr<core::controller::ControllerServiceNode> 
FlowController::createControllerService(const std::string &type, const 
std::string &id, bool firstTimeAdded) {
   return controller_service_provider_->createControllerService(type, id, 
firstTimeAdded);
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 12711a9..efd6fa7 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -40,8 +40,10 @@ namespace minifi {
 std::shared_ptr<logging::Logger> FlowFileRecord::logger_ = 
logging::LoggerFactory<FlowFileRecord>::getLogger();
 std::atomic<uint64_t> FlowFileRecord::local_flow_seq_number_(0);
 
-FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> 
flow_repository, std::map<std::string, std::string> attributes, 
std::shared_ptr<ResourceClaim> claim)
+FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> 
flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, 
std::map<std::string, std::string> attributes,
+                               std::shared_ptr<ResourceClaim> claim)
     : FlowFile(),
+      content_repo_(content_repo),
       flow_repository_(flow_repository) {
   id_ = local_flow_seq_number_.load();
   claim_ = claim;
@@ -64,9 +66,11 @@ 
FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository
     claim_->increaseFlowFileRecordOwnedCount();
 }
 
-FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> 
flow_repository, std::shared_ptr<core::FlowFile> &event, const std::string 
&uuidConnection)
+FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> 
flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, 
std::shared_ptr<core::FlowFile> &event,
+                               const std::string &uuidConnection)
     : FlowFile(),
       snapshot_(""),
+      content_repo_(content_repo),
       flow_repository_(flow_repository) {
   entry_date_ = event->getEntryDate();
   lineage_start_date_ = event->getlineageStartDate();
@@ -82,10 +86,11 @@ 
FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository
   }
 }
 
-FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> 
flow_repository, std::shared_ptr<core::FlowFile> &event)
+FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> 
flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, 
std::shared_ptr<core::FlowFile> &event)
     : FlowFile(),
       uuid_connection_(""),
       snapshot_(""),
+      content_repo_(content_repo),
       flow_repository_(flow_repository) {
 }
 
@@ -101,7 +106,7 @@ FlowFileRecord::~FlowFileRecord() {
     if (claim_->getFlowFileRecordOwnedCount() <= 0) {
       logger_->log_debug("Delete Resource Claim %s", 
claim_->getContentFullPath().c_str());
       if (!this->stored || !flow_repository_->Get(uuid_str_, value)) {
-        std::remove(claim_->getContentFullPath().c_str());
+        content_repo_->remove(claim_);
       }
     }
   }
@@ -319,6 +324,9 @@ bool FlowFileRecord::DeSerialize(const uint8_t *buffer, 
const int bufferSize) {
     return false;
   }
 
+  if (nullptr == claim_) {
+    claim_ = std::make_shared<ResourceClaim>(content_full_fath_, 
content_repo_, true);
+  }
   return true;
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/HttpConfigurationListener.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/HttpConfigurationListener.cpp 
b/libminifi/src/HttpConfigurationListener.cpp
index 39da67b..c16ca75 100644
--- a/libminifi/src/HttpConfigurationListener.cpp
+++ b/libminifi/src/HttpConfigurationListener.cpp
@@ -63,17 +63,14 @@ bool 
HttpConfigurationListener::pullConfiguration(std::string &configuration) {
   }
 
   utils::HTTPRequestResponse content;
-  curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION,
-      &utils::HTTPRequestResponse::recieve_write);
+  curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, 
&utils::HTTPRequestResponse::recieve_write);
 
-  curl_easy_setopt(http_session, CURLOPT_WRITEDATA,
-      static_cast<void*>(&content));
+  curl_easy_setopt(http_session, CURLOPT_WRITEDATA, 
static_cast<void*>(&content));
 
   CURLcode res = curl_easy_perform(http_session);
 
   if (res == CURLE_OK) {
-    logger_->log_debug("HttpConfigurationListener -- curl successful to %s",
-        fullUrl.c_str());
+    logger_->log_debug("HttpConfigurationListener -- curl successful to %s", 
fullUrl.c_str());
 
     std::string response_body(content.data.begin(), content.data.end());
     int64_t http_code = 0;
@@ -82,8 +79,7 @@ bool HttpConfigurationListener::pullConfiguration(std::string 
&configuration) {
     /* ask for the content-type */
     curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, &content_type);
 
-    bool isSuccess = ((int32_t) (http_code / 100)) == 2
-        && res != CURLE_ABORTED_BY_CALLBACK;
+    bool isSuccess = ((int32_t) (http_code / 100)) == 2 && res != 
CURLE_ABORTED_BY_CALLBACK;
     bool body_empty = IsNullOrEmpty(content.data);
 
     if (isSuccess && !body_empty) {
@@ -94,9 +90,7 @@ bool HttpConfigurationListener::pullConfiguration(std::string 
&configuration) {
       logger_->log_error("Cannot output body to content");
     }
   } else {
-    logger_->log_error(
-        "HttpConfigurationListener -- curl_easy_perform() failed %s\n",
-        curl_easy_strerror(res));
+    logger_->log_error("HttpConfigurationListener -- curl_easy_perform() 
failed %s\n", curl_easy_strerror(res));
   }
   curl_easy_cleanup(http_session);
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/Properties.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp
index abebfbb..076cefc 100644
--- a/libminifi/src/Properties.cpp
+++ b/libminifi/src/Properties.cpp
@@ -34,7 +34,7 @@ Properties::Properties()
 
 // Get the config value
 bool Properties::get(std::string key, std::string &value) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
   auto it = properties_.find(key);
 
   if (it != properties_.end()) {
@@ -46,7 +46,7 @@ bool Properties::get(std::string key, std::string &value) {
 }
 
 int Properties::getInt(const std::string &key, int default_value) {
-  std::lock_guard<std::mutex> lock(mutex_);
+  std::lock_guard < std::mutex > lock(mutex_);
   auto it = properties_.find(key);
 
   if (it != properties_.end()) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp 
b/libminifi/src/RemoteProcessorGroupPort.cpp
index d1862cd..3c88e8f 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -18,7 +18,7 @@
  * limitations under the License.
  */
 
-#include "../include/RemoteProcessorGroupPort.h"
+#include "RemoteProcessorGroupPort.h"
 
 #include <curl/curl.h>
 #include <curl/curlbuild.h>
@@ -30,18 +30,20 @@
 #include <deque>
 #include <iostream>
 #include <set>
+
 #include <string>
 #include <type_traits>
 #include <utility>
 #include "json/json.h"
 #include "json/writer.h"
 
-#include "../include/core/logging/Logger.h"
-#include "../include/core/ProcessContext.h"
-#include "../include/core/ProcessorNode.h"
-#include "../include/core/Property.h"
-#include "../include/core/Relationship.h"
-#include "../include/Site2SitePeer.h"
+#include "Exception.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessorNode.h"
+#include "core/Property.h"
+#include "core/Relationship.h"
+#include "Site2SitePeer.h"
 
 namespace org {
 namespace apache {
@@ -54,8 +56,7 @@ core::Property RemoteProcessorGroupPort::port("Port", "Remote 
Port", "");
 core::Property RemoteProcessorGroupPort::portUUID("Port UUID", "Specifies 
remote NiFi Port UUID.", "");
 core::Relationship RemoteProcessorGroupPort::relation;
 
-std::unique_ptr<Site2SiteClientProtocol> 
RemoteProcessorGroupPort::getNextProtocol(
-bool create = true) {
+std::unique_ptr<Site2SiteClientProtocol> 
RemoteProcessorGroupPort::getNextProtocol(bool create = true) {
   std::unique_ptr<Site2SiteClientProtocol> nextProtocol = nullptr;
   if (!available_protocols_.try_dequeue(nextProtocol)) {
     if (create) {
@@ -170,31 +171,41 @@ void 
RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::Pr
     uuid_parse(value.c_str(), protocol_uuid_);
   }
 
-  std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol();
+  std::unique_ptr<Site2SiteClientProtocol> protocol_ = nullptr;
+  try {
+    protocol_ = getNextProtocol();
 
-  if (!protocol_) {
-    context->yield();
-    return;
-  }
+    if (!protocol_) {
+      context->yield();
+      return;
+    }
+    if (!protocol_->bootstrap()) {
+      // bootstrap the client protocol if needeed
+      context->yield();
+      std::shared_ptr<Processor> processor = std::static_pointer_cast < 
Processor > (context->getProcessorNode().getProcessor());
+      logger_->log_error("Site2Site bootstrap failed yield period %d peer ", 
processor->getYieldPeriodMsec());
+
+      return;
+    }
+
+    if (direction_ == RECEIVE) {
+      protocol_->receiveFlowFiles(context, session);
+    } else {
+      protocol_->transferFlowFiles(context, session);
+    }
 
-  if (!protocol_->bootstrap()) {
-    // bootstrap the client protocol if needeed
-    context->yield();
-    std::shared_ptr<Processor> processor = 
std::static_pointer_cast<Processor>(context->getProcessorNode().getProcessor());
-    logger_->log_error("Site2Site bootstrap failed yield period %d peer ", 
processor->getYieldPeriodMsec());
     returnProtocol(std::move(protocol_));
     return;
+  } catch (const minifi::Exception &ex2) {
+    context->yield();
+    session->rollback();
+  } catch (...) {
+    context->yield();
+    session->rollback();
   }
 
-  if (direction_ == RECEIVE) {
-    protocol_->receiveFlowFiles(context, session);
-  } else {
-    protocol_->transferFlowFiles(context, session);
-  }
-
-  returnProtocol(std::move(protocol_));
 
-  return;
+  throw std::exception();
 }
 
 void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/ResourceClaim.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp
index 1a9f2fe..e7d4557 100644
--- a/libminifi/src/ResourceClaim.cpp
+++ b/libminifi/src/ResourceClaim.cpp
@@ -17,14 +17,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "ResourceClaim.h"
 #include <uuid/uuid.h>
-
 #include <map>
 #include <queue>
 #include <string>
 #include <vector>
-
-#include "ResourceClaim.h"
+#include <memory>
+#include "core/StreamManager.h"
+#include "utils/Id.h"
 #include "core/logging/LoggerConfiguration.h"
 
 namespace org {
@@ -36,14 +37,22 @@ utils::NonRepeatingStringGenerator 
ResourceClaim::non_repeating_string_generator
 
 char *ResourceClaim::default_directory_path = 
const_cast<char*>(DEFAULT_CONTENT_DIRECTORY);
 
-ResourceClaim::ResourceClaim(const std::string contentDirectory)
+ResourceClaim::ResourceClaim(std::shared_ptr<core::StreamManager<ResourceClaim>>
 claim_manager, const std::string contentDirectory)
     : _flowFileRecordOwnedCount(0),
+      claim_manager_(claim_manager),
+      deleted_(false),
       logger_(logging::LoggerFactory<ResourceClaim>::getLogger()) {
   // Create the full content path for the content
   _contentFullPath = contentDirectory + "/" + 
non_repeating_string_generator_.generate();
   logger_->log_debug("Resource Claim created %s", _contentFullPath);
 }
 
+ResourceClaim::ResourceClaim(const std::string path, 
std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, bool deleted)
+    : claim_manager_(claim_manager),
+      deleted_(deleted) {
+  _contentFullPath = path;
+}
+
 } /* namespace minifi */
 } /* namespace nifi */
 } /* namespace apache */

Reply via email to