MINIFI-249: Update prov repo to better abstract deser.

- Deserialization and serialization are better abstracted
into SerializableComponent allowing us to use all repos with
the same [de]serialization interfaces.
- Update Test resources to use local http server when possible
- Allow for different volatile configurations
- Update removals and link FlowFileRepo to content repo

MINIFI-330: convert const char* to std::string

This closes #110.

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/fe634853
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/fe634853
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/fe634853

Branch: refs/heads/master
Commit: fe63485342fc0e84250887871826b9f17b1f9a94
Parents: 20622f6
Author: Marc Parisi <[email protected]>
Authored: Wed May 24 14:39:08 2017 -0400
Committer: Aldrin Piri <[email protected]>
Committed: Thu Jul 27 12:43:47 2017 -0400

----------------------------------------------------------------------
 cmake/BuildTests.cmake                          |   4 +-
 libminifi/include/Connection.h                  |   6 +-
 libminifi/include/EventDrivenSchedulingAgent.h  |   5 +-
 libminifi/include/FlowController.h              |  23 +-
 libminifi/include/FlowFileRecord.h              |  31 +-
 libminifi/include/ResourceClaim.h               |  35 +-
 libminifi/include/SchedulingAgent.h             |  12 +-
 libminifi/include/Site2SiteClientProtocol.h     |  35 +-
 libminifi/include/ThreadedSchedulingAgent.h     |   7 +-
 libminifi/include/TimerDrivenSchedulingAgent.h  |   7 +-
 libminifi/include/core/ConfigurationFactory.h   |  11 +-
 libminifi/include/core/ContentRepository.h      |  60 ++
 libminifi/include/core/Core.h                   |   3 +
 libminifi/include/core/FlowConfiguration.h      |  16 +-
 libminifi/include/core/ProcessContext.h         |  27 +-
 libminifi/include/core/ProcessGroup.h           |   3 +-
 libminifi/include/core/ProcessSession.h         |   2 +-
 libminifi/include/core/Repository.h             |  92 ++-
 libminifi/include/core/RepositoryFactory.h      |  16 +-
 libminifi/include/core/SerializableComponent.h  |  88 +++
 libminifi/include/core/StreamManager.h          |  81 +++
 .../include/core/logging/LoggerConfiguration.h  |   3 +-
 .../SiteToSiteProvenanceReportingTask.h         |   4 +-
 .../include/core/repository/AtomicRepoEntries.h | 501 ++++++++++++++++
 .../core/repository/FileSystemRepository.h      |  72 +++
 .../core/repository/FlowFileRepository.h        |  25 +-
 .../core/repository/VolatileContentRepository.h | 138 +++++
 .../repository/VolatileFlowFileRepository.h     |  82 +++
 .../repository/VolatileProvenanceRepository.h   |  60 ++
 .../core/repository/VolatileRepository.h        | 517 +++++++++--------
 libminifi/include/core/yaml/YamlConfiguration.h |  39 +-
 libminifi/include/io/AtomicEntryStream.h        | 205 +++++++
 libminifi/include/io/BaseStream.h               |  13 +
 libminifi/include/io/ClientSocket.h             |   6 +-
 libminifi/include/io/DataStream.h               |   8 +-
 libminifi/include/io/FileStream.h               | 136 +++++
 libminifi/include/processors/ExecuteProcess.h   |   8 +-
 libminifi/include/processors/GenerateFlowFile.h |   6 +-
 libminifi/include/processors/InvokeHTTP.h       |   6 +
 libminifi/include/processors/ListenHTTP.h       |   2 +-
 libminifi/include/processors/ListenSyslog.h     |  10 +-
 libminifi/include/processors/LogAttribute.h     |  28 +-
 libminifi/include/processors/PutFile.h          |   3 +-
 libminifi/include/properties/Configure.h        |   1 +
 libminifi/include/provenance/Provenance.h       |  62 +-
 .../include/provenance/ProvenanceRepository.h   |  82 ++-
 libminifi/include/utils/ByteInputCallBack.h     |  20 +-
 libminifi/src/ConfigurationListener.cpp         |  12 +-
 libminifi/src/Configure.cpp                     |  52 +-
 libminifi/src/Connection.cpp                    |   9 +-
 libminifi/src/FlowControlProtocol.cpp           |   5 +-
 libminifi/src/FlowController.cpp                |  47 +-
 libminifi/src/FlowFileRecord.cpp                |  16 +-
 libminifi/src/HttpConfigurationListener.cpp     |  16 +-
 libminifi/src/Properties.cpp                    |   4 +-
 libminifi/src/RemoteProcessorGroupPort.cpp      |  65 ++-
 libminifi/src/ResourceClaim.cpp                 |  17 +-
 libminifi/src/SchedulingAgent.cpp               |  12 +-
 libminifi/src/Site2SiteClientProtocol.cpp       |  19 +-
 libminifi/src/ThreadedSchedulingAgent.cpp       |   8 +-
 libminifi/src/controllers/SSLContextService.cpp |  14 +-
 libminifi/src/core/ClassLoader.cpp              |   6 +-
 libminifi/src/core/ConfigurableComponent.cpp    |  13 +-
 libminifi/src/core/ConfigurationFactory.cpp     |  12 +-
 libminifi/src/core/Connectable.cpp              |  12 +-
 libminifi/src/core/Core.cpp                     |   5 +
 libminifi/src/core/FlowConfiguration.cpp        |  12 +-
 libminifi/src/core/FlowFile.cpp                 |   2 +-
 libminifi/src/core/ProcessGroup.cpp             |  29 +-
 libminifi/src/core/ProcessSession.cpp           | 571 ++++++++++---------
 libminifi/src/core/ProcessSessionFactory.cpp    |   2 +-
 libminifi/src/core/Processor.cpp                |  20 +-
 libminifi/src/core/Repository.cpp               |   6 +-
 libminifi/src/core/RepositoryFactory.cpp        |  47 +-
 .../StandardControllerServiceNode.cpp           |   6 +-
 .../src/core/logging/LoggerConfiguration.cpp    |  16 +-
 .../SiteToSiteProvenanceReportingTask.cpp       |  31 +-
 .../core/repository/FileSystemRepository.cpp    |  54 ++
 .../src/core/repository/FlowFileRepository.cpp  |  54 +-
 .../repository/VolatileContentRepository.cpp    | 183 ++++++
 .../src/core/repository/VolatileRepository.cpp  |  29 +-
 libminifi/src/core/yaml/YamlConfiguration.cpp   |  66 ++-
 libminifi/src/io/AtomicEntryStream.cpp          |  34 ++
 libminifi/src/io/ClientSocket.cpp               |  14 +-
 libminifi/src/io/FileStream.cpp                 | 160 ++++++
 libminifi/src/io/StreamFactory.cpp              |   6 +-
 libminifi/src/processors/ExecuteProcess.cpp     |   6 +-
 libminifi/src/processors/GenerateFlowFile.cpp   |   4 +-
 libminifi/src/processors/GetFile.cpp            |   8 +-
 libminifi/src/processors/InvokeHTTP.cpp         |  33 +-
 libminifi/src/processors/ListenHTTP.cpp         |  10 +-
 libminifi/src/processors/ListenSyslog.cpp       |   2 +-
 libminifi/src/processors/LogAttribute.cpp       |   9 +-
 libminifi/src/processors/PutFile.cpp            |  45 +-
 libminifi/src/processors/TailFile.cpp           |  26 +-
 libminifi/src/provenance/Provenance.cpp         |  50 +-
 .../src/provenance/ProvenanceRepository.cpp     |  18 +-
 libminifi/test/CPPLINT.cfg                      |   1 +
 libminifi/test/TestBase.cpp                     | 211 +++++++
 libminifi/test/TestBase.h                       | 102 +++-
 libminifi/test/TestServer.h                     | 137 +++++
 .../ControllerServiceIntegrationTests.cpp       |  11 +-
 .../HttpConfigurationListenerTest.cpp           |  67 +--
 .../test/integration/HttpGetIntegrationTest.cpp |  67 ++-
 .../integration/HttpPostIntegrationTest.cpp     |  34 +-
 .../integration/ProvenanceReportingTest.cpp     |  11 +-
 .../test/integration/Site2SiteRestTest.cpp      |  61 +-
 .../test/integration/TestExecuteProcess.cpp     |  20 +-
 libminifi/test/resources/TestHTTPGet.yml        |   2 +-
 libminifi/test/resources/TestHTTPGetSecure.yml  |   2 +-
 libminifi/test/resources/TestHTTPPost.yml       |   2 +-
 libminifi/test/resources/cn.ckey.pem            |   1 -
 libminifi/test/resources/cn.crt.pem             |   1 -
 libminifi/test/resources/nifi-cert.pem          |  27 +
 libminifi/test/unit/FileStreamTests.cpp         | 210 +++++++
 libminifi/test/unit/InvokeHTTPTests.cpp         | 173 ++----
 libminifi/test/unit/ProcessorTests.cpp          | 289 ++++------
 libminifi/test/unit/ProvenanceTestHelper.h      |  70 ++-
 libminifi/test/unit/ProvenanceTests.cpp         |  33 +-
 libminifi/test/unit/RepoTests.cpp               |  14 +-
 libminifi/test/unit/TailFileTests.cpp           | 187 +++---
 libminifi/test/unit/YamlConfigurationTests.cpp  |   6 +-
 main/MiNiFiMain.cpp                             |  59 +-
 123 files changed, 4638 insertions(+), 1658 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/cmake/BuildTests.cmake
----------------------------------------------------------------------
diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake
index 59f1d59..9500792 100644
--- a/cmake/BuildTests.cmake
+++ b/cmake/BuildTests.cmake
@@ -67,7 +67,7 @@ GETSOURCEFILES(INTEGRATION_TESTS "${TEST_DIR}/integration/")
 SET(UNIT_TEST_COUNT 0)
 FOREACH(testfile ${UNIT_TESTS})
        get_filename_component(testfilename "${testfile}" NAME_WE)
-       add_executable("${testfilename}" "${TEST_DIR}/unit/${testfile}" 
${SPD_SOURCES})
+       add_executable("${testfilename}" "${TEST_DIR}/unit/${testfile}" 
${SPD_SOURCES} "${TEST_DIR}/TestBase.cpp")
        createTests("${testfilename}")
        MATH(EXPR UNIT_TEST_COUNT "${UNIT_TEST_COUNT}+1")
        add_test(NAME "${testfilename}" COMMAND "${testfilename}" 
WORKING_DIRECTORY ${TEST_DIR})
@@ -77,7 +77,7 @@ message("-- Finished building ${UNIT_TEST_COUNT} unit test 
file(s)...")
 SET(INT_TEST_COUNT 0)
 FOREACH(testfile ${INTEGRATION_TESTS})
        get_filename_component(testfilename "${testfile}" NAME_WE)
-       add_executable("${testfilename}" "${TEST_DIR}/integration/${testfile}" 
${SPD_SOURCES})
+       add_executable("${testfilename}" "${TEST_DIR}/integration/${testfile}" 
${SPD_SOURCES} "${TEST_DIR}/TestBase.cpp")
        createTests("${testfilename}")
        #message("Adding ${testfilename} from ${testfile}")
        MATH(EXPR INT_TEST_COUNT "${INT_TEST_COUNT}+1")

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/Connection.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index be51fce..ff32baf 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -47,7 +47,9 @@ class Connection : public core::Connectable, public 
std::enable_shared_from_this
   /*
    * Create a new processor
    */
-  explicit Connection(std::shared_ptr<core::Repository> flow_repository, 
std::string name, uuid_t uuid = NULL, uuid_t srcUUID = NULL, uuid_t destUUID = 
NULL);
+  explicit Connection(const std::shared_ptr<core::Repository> 
&flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, 
std::string name, uuid_t uuid = NULL,
+                      uuid_t srcUUID = NULL,
+                      uuid_t destUUID = NULL);
   // Destructor
   virtual ~Connection() {
   }
@@ -168,6 +170,8 @@ class Connection : public core::Connectable, public 
std::enable_shared_from_this
   std::atomic<uint64_t> expired_duration_;
   // flow file repository
   std::shared_ptr<core::Repository> flow_repository_;
+  // content repository reference.
+  std::shared_ptr<core::ContentRepository> content_repo_;
 
  private:
   // Mutex for protection

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/EventDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h 
b/libminifi/include/EventDrivenSchedulingAgent.h
index 6a63dc5..c838b11 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -38,8 +38,9 @@ class EventDrivenSchedulingAgent : public 
ThreadedSchedulingAgent {
   /*!
    * Create a new event driven scheduling agent.
    */
-  
EventDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
 controller_service_provider, std::shared_ptr<core::Repository> repo, 
std::shared_ptr<Configure> configuration)
-      : ThreadedSchedulingAgent(controller_service_provider, repo, 
configuration) {
+  
EventDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
 controller_service_provider, std::shared_ptr<core::Repository> repo,
+                             std::shared_ptr<core::Repository> flow_repo, 
std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<Configure> configuration)
+      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, 
content_repo, configuration) {
   }
   // Destructor
   virtual ~EventDrivenSchedulingAgent() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h 
b/libminifi/include/FlowController.h
index c2fef2a..d9a0452 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -70,10 +70,23 @@ class FlowController : public 
core::controller::ControllerServiceProvider, publi
   /**
    * Flow controller constructor
    */
-  FlowController(std::shared_ptr<core::Repository> provenance_repo, 
std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> 
configure,
-                 std::unique_ptr<core::FlowConfiguration> flow_configuration,
-                 const std::string name = DEFAULT_ROOT_GROUP_NAME,
-                 bool headless_mode = false);
+  explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, 
std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> 
configure,
+                          std::unique_ptr<core::FlowConfiguration> 
flow_configuration,
+                          std::shared_ptr<core::ContentRepository> 
content_repo, const std::string name, bool headless_mode);
+
+  explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, 
std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> 
configure,
+                          std::unique_ptr<core::FlowConfiguration> 
flow_configuration,
+                          std::shared_ptr<core::ContentRepository> 
content_repo)
+      : FlowController(provenance_repo, flow_file_repo, configure, 
std::move(flow_configuration), content_repo, DEFAULT_ROOT_GROUP_NAME, false)
+  {
+  }
+
+  explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, 
std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> 
configure,
+                          std::unique_ptr<core::FlowConfiguration> 
flow_configuration)
+      : FlowController(provenance_repo, flow_file_repo, configure, 
std::move(flow_configuration), 
std::make_shared<core::repository::FileSystemRepository>(), 
DEFAULT_ROOT_GROUP_NAME, false)
+  {
+    content_repo_->initialize(configure);
+  }
 
   // Destructor
   virtual ~FlowController();
@@ -301,6 +314,8 @@ class FlowController : public 
core::controller::ControllerServiceProvider, publi
   // FlowFile Repo
   std::shared_ptr<core::Repository> flow_file_repo_;
 
+  std::shared_ptr<core::ContentRepository> content_repo_;
+
   // Flow Engines
   // Flow Timer Scheduler
   std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/FlowFileRecord.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowFileRecord.h 
b/libminifi/include/FlowFileRecord.h
index 3d6057d..d6e5f2e 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -30,7 +30,8 @@
 #include <sstream>
 #include <fstream>
 #include <set>
-
+#include "core/ContentRepository.h"
+#include "io/BaseStream.h"
 #include "io/Serializable.h"
 #include "core/FlowFile.h"
 #include "utils/TimeUtil.h"
@@ -81,11 +82,21 @@ inline const char *FlowAttributeKey(FlowAttribute 
attribute) {
 // throw exception for error
 class InputStreamCallback {
  public:
-  virtual void process(std::ifstream *stream) = 0;
+  virtual ~InputStreamCallback() {
+
+  }
+  //virtual void process(std::ifstream *stream) = 0;
+
+  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) = 0;
 };
 class OutputStreamCallback {
  public:
-  virtual void process(std::ofstream *stream) = 0;
+  virtual ~OutputStreamCallback() {
+
+  }
+  //virtual void process(std::ofstream *stream) = 0;
+  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) = 0;
+
 };
 
 class FlowFileRecord : public core::FlowFile, public io::Serializable {
@@ -94,14 +105,17 @@ class FlowFileRecord : public core::FlowFile, public 
io::Serializable {
   /*
    * Create a new flow record
    */
-  explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, 
std::map<std::string, std::string> attributes, std::shared_ptr<ResourceClaim> 
claim = nullptr);
+  explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, 
const std::shared_ptr<core::ContentRepository> &content_repo, 
std::map<std::string, std::string> attributes,
+                          std::shared_ptr<ResourceClaim> claim = nullptr);
 
-  explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, 
std::shared_ptr<core::FlowFile> &event);
+  explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, 
const std::shared_ptr<core::ContentRepository> &content_repo, 
std::shared_ptr<core::FlowFile> &event);
 
-  explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, 
std::shared_ptr<core::FlowFile> &event, const std::string &uuidConnection);
+  explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, 
const std::shared_ptr<core::ContentRepository> &content_repo, 
std::shared_ptr<core::FlowFile> &event,
+                          const std::string &uuidConnection);
 
-  explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository)
+  explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, 
const std::shared_ptr<core::ContentRepository> &content_repo)
       : FlowFile(),
+        content_repo_(content_repo),
         flow_repository_(flow_repository),
         snapshot_("") {
 
@@ -168,6 +182,9 @@ class FlowFileRecord : public core::FlowFile, public 
io::Serializable {
   // repository reference.
   std::shared_ptr<core::Repository> flow_repository_;
 
+  // content repo reference.
+  std::shared_ptr<core::ContentRepository> content_repo_;
+
   // Snapshot flow record for session rollback
   bool snapshot_;
   // Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/ResourceClaim.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ResourceClaim.h 
b/libminifi/include/ResourceClaim.h
index 49faed1..19a67fa 100644
--- a/libminifi/include/ResourceClaim.h
+++ b/libminifi/include/ResourceClaim.h
@@ -25,9 +25,11 @@
 #include <vector>
 #include <queue>
 #include <map>
+#include <memory>
 #include <mutex>
 #include <atomic>
 #include "core/Core.h"
+#include "core/StreamManager.h"
 #include "properties/Configure.h"
 #include "utils/Id.h"
 
@@ -40,7 +42,7 @@ namespace minifi {
 #define DEFAULT_CONTENT_DIRECTORY "./content_repository"
 
 // ResourceClaim Class
-class ResourceClaim {
+class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> {
 
  public:
 
@@ -49,7 +51,9 @@ class ResourceClaim {
   /*!
    * Create a new resource claim
    */
-  ResourceClaim(const std::string contentDirectory = default_directory_path);
+  ResourceClaim(std::shared_ptr<core::StreamManager<ResourceClaim>> 
claim_manager, const std::string contentDirectory = default_directory_path);
+
+  ResourceClaim(const std::string path, 
std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, bool deleted 
= false);
   // Destructor
   virtual ~ResourceClaim() {
   }
@@ -59,7 +63,11 @@ class ResourceClaim {
   }
   // decreaseFlowFileRecordOwenedCount
   void decreaseFlowFileRecordOwnedCount() {
-    --_flowFileRecordOwnedCount;
+
+    if (_flowFileRecordOwnedCount > 0) {
+      _flowFileRecordOwnedCount--;
+    }
+
   }
   // getFlowFileRecordOwenedCount
   uint64_t getFlowFileRecordOwnedCount() {
@@ -74,14 +82,35 @@ class ResourceClaim {
     _contentFullPath = path;
   }
 
+  void deleteClaim() {
+    if (!deleted_)
+    {
+      deleted_ = true;
+    }
+
+  }
+
+  friend std::ostream& operator<<(std::ostream& stream, const ResourceClaim& 
claim) {
+    stream << claim._contentFullPath;
+    return stream;
+  }
+
+  friend std::ostream& operator<<(std::ostream& stream, const 
std::shared_ptr<ResourceClaim>& claim) {
+    stream << claim->_contentFullPath;
+    return stream;
+  }
  protected:
+  std::atomic<bool> deleted_;
   // Full path to the content
   std::string _contentFullPath;
 
   // How many FlowFileRecord Own this cliam
   std::atomic<uint64_t> _flowFileRecordOwnedCount;
 
+  std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager_;
+
  private:
+
   // Logger
   std::shared_ptr<logging::Logger> logger_;
   // Prevent default copy constructor and assignment operation

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/SchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/SchedulingAgent.h 
b/libminifi/include/SchedulingAgent.h
index 22f79db..1ff3fac 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -52,14 +52,19 @@ class SchedulingAgent {
   /*!
    * Create a new scheduling agent.
    */
-  SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_service_provider, std::shared_ptr<core::Repository> repo, 
std::shared_ptr<Configure> configuration)
+  SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_service_provider, std::shared_ptr<core::Repository> repo,
+                 std::shared_ptr<core::Repository> flow_repo,
+                  std::shared_ptr<core::ContentRepository> content_repo,
+                  std::shared_ptr<Configure> configuration)
       : configure_(configuration),
         admin_yield_duration_(0),
         bored_yield_duration_(0),
+        content_repo_(content_repo),
         controller_service_provider_(controller_service_provider),
         logger_(logging::LoggerFactory<SchedulingAgent>::getLogger()) {
     running_ = false;
     repo_ = repo;
+    flow_repo_ = flow_repo;
     utils::ThreadPool<bool> pool = 
utils::ThreadPool<bool>(configure_->getInt(Configure::nifi_flow_engine_threads, 
8), true);
     component_lifecycle_thread_pool_ = std::move(pool);
     component_lifecycle_thread_pool_.start();
@@ -77,7 +82,6 @@ class SchedulingAgent {
   // start
   void start() {
     running_ = true;
-
   }
   // stop
   void stop() {
@@ -108,6 +112,10 @@ class SchedulingAgent {
   std::shared_ptr<Configure> configure_;
 
   std::shared_ptr<core::Repository> repo_;
+  
+  std::shared_ptr<core::Repository> flow_repo_;
+
+  std::shared_ptr<core::ContentRepository> content_repo_;
   // thread pool for components.
   utils::ThreadPool<bool> component_lifecycle_thread_pool_;
   // controller service provider reference

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/Site2SiteClientProtocol.h
----------------------------------------------------------------------
diff --git a/libminifi/include/Site2SiteClientProtocol.h 
b/libminifi/include/Site2SiteClientProtocol.h
index 8d89004..dcb551a 100644
--- a/libminifi/include/Site2SiteClientProtocol.h
+++ b/libminifi/include/Site2SiteClientProtocol.h
@@ -549,7 +549,8 @@ class Site2SiteClientProtocol {
         : _packet(packet) {
     }
     DataPacket *_packet;
-    void process(std::ofstream *stream) {
+    //void process(std::ofstream *stream) {
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
       uint8_t buffer[8192];
       int len = _packet->_size;
       while (len > 0) {
@@ -557,11 +558,12 @@ class Site2SiteClientProtocol {
         int ret = _packet->_transaction->getStream().readData(buffer, size);
         if (ret != size) {
           _packet->_protocol->logger_->log_error("Site2Site Receive Flow Size 
%d Failed %d", size, ret);
-          break;
+          return -1;
         }
-        stream->write((const char *) buffer, size);
+        stream->write(buffer, size);
         len -= size;
       }
+      return len;
     }
   };
   // Nest Callback Class for read stream
@@ -571,22 +573,29 @@ class Site2SiteClientProtocol {
         : _packet(packet) {
     }
     DataPacket *_packet;
-    void process(std::ifstream *stream) {
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
       _packet->_size = 0;
-      uint8_t buffer[8192];
+      uint8_t buffer[8192] = { 0 };
       int readSize;
-      while (stream->good()) {
-        if (!stream->read((char *) buffer, 8192))
-          readSize = stream->gcount();
-        else
-          readSize = 8192;
+      size_t size = 0;
+      do {
+        readSize = stream->read(buffer, 8192);
+
+        if (readSize == 0) {
+          break;
+        }
+        if (readSize < 0) {
+          return -1;
+        }
         int ret = _packet->_transaction->getStream().writeData(buffer, 
readSize);
         if (ret != readSize) {
           _packet->_protocol->logger_->log_error("Site2Site Send Flow Size %d 
Failed %d", readSize, ret);
-          break;
+          return -1;
         }
-        _packet->_size += readSize;
-      }
+        size += readSize;
+      } while (size < stream->getSize());
+      _packet->_size = size;
+      return size;
     }
   };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/ThreadedSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/ThreadedSchedulingAgent.h 
b/libminifi/include/ThreadedSchedulingAgent.h
index 50ab6c9..b4db4bf 100644
--- a/libminifi/include/ThreadedSchedulingAgent.h
+++ b/libminifi/include/ThreadedSchedulingAgent.h
@@ -42,8 +42,11 @@ class ThreadedSchedulingAgent : public SchedulingAgent {
   /*!
    * Create a new threaded scheduling agent.
    */
-  
ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
 controller_service_provider, std::shared_ptr<core::Repository> repo, 
std::shared_ptr<Configure> configuration)
-      : SchedulingAgent(controller_service_provider, repo, configuration),
+  
ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
 controller_service_provider, std::shared_ptr<core::Repository> repo,
+                          std::shared_ptr<core::Repository> flow_repo,
+                          std::shared_ptr<core::ContentRepository> 
content_repo,
+                          std::shared_ptr<Configure> configuration)
+      : SchedulingAgent(controller_service_provider, repo, flow_repo, 
content_repo, configuration),
         logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) {
   }
   // Destructor

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/TimerDrivenSchedulingAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h 
b/libminifi/include/TimerDrivenSchedulingAgent.h
index 597dc76..816bcec 100644
--- a/libminifi/include/TimerDrivenSchedulingAgent.h
+++ b/libminifi/include/TimerDrivenSchedulingAgent.h
@@ -37,8 +37,11 @@ class TimerDrivenSchedulingAgent : public 
ThreadedSchedulingAgent {
   /*!
    * Create a new processor
    */
-  
TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
 controller_service_provider, std::shared_ptr<core::Repository> repo, 
std::shared_ptr<Configure> configure)
-      : ThreadedSchedulingAgent(controller_service_provider, repo, configure) {
+  
TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider>
 controller_service_provider, std::shared_ptr<core::Repository> repo,
+                             std::shared_ptr<core::Repository> flow_repo,
+                             std::shared_ptr<core::ContentRepository> 
content_repo,
+                             std::shared_ptr<Configure> configure)
+      : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, 
content_repo, configure) {
   }
   //  Destructor
   virtual ~TimerDrivenSchedulingAgent() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/ConfigurationFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ConfigurationFactory.h 
b/libminifi/include/core/ConfigurationFactory.h
index b58c170..61af8cd 100644
--- a/libminifi/include/core/ConfigurationFactory.h
+++ b/libminifi/include/core/ConfigurationFactory.h
@@ -30,6 +30,7 @@ namespace core {
 
 template<typename T>
 typename std::enable_if<!class_operations<T>::value, T*>::type 
instantiate(const std::shared_ptr<core::Repository> &repo, const 
std::shared_ptr<core::Repository> &flow_file_repo,
+                                                                           
const std::shared_ptr<core::ContentRepository> &content_repo,
                                                                            
std::shared_ptr<Configure> configuration,
                                                                            
const std::string path) {
   throw std::runtime_error("Cannot instantiate class");
@@ -37,16 +38,20 @@ typename std::enable_if<!class_operations<T>::value, 
T*>::type instantiate(const
 
 template<typename T>
 typename std::enable_if<class_operations<T>::value, T*>::type 
instantiate(const std::shared_ptr<core::Repository> &repo, const 
std::shared_ptr<core::Repository> &flow_file_repo,
+                                                                          
const std::shared_ptr<core::ContentRepository> &content_repo,
                                                                           
const std::shared_ptr<io::StreamFactory> &stream_factory,
-                                                                          
std::shared_ptr<Configure> configuration, const std::string path) {
-  return new T(repo, flow_file_repo, stream_factory, configuration, path);
+                                                                          
std::shared_ptr<Configure> configuration,
+                                                                          
const std::string path) {
+  return new T(repo, flow_file_repo, content_repo, stream_factory, 
configuration, path);
 }
 
 /**
  * Configuration factory is used to create a new FlowConfiguration
  * object.
  */
-std::unique_ptr<core::FlowConfiguration> 
createFlowConfiguration(std::shared_ptr<core::Repository> repo, 
std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> 
configure,
+std::unique_ptr<core::FlowConfiguration> 
createFlowConfiguration(std::shared_ptr<core::Repository> repo, 
std::shared_ptr<core::Repository> flow_file_repo,
+                                                                 
std::shared_ptr<core::ContentRepository> content_repo,
+                                                                 
std::shared_ptr<Configure> configure,
                                                                  
std::shared_ptr<io::StreamFactory> stream_factory,
                                                                  const 
std::string configuration_class_name, const std::string path = "",
                                                                  bool 
fail_safe = false);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/ContentRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ContentRepository.h 
b/libminifi/include/core/ContentRepository.h
new file mode 100644
index 0000000..b544ca0
--- /dev/null
+++ b/libminifi/include/core/ContentRepository.h
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_CONTENTREPOSITORY_H_
+#define LIBMINIFI_INCLUDE_CORE_CONTENTREPOSITORY_H_
+
+#include "properties/Configure.h"
+#include "ResourceClaim.h"
+#include "io/DataStream.h"
+#include "io/BaseStream.h"
+#include "StreamManager.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+/**
+ * Content repository definition that extends StreamManager.
+ */
+class ContentRepository : public StreamManager<minifi::ResourceClaim> {
+ public:
+  virtual ~ContentRepository() {
+
+  }
+
+  /**
+   * initialize this content repository using the provided configuration.
+   */
+  virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0;
+
+  /**
+   * Stops this repository.
+   */
+  virtual void stop() = 0;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_CONTENTREPOSITORY_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/Core.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h
index c32eb59..1dc79e7 100644
--- a/libminifi/include/core/Core.h
+++ b/libminifi/include/core/Core.h
@@ -19,6 +19,7 @@
 #define LIBMINIFI_INCLUDE_CORE_CORE_H_
 
 #include <cstdlib>
+#include <iostream>
 #include <memory>
 #include <string>
 #include <uuid/uuid.h>
@@ -132,6 +133,8 @@ class CoreComponent {
    */
   void setUUID(uuid_t uuid);
 
+  void setUUIDStr(const std::string uuidStr);
+
   /**
    * Returns the UUID through the provided object.
    * @param uuid uuid struct to which we will copy the memory

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h 
b/libminifi/include/core/FlowConfiguration.h
index 3429166..43d2bc0 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -58,11 +58,12 @@ class FlowConfiguration : public CoreComponent {
    * Constructor that will be used for configuring
    * the flow controller.
    */
-  explicit FlowConfiguration(std::shared_ptr<core::Repository> repo, 
std::shared_ptr<core::Repository> flow_file_repo, 
std::shared_ptr<io::StreamFactory> stream_factory,
-                             std::shared_ptr<Configure> configuration,
-                             const std::string path)
+  explicit FlowConfiguration(std::shared_ptr<core::Repository> repo, 
std::shared_ptr<core::Repository> flow_file_repo, 
std::shared_ptr<core::ContentRepository> content_repo,
+                             std::shared_ptr<io::StreamFactory> stream_factory,
+                             std::shared_ptr<Configure> configuration, const 
std::string path)
       : CoreComponent(core::getClassName<FlowConfiguration>()),
         flow_file_repo_(flow_file_repo),
+        content_repo_(content_repo),
         config_path_(path),
         stream_factory_(stream_factory),
         configuration_(configuration),
@@ -76,8 +77,9 @@ class FlowConfiguration : public CoreComponent {
   // Create Processor (Node/Input/Output Port) based on the name
   std::shared_ptr<core::Processor> createProcessor(std::string name, uuid_t 
uuid);
   // Create Root Processor Group
-  std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name,
-                                                             uuid_t uuid, int 
version);
+
+  std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, 
uuid_t uuid, int version);
+
   std::shared_ptr<core::controller::ControllerServiceNode> 
createControllerService(const std::string &class_name, const std::string &name, 
uuid_t uuid);
 
   // Create Remote Processor Group
@@ -100,7 +102,7 @@ class FlowConfiguration : public CoreComponent {
   }
 
   virtual std::unique_ptr<core::ProcessGroup> getRootFromPayload(
-      std::string &yamlConfigPayload) {
+                                                                 std::string 
&yamlConfigPayload) {
     return nullptr;
   }
 
@@ -127,6 +129,8 @@ class FlowConfiguration : public CoreComponent {
   std::string config_path_;
   // flow file repo
   std::shared_ptr<core::Repository> flow_file_repo_;
+  // content repository.
+  std::shared_ptr<core::ContentRepository> content_repo_;
   // stream factory
   std::shared_ptr<io::StreamFactory> stream_factory_;
   std::shared_ptr<Configure> configuration_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/ProcessContext.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessContext.h 
b/libminifi/include/core/ProcessContext.h
index 48e0108..f6aaf5e 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -25,8 +25,10 @@
 #include <mutex>
 #include <atomic>
 #include <algorithm>
-
+#include <memory>
 #include "Property.h"
+#include "core/ContentRepository.h"
+#include "core/repository/FileSystemRepository.h"
 #include "core/controller/ControllerServiceProvider.h"
 #include "core/controller/ControllerServiceLookup.h"
 #include "core/logging/LoggerConfiguration.h"
@@ -46,10 +48,13 @@ class ProcessContext : public 
controller::ControllerServiceLookup {
   /*!
    * Create a new process context associated with the processor/controller 
service/state manager
    */
-  ProcessContext(ProcessorNode &processor, 
std::shared_ptr<controller::ControllerServiceProvider> 
&controller_service_provider, std::shared_ptr<core::Repository> repo)
+  ProcessContext(ProcessorNode &processor, 
std::shared_ptr<controller::ControllerServiceProvider> 
&controller_service_provider, std::shared_ptr<core::Repository> repo,
+                std::shared_ptr<core::Repository> flow_repo,
+                 std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::FileSystemRepository>())
       : processor_node_(processor),
         controller_service_provider_(controller_service_provider),
-        logger_(logging::LoggerFactory<ProcessContext>::getLogger()) {
+        logger_(logging::LoggerFactory<ProcessContext>::getLogger()),
+        content_repo_(content_repo), flow_repo_(flow_repo) {
     repo_ = repo;
   }
   // Destructor
@@ -92,6 +97,18 @@ class ProcessContext : public 
controller::ControllerServiceLookup {
     return repo_;
   }
 
+  /**
+   * Returns a reference to the content repository for the running instance.
+   * @return content repository shared pointer.
+   */
+  std::shared_ptr<core::ContentRepository> getContentRepository() {
+    return content_repo_;
+  }
+  
+  std::shared_ptr<core::Repository> getFlowFileRepository() {
+    return flow_repo_;
+  }
+
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
   ProcessContext(const ProcessContext &parent) = delete;
@@ -145,6 +162,10 @@ class ProcessContext : public 
controller::ControllerServiceLookup {
   std::shared_ptr<controller::ControllerServiceProvider> 
controller_service_provider_;
   // repository shared pointer.
   std::shared_ptr<core::Repository> repo_;
+  std::shared_ptr<core::Repository> flow_repo_;
+
+  // repository shared pointer.
+  std::shared_ptr<core::ContentRepository> content_repo_;
   // Processor
   ProcessorNode processor_node_;
   // Logger

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/ProcessGroup.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessGroup.h 
b/libminifi/include/core/ProcessGroup.h
index 410480a..a0e51e3 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -56,8 +56,7 @@ class ProcessGroup {
   /*!
    * Create a new process group
    */
-  ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, 
int version = 0,
-               ProcessGroup *parent = NULL);
+  ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, 
int version = 0, ProcessGroup *parent = NULL);
   // Destructor
   virtual ~ProcessGroup();
   // Set Processor Name

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/ProcessSession.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessSession.h 
b/libminifi/include/core/ProcessSession.h
index ad79d12..d853e9b 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -50,7 +50,7 @@ class ProcessSession {
   ProcessSession(ProcessContext *processContext = NULL)
       : process_context_(processContext),
         logger_(logging::LoggerFactory<ProcessSession>::getLogger()) {
-    logger_->log_trace("ProcessSession created for %s", 
process_context_->getProcessorNode().getName().c_str());
+    logger_->log_trace("ProcessSession created for %s", 
process_context_->getProcessorNode().getName());
     auto repo = processContext->getProvenanceRepository();
     provenance_report_ = new provenance::ProvenanceReporter(repo, 
process_context_->getProcessorNode().getUUIDStr(), 
process_context_->getProcessorNode().getName());
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/Repository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Repository.h 
b/libminifi/include/core/Repository.h
index 5f7e6c2..f1b47ae 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -31,7 +31,8 @@
 #include <string>
 #include <thread>
 #include <vector>
-
+#include "core/ContentRepository.h"
+#include "core/SerializableComponent.h"
 #include "properties/Configure.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "core/Property.h"
@@ -52,15 +53,15 @@ namespace core {
 #define MAX_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
 #define REPOSITORY_PURGE_PERIOD (2500) // 2500 msec
 
-class Repository : public CoreComponent {
+class Repository : public core::SerializableComponent {
  public:
   /*
    * Constructor for the repository
    */
   Repository(std::string repo_name = "Repository", std::string directory = 
REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = 
MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
-  MAX_REPOSITORY_STORAGE_SIZE,
+                 MAX_REPOSITORY_STORAGE_SIZE,
              uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
-      : CoreComponent(repo_name),
+      : core::SerializableComponent(repo_name),
         thread_(),
         logger_(logging::LoggerFactory<Repository>::getLogger()) {
     directory_ = directory;
@@ -81,7 +82,7 @@ class Repository : public CoreComponent {
     return true;
   }
   // Put
-  virtual bool Put(std::string key, uint8_t *buf, int bufLen) {
+  virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
     return true;
   }
   // Delete
@@ -89,7 +90,14 @@ class Repository : public CoreComponent {
     return true;
   }
 
-  virtual bool Get(std::string key, std::string &value) {
+  virtual bool 
Delete(std::vector<std::shared_ptr<core::SerializableComponent>> &storedValues) 
{
+    bool found = true;
+    for (auto storedValue : storedValues) {
+      found &= Delete(storedValue->getName());
+    }
+    return found;
+  }
+  virtual bool Get(const std::string &key, std::string &value) {
     return false;
   }
 
@@ -109,10 +117,82 @@ class Repository : public CoreComponent {
   virtual bool isRunning() {
     return running_;
   }
+
+  /**
+   * Specialization that allows us to serialize max_size objects into store.
+   * the lambdaConstructor will create objects to put into store
+   * @param store vector in which we can store serialized object
+   * @param max_size reference that stores the max number of objects to 
retrieve and serialize.
+   * upon return max_size will represent the number of serialized objects.
+   * @return status of this operation
+   *
+   * Base implementation returns true;
+   */
+  virtual bool 
Serialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, 
size_t max_size) {
+    return true;
+  }
+
+  /**
+   * Specialization that allows us to deserialize max_size objects into store.
+   * @param store vector in which we can store deserialized object
+   * @param max_size reference that stores the max number of objects to 
retrieve and deserialize.
+   * upon return max_size will represent the number of deserialized objects.
+   * @return status of this operation
+   *
+   * Base implementation returns true;
+   */
+  virtual bool 
DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, 
size_t &max_size) {
+    return true;
+  }
+
+  /**
+   * Specialization that allows us to deserialize max_size objects into store.
+   * the lambdaConstructor will create objects to put into store
+   * @param store vector in which we can store deserialized object
+   * @param max_size reference that stores the max number of objects to 
retrieve and deserialize.
+   * upon return max_size will represent the number of deserialized objects.
+   * @param lambdaConstructor reference that will create the objects for store
+   * @return status of this operation
+   *
+   * Base implementation returns true;
+   */
+  virtual bool 
DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, 
size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> 
lambdaConstructor) {
+    return true;
+  }
+
+  /**
+   * Base implementation returns true;
+   */
+  virtual bool Serialize(const std::shared_ptr<core::SerializableComponent> 
&store) {
+    return true;
+  }
+
+  /**
+   * Base implementation returns true;
+   */
+  virtual bool DeSerialize(const std::shared_ptr<core::SerializableComponent> 
&store) {
+    return true;
+  }
+
+  /**
+   * Base implementation returns true;
+   */
+  virtual bool DeSerialize(const uint8_t *buffer, const size_t bufferSize) {
+    return true;
+  }
+
+  virtual bool Serialize(const std::string &key, const uint8_t *buffer, const 
size_t bufferSize) {
+    return Put(key, buffer, bufferSize);
+  }
+
   uint64_t incrementSize(const char *fpath, const struct stat *sb, int 
typeflag) {
     return (repo_size_ += sb->st_size);
   }
 
+  virtual void loadComponent(const std::shared_ptr<core::ContentRepository> 
&content_repo) {
+
+  }
+
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
   Repository(const Repository &parent) = delete;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/RepositoryFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/RepositoryFactory.h 
b/libminifi/include/core/RepositoryFactory.h
index 9fafb57..b123a6d 100644
--- a/libminifi/include/core/RepositoryFactory.h
+++ b/libminifi/include/core/RepositoryFactory.h
@@ -19,8 +19,8 @@
 #ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_
 #define LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_
 
+#include "core/ContentRepository.h"
 #include "core/Repository.h"
-#include "core/repository/VolatileRepository.h"
 #include "Core.h"
 
 namespace org {
@@ -30,8 +30,22 @@ namespace minifi {
 
 namespace core {
 
+/**
+ * Create a repository represented by the configuration class name
+ * @param configuration_class_name configuration class name
+ * @param fail_safe determines whether or not to make the default class if 
configuration_class_name is invalid
+ * @param repo_name name of the repository
+ */
 std::shared_ptr<core::Repository> createRepository(const std::string 
configuration_class_name, bool fail_safe = false, const std::string repo_name = 
"");
 
+/**
+ * Create a context repository
+ * @param configuration_class_name configuration class name
+ * @param fail_safe determines whether or not to make the default class if 
configuration_class_name is invalid
+ * @param repo_name name of the repository
+ */
+std::shared_ptr<core::ContentRepository> createContentRepository(const 
std::string configuration_class_name, bool fail_safe = false, const std::string 
repo_name = "");
+
 } /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/SerializableComponent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/SerializableComponent.h 
b/libminifi/include/core/SerializableComponent.h
new file mode 100644
index 0000000..f7f9feb
--- /dev/null
+++ b/libminifi/include/core/SerializableComponent.h
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_
+#define LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_
+
+#include "io/Serializable.h"
+#include "core/Core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+/**
+ * Represents a component that is serializable and an extension point of core 
Component
+ */
+class SerializableComponent : public core::CoreComponent, public 
minifi::io::Serializable {
+
+ public:
+
+  SerializableComponent(const std::string name, uuid_t uuid = nullptr)
+      : core::CoreComponent(name, uuid) {
+
+  }
+
+  virtual ~SerializableComponent() {
+
+  }
+
+  /**
+   * Serialize this object into the the store
+   * @param store object in which we are serializing data into
+   * @return status of this serialization.
+   */
+  virtual bool Serialize(const std::shared_ptr<core::SerializableComponent> 
&store) = 0;
+
+  /**
+   * Deserialization from the parameter store into the current object
+   * @param store from which we are deserializing the current object
+   * @return status of this deserialization.
+   */
+  virtual bool DeSerialize(const std::shared_ptr<core::SerializableComponent> 
&store) = 0;
+
+  /**
+   * Deserializes the current object using buffer
+   * @param buffer buffer from which we can deserialize the currenet object
+   * @param bufferSize length of buffer from which we can deserialize the 
current object.
+   * @return status of the deserialization.
+   */
+  virtual bool DeSerialize(const uint8_t *buffer, const size_t bufferSize) = 0;
+
+  /**
+   * Serialization of this object into buffer
+   * @param key string that represents this objects identifier
+   * @param buffer buffer that contains the serialized object
+   * @param bufferSize length of buffer
+   * @return status of serialization
+   */
+  virtual bool Serialize(const std::string &key, const uint8_t *buffer, const 
size_t bufferSize) {
+    return false;
+  }
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_ */
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/StreamManager.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/StreamManager.h 
b/libminifi/include/core/StreamManager.h
new file mode 100644
index 0000000..468526d
--- /dev/null
+++ b/libminifi/include/core/StreamManager.h
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_STREAMMANAGER_H_
+#define LIBMINIFI_INCLUDE_CORE_STREAMMANAGER_H_
+
+#include "properties/Configure.h"
+#include "ResourceClaim.h"
+#include "io/DataStream.h"
+#include "io/BaseStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+/**
+ * Purpose: Provides a base for all stream based managers. The goal here is to 
provide
+ * a small set of interfaces that provide a small set of operations to provide 
state 
+ * management for streams.
+ */
+template<typename T>
+class StreamManager {
+ public:
+  virtual ~StreamManager() {
+
+  }
+
+  /**
+   * Create a write stream using the streamId as a reference.
+   * @param streamId stream identifier
+   * @return stream pointer.
+   */
+  virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<T> 
&streamId) = 0;
+
+  /**
+   * Create a read stream using the streamId as a reference.
+   * @param streamId stream identifier
+   * @return stream pointer.
+   */
+  virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<T> 
&streamId) = 0;
+
+  /**
+   * Closes the stream
+   * @param streamId stream identifier
+   * @return result of operation.
+   */
+  virtual bool close(const std::shared_ptr<T> &streamId) = 0;
+
+  /**
+   * Removes the stream from this stream manager. The end result
+   * is dependent on the stream manager implementation.
+   * @param streamId stream identifier
+   * @return result of operation.
+   */
+  virtual bool remove(const std::shared_ptr<T> &streamId) = 0;
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_STREAMMANAGER_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/logging/LoggerConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/logging/LoggerConfiguration.h 
b/libminifi/include/core/logging/LoggerConfiguration.h
index aa4a1d0..787fec5 100644
--- a/libminifi/include/core/logging/LoggerConfiguration.h
+++ b/libminifi/include/core/logging/LoggerConfiguration.h
@@ -105,8 +105,7 @@ class LoggerConfiguration {
    protected:
   static std::shared_ptr<internal::LoggerNamespace> 
initialize_namespaces(const std::shared_ptr<LoggerProperties> 
&logger_properties);
   static std::shared_ptr<spdlog::logger> get_logger(std::shared_ptr<Logger> 
logger, const std::shared_ptr<internal::LoggerNamespace> &root_namespace, const 
std::string &name,
-                                                    
std::shared_ptr<spdlog::formatter> formatter,
-                                                    bool remove_if_present = 
false);
+                                                    
std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present = false);
    private:
   static std::shared_ptr<internal::LoggerNamespace> create_default_root();
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
----------------------------------------------------------------------
diff --git 
a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h 
b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
index e1d80e8..2bd4099 100644
--- a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
+++ b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h
@@ -54,7 +54,6 @@ class SiteToSiteProvenanceReportingTask : public 
minifi::RemoteProcessorGroupPor
   }
   //! Destructor
   virtual ~SiteToSiteProvenanceReportingTask() {
-
   }
   //! Report Task Name
   static constexpr char const* ReportTaskName = 
"SiteToSiteProvenanceReportingTask";
@@ -62,7 +61,8 @@ class SiteToSiteProvenanceReportingTask : public 
minifi::RemoteProcessorGroupPor
 
  public:
   //! Get provenance json report
-  void getJsonReport(core::ProcessContext *context, core::ProcessSession 
*session, std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> 
&records, std::string &report);
+  void getJsonReport(core::ProcessContext *context, core::ProcessSession 
*session, std::vector<std::shared_ptr<core::SerializableComponent>> &records, 
std::string &report);
+
   void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory 
*sessionFactory);
   //! OnTrigger method, implemented by NiFi SiteToSiteProvenanceReportingTask
   virtual void onTrigger(core::ProcessContext *context, core::ProcessSession 
*session);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/repository/AtomicRepoEntries.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/AtomicRepoEntries.h 
b/libminifi/include/core/repository/AtomicRepoEntries.h
new file mode 100644
index 0000000..c681060
--- /dev/null
+++ b/libminifi/include/core/repository/AtomicRepoEntries.h
@@ -0,0 +1,501 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ref_count_hip.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_
+#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_
+
+#include  <cstddef>
+#include <cstring>
+#include <iostream>
+#include <chrono>
+#include <functional>
+#include <atomic>
+#include <vector>
+#include <map>
+#include <iterator>
+
+static uint16_t accounting_size = sizeof(std::vector<uint8_t>) + 
sizeof(std::string) + sizeof(size_t);
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+/**
+ * Purpose: Repo value represents an item that will support a move operation 
within an AtomicEntry
+ *
+ * Justification: Since AtomicEntry is a static entry that does not move or 
change, the underlying
+ * RepoValue can be changed to support atomic operations.
+ */
+template<typename T>
+class RepoValue {
+ public:
+
+  explicit RepoValue() {
+  }
+
+  /**
+   * Constructor that populates the item allowing for a custom key comparator.
+   * @param key key for this repo value.
+   * @param ptr buffer
+   * @param size size buffer
+   * @param comparator custom comparator.
+   */
+  explicit RepoValue(T key, const uint8_t *ptr, size_t size, 
std::function<bool(T, T)> comparator = nullptr)
+      : key_(key),
+        comparator_(comparator) {
+    if (nullptr == ptr) {
+      size = 0;
+    }
+    buffer_.resize(size);
+    if (size > 0) {
+      std::memcpy(buffer_.data(), ptr, size);
+    }
+  }
+
+  /**
+   * RepoValue that moves the other object into this.
+   */
+  explicit RepoValue(RepoValue<T> &&other)
+noexcept      : key_(std::move(other.key_)),
+      buffer_(std::move(other.buffer_)),
+      comparator_(std::move(other.comparator_)) {
+      }
+
+      ~RepoValue()
+      {
+      }
+
+      T &getKey() {
+        return key_;
+      }
+
+      /**
+       * Sets the key, relacing the custom comparator if needed.
+       */
+      void setKey(const T key, std::function<bool(T,T)> comparator = nullptr) {
+        key_ = key;
+        comparator_ = comparator;
+      }
+
+      /**
+       * Determines if the key is the same using the custom comparator
+       * @param other object to compare against
+       * @return result of the comparison
+       */
+      inline bool isEqual(RepoValue<T> *other)
+      {
+        return comparator_ == nullptr ? key_ == other->key_ : 
comparator_(key_,other->key_);
+      }
+
+      /**
+       * Determines if the key is the same using the custom comparator
+       * @param other object to compare against
+       * @return result of the comparison
+       */
+      inline bool isKey(T other)
+      {
+        return comparator_ == nullptr ? key_ == other : 
comparator_(key_,other);
+      }
+
+      /**
+       * Clears the buffer.
+       */
+      void clearBuffer() {
+        buffer_.resize(0);
+        buffer_.clear();
+      }
+
+      /**
+       * Return the size of the memory within the key
+       * buffer, the size of timestamp, and the general
+       * system word size
+       */
+      uint64_t size() {
+        return buffer_.size();
+      }
+
+      size_t getBufferSize() {
+        return buffer_.size();
+      }
+
+      const uint8_t *getBuffer()
+      {
+        return buffer_.data();
+      }
+
+      /**
+       * Places the contents of buffer into str
+       * @param strnig into which we are placing the memory contained in 
buffer.
+       */
+      void emplace(std::string &str) {
+        str.insert(0, reinterpret_cast<const char*>(buffer_.data()), 
buffer_.size());
+      }
+
+      /**
+       * Appends ptr to the end of buffer.
+       * @param ptr pointer containing data to add to buffer_
+       */
+      void append(uint8_t *ptr, size_t size)
+      {
+        buffer_.insert(buffer_.end(), ptr, ptr + size);
+      }
+
+      RepoValue<T> &operator=(RepoValue<T> &&other) noexcept {
+        key_ = std::move(other.key_);
+        buffer_ = std::move(other.buffer_);
+        return *this;
+      }
+
+    private:
+      T key_;
+      std::function<bool(T,T)> comparator_;
+      std::vector<uint8_t> buffer_;
+    };
+
+    /**
+     * Purpose: Atomic Entry allows us to create a statically
+     * sized ring buffer, with the ability to create
+     *
+     **/
+template<typename T>
+class AtomicEntry {
+
+ public:
+  /**
+   * Constructor that accepts a max size and an atomic counter for the total
+   * size allowd by this and other atomic entries.
+   */
+  explicit AtomicEntry(std::atomic<size_t> *total_size, size_t *max_size)
+      : write_pending_(false),
+        has_value_(false),
+        accumulated_repo_size_(total_size),
+        max_repo_size_(max_size),
+        ref_count_(0),
+        free_required(false) {
+
+  }
+
+  /**
+   * Sets the repo value, moving the old value into old_value.
+   * @param new_value new value to move into value_.
+   * @param old_value the previous value of value_ will be moved into old_value
+   * @param prev_size size reclaimed.
+   * @return result of this set. If true old_value will be populated.
+   */
+  bool setRepoValue(RepoValue<T> &new_value, RepoValue<T> &old_value, size_t 
&prev_size) {
+    // delete the underlying pointer
+    bool lock = false;
+    if (!write_pending_.compare_exchange_weak(lock, true))
+    {
+      return false;
+    }
+    if (has_value_) {
+      prev_size = value_.size();
+    }
+    old_value = std::move(value_);
+    value_ = std::move(new_value);
+    has_value_ = true;
+    try_unlock();
+    return true;
+  }
+  
+  
+  AtomicEntry<T> *takeOwnership()
+  {
+      bool lock = false;
+      if (!write_pending_.compare_exchange_weak(lock, true) )
+       return nullptr;
+      
+      ref_count_++;
+      
+      try_unlock();
+      
+      return this;
+  }
+  /**
+   * A test and set operation, which is used to allow a function to test
+   * if an item can be released and a function used for reclaiming memory 
associated
+   * with said object.
+   * A custom comparator can be provided to augment the key being added into 
value_
+   */
+  bool testAndSetKey(const T str, std::function<bool(T)> releaseTest = 
nullptr, std::function<void(T)> reclaimer = nullptr, std::function<bool(T, T)> 
comparator = nullptr) {
+    bool lock = false;
+
+    if (!write_pending_.compare_exchange_weak(lock, true) )
+      return false;
+
+    if (has_value_) {
+      // we either don't have a release test or we cannot release this
+      // entity
+      if (releaseTest != nullptr && reclaimer != nullptr && 
releaseTest(value_.getKey()))
+                                                                        {
+        reclaimer(value_.getKey());
+      }
+      else if (free_required && ref_count_ == 0)
+      {
+       size_t bufferSize = value_.getBufferSize();
+       value_.clearBuffer();
+       has_value_ = false;
+       if (accumulated_repo_size_ != nullptr) {
+         *accumulated_repo_size_ -= bufferSize;
+       }
+       free_required = false;  
+      }
+      else {
+        try_unlock();
+        return false;
+      }
+
+    }
+    ref_count_=1;
+    value_.setKey(str, comparator);
+    has_value_ = true;
+    try_unlock();
+    return true;
+  }
+
+  /**
+   * Moved the value into the argument
+   * @param value the previous value will be moved into this parameter
+   * @return  success of get operation based on whether or not this atomic 
entry has a value.
+   */
+  bool getValue(RepoValue<T> &value) {
+    try_lock();
+    if (!has_value_) {
+      try_unlock();
+      return false;
+    }
+    value = std::move(value_);
+    has_value_ = false;
+    try_unlock();
+    return true;
+  }
+
+  /**
+   * Moved the value into the argument
+   * @param value the previous value will be moved into this parameter
+   * @return  success of get operation based on whether or not this atomic 
entry has a value.
+   */
+  bool getValue(const T &key, RepoValue<T> &value) {
+    try_lock();
+    if (!has_value_) {
+      try_unlock();
+      return false;
+    }
+    if (!value_.isKey(key)) {
+      try_unlock();
+      return false;
+    }
+    value = std::move(value_);
+    has_value_ = false;
+    try_unlock();
+    return true;
+  }
+  
+  void decrementOwnership(){
+    try_lock();
+    if (!has_value_) {
+      try_unlock();
+      return;
+    }
+    if (ref_count_ > 0){
+      ref_count_--;
+    }
+    if (ref_count_ == 0 && free_required)
+    {
+      size_t bufferSize = value_.getBufferSize();
+      value_.clearBuffer();
+      has_value_ = false;
+      if (accumulated_repo_size_ != nullptr) {
+       *accumulated_repo_size_ -= bufferSize;
+      }
+      free_required = false;
+    }
+    else{
+    }
+    try_unlock();
+  }
+
+  /**
+   * Moved the value into the argument
+   * @param value the previous value will be moved into this parameter
+   * @return  success of get operation based on whether or not this atomic 
entry has a value.
+   */
+  bool getValue(const T &key, RepoValue<T> **value) {
+    try_lock();
+    if (!has_value_) {
+      try_unlock();
+      return false;
+    }
+    if (!value_.isKey(key)) {
+      try_unlock();
+      return false;
+    }
+    ref_count_++;
+    *value = &value_;
+    try_unlock();
+    return true;
+  }
+
+  /**
+   * Operation that will be used to test and free if a release is required 
without
+   * setting a new object.
+   * @param releaseTest function that will be used to free the RepoValue key 
from
+   * this atomic entry.
+   * @param freedValue informs the caller if an item was freed.
+   */
+  T testAndFree(std::function<bool(T)> releaseTest, bool &freedValue) {
+    try_lock();
+    T ref;
+    if (!has_value_) {
+      try_unlock();
+      return ref;
+    }
+
+    if (releaseTest(value_.getKey())) {
+      size_t bufferSize = value_.getBufferSize();
+      value_.clearBuffer();
+      ref = value_.getKey();
+      has_value_ = false;
+      if (accumulated_repo_size_ != nullptr) {
+        *accumulated_repo_size_ -= bufferSize;
+      }
+
+    }
+    try_unlock();
+    return ref;
+  }
+  
+  size_t getLength()
+  {
+    size_t size = 0;
+     try_lock();
+     size = value_.getBufferSize();
+     try_unlock();
+     return size;
+     
+  }
+
+  /**
+   * sets has_value to false; however, does not call
+   * any external entity to further free RepoValue
+   */
+  bool freeValue(const T &key) {
+    try_lock();
+    if (!has_value_) {
+      try_unlock();
+      return false;
+    }
+    if (!value_.isKey(key)) {
+      try_unlock();
+      return false;
+    }
+    if (ref_count_ > 0)
+    {
+       free_required = true;
+       try_unlock();
+       return true;
+    }
+    size_t bufferSize = value_.getBufferSize();
+    value_.clearBuffer();
+    has_value_ = false;
+    if (accumulated_repo_size_ != nullptr) {
+      *accumulated_repo_size_ -= bufferSize;
+    }
+    free_required = false;
+    try_unlock();
+    return true;
+  }
+
+  /**
+   * Appends buffer onto this atomic entry if key matches
+   * the current RepoValue's key.
+   */
+  bool insert(const T key, uint8_t *buffer, size_t size) {
+    try_lock();
+
+    if (!has_value_) {
+      try_unlock();
+      return false;
+    }
+
+    if (!value_.isKey(key)) {
+      try_unlock();
+      return false;
+    }
+
+    if ((accumulated_repo_size_ != nullptr && max_repo_size_ != nullptr) && 
(*accumulated_repo_size_ + size > *max_repo_size_)) {
+      // can't support this write
+      try_unlock();
+      return false;
+    }
+
+    value_.append(buffer, size);
+    (*accumulated_repo_size_) += size;
+    try_unlock();
+    return true;
+  }
+
+ private:
+
+  /**
+   * Spin lock to unlock the current atomic entry.
+   */
+  inline void try_lock() {
+    bool lock = false;
+    while (!write_pending_.compare_exchange_weak(lock, 
true,std::memory_order_acquire)) {
+      lock = false;
+      // attempt again
+    }
+  }
+
+  /**
+   * Spin lock to unlock the current atomic entry.
+   */
+  inline void try_unlock() {
+    bool lock = true;
+    while (!write_pending_.compare_exchange_weak(lock, 
false,std::memory_order_acquire)) {
+      lock = true;
+      // attempt again
+    }
+  }
+
+  // atomic size pointer.
+  std::atomic<size_t> *accumulated_repo_size_;
+  // max size
+  size_t *max_repo_size_;
+  // determines if a write is pending.
+  std::atomic<bool> write_pending_;
+  // used to determine if a value is present in this atomic entry.
+  std::atomic<bool> has_value_;
+  std::atomic<uint16_t> ref_count_;
+  std::atomic<bool> free_required;
+  // repo value.
+  RepoValue<T> value_;
+};
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/repository/FileSystemRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/FileSystemRepository.h 
b/libminifi/include/core/repository/FileSystemRepository.h
new file mode 100644
index 0000000..84bf01e
--- /dev/null
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_FileSystemRepository_H_
+#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_FileSystemRepository_H_
+
+#include "core/Core.h"
+#include "../ContentRepository.h"
+#include "properties/Configure.h"
+#include "core/logging/LoggerConfiguration.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+/**
+ * FileSystemRepository is a content repository that stores data onto the 
local file system.
+ */
+class FileSystemRepository : public core::ContentRepository, public 
core::CoreComponent {
+ public:
+  FileSystemRepository(std::string name = getClassName<FileSystemRepository>())
+      : core::CoreComponent(name),
+        logger_(logging::LoggerFactory<FileSystemRepository>::getLogger()) {
+
+  }
+  virtual ~FileSystemRepository() {
+
+  }
+
+  virtual bool initialize(const std::shared_ptr<minifi::Configure> 
&configuration);
+
+  virtual void stop();
+
+  virtual std::shared_ptr<io::BaseStream> write(const 
std::shared_ptr<minifi::ResourceClaim> &claim);
+
+  virtual std::shared_ptr<io::BaseStream> read(const 
std::shared_ptr<minifi::ResourceClaim> &claim);
+
+  virtual bool close(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+    return remove(claim);
+  }
+
+  virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim);
+
+ private:
+
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_FileSystemRepository_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/repository/FlowFileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/FlowFileRepository.h 
b/libminifi/include/core/repository/FlowFileRepository.h
index 2e19286..28b9c05 100644
--- a/libminifi/include/core/repository/FlowFileRepository.h
+++ b/libminifi/include/core/repository/FlowFileRepository.h
@@ -48,12 +48,10 @@ class FlowFileRepository : public core::Repository, public 
std::enable_shared_fr
   // Constructor
 
   FlowFileRepository(const std::string repo_name = "", std::string directory = 
FLOWFILE_REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = 
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
-                     int64_t maxPartitionBytes = 
MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE,
-                     uint64_t purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD)
+                     int64_t maxPartitionBytes = 
MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = 
FLOWFILE_REPOSITORY_PURGE_PERIOD)
       : Repository(repo_name.length() > 0 ? repo_name : 
core::getClassName<FlowFileRepository>(), directory, maxPartitionMillis, 
maxPartitionBytes, purgePeriod),
-        logger_(logging::LoggerFactory<FlowFileRepository>::getLogger())
-
-  {
+        logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()),
+        content_repo_(nullptr) {
     db_ = NULL;
   }
 
@@ -95,11 +93,12 @@ class FlowFileRepository : public core::Repository, public 
std::enable_shared_fr
 
   virtual void run();
 
-  virtual bool Put(std::string key, uint8_t *buf, int bufLen) {
+  virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
 
     // persistent to the DB
     leveldb::Slice value((const char *) buf, bufLen);
     leveldb::Status status;
+    repo_size_+=bufLen;
     status = db_->Put(leveldb::WriteOptions(), key, value);
     if (status.ok())
       return true;
@@ -115,7 +114,9 @@ class FlowFileRepository : public core::Repository, public 
std::enable_shared_fr
     leveldb::Status status;
     status = db_->Delete(leveldb::WriteOptions(), key);
     if (status.ok())
+    {
       return true;
+    }
     else
       return false;
   }
@@ -123,7 +124,7 @@ class FlowFileRepository : public core::Repository, public 
std::enable_shared_fr
    * Sets the value from the provided key
    * @return status of the get operation.
    */
-  virtual bool Get(std::string key, std::string &value) {
+  virtual bool Get(const std::string &key, std::string &value) {
     leveldb::Status status;
     status = db_->Get(leveldb::ReadOptions(), key, &value);
     if (status.ok())
@@ -135,21 +136,25 @@ class FlowFileRepository : public core::Repository, 
public std::enable_shared_fr
   void setConnectionMap(std::map<std::string, 
std::shared_ptr<minifi::Connection>> &connectionMap) {
     this->connectionMap = connectionMap;
   }
-  void loadComponent();
+  virtual void loadComponent(const std::shared_ptr<core::ContentRepository> 
&content_repo);
 
   void start() {
     if (this->purge_period_ <= 0)
+    {
       return;
+    }
     if (running_)
+    {
       return;
-    thread_ = std::thread(&FlowFileRepository::run, shared_from_this());
-    thread_.detach();
+    }
     running_ = true;
+    thread_ = std::thread(&FlowFileRepository::run, shared_from_this());
     logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
   }
 
  private:
   std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
+  std::shared_ptr<core::ContentRepository> content_repo_;
   leveldb::DB* db_;
   std::shared_ptr<logging::Logger> logger_;
 };

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/repository/VolatileContentRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h 
b/libminifi/include/core/repository/VolatileContentRepository.h
new file mode 100644
index 0000000..306a812
--- /dev/null
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -0,0 +1,138 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileContentRepository_H_
+#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileContentRepository_H_
+
+#include "core/Core.h"
+#include "AtomicRepoEntries.h"
+#include "io/AtomicEntryStream.h"
+#include "../ContentRepository.h"
+#include "core/repository/VolatileRepository.h"
+#include "properties/Configure.h"
+#include "core/logging/LoggerConfiguration.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+/**
+ * Purpose: Stages content into a volatile area of memory. Note that   when 
the maximum number
+ * of entries is consumed we will rollback a session to wait for others to be 
freed.
+ */
+class VolatileContentRepository : public core::ContentRepository, public 
core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>> {
+ public:
+
+  static const char *minimal_locking;
+
+  explicit VolatileContentRepository(std::string name = 
getClassName<VolatileContentRepository>())
+      : 
core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>>(name),
+        
logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()),
+        minimize_locking_(true) {
+    max_count_ = 15000;
+  }
+  virtual ~VolatileContentRepository() {
+    if (!minimize_locking_) {
+      std::lock_guard<std::mutex> lock(map_mutex_);
+      for (const auto &item : master_list_)
+      {
+        delete item.second;
+      }
+      master_list_.clear();
+    }
+
+  }
+
+  /**
+   * Initialize the volatile content repo
+   * @param configure configuration
+   */
+  virtual bool initialize(const std::shared_ptr<Configure> &configure);
+
+  /**
+   * Stop any thread associated with the volatile content repository.
+   */
+  virtual void stop();
+
+  /**
+   * Creates writable stream.
+   * @param claim resource claim
+   * @return BaseStream shared pointer that represents the stream the consumer 
will write to.
+   */
+  virtual std::shared_ptr<io::BaseStream> write(const 
std::shared_ptr<minifi::ResourceClaim> &claim);
+
+  /**
+   * Creates readable stream.
+   * @param claim resource claim
+   * @return BaseStream shared pointer that represents the stream from which 
the consumer will read..
+   */
+  virtual std::shared_ptr<io::BaseStream> read(const 
std::shared_ptr<minifi::ResourceClaim> &claim);
+
+  /**
+   * Closes the claim.
+   * @return whether or not the claim is associated with content stored in 
volatile memory.
+   */
+  virtual bool close(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+    return remove(claim);
+  }
+
+  /**
+   * Closes the claim.
+   * @return whether or not the claim is associated with content stored in 
volatile memory.
+   */
+  virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim);
+
+ protected:
+  virtual void start();
+
+  virtual void run();
+
+  template<typename T2>
+  std::shared_ptr<T2> shared_from_parent()
+  {
+    return std::static_pointer_cast<T2>(shared_from_this());
+  }
+
+ private:
+
+  bool minimize_locking_;
+
+  // function pointers that are associated with the claims.
+  std::function<bool(std::shared_ptr<minifi::ResourceClaim>, 
std::shared_ptr<minifi::ResourceClaim>)> resource_claim_comparator_;
+  std::function<bool(std::shared_ptr<minifi::ResourceClaim>)> 
resource_claim_check_;
+  std::function<void(std::shared_ptr<minifi::ResourceClaim>)> claim_reclaimer_;
+
+  // logger
+  std::shared_ptr<logging::Logger> logger_;
+
+  // mutex and master list that represent a cache of Atomic entries. this 
exists so that we don't have to walk the atomic entry list.
+  // The idea is to reduce the computational complexity while keeping access 
as maximally lock free as we can.
+  std::mutex map_mutex_;
+
+  std::map<std::string, AtomicEntry<std::shared_ptr<minifi::ResourceClaim>>*> 
master_list_;
+};
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileContentRepository_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/repository/VolatileFlowFileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h 
b/libminifi/include/core/repository/VolatileFlowFileRepository.h
new file mode 100644
index 0000000..059c1de
--- /dev/null
+++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEFLOWFILEREPOSITORY_H_
+#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEFLOWFILEREPOSITORY_H_
+
+#include "VolatileRepository.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+/**
+ * Volatile flow file repository. keeps a running counter of the current 
location, freeing
+ * those which we no longer hold.
+ */
+class VolatileFlowFileRepository : public VolatileRepository<std::string>
+{
+ public:
+  explicit VolatileFlowFileRepository(std::string repo_name = "", std::string 
dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = 
MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
+  MAX_REPOSITORY_STORAGE_SIZE,
+                                      uint64_t purgePeriod = 
REPOSITORY_PURGE_PERIOD)
+      : VolatileRepository(repo_name.length() > 0 ? repo_name : 
core::getClassName<VolatileRepository>(), "", maxPartitionMillis, 
maxPartitionBytes, purgePeriod)
+
+  {
+    purge_required_ = true;
+    content_repo_ = nullptr;
+  }
+
+  virtual void run() {
+    repo_full_ = false;
+    while (running_) {
+      std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
+      if (purge_required_ && nullptr != content_repo_)
+          {
+        std::lock_guard<std::mutex> lock(purge_mutex_);
+        for (auto purgeItem : purge_list_)
+        {
+          std::shared_ptr<minifi::ResourceClaim> newClaim = 
std::make_shared<minifi::ResourceClaim>(purgeItem, content_repo_, true);
+          content_repo_->remove(newClaim);
+        }
+        purge_list_.resize(0);
+        purge_list_.clear();
+      }
+    }
+  }
+
+  void loadComponent(const std::shared_ptr<core::ContentRepository> 
&content_repo) {
+    content_repo_ = content_repo;
+
+  }
+
+ protected:
+
+  std::shared_ptr<core::ContentRepository> content_repo_;
+
+};
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEFLOWFILEREPOSITORY_H_ */

Reply via email to