This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch MINIFICPP-2028
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 83e0af6c58de86bfc7660c5465aaade9a37d988c
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Mon Jan 9 14:24:56 2023 +0100

    MINIFICPP-2028 Remove SerializableComponent dependency from Repository
---
 extensions/rocksdb-repos/ProvenanceRepository.cpp  |  42 +---
 extensions/rocksdb-repos/ProvenanceRepository.h    |   7 +-
 .../tests/unit/ProcessorTests.cpp                  |   6 +-
 libminifi/include/core/Repository.h                |  74 +------
 libminifi/include/core/SerializableComponent.h     |  74 +------
 .../include/core/repository/VolatileRepository.h   |  19 --
 libminifi/include/provenance/Provenance.h          | 240 ++++++++-------------
 libminifi/src/core/Repository.cpp                  |  46 ++++
 .../SiteToSiteProvenanceReportingTask.cpp          |   5 +-
 .../src/core/repository/VolatileRepository.cpp     |  44 ----
 libminifi/src/provenance/Provenance.cpp            | 180 +++++++---------
 libminifi/test/rocksdb-tests/ProvenanceTests.cpp   | 138 ++++++------
 libminifi/test/unit/ProvenanceTestHelper.h         |  33 +--
 13 files changed, 323 insertions(+), 585 deletions(-)

diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp 
b/extensions/rocksdb-repos/ProvenanceRepository.cpp
index 0c7da870d..2680de9d5 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.cpp
+++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp
@@ -122,35 +122,17 @@ bool ProvenanceRepository::Get(const std::string &key, 
std::string &value) {
   return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
 }
 
-bool ProvenanceRepository::Serialize(const std::string &key, const uint8_t 
*buffer, const size_t bufferSize) {
-  return Put(key, buffer, bufferSize);
-}
-
-bool 
ProvenanceRepository::get(std::vector<std::shared_ptr<core::CoreComponent>> 
&store, size_t max_size) {
-  std::unique_ptr<rocksdb::Iterator> 
it(db_->NewIterator(rocksdb::ReadOptions()));
-  for (it->SeekToFirst(); it->Valid(); it->Next()) {
-    std::shared_ptr<ProvenanceEventRecord> eventRead = 
std::make_shared<ProvenanceEventRecord>();
-    std::string key = it->key().ToString();
-    if (store.size() >= max_size)
-      break;
-    if (eventRead->DeSerialize(gsl::make_span(it->value()).as_span<const 
std::byte>())) {
-      
store.push_back(std::dynamic_pointer_cast<core::CoreComponent>(eventRead));
-    }
-  }
-  return true;
-}
-
-bool 
ProvenanceRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
 &records, size_t &max_size,
-                                       
std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
+bool 
ProvenanceRepository::getElements(std::vector<std::shared_ptr<core::SerializableComponent>>
 &records, size_t &max_size) {
   std::unique_ptr<rocksdb::Iterator> 
it(db_->NewIterator(rocksdb::ReadOptions()));
   size_t requested_batch = max_size;
   max_size = 0;
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
     if (max_size >= requested_batch)
       break;
-    std::shared_ptr<core::SerializableComponent> eventRead = lambda();
+    auto eventRead = std::make_shared<ProvenanceEventRecord>();
     std::string key = it->key().ToString();
-    if (eventRead->DeSerialize(gsl::make_span(it->value()).as_span<const 
std::byte>())) {
+    io::BufferStream stream(gsl::make_span(it->value()).as_span<const 
std::byte>());
+    if (eventRead->deserialize(stream)) {
       max_size++;
       records.push_back(eventRead);
     }
@@ -158,22 +140,6 @@ bool 
ProvenanceRepository::DeSerialize(std::vector<std::shared_ptr<core::Seriali
   return max_size > 0;
 }
 
-bool 
ProvenanceRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
 &store, size_t &max_size) {
-  std::unique_ptr<rocksdb::Iterator> 
it(db_->NewIterator(rocksdb::ReadOptions()));
-  max_size = 0;
-  for (it->SeekToFirst(); it->Valid(); it->Next()) {
-    std::shared_ptr<ProvenanceEventRecord> eventRead = 
std::make_shared<ProvenanceEventRecord>();
-    std::string key = it->key().ToString();
-
-    if 
(store.at(max_size)->DeSerialize(gsl::make_span(it->value()).as_span<const 
std::byte>())) {
-      max_size++;
-    }
-    if (store.size() >= max_size)
-      break;
-  }
-  return max_size > 0;
-}
-
 void ProvenanceRepository::destroy() {
   db_.reset();
 }
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h 
b/extensions/rocksdb-repos/ProvenanceRepository.h
index 0dc9cb6e4..7aaeaa34e 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -79,15 +79,10 @@ class ProvenanceRepository : public 
core::ThreadedRepository {
   }
 
   bool Get(const std::string &key, std::string &value) override;
-
-  bool Serialize(const std::string &key, const uint8_t *buffer, const size_t 
bufferSize) override;
-  bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> 
&records, size_t &max_size,
-                   
std::function<std::shared_ptr<core::SerializableComponent>()> lambda) override;
-  bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> 
&store, size_t &max_size) override;
+  bool getElements(std::vector<std::shared_ptr<core::SerializableComponent>> 
&records, size_t &max_size) override;
 
   void destroy();
   uint64_t getKeyCount() const;
-  virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, 
size_t max_size);
 
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp 
b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index b7afc647c..eb56c1355 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -48,6 +48,7 @@
 #include "utils/PropertyErrors.h"
 #include "utils/IntegrationTestUtils.h"
 #include "Utils.h"
+#include "io/BufferStream.h"
 
 TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
   TestController testController;
@@ -448,7 +449,8 @@ TEST_CASE("Test Find file", "[getfileCreate3]") {
 
   for (auto entry : repo->getRepoMap()) {
     minifi::provenance::ProvenanceEventRecord newRecord;
-    newRecord.DeSerialize(gsl::make_span(entry.second).as_span<const 
std::byte>());
+    minifi::io::BufferStream stream(gsl::make_span(entry.second).as_span<const 
std::byte>());
+    newRecord.deserialize(stream);
 
     bool found = false;
     for (const auto& provRec : records) {
@@ -475,7 +477,7 @@ TEST_CASE("Test Find file", "[getfileCreate3]") {
   processorReport->setScheduledState(core::ScheduledState::RUNNING);
   std::string jsonStr;
   std::size_t deserialized = 0;
-  repo->DeSerialize(recordsReport, deserialized);
+  repo->getElements(recordsReport, deserialized);
   std::function<void(const std::shared_ptr<core::ProcessContext> &, const 
std::shared_ptr<core::ProcessSession>&)> verifyReporter =
       [&](const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) {
         taskReport->getJsonReport(context, session, recordsReport, jsonStr);
diff --git a/libminifi/include/core/Repository.h 
b/libminifi/include/core/Repository.h
index 901efcecb..247da7351 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -54,14 +54,14 @@ constexpr auto MAX_REPOSITORY_STORAGE_SIZE = 10_MiB;
 constexpr auto MAX_REPOSITORY_ENTRY_LIFE_TIME = std::chrono::minutes(10);
 constexpr auto REPOSITORY_PURGE_PERIOD = std::chrono::milliseconds(2500);
 
-class Repository : public core::SerializableComponent {
+class Repository : public core::CoreComponent {
  public:
   explicit Repository(std::string repo_name = "Repository",
                       std::string directory = REPOSITORY_DIRECTORY,
                       std::chrono::milliseconds maxPartitionMillis = 
MAX_REPOSITORY_ENTRY_LIFE_TIME,
                       int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
                       std::chrono::milliseconds purgePeriod = 
REPOSITORY_PURGE_PERIOD)
-    : core::SerializableComponent(std::move(repo_name)),
+    : core::CoreComponent(std::move(repo_name)),
       directory_(std::move(directory)),
       max_partition_millis_(maxPartitionMillis),
       max_partition_bytes_(maxPartitionBytes),
@@ -71,10 +71,12 @@ class Repository : public core::SerializableComponent {
       logger_(logging::LoggerFactory<Repository>::getLogger()) {
   }
 
-  virtual bool initialize(const std::shared_ptr<Configure>& /*configure*/) = 0;
+  virtual bool isRunning() {
+    return true;
+  }
 
+  virtual bool initialize(const std::shared_ptr<Configure>& /*configure*/) = 0;
   virtual bool start() = 0;
-
   virtual bool stop() = 0;
 
   virtual bool isNoop() const {
@@ -96,13 +98,7 @@ class Repository : public core::SerializableComponent {
     return true;
   }
 
-  virtual bool 
Delete(std::vector<std::shared_ptr<core::SerializableComponent>> &storedValues) 
{
-    bool found = true;
-    for (const auto& storedValue : storedValues) {
-      found &= Delete(storedValue->getName());
-    }
-    return found;
-  }
+  virtual bool 
Delete(std::vector<std::shared_ptr<core::SerializableComponent>> &storedValues);
 
   void setConnectionMap(std::map<std::string, core::Connectable*> 
connectionMap) {
     connection_map_ = std::move(connectionMap);
@@ -120,63 +116,11 @@ class Repository : public core::SerializableComponent {
     return repo_full_;
   }
 
-  /**
-   * Specialization that allows us to serialize max_size objects into store.
-   * the lambdaConstructor will create objects to put into store
-   * @param store vector in which we can store serialized object
-   * @param max_size reference that stores the max number of objects to 
retrieve and serialize.
-   * upon return max_size will represent the number of serialized objects.
-   * @return status of this operation
-   *
-   * Base implementation returns true;
-   */
-  virtual bool 
Serialize(std::vector<std::shared_ptr<core::SerializableComponent>>& /*store*/, 
size_t /*max_size*/) {
-    return true;
-  }
-
-  /**
-   * Specialization that allows us to deserialize max_size objects into store.
-   * @param store vector in which we can store deserialized object
-   * @param max_size reference that stores the max number of objects to 
retrieve and deserialize.
-   * upon return max_size will represent the number of deserialized objects.
-   * @return status of this operation
-   *
-   * Base implementation returns true;
-   */
-  virtual bool 
DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>& 
/*store*/, size_t& /*max_size*/) {
-    return true;
-  }
-
-  /**
-   * Specialization that allows us to deserialize max_size objects into store.
-   * the lambdaConstructor will create objects to put into store
-   * @param store vector in which we can store deserialized object
-   * @param max_size reference that stores the max number of objects to 
retrieve and deserialize.
-   * upon return max_size will represent the number of deserialized objects.
-   * @param lambdaConstructor reference that will create the objects for store
-   * @return status of this operation
-   *
-   * Base implementation returns true;
-   */
-  virtual bool 
DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>& 
/*store*/, size_t& /*max_size*/, 
std::function<std::shared_ptr<core::SerializableComponent>()> 
/*lambdaConstructor*/) { // NOLINT
-    return true;
-  }
-
-  bool Serialize(const std::shared_ptr<core::SerializableComponent>& 
/*store*/) override {
+  virtual bool 
getElements(std::vector<std::shared_ptr<core::SerializableComponent>>& 
/*store*/, size_t& /*max_size*/) {
     return true;
   }
 
-  bool DeSerialize(const std::shared_ptr<core::SerializableComponent>& 
/*store*/) override {
-    return true;
-  }
-
-  bool DeSerialize(gsl::span<const std::byte>) override {
-    return true;
-  }
-
-  bool Serialize(const std::string &key, const uint8_t *buffer, const size_t 
bufferSize) override {
-    return Put(key, buffer, bufferSize);
-  }
+  virtual bool storeElement(const std::shared_ptr<core::SerializableComponent> 
element);
 
   virtual void loadComponent(const std::shared_ptr<core::ContentRepository>& 
/*content_repo*/) {
   }
diff --git a/libminifi/include/core/SerializableComponent.h 
b/libminifi/include/core/SerializableComponent.h
index d5b0b037b..3d9480ebf 100644
--- a/libminifi/include/core/SerializableComponent.h
+++ b/libminifi/include/core/SerializableComponent.h
@@ -15,86 +15,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_
-#define LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_
+#pragma once
 
-#include <memory>
 #include <string>
 #include <utility>
 
-#include "core/Connectable.h"
 #include "core/Core.h"
 #include "utils/gsl.h"
+#include "io/InputStream.h"
+#include "io/OutputStream.h"
 
 namespace org::apache::nifi::minifi::core {
 
-/**
- * Represents a component that is serializable and an extension point of core 
Component
- */
-class SerializableComponent : public core::Connectable {
+class SerializableComponent : public core::CoreComponent {
  public:
   explicit SerializableComponent(std::string name)
-        : core::Connectable(std::move(name)) {
-    }
-
-  SerializableComponent(std::string name, const utils::Identifier& uuid)
-      : core::Connectable(std::move(name), uuid) {
-  }
-
-  ~SerializableComponent() override = default;
-
-  /**
-   * Serialize this object into the the store
-   * @param store object in which we are serializing data into
-   * @return status of this serialization.
-   */
-  virtual bool Serialize(const std::shared_ptr<core::SerializableComponent> 
&store) = 0;
-
-  /**
-   * Deserialization from the parameter store into the current object
-   * @param store from which we are deserializing the current object
-   * @return status of this deserialization.
-   */
-  virtual bool DeSerialize(const std::shared_ptr<core::SerializableComponent> 
&store) = 0;
-
-  /**
-   * Deserializes the current object using buffer
-   * @param buffer buffer from which we can deserialize the currenet object
-   * @param bufferSize length of buffer from which we can deserialize the 
current object.
-   * @return status of the deserialization.
-   */
-  virtual bool DeSerialize(gsl::span<const std::byte>) = 0;
-
-  /**
-   * Serialization of this object into buffer
-   * @param key string that represents this objects identifier
-   * @param buffer buffer that contains the serialized object
-   * @param bufferSize length of buffer
-   * @return status of serialization
-   */
-  virtual bool Serialize(const std::string& /*key*/, const uint8_t* 
/*buffer*/, const size_t /*bufferSize*/) {
-    return false;
+    : core::CoreComponent(std::move(name)) {
   }
 
-  void yield() override { }
-
-  /**
-   * Determines if we are connected and operating
-   */
-  bool isRunning() override {
-    return true;
-  }
+  virtual ~SerializableComponent() = default;
 
-  /**
-   * Determines if work is available by this connectable
-   * @return boolean if work is available.
-   */
-  bool isWorkAvailable() override {
-    return true;
-  }
+  virtual bool serialize(io::OutputStream& output_stream) = 0;
+  virtual bool deserialize(io::InputStream &input_stream) = 0;
 };
 
 }  // namespace org::apache::nifi::minifi::core
-
-#endif  // LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_
-
diff --git a/libminifi/include/core/repository/VolatileRepository.h 
b/libminifi/include/core/repository/VolatileRepository.h
index 5bdf509cc..05d341f9d 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -82,25 +82,6 @@ class VolatileRepository : public core::ThreadedRepository {
    * @return status of the get operation.
    */
   bool Get(const std::string& key, std::string &value) override;
-  /**
-   * Deserializes objects into store
-   * @param store vector in which we will store newly created objects.
-   * @param max_size size of objects deserialized
-   */
-  bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> 
&store, size_t &max_size, 
std::function<std::shared_ptr<core::SerializableComponent>()> lambda) override;
-
-  /**
-   * Deserializes objects into a store that contains a fixed number of objects 
in which
-   * we will deserialize from this repo
-   * @param store precreated object vector
-   * @param max_size size of objects deserialized
-   */
-  bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> 
&store, size_t &max_size) override;
-
-  /**
-   * Function to load this component.
-   */
-  void loadComponent(const std::shared_ptr<core::ContentRepository> 
&content_repo) override;
 
   uint64_t getRepoSize() const override {
     return repo_data_.current_size;
diff --git a/libminifi/include/provenance/Provenance.h 
b/libminifi/include/provenance/Provenance.h
index e3feb6708..275701fcb 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -15,8 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCE_H_
-#define LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCE_H_
+#pragma once
 
 #include <algorithm>
 #include <atomic>
@@ -43,10 +42,7 @@
 #include "utils/TimeUtil.h"
 
 namespace org::apache::nifi::minifi::provenance {
-// Provenance Event Record Serialization Seg Size
-#define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048
 
-// Provenance Event Record
 class ProvenanceEventRecord : public core::SerializableComponent {
  public:
   enum ProvenanceEventType {
@@ -160,169 +156,168 @@ class ProvenanceEventRecord : public 
core::SerializableComponent {
     _eventTime = std::chrono::system_clock::now();
   }
 
-  // Destructor
   virtual ~ProvenanceEventRecord() = default;
-  // Get the Event ID
-  utils::Identifier getEventId() {
+
+  utils::Identifier getEventId() const {
     return getUUID();
   }
 
   void setEventId(const utils::Identifier &id) {
     setUUID(id);
   }
-  // Get Attributes
-  std::map<std::string, std::string> getAttributes() {
+
+  std::map<std::string, std::string> getAttributes() const {
     return _attributes;
   }
-  // Get Size
-  uint64_t getFileSize() {
+
+  uint64_t getFileSize() const {
     return _size;
   }
-  // ! Get Offset
-  uint64_t getFileOffset() {
+
+  uint64_t getFileOffset() const {
     return _offset;
   }
-  // ! Get Entry Date
-  std::chrono::system_clock::time_point getFlowFileEntryDate() {
+
+  std::chrono::system_clock::time_point getFlowFileEntryDate() const {
     return _entryDate;
   }
-  // ! Get Lineage Start Date
-  std::chrono::system_clock::time_point getlineageStartDate() {
+
+  std::chrono::system_clock::time_point getlineageStartDate() const {
     return _lineageStartDate;
   }
-  // ! Get Event Time
-  std::chrono::system_clock::time_point getEventTime() {
+
+  std::chrono::system_clock::time_point getEventTime() const {
     return _eventTime;
   }
-  // ! Get Event Duration
-  std::chrono::milliseconds getEventDuration() {
+
+  std::chrono::milliseconds getEventDuration() const {
     return _eventDuration;
   }
-  // Set Event Duration
+
   void setEventDuration(std::chrono::milliseconds duration) {
     _eventDuration = duration;
   }
-  // ! Get Event Type
-  ProvenanceEventType getEventType() {
+
+  ProvenanceEventType getEventType() const {
     return _eventType;
   }
-  // Get Component ID
-  std::string getComponentId() {
+
+  std::string getComponentId() const {
     return _componentId;
   }
-  // Get Component Type
-  std::string getComponentType() {
+
+  std::string getComponentType() const {
     return _componentType;
   }
-  // Get FlowFileUuid
-  utils::Identifier getFlowFileUuid() {
+
+  utils::Identifier getFlowFileUuid() const {
     return flow_uuid_;
   }
-  // Get content full path
-  std::string getContentFullPath() {
+
+  std::string getContentFullPath() const {
     return _contentFullPath;
   }
-  // Get LineageIdentifiers
-  std::vector<utils::Identifier> getLineageIdentifiers() {
+
+  std::vector<utils::Identifier> getLineageIdentifiers() const {
     return _lineageIdentifiers;
   }
-  // Get Details
-  std::string getDetails() {
+
+  std::string getDetails() const {
     return _details;
   }
-  // Set Details
-  void setDetails(std::string details) {
+
+  void setDetails(const std::string& details) {
     _details = details;
   }
-  // Get TransitUri
+
   std::string getTransitUri() {
     return _transitUri;
   }
-  // Set TransitUri
-  void setTransitUri(std::string uri) {
+
+  void setTransitUri(const std::string& uri) {
     _transitUri = uri;
   }
-  // Get SourceSystemFlowFileIdentifier
-  std::string getSourceSystemFlowFileIdentifier() {
+
+  std::string getSourceSystemFlowFileIdentifier() const {
     return _sourceSystemFlowFileIdentifier;
   }
-  // Set SourceSystemFlowFileIdentifier
-  void setSourceSystemFlowFileIdentifier(std::string identifier) {
+
+  void setSourceSystemFlowFileIdentifier(const std::string& identifier) {
     _sourceSystemFlowFileIdentifier = identifier;
   }
-  // Get Parent UUIDs
-  std::vector<utils::Identifier> getParentUuids() {
+
+  std::vector<utils::Identifier> getParentUuids() const {
     return _parentUuids;
   }
-  // Add Parent UUID
+
   void addParentUuid(const utils::Identifier& uuid) {
     if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) != 
_parentUuids.end())
       return;
     else
       _parentUuids.push_back(uuid);
   }
-  // Add Parent Flow File
+
   void addParentFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
     addParentUuid(flow->getUUID());
   }
-  // Remove Parent UUID
+
   void removeParentUuid(const utils::Identifier& uuid) {
     _parentUuids.erase(std::remove(_parentUuids.begin(), _parentUuids.end(), 
uuid), _parentUuids.end());
   }
-  // Remove Parent Flow File
+
   void removeParentFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
     removeParentUuid(flow->getUUID());
   }
-  // Get Children UUIDs
-  std::vector<utils::Identifier> getChildrenUuids() {
+
+  std::vector<utils::Identifier> getChildrenUuids() const {
     return _childrenUuids;
   }
-  // Add Child UUID
+
   void addChildUuid(const utils::Identifier& uuid) {
     if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid) != 
_childrenUuids.end())
       return;
     else
       _childrenUuids.push_back(uuid);
   }
-  // Add Child Flow File
-  void addChildFlowFile(std::shared_ptr<core::FlowFile> flow) {
+
+  void addChildFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
     addChildUuid(flow->getUUID());
     return;
   }
-  // Remove Child UUID
+
   void removeChildUuid(const utils::Identifier& uuid) {
     _childrenUuids.erase(std::remove(_childrenUuids.begin(), 
_childrenUuids.end(), uuid), _childrenUuids.end());
   }
-  // Remove Child Flow File
-  void removeChildFlowFile(std::shared_ptr<core::FlowFile> flow) {
+
+  void removeChildFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
     removeChildUuid(flow->getUUID());
   }
-  // Get AlternateIdentifierUri
-  std::string getAlternateIdentifierUri() {
+
+  std::string getAlternateIdentifierUri() const {
     return _alternateIdentifierUri;
   }
-  // Set AlternateIdentifierUri
-  void setAlternateIdentifierUri(std::string uri) {
+
+  void setAlternateIdentifierUri(const std::string& uri) {
     _alternateIdentifierUri = uri;
   }
-  // Get Relationship
-  std::string getRelationship() {
+
+  std::string getRelationship() const {
     return _relationship;
   }
-  // Set Relationship
-  void setRelationship(std::string relation) {
+
+  void setRelationship(const std::string& relation) {
     _relationship = relation;
   }
-  // Get sourceQueueIdentifier
-  std::string getSourceQueueIdentifier() {
+
+  std::string getSourceQueueIdentifier() const {
     return _sourceQueueIdentifier;
   }
-  // Set sourceQueueIdentifier
-  void setSourceQueueIdentifier(std::string identifier) {
+
+  void setSourceQueueIdentifier(const std::string& identifier) {
     _sourceQueueIdentifier = identifier;
   }
-  // fromFlowFile
-  void fromFlowFile(std::shared_ptr<core::FlowFile> &flow) {
+
+  void fromFlowFile(const std::shared_ptr<core::FlowFile> &flow) {
     _entryDate = flow->getEntryDate();
     _lineageStartDate = flow->getlineageStartDate();
     _lineageIdentifiers = flow->getlineageIdentifiers();
@@ -336,21 +331,12 @@ class ProvenanceEventRecord : public 
core::SerializableComponent {
       _contentFullPath = flow->getResourceClaim()->getContentFullPath();
     }
   }
-  using SerializableComponent::Serialize;
-
-  // Serialize the event to a stream
-  bool Serialize(org::apache::nifi::minifi::io::BufferStream& outStream);
 
-  // Serialize and Persistent to the repository
-  bool Serialize(const std::shared_ptr<core::SerializableComponent> &repo) 
override;
-  bool DeSerialize(gsl::span<const std::byte>) override;
-  bool DeSerialize(org::apache::nifi::minifi::io::BufferStream &stream) {
-    return DeSerialize(stream.getBuffer());
-  }
-  bool DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) 
override;
+  bool serialize(io::OutputStream& output_stream) override;
+  bool deserialize(io::InputStream &input_stream) override;
+  bool loadFromRepository(const std::shared_ptr<core::Repository> &repo);
 
  protected:
-  // Event type
   ProvenanceEventType _eventType;
   // Date at which the event was created
   std::chrono::system_clock::time_point _eventTime{};
@@ -358,39 +344,24 @@ class ProvenanceEventRecord : public 
core::SerializableComponent {
   std::chrono::system_clock::time_point _entryDate{};
   // Date at which the origin of this flow file entered the flow
   std::chrono::system_clock::time_point _lineageStartDate{};
-  // Event Duration
   std::chrono::milliseconds _eventDuration{};
-  // Component ID
   std::string _componentId;
-  // Component Type
   std::string _componentType;
   // Size in bytes of the data corresponding to this flow file
   uint64_t _size;
-  // flow uuid
   utils::Identifier flow_uuid_;
-  // Offset to the content
   uint64_t _offset;
-  // Full path to the content
   std::string _contentFullPath;
-  // Attributes key/values pairs for the flow record
   std::map<std::string, std::string> _attributes;
   // UUID string for all parents
   std::vector<utils::Identifier> _lineageIdentifiers;
-  // transitUri
   std::string _transitUri;
-  // sourceSystemFlowFileIdentifier
   std::string _sourceSystemFlowFileIdentifier;
-  // parent UUID
   std::vector<utils::Identifier> _parentUuids;
-  // child UUID
   std::vector<utils::Identifier> _childrenUuids;
-  // detail
   std::string _details;
-  // sourceQueueIdentifier
   std::string _sourceQueueIdentifier;
-  // relationship
   std::string _relationship;
-  // alternateIdentifierUri;
   std::string _alternateIdentifierUri;
 
  private:
@@ -402,13 +373,8 @@ class ProvenanceEventRecord : public 
core::SerializableComponent {
   static std::shared_ptr<utils::IdGenerator> id_generator_;
 };
 
-// Provenance Reporter
 class ProvenanceReporter {
  public:
-  // Constructor
-  /*!
-   * Create a new provenance reporter associated with the process session
-   */
   ProvenanceReporter(std::shared_ptr<core::Repository> repo, std::string 
componentId, std::string componentType)
       : logger_(core::logging::LoggerFactory<ProvenanceReporter>::getLogger()) 
{
     _componentId = componentId;
@@ -416,59 +382,45 @@ class ProvenanceReporter {
     repo_ = repo;
   }
 
-  // Destructor
   virtual ~ProvenanceReporter() {
     clear();
   }
-  // Get events
-  std::set<std::shared_ptr<ProvenanceEventRecord>> getEvents() {
+
+  std::set<std::shared_ptr<ProvenanceEventRecord>> getEvents() const {
     return _events;
   }
-  // Add event
+
   void add(const std::shared_ptr<ProvenanceEventRecord> &event) {
     _events.insert(event);
   }
-  // Remove event
+
   void remove(const std::shared_ptr<ProvenanceEventRecord> &event) {
     if (_events.find(event) != _events.end()) {
       _events.erase(event);
     }
   }
-  //
-  // clear
+
   void clear() {
     _events.clear();
   }
-  // commit
+
   void commit();
-  // create
-  void create(std::shared_ptr<core::FlowFile> flow, std::string detail);
-  // route
-  void route(std::shared_ptr<core::FlowFile> flow, core::Relationship 
relation, std::string detail, std::chrono::milliseconds processingDuration);
-  // modifyAttributes
-  void modifyAttributes(std::shared_ptr<core::FlowFile> flow, std::string 
detail);
-  // modifyContent
-  void modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, 
std::chrono::milliseconds processingDuration);
-  // clone
-  void clone(std::shared_ptr<core::FlowFile> parent, 
std::shared_ptr<core::FlowFile> child);
-  // join
-  void join(std::vector<std::shared_ptr<core::FlowFile> > parents, 
std::shared_ptr<core::FlowFile> child, std::string detail, 
std::chrono::milliseconds processingDuration);
-  // fork
-  void fork(std::vector<std::shared_ptr<core::FlowFile> > child, 
std::shared_ptr<core::FlowFile> parent, std::string detail, 
std::chrono::milliseconds processingDuration);
-  // expire
-  void expire(std::shared_ptr<core::FlowFile> flow, std::string detail);
-  // drop
-  void drop(std::shared_ptr<core::FlowFile> flow, std::string reason);
-  // send
-  void send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, 
std::string detail, std::chrono::milliseconds processingDuration, bool force);
-  // fetch
-  void fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, 
std::string detail, std::chrono::milliseconds processingDuration);
-  // receive
-  void receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri, 
std::string sourceSystemFlowFileIdentifier, std::string detail, 
std::chrono::milliseconds processingDuration);
+  void create(const std::shared_ptr<core::FlowFile>& flow, const std::string& 
detail);
+  void route(const std::shared_ptr<core::FlowFile>& flow, const 
core::Relationship& relation, const std::string& detail, 
std::chrono::milliseconds processingDuration);
+  void modifyAttributes(const std::shared_ptr<core::FlowFile>& flow, const 
std::string& detail);
+  void modifyContent(const std::shared_ptr<core::FlowFile>& flow, const 
std::string& detail, std::chrono::milliseconds processingDuration);
+  void clone(const std::shared_ptr<core::FlowFile>& parent, const 
std::shared_ptr<core::FlowFile>& child);
+  void join(const std::vector<std::shared_ptr<core::FlowFile>>& parents, const 
std::shared_ptr<core::FlowFile>& child, const std::string& detail, 
std::chrono::milliseconds processingDuration);
+  void fork(const std::vector<std::shared_ptr<core::FlowFile>>& children, 
const std::shared_ptr<core::FlowFile>& parent, const std::string& detail, 
std::chrono::milliseconds processingDuration);
+  void expire(const std::shared_ptr<core::FlowFile>& flow, const std::string& 
detail);
+  void drop(const std::shared_ptr<core::FlowFile>& flow, const std::string& 
reason);
+  void send(const std::shared_ptr<core::FlowFile>& flow, const std::string& 
transitUri, const std::string& detail, std::chrono::milliseconds 
processingDuration, bool force);
+  void fetch(const std::shared_ptr<core::FlowFile>& flow, const std::string& 
transitUri, const std::string& detail, std::chrono::milliseconds 
processingDuration);
+  void receive(const std::shared_ptr<core::FlowFile>& flow, const std::string& 
transitUri,
+    const std::string& sourceSystemFlowFileIdentifier, const std::string& 
detail, std::chrono::milliseconds processingDuration);
 
  protected:
-  // allocate
-  std::shared_ptr<ProvenanceEventRecord> 
allocate(ProvenanceEventRecord::ProvenanceEventType eventType, 
std::shared_ptr<core::FlowFile> flow) {
+  std::shared_ptr<ProvenanceEventRecord> 
allocate(ProvenanceEventRecord::ProvenanceEventType eventType, const 
std::shared_ptr<core::FlowFile>& flow) {
     if (repo_->isNoop()) {
       return nullptr;
     }
@@ -480,16 +432,12 @@ class ProvenanceReporter {
     return event;
   }
 
-  // Component ID
   std::string _componentId;
-  // Component Type
   std::string _componentType;
 
  private:
   std::shared_ptr<core::logging::Logger> logger_;
-  // Incoming connection Iterator
   std::set<std::shared_ptr<ProvenanceEventRecord>> _events;
-  // provenance repository.
   std::shared_ptr<core::Repository> repo_;
 
   // Prevent default copy constructor and assignment operation
@@ -498,8 +446,4 @@ class ProvenanceReporter {
   ProvenanceReporter &operator=(const ProvenanceReporter &parent);
 };
 
-// Provenance Repository
-
 }  // namespace org::apache::nifi::minifi::provenance
-
-#endif  // LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCE_H_
diff --git a/libminifi/src/core/Repository.cpp 
b/libminifi/src/core/Repository.cpp
new file mode 100644
index 000000000..5ab712be3
--- /dev/null
+++ b/libminifi/src/core/Repository.cpp
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/Repository.h"
+
+namespace org::apache::nifi::minifi::core {
+
+bool 
Repository::Delete(std::vector<std::shared_ptr<core::SerializableComponent>> 
&storedValues) {
+  bool found = true;
+  for (const auto& storedValue : storedValues) {
+    found &= Delete(storedValue->getName());
+  }
+  return found;
+}
+
+bool Repository::storeElement(const 
std::shared_ptr<core::SerializableComponent> element) {
+  if (!element) {
+    return false;
+  }
+
+  org::apache::nifi::minifi::io::BufferStream outStream;
+
+  element->serialize(outStream);
+
+  if (!Put(element->getUUIDStr(), 
const_cast<uint8_t*>(outStream.getBuffer().as_span<const uint8_t>().data()), 
outStream.size())) {
+    logger_->log_error("NiFi Provenance Store event %s size %llu fail", 
element->getUUIDStr(), outStream.size());
+    return false;
+  }
+  return true;
+}
+
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp 
b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
index 950977728..4c25ef5df 100644
--- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
+++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
@@ -17,8 +17,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 #include <vector>
 #include <queue>
 #include <map>
@@ -164,8 +162,7 @@ void SiteToSiteProvenanceReportingTask::onTrigger(const 
std::shared_ptr<core::Pr
   logging::LOG_DEBUG(logger_) << "batch size " << batch_size_ << " records";
   size_t deserialized = batch_size_;
   std::shared_ptr<core::Repository> repo = context->getProvenanceRepository();
-  std::function<std::shared_ptr<core::SerializableComponent>()> constructor = 
[]() {return std::make_shared<provenance::ProvenanceEventRecord>();};
-  if (!repo->DeSerialize(records, deserialized, constructor) && deserialized 
== 0) {
+  if (!repo->getElements(records, deserialized) && deserialized == 0) {
     return;
   }
   logging::LOG_DEBUG(logger_) << "Captured " << deserialized << " records";
diff --git a/libminifi/src/core/repository/VolatileRepository.cpp 
b/libminifi/src/core/repository/VolatileRepository.cpp
index e186d0053..033efffcc 100644
--- a/libminifi/src/core/repository/VolatileRepository.cpp
+++ b/libminifi/src/core/repository/VolatileRepository.cpp
@@ -25,9 +25,6 @@
 
 namespace org::apache::nifi::minifi::core::repository {
 
-void VolatileRepository::loadComponent(const 
std::shared_ptr<core::ContentRepository>& /*content_repo*/) {
-}
-
 VolatileRepository::~VolatileRepository() {
   for (auto ent : repo_data_.value_vector) {
     delete ent;
@@ -122,45 +119,4 @@ bool VolatileRepository::Get(const std::string &key, 
std::string &value) {
   return false;
 }
 
-bool 
VolatileRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
 &store,
-                                     size_t &max_size, 
std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
-  size_t requested_batch = max_size;
-  max_size = 0;
-  for (auto ent : repo_data_.value_vector) {
-    // let the destructor do the cleanup
-    RepoValue<std::string> repo_value;
-
-    if (ent->getValue(repo_value)) {
-      std::shared_ptr<core::SerializableComponent> newComponent = lambda();
-      // we've taken ownership of this repo value
-      newComponent->DeSerialize(repo_value.getBuffer());
-      store.push_back(newComponent);
-      repo_data_.current_size -= repo_value.getBuffer().size();
-      if (max_size++ >= requested_batch) {
-        break;
-      }
-    }
-  }
-  return max_size > 0;
-}
-
-bool 
VolatileRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
 &store, size_t &max_size) {
-  logger_->log_debug("VolatileRepository -- DeSerialize %u", 
repo_data_.current_size.load());
-  max_size = 0;
-  for (auto ent : repo_data_.value_vector) {
-    // let the destructor do the cleanup
-    RepoValue<std::string> repo_value;
-
-    if (ent->getValue(repo_value)) {
-      // we've taken ownership of this repo value
-      store.at(max_size)->DeSerialize(repo_value.getBuffer());
-      repo_data_.current_size -= repo_value.getBuffer().size();
-      if (max_size++ >= store.size()) {
-        break;
-      }
-    }
-  }
-  return max_size > 0;
-}
-
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/src/provenance/Provenance.cpp 
b/libminifi/src/provenance/Provenance.cpp
index dc705dfc4..c3b9cfefc 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -23,6 +23,7 @@
 #include <string>
 #include <vector>
 #include <list>
+#include <utility>
 
 #include "core/Repository.h"
 #include "io/BufferStream.h"
@@ -40,20 +41,17 @@ const char 
*ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY + 1] = { "CREAT
     "ATTRIBUTES_MODIFIED", "ROUTE", "ADDINFO", "REPLAY" };
 
 
ProvenanceEventRecord::ProvenanceEventRecord(ProvenanceEventRecord::ProvenanceEventType
 event, std::string componentId, std::string componentType)
-    : core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()) 
{
-  _eventType = event;
-  _componentId = componentId;
-  _componentType = componentType;
-  _eventTime = std::chrono::system_clock::now();
+    : core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()),
+      _eventType(event),
+      _componentId(std::move(componentId)),
+      _componentType(std::move(componentType)),
+      _eventTime(std::chrono::system_clock::now()) {
 }
 
-// DeSerialize
-bool ProvenanceEventRecord::DeSerialize(const 
std::shared_ptr<core::SerializableComponent> &store) {
+bool ProvenanceEventRecord::loadFromRepository(const 
std::shared_ptr<core::Repository> &repo) {
   std::string value;
   bool ret;
 
-  const std::shared_ptr<core::Repository> repo = 
std::dynamic_pointer_cast<core::Repository>(store);
-
   if (nullptr == repo || uuid_.isNil()) {
     logger_->log_error("Repo could not be assigned");
     return false;
@@ -69,7 +67,7 @@ bool ProvenanceEventRecord::DeSerialize(const 
std::shared_ptr<core::Serializable
 
   org::apache::nifi::minifi::io::BufferStream stream(value);
 
-  ret = DeSerialize(stream);
+  ret = deserialize(stream);
 
   if (ret) {
     logger_->log_debug("NiFi Provenance retrieve event %s size %llu eventType 
%d success", getUUIDStr(), stream.size(), _eventType);
@@ -80,68 +78,68 @@ bool ProvenanceEventRecord::DeSerialize(const 
std::shared_ptr<core::Serializable
   return ret;
 }
 
-bool 
ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStream& 
outStream) {
+bool ProvenanceEventRecord::serialize(io::OutputStream& output_stream) {
   {
-    const auto ret = outStream.write(this->uuid_);
+    const auto ret = output_stream.write(this->uuid_);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
   {
     uint32_t eventType = this->_eventType;
-    const auto ret = outStream.write(eventType);
+    const auto ret = output_stream.write(eventType);
     if (ret != 4) {
       return false;
     }
   }
   {
     uint64_t event_time_ms = 
std::chrono::duration_cast<std::chrono::milliseconds>(_eventTime.time_since_epoch()).count();
-    const auto ret = outStream.write(event_time_ms);
+    const auto ret = output_stream.write(event_time_ms);
     if (ret != 8) {
       return false;
     }
   }
   {
     uint64_t entry_date_ms = 
std::chrono::duration_cast<std::chrono::milliseconds>(_entryDate.time_since_epoch()).count();
-    const auto ret = outStream.write(entry_date_ms);
+    const auto ret = output_stream.write(entry_date_ms);
     if (ret != 8) {
       return false;
     }
   }
   {
     uint64_t event_duration_ms = this->_eventDuration.count();
-    const auto ret = outStream.write(event_duration_ms);
+    const auto ret = output_stream.write(event_duration_ms);
     if (ret != 8) {
       return false;
     }
   }
   {
     uint64_t lineage_start_date_ms = 
std::chrono::duration_cast<std::chrono::milliseconds>(_lineageStartDate.time_since_epoch()).count();
-    const auto ret = outStream.write(lineage_start_date_ms);
+    const auto ret = output_stream.write(lineage_start_date_ms);
     if (ret != 8) {
       return false;
     }
   }
   {
-    const auto ret = outStream.write(this->_componentId);
+    const auto ret = output_stream.write(this->_componentId);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
   {
-    const auto ret = outStream.write(this->_componentType);
+    const auto ret = output_stream.write(this->_componentType);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
   {
-    const auto ret = outStream.write(this->flow_uuid_);
+    const auto ret = output_stream.write(this->flow_uuid_);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
   {
-    const auto ret = outStream.write(this->_details);
+    const auto ret = output_stream.write(this->_details);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
@@ -149,45 +147,45 @@ bool 
ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStrea
   // write flow attributes
   {
     const auto numAttributes = gsl::narrow<uint32_t>(this->_attributes.size());
-    const auto ret = outStream.write(numAttributes);
+    const auto ret = output_stream.write(numAttributes);
     if (ret != 4) {
       return false;
     }
   }
   for (const auto& itAttribute : _attributes) {
     {
-      const auto ret = outStream.write(itAttribute.first);
+      const auto ret = output_stream.write(itAttribute.first);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     {
-      const auto ret = outStream.write(itAttribute.second);
+      const auto ret = output_stream.write(itAttribute.second);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
   }
   {
-    const auto ret = outStream.write(this->_contentFullPath);
+    const auto ret = output_stream.write(this->_contentFullPath);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
   {
-    const auto ret = outStream.write(this->_size);
+    const auto ret = output_stream.write(this->_size);
     if (ret != 8) {
       return false;
     }
   }
   {
-    const auto ret = outStream.write(this->_offset);
+    const auto ret = output_stream.write(this->_offset);
     if (ret != 8) {
       return false;
     }
   }
   {
-    const auto ret = outStream.write(this->_sourceQueueIdentifier);
+    const auto ret = output_stream.write(this->_sourceQueueIdentifier);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
@@ -196,44 +194,44 @@ bool 
ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStrea
     // write UUIDs
     {
       const auto parent_uuids_count = 
gsl::narrow<uint32_t>(this->_parentUuids.size());
-      const auto ret = outStream.write(parent_uuids_count);
+      const auto ret = output_stream.write(parent_uuids_count);
       if (ret != 4) {
         return false;
       }
     }
     for (const auto& parentUUID : _parentUuids) {
-      const auto ret = outStream.write(parentUUID);
+      const auto ret = output_stream.write(parentUUID);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     {
       const auto children_uuids_count = 
gsl::narrow<uint32_t>(this->_childrenUuids.size());
-      const auto ret = outStream.write(children_uuids_count);
+      const auto ret = output_stream.write(children_uuids_count);
       if (ret != 4) {
         return false;
       }
     }
     for (const auto& childUUID : _childrenUuids) {
-      const auto ret = outStream.write(childUUID);
+      const auto ret = output_stream.write(childUUID);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
   } else if (this->_eventType == ProvenanceEventRecord::SEND || 
this->_eventType == ProvenanceEventRecord::FETCH) {
-    const auto ret = outStream.write(this->_transitUri);
+    const auto ret = output_stream.write(this->_transitUri);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
     {
-      const auto ret = outStream.write(this->_transitUri);
+      const auto ret = output_stream.write(this->_transitUri);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     {
-      const auto ret = outStream.write(this->_sourceSystemFlowFileIdentifier);
+      const auto ret = 
output_stream.write(this->_sourceSystemFlowFileIdentifier);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
@@ -243,23 +241,9 @@ bool 
ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStrea
   return true;
 }
 
-bool ProvenanceEventRecord::Serialize(const 
std::shared_ptr<core::SerializableComponent> &repo) {
-  org::apache::nifi::minifi::io::BufferStream outStream;
-
-  Serialize(outStream);
-
-  // Persist to the DB
-  if (!repo->Serialize(getUUIDStr(), 
const_cast<uint8_t*>(outStream.getBuffer().as_span<const uint8_t>().data()), 
outStream.size())) {
-    logger_->log_error("NiFi Provenance Store event %s size %llu fail", 
getUUIDStr(), outStream.size());
-  }
-  return true;
-}
-
-bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> 
buffer) {
-  org::apache::nifi::minifi::io::BufferStream outStream(buffer);
-
+bool ProvenanceEventRecord::deserialize(io::InputStream &input_stream) {
   {
-    const auto ret = outStream.read(uuid_);
+    const auto ret = input_stream.read(uuid_);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
@@ -267,7 +251,7 @@ bool ProvenanceEventRecord::DeSerialize(const 
gsl::span<const std::byte> buffer)
 
   uint32_t eventType;
   {
-    const auto ret = outStream.read(eventType);
+    const auto ret = input_stream.read(eventType);
     if (ret != 4) {
       return false;
     }
@@ -276,7 +260,7 @@ bool ProvenanceEventRecord::DeSerialize(const 
gsl::span<const std::byte> buffer)
   this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType;
   {
     uint64_t event_time_in_ms;
-    const auto ret = outStream.read(event_time_in_ms);
+    const auto ret = input_stream.read(event_time_in_ms);
     if (ret != 8) {
       return false;
     }
@@ -285,7 +269,7 @@ bool ProvenanceEventRecord::DeSerialize(const 
gsl::span<const std::byte> buffer)
 
   {
     uint64_t entry_date_in_ms;
-    const auto ret = outStream.read(entry_date_in_ms);
+    const auto ret = input_stream.read(entry_date_in_ms);
     if (ret != 8) {
       return false;
     }
@@ -294,7 +278,7 @@ bool ProvenanceEventRecord::DeSerialize(const 
gsl::span<const std::byte> buffer)
 
   {
     uint64_t event_duration_ms;
-    const auto ret = outStream.read(event_duration_ms);
+    const auto ret = input_stream.read(event_duration_ms);
     if (ret != 8) {
       return false;
     }
@@ -303,7 +287,7 @@ bool ProvenanceEventRecord::DeSerialize(const 
gsl::span<const std::byte> buffer)
 
   {
     uint64_t lineage_start_date_in_ms;
-    const auto ret = outStream.read(lineage_start_date_in_ms);
+    const auto ret = input_stream.read(lineage_start_date_in_ms);
     if (ret != 8) {
       return false;
     }
@@ -311,28 +295,28 @@ bool ProvenanceEventRecord::DeSerialize(const 
gsl::span<const std::byte> buffer)
   }
 
   {
-    const auto ret = outStream.read(this->_componentId);
+    const auto ret = input_stream.read(this->_componentId);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
 
   {
-    const auto ret = outStream.read(this->_componentType);
+    const auto ret = input_stream.read(this->_componentType);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
 
   {
-    const auto ret = outStream.read(this->flow_uuid_);
+    const auto ret = input_stream.read(this->flow_uuid_);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
 
   {
-    const auto ret = outStream.read(this->_details);
+    const auto ret = input_stream.read(this->_details);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
@@ -341,7 +325,7 @@ bool ProvenanceEventRecord::DeSerialize(const 
gsl::span<const std::byte> buffer)
   // read flow attributes
   uint32_t numAttributes = 0;
   {
-    const auto ret = outStream.read(numAttributes);
+    const auto ret = input_stream.read(numAttributes);
     if (ret != 4) {
       return false;
     }
@@ -350,14 +334,14 @@ bool ProvenanceEventRecord::DeSerialize(const 
gsl::span<const std::byte> buffer)
   for (uint32_t i = 0; i < numAttributes; i++) {
     std::string key;
     {
-      const auto ret = outStream.read(key);
+      const auto ret = input_stream.read(key);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     std::string value;
     {
-      const auto ret = outStream.read(value);
+      const auto ret = input_stream.read(value);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
@@ -366,28 +350,28 @@ bool ProvenanceEventRecord::DeSerialize(const 
gsl::span<const std::byte> buffer)
   }
 
   {
-    const auto ret = outStream.read(this->_contentFullPath);
+    const auto ret = input_stream.read(this->_contentFullPath);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
 
   {
-    const auto ret = outStream.read(this->_size);
+    const auto ret = input_stream.read(this->_size);
     if (ret != 8) {
       return false;
     }
   }
 
   {
-    const auto ret = outStream.read(this->_offset);
+    const auto ret = input_stream.read(this->_offset);
     if (ret != 8) {
       return false;
     }
   }
 
   {
-    const auto ret = outStream.read(this->_sourceQueueIdentifier);
+    const auto ret = input_stream.read(this->_sourceQueueIdentifier);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
@@ -397,7 +381,7 @@ bool ProvenanceEventRecord::DeSerialize(const 
gsl::span<const std::byte> buffer)
     // read UUIDs
     uint32_t number = 0;
     {
-      const auto ret = outStream.read(number);
+      const auto ret = input_stream.read(number);
       if (ret != 4) {
         return false;
       }
@@ -406,7 +390,7 @@ bool ProvenanceEventRecord::DeSerialize(const 
gsl::span<const std::byte> buffer)
     for (uint32_t i = 0; i < number; i++) {
       utils::Identifier parentUUID;
       {
-        const auto ret = outStream.read(parentUUID);
+        const auto ret = input_stream.read(parentUUID);
         if (ret == 0 || io::isError(ret)) {
           return false;
         }
@@ -415,7 +399,7 @@ bool ProvenanceEventRecord::DeSerialize(const 
gsl::span<const std::byte> buffer)
     }
     number = 0;
     {
-      const auto ret = outStream.read(number);
+      const auto ret = input_stream.read(number);
       if (ret != 4) {
         return false;
       }
@@ -423,7 +407,7 @@ bool ProvenanceEventRecord::DeSerialize(const 
gsl::span<const std::byte> buffer)
     for (uint32_t i = 0; i < number; i++) {
       utils::Identifier childUUID;
       {
-        const auto ret = outStream.read(childUUID);
+        const auto ret = input_stream.read(childUUID);
         if (ret == 0 || io::isError(ret)) {
           return false;
         }
@@ -432,20 +416,20 @@ bool ProvenanceEventRecord::DeSerialize(const 
gsl::span<const std::byte> buffer)
     }
   } else if (this->_eventType == ProvenanceEventRecord::SEND || 
this->_eventType == ProvenanceEventRecord::FETCH) {
     {
-      const auto ret = outStream.read(this->_transitUri);
+      const auto ret = input_stream.read(this->_transitUri);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
   } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
     {
-      const auto ret = outStream.read(this->_transitUri);
+      const auto ret = input_stream.read(this->_transitUri);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     {
-      const auto ret = outStream.read(this->_sourceSystemFlowFileIdentifier);
+      const auto ret = 
input_stream.read(this->_sourceSystemFlowFileIdentifier);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
@@ -469,14 +453,14 @@ void ProvenanceReporter::commit() {
 
   for (auto& event : _events) {
     std::unique_ptr<io::BufferStream> stramptr(new io::BufferStream());
-    event->Serialize(*stramptr);
+    event->serialize(*stramptr);
 
     flowData.emplace_back(event->getUUIDStr(), std::move(stramptr));
   }
   repo_->MultiPut(flowData);
 }
 
-void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, 
std::string detail) {
+void ProvenanceReporter::create(const std::shared_ptr<core::FlowFile>& flow, 
const std::string& detail) {
   auto event = allocate(ProvenanceEventRecord::CREATE, flow);
 
   if (event) {
@@ -485,7 +469,7 @@ void 
ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, std::strin
   }
 }
 
-void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow, 
core::Relationship relation, std::string detail, std::chrono::milliseconds 
processingDuration) {
+void ProvenanceReporter::route(const std::shared_ptr<core::FlowFile>& flow, 
const core::Relationship& relation, const std::string& detail, 
std::chrono::milliseconds processingDuration) {
   auto event = allocate(ProvenanceEventRecord::ROUTE, flow);
 
   if (event) {
@@ -496,7 +480,7 @@ void 
ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow, core::Relat
   }
 }
 
-void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> 
flow, std::string detail) {
+void ProvenanceReporter::modifyAttributes(const 
std::shared_ptr<core::FlowFile>& flow, const std::string& detail) {
   auto event = allocate(ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow);
 
   if (event) {
@@ -505,7 +489,7 @@ void 
ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow,
   }
 }
 
-void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow, 
std::string detail, std::chrono::milliseconds processingDuration) {
+void ProvenanceReporter::modifyContent(const std::shared_ptr<core::FlowFile>& 
flow, const std::string& detail, std::chrono::milliseconds processingDuration) {
   auto event = allocate(ProvenanceEventRecord::CONTENT_MODIFIED, flow);
 
   if (event) {
@@ -515,7 +499,7 @@ void 
ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow, std
   }
 }
 
-void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent, 
std::shared_ptr<core::FlowFile> child) {
+void ProvenanceReporter::clone(const std::shared_ptr<core::FlowFile>& parent, 
const std::shared_ptr<core::FlowFile>& child) {
   auto event = allocate(ProvenanceEventRecord::CLONE, parent);
 
   if (event) {
@@ -525,15 +509,14 @@ void 
ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent, std::shar
   }
 }
 
-void ProvenanceReporter::join(std::vector<std::shared_ptr<core::FlowFile> > 
parents, std::shared_ptr<core::FlowFile> child, std::string detail, 
std::chrono::milliseconds processingDuration) {
+void ProvenanceReporter::join(const 
std::vector<std::shared_ptr<core::FlowFile>>& parents, const 
std::shared_ptr<core::FlowFile>& child,
+    const std::string& detail, std::chrono::milliseconds processingDuration) {
   auto event = allocate(ProvenanceEventRecord::JOIN, child);
 
   if (event) {
     event->addChildFlowFile(child);
-    std::vector<std::shared_ptr<core::FlowFile> >::iterator it;
-    for (it = parents.begin(); it != parents.end(); it++) {
-      std::shared_ptr<core::FlowFile> record = *it;
-      event->addParentFlowFile(record);
+    for (const auto& parent : parents) {
+      event->addParentFlowFile(parent);
     }
     event->setDetails(detail);
     event->setEventDuration(processingDuration);
@@ -541,15 +524,14 @@ void 
ProvenanceReporter::join(std::vector<std::shared_ptr<core::FlowFile> > pare
   }
 }
 
-void ProvenanceReporter::fork(std::vector<std::shared_ptr<core::FlowFile> > 
child, std::shared_ptr<core::FlowFile> parent, std::string detail, 
std::chrono::milliseconds processingDuration) {
+void ProvenanceReporter::fork(const 
std::vector<std::shared_ptr<core::FlowFile>>& children, const 
std::shared_ptr<core::FlowFile>& parent,
+    const std::string& detail, std::chrono::milliseconds processingDuration) {
   auto event = allocate(ProvenanceEventRecord::FORK, parent);
 
   if (event) {
     event->addParentFlowFile(parent);
-    std::vector<std::shared_ptr<core::FlowFile> >::iterator it;
-    for (it = child.begin(); it != child.end(); it++) {
-      std::shared_ptr<core::FlowFile> record = *it;
-      event->addChildFlowFile(record);
+    for (const auto& child : children) {
+      event->addChildFlowFile(child);
     }
     event->setDetails(detail);
     event->setEventDuration(processingDuration);
@@ -557,7 +539,7 @@ void 
ProvenanceReporter::fork(std::vector<std::shared_ptr<core::FlowFile> > chil
   }
 }
 
-void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow, 
std::string detail) {
+void ProvenanceReporter::expire(const std::shared_ptr<core::FlowFile>& flow, 
const std::string& detail) {
   auto event = allocate(ProvenanceEventRecord::EXPIRE, flow);
 
   if (event) {
@@ -566,7 +548,7 @@ void 
ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow, std::strin
   }
 }
 
-void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow, 
std::string reason) {
+void ProvenanceReporter::drop(const std::shared_ptr<core::FlowFile>& flow, 
const std::string& reason) {
   auto event = allocate(ProvenanceEventRecord::DROP, flow);
 
   if (event) {
@@ -576,7 +558,7 @@ void 
ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow, std::string
   }
 }
 
-void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, 
std::string transitUri, std::string detail, std::chrono::milliseconds 
processingDuration, bool force) {
+void ProvenanceReporter::send(const std::shared_ptr<core::FlowFile>& flow, 
const std::string& transitUri, const std::string& detail, 
std::chrono::milliseconds processingDuration, bool force) {
   auto event = allocate(ProvenanceEventRecord::SEND, flow);
 
   if (event) {
@@ -587,15 +569,15 @@ void 
ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, std::string
       add(event);
     } else {
       if (!repo_->isFull())
-        event->Serialize(repo_);
+        repo_->storeElement(event);
     }
   }
 }
 
-void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow,
-                                 std::string transitUri,
-                                 std::string sourceSystemFlowFileIdentifier,
-                                 std::string detail,
+void ProvenanceReporter::receive(const std::shared_ptr<core::FlowFile>& flow,
+                                 const std::string& transitUri,
+                                 const std::string& 
sourceSystemFlowFileIdentifier,
+                                 const std::string& detail,
                                  std::chrono::milliseconds processingDuration) 
{
   auto event = allocate(ProvenanceEventRecord::RECEIVE, flow);
 
@@ -608,7 +590,7 @@ void 
ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow,
   }
 }
 
-void ProvenanceReporter::fetch(std::shared_ptr<core::FlowFile> flow, 
std::string transitUri, std::string detail, std::chrono::milliseconds 
processingDuration) {
+void ProvenanceReporter::fetch(const std::shared_ptr<core::FlowFile>& flow, 
const std::string& transitUri, const std::string& detail, 
std::chrono::milliseconds processingDuration) {
   auto event = allocate(ProvenanceEventRecord::FETCH, flow);
 
   if (event) {
diff --git a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp 
b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
index 7a3f06557..09e3d4e0b 100644
--- a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
+++ b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
@@ -35,128 +35,128 @@ namespace provenance = minifi::provenance;
 using namespace std::literals::chrono_literals;
 
 TEST_CASE("Test Provenance record create", 
"[Testprovenance::ProvenanceEventRecord]") {
-  provenance::ProvenanceEventRecord 
record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah", 
"blahblah");
-  REQUIRE(record1.getAttributes().empty());
-  REQUIRE(record1.getAlternateIdentifierUri().length() == 0);
+  auto record1 = 
std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE,
 "blah", "blahblah");
+  REQUIRE(record1->getAttributes().empty());
+  REQUIRE(record1->getAlternateIdentifierUri().length() == 0);
 }
 
 TEST_CASE("Test Provenance record serialization", 
"[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
-  provenance::ProvenanceEventRecord 
record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, 
"componentid", "componenttype");
+  auto record1 = 
std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE,
 "componentid", "componenttype");
 
-  utils::Identifier eventId = record1.getEventId();
+  utils::Identifier eventId = record1->getEventId();
 
   std::string smileyface = ":)";
-  record1.setDetails(smileyface);
+  record1->setDetails(smileyface);
 
   auto sample = 65555ms;
   std::shared_ptr<core::Repository> testRepository = 
std::make_shared<TestRepository>();
-  record1.setEventDuration(sample);
-
-  record1.Serialize(testRepository);
-  provenance::ProvenanceEventRecord record2;
-  record2.setEventId(eventId);
-  REQUIRE(record2.DeSerialize(testRepository) == true);
-  REQUIRE(record2.getEventId() == record1.getEventId());
-  REQUIRE(record2.getComponentId() == record1.getComponentId());
-  REQUIRE(record2.getComponentType() == record1.getComponentType());
-  REQUIRE(record2.getDetails() == record1.getDetails());
-  REQUIRE(record2.getDetails() == smileyface);
-  REQUIRE(record2.getEventDuration() == sample);
+  record1->setEventDuration(sample);
+
+  testRepository->storeElement(record1);
+  auto record2 = std::make_shared<provenance::ProvenanceEventRecord>();
+  record2->setEventId(eventId);
+  REQUIRE(record2->loadFromRepository(testRepository) == true);
+  REQUIRE(record2->getEventId() == record1->getEventId());
+  REQUIRE(record2->getComponentId() == record1->getComponentId());
+  REQUIRE(record2->getComponentType() == record1->getComponentType());
+  REQUIRE(record2->getDetails() == record1->getDetails());
+  REQUIRE(record2->getDetails() == smileyface);
+  REQUIRE(record2->getEventDuration() == sample);
 }
 
 TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") {
-  provenance::ProvenanceEventRecord 
record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, 
"componentid", "componenttype");
-  utils::Identifier eventId = record1.getEventId();
+  auto record1 = 
std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE,
 "componentid", "componenttype");
+  utils::Identifier eventId = record1->getEventId();
   std::shared_ptr<minifi::FlowFileRecord> ffr1 = 
std::make_shared<minifi::FlowFileRecord>();
   ffr1->setAttribute("potato", "potatoe");
   ffr1->setAttribute("tomato", "tomatoe");
 
-  record1.addChildFlowFile(ffr1);
+  record1->addChildFlowFile(ffr1);
 
   auto sample = 65555ms;
   std::shared_ptr<core::Repository> testRepository = 
std::make_shared<TestRepository>();
-  record1.setEventDuration(sample);
-
-  record1.Serialize(testRepository);
-  provenance::ProvenanceEventRecord record2;
-  record2.setEventId(eventId);
-  REQUIRE(record2.DeSerialize(testRepository) == true);
-  REQUIRE(record1.getChildrenUuids().size() == 1);
-  REQUIRE(record2.getChildrenUuids().size() == 1);
-  utils::Identifier childId = record2.getChildrenUuids().at(0);
+  record1->setEventDuration(sample);
+
+  testRepository->storeElement(record1);
+  auto record2 = std::make_shared<provenance::ProvenanceEventRecord>();
+  record2->setEventId(eventId);
+  REQUIRE(record2->loadFromRepository(testRepository) == true);
+  REQUIRE(record1->getChildrenUuids().size() == 1);
+  REQUIRE(record2->getChildrenUuids().size() == 1);
+  utils::Identifier childId = record2->getChildrenUuids().at(0);
   REQUIRE(childId == ffr1->getUUID());
-  record2.removeChildUuid(childId);
-  REQUIRE(record2.getChildrenUuids().empty());
+  record2->removeChildUuid(childId);
+  REQUIRE(record2->getChildrenUuids().empty());
 }
 
 TEST_CASE("Test Provenance record serialization Volatile", 
"[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
-  provenance::ProvenanceEventRecord 
record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, 
"componentid", "componenttype");
+  auto record1 = 
std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE,
 "componentid", "componenttype");
 
-  utils::Identifier eventId = record1.getEventId();
+  utils::Identifier eventId = record1->getEventId();
 
   std::string smileyface = ":)";
-  record1.setDetails(smileyface);
+  record1->setDetails(smileyface);
 
   auto sample = 65555ms;
 
   std::shared_ptr<core::Repository> testRepository = 
std::make_shared<core::repository::VolatileProvenanceRepository>();
   testRepository->initialize(nullptr);
-  record1.setEventDuration(sample);
-
-  record1.Serialize(testRepository);
-  provenance::ProvenanceEventRecord record2;
-  record2.setEventId(eventId);
-  REQUIRE(record2.DeSerialize(testRepository) == true);
-  REQUIRE(record2.getEventId() == record1.getEventId());
-  REQUIRE(record2.getComponentId() == record1.getComponentId());
-  REQUIRE(record2.getComponentType() == record1.getComponentType());
-  REQUIRE(record2.getDetails() == record1.getDetails());
-  REQUIRE(record2.getDetails() == smileyface);
-  REQUIRE(record2.getEventDuration() == sample);
+  record1->setEventDuration(sample);
+
+  testRepository->storeElement(record1);
+  auto record2 = std::make_shared<provenance::ProvenanceEventRecord>();
+  record2->setEventId(eventId);
+  REQUIRE(record2->loadFromRepository(testRepository) == true);
+  REQUIRE(record2->getEventId() == record1->getEventId());
+  REQUIRE(record2->getComponentId() == record1->getComponentId());
+  REQUIRE(record2->getComponentType() == record1->getComponentType());
+  REQUIRE(record2->getDetails() == record1->getDetails());
+  REQUIRE(record2->getDetails() == smileyface);
+  REQUIRE(record2->getEventDuration() == sample);
 }
 
 TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", 
"[TestFlowAndProv1]") {
-  provenance::ProvenanceEventRecord 
record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, 
"componentid", "componenttype");
-  utils::Identifier eventId = record1.getEventId();
+  auto record1 = 
std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE,
 "componentid", "componenttype");
+  utils::Identifier eventId = record1->getEventId();
   std::shared_ptr<minifi::FlowFileRecord> ffr1 = 
std::make_shared<minifi::FlowFileRecord>();
   ffr1->setAttribute("potato", "potatoe");
   ffr1->setAttribute("tomato", "tomatoe");
 
-  record1.addChildFlowFile(ffr1);
+  record1->addChildFlowFile(ffr1);
 
   auto sample = 65555ms;
   std::shared_ptr<core::Repository> testRepository = 
std::make_shared<core::repository::VolatileProvenanceRepository>();
   testRepository->initialize(nullptr);
-  record1.setEventDuration(sample);
-
-  record1.Serialize(testRepository);
-  provenance::ProvenanceEventRecord record2;
-  record2.setEventId(eventId);
-  REQUIRE(record2.DeSerialize(testRepository) == true);
-  REQUIRE(record1.getChildrenUuids().size() == 1);
-  REQUIRE(record2.getChildrenUuids().size() == 1);
-  utils::Identifier childId = record2.getChildrenUuids().at(0);
+  record1->setEventDuration(sample);
+
+  testRepository->storeElement(record1);
+  auto record2 = std::make_shared<provenance::ProvenanceEventRecord>();
+  record2->setEventId(eventId);
+  REQUIRE(record2->loadFromRepository(testRepository) == true);
+  REQUIRE(record1->getChildrenUuids().size() == 1);
+  REQUIRE(record2->getChildrenUuids().size() == 1);
+  utils::Identifier childId = record2->getChildrenUuids().at(0);
   REQUIRE(childId == ffr1->getUUID());
-  record2.removeChildUuid(childId);
-  REQUIRE(record2.getChildrenUuids().empty());
+  record2->removeChildUuid(childId);
+  REQUIRE(record2->getChildrenUuids().empty());
 }
 
 TEST_CASE("Test Provenance record serialization NoOp", 
"[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
-  provenance::ProvenanceEventRecord 
record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, 
"componentid", "componenttype");
+  auto record1 = 
std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE,
 "componentid", "componenttype");
 
-  utils::Identifier eventId = record1.getEventId();
+  utils::Identifier eventId = record1->getEventId();
 
   std::string smileyface = ":)";
-  record1.setDetails(smileyface);
+  record1->setDetails(smileyface);
 
   auto sample = 65555ms;
 
   std::shared_ptr<core::Repository> testRepository = 
core::createRepository("nooprepository");
   testRepository->initialize(nullptr);
-  record1.setEventDuration(sample);
+  record1->setEventDuration(sample);
 
-  REQUIRE(record1.Serialize(testRepository) == true);
-  provenance::ProvenanceEventRecord record2;
-  record2.setEventId(eventId);
-  REQUIRE(record2.DeSerialize(testRepository) == false);
+  REQUIRE(testRepository->storeElement(record1));
+  auto record2 = std::make_shared<provenance::ProvenanceEventRecord>();
+  record2->setEventId(eventId);
+  REQUIRE(record2->loadFromRepository(testRepository) == false);
 }
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h 
b/libminifi/test/unit/ProvenanceTestHelper.h
index 41ca154a8..400e0e91d 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -35,6 +35,7 @@
 #include "properties/Configure.h"
 #include "provenance/Provenance.h"
 #include "SwapManager.h"
+#include "io/BufferStream.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -73,10 +74,6 @@ class TestRepositoryBase : public T_BaseRepository {
     return true;
   }
 
-  bool Serialize(const std::string &key, const uint8_t *buffer, const size_t 
bufferSize) override {
-    return Put(key, buffer, bufferSize);
-  }
-
   bool Delete(const std::string& key) override {
     std::lock_guard<std::mutex> lock{repository_results_mutex_};
     repository_results_.erase(key);
@@ -94,11 +91,7 @@ class TestRepositoryBase : public T_BaseRepository {
     }
   }
 
-  bool 
Serialize(std::vector<std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent>>&
 /*store*/, size_t /*max_size*/) override {
-    return false;
-  }
-
-  bool 
DeSerialize(std::vector<std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent>>
 &store, size_t &max_size) override {
+  bool 
getElements(std::vector<std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent>>
 &store, size_t &max_size) override {
     std::lock_guard<std::mutex> lock{repository_results_mutex_};
     max_size = 0;
     for (const auto &entry : repository_results_) {
@@ -106,27 +99,20 @@ class TestRepositoryBase : public T_BaseRepository {
         break;
       }
       const auto eventRead = store.at(max_size);
-      eventRead->DeSerialize(gsl::make_span(entry.second).template 
as_span<const std::byte>());
+      org::apache::nifi::minifi::io::BufferStream 
stream(gsl::make_span(entry.second).template as_span<const std::byte>());
+      eventRead->deserialize(stream);
       ++max_size;
     }
     return true;
   }
 
-  bool Serialize(const 
std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent>& 
/*store*/) override {
-    return false;
-  }
-
-  bool DeSerialize(const 
std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent> &store) 
override {
-    std::string value;
-    Get(store->getUUIDStr(), value);
-    store->DeSerialize(gsl::make_span(value).as_span<const std::byte>());
+  bool storeElement(const 
std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent> 
element) override {
+    org::apache::nifi::minifi::io::BufferStream outStream;
+    element->serialize(outStream);
+    Put(element->getUUIDStr(), 
const_cast<uint8_t*>(outStream.getBuffer().as_span<const uint8_t>().data()), 
outStream.size());
     return true;
   }
 
-  bool DeSerialize(gsl::span<const std::byte>) override {
-    return false;
-  }
-
   std::map<std::string, std::string> getRepoMap() const {
     std::lock_guard<std::mutex> lock{repository_results_mutex_};
     return repository_results_;
@@ -199,9 +185,6 @@ class TestFlowRepository : public 
org::apache::nifi::minifi::core::ThreadedRepos
     }
   }
 
-  void loadComponent(const 
std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>& 
/*content_repo*/) override {
-  }
-
  private:
   void run() override {
   }

Reply via email to