This is an automated email from the ASF dual-hosted git repository. lordgamez pushed a commit to branch MINIFICPP-2028 in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 83e0af6c58de86bfc7660c5465aaade9a37d988c Author: Gabor Gyimesi <[email protected]> AuthorDate: Mon Jan 9 14:24:56 2023 +0100 MINIFICPP-2028 Remove SerializableComponent dependency from Repository --- extensions/rocksdb-repos/ProvenanceRepository.cpp | 42 +--- extensions/rocksdb-repos/ProvenanceRepository.h | 7 +- .../tests/unit/ProcessorTests.cpp | 6 +- libminifi/include/core/Repository.h | 74 +------ libminifi/include/core/SerializableComponent.h | 74 +------ .../include/core/repository/VolatileRepository.h | 19 -- libminifi/include/provenance/Provenance.h | 240 ++++++++------------- libminifi/src/core/Repository.cpp | 46 ++++ .../SiteToSiteProvenanceReportingTask.cpp | 5 +- .../src/core/repository/VolatileRepository.cpp | 44 ---- libminifi/src/provenance/Provenance.cpp | 180 +++++++--------- libminifi/test/rocksdb-tests/ProvenanceTests.cpp | 138 ++++++------ libminifi/test/unit/ProvenanceTestHelper.h | 33 +-- 13 files changed, 323 insertions(+), 585 deletions(-) diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp index 0c7da870d..2680de9d5 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.cpp +++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp @@ -122,35 +122,17 @@ bool ProvenanceRepository::Get(const std::string &key, std::string &value) { return db_->Get(rocksdb::ReadOptions(), key, &value).ok(); } -bool ProvenanceRepository::Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) { - return Put(key, buffer, bufferSize); -} - -bool ProvenanceRepository::get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size) { - std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions())); - for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); - std::string key = it->key().ToString(); - if (store.size() >= max_size) - break; - if (eventRead->DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>())) { - store.push_back(std::dynamic_pointer_cast<core::CoreComponent>(eventRead)); - } - } - return true; -} - -bool ProvenanceRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size, - std::function<std::shared_ptr<core::SerializableComponent>()> lambda) { +bool ProvenanceRepository::getElements(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size) { std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions())); size_t requested_batch = max_size; max_size = 0; for (it->SeekToFirst(); it->Valid(); it->Next()) { if (max_size >= requested_batch) break; - std::shared_ptr<core::SerializableComponent> eventRead = lambda(); + auto eventRead = std::make_shared<ProvenanceEventRecord>(); std::string key = it->key().ToString(); - if (eventRead->DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>())) { + io::BufferStream stream(gsl::make_span(it->value()).as_span<const std::byte>()); + if (eventRead->deserialize(stream)) { max_size++; records.push_back(eventRead); } @@ -158,22 +140,6 @@ bool ProvenanceRepository::DeSerialize(std::vector<std::shared_ptr<core::Seriali return max_size > 0; } -bool ProvenanceRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) { - std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions())); - max_size = 0; - for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); - std::string key = it->key().ToString(); - - if (store.at(max_size)->DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>())) { - max_size++; - } - if (store.size() >= max_size) - break; - } - return max_size > 0; -} - void ProvenanceRepository::destroy() { db_.reset(); } diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h index 0dc9cb6e4..7aaeaa34e 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.h +++ b/extensions/rocksdb-repos/ProvenanceRepository.h @@ -79,15 +79,10 @@ class ProvenanceRepository : public core::ThreadedRepository { } bool Get(const std::string &key, std::string &value) override; - - bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) override; - bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size, - std::function<std::shared_ptr<core::SerializableComponent>()> lambda) override; - bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) override; + bool getElements(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size) override; void destroy(); uint64_t getKeyCount() const; - virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size); // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp index b7afc647c..eb56c1355 100644 --- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp +++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp @@ -48,6 +48,7 @@ #include "utils/PropertyErrors.h" #include "utils/IntegrationTestUtils.h" #include "Utils.h" +#include "io/BufferStream.h" TEST_CASE("Test Creation of GetFile", "[getfileCreate]") { TestController testController; @@ -448,7 +449,8 @@ TEST_CASE("Test Find file", "[getfileCreate3]") { for (auto entry : repo->getRepoMap()) { minifi::provenance::ProvenanceEventRecord newRecord; - newRecord.DeSerialize(gsl::make_span(entry.second).as_span<const std::byte>()); + minifi::io::BufferStream stream(gsl::make_span(entry.second).as_span<const std::byte>()); + newRecord.deserialize(stream); bool found = false; for (const auto& provRec : records) { @@ -475,7 +477,7 @@ TEST_CASE("Test Find file", "[getfileCreate3]") { processorReport->setScheduledState(core::ScheduledState::RUNNING); std::string jsonStr; std::size_t deserialized = 0; - repo->DeSerialize(recordsReport, deserialized); + repo->getElements(recordsReport, deserialized); std::function<void(const std::shared_ptr<core::ProcessContext> &, const std::shared_ptr<core::ProcessSession>&)> verifyReporter = [&](const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { taskReport->getJsonReport(context, session, recordsReport, jsonStr); diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h index 901efcecb..247da7351 100644 --- a/libminifi/include/core/Repository.h +++ b/libminifi/include/core/Repository.h @@ -54,14 +54,14 @@ constexpr auto MAX_REPOSITORY_STORAGE_SIZE = 10_MiB; constexpr auto MAX_REPOSITORY_ENTRY_LIFE_TIME = std::chrono::minutes(10); constexpr auto REPOSITORY_PURGE_PERIOD = std::chrono::milliseconds(2500); -class Repository : public core::SerializableComponent { +class Repository : public core::CoreComponent { public: explicit Repository(std::string repo_name = "Repository", std::string directory = REPOSITORY_DIRECTORY, std::chrono::milliseconds maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE, std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD) - : core::SerializableComponent(std::move(repo_name)), + : core::CoreComponent(std::move(repo_name)), directory_(std::move(directory)), max_partition_millis_(maxPartitionMillis), max_partition_bytes_(maxPartitionBytes), @@ -71,10 +71,12 @@ class Repository : public core::SerializableComponent { logger_(logging::LoggerFactory<Repository>::getLogger()) { } - virtual bool initialize(const std::shared_ptr<Configure>& /*configure*/) = 0; + virtual bool isRunning() { + return true; + } + virtual bool initialize(const std::shared_ptr<Configure>& /*configure*/) = 0; virtual bool start() = 0; - virtual bool stop() = 0; virtual bool isNoop() const { @@ -96,13 +98,7 @@ class Repository : public core::SerializableComponent { return true; } - virtual bool Delete(std::vector<std::shared_ptr<core::SerializableComponent>> &storedValues) { - bool found = true; - for (const auto& storedValue : storedValues) { - found &= Delete(storedValue->getName()); - } - return found; - } + virtual bool Delete(std::vector<std::shared_ptr<core::SerializableComponent>> &storedValues); void setConnectionMap(std::map<std::string, core::Connectable*> connectionMap) { connection_map_ = std::move(connectionMap); @@ -120,63 +116,11 @@ class Repository : public core::SerializableComponent { return repo_full_; } - /** - * Specialization that allows us to serialize max_size objects into store. - * the lambdaConstructor will create objects to put into store - * @param store vector in which we can store serialized object - * @param max_size reference that stores the max number of objects to retrieve and serialize. - * upon return max_size will represent the number of serialized objects. - * @return status of this operation - * - * Base implementation returns true; - */ - virtual bool Serialize(std::vector<std::shared_ptr<core::SerializableComponent>>& /*store*/, size_t /*max_size*/) { - return true; - } - - /** - * Specialization that allows us to deserialize max_size objects into store. - * @param store vector in which we can store deserialized object - * @param max_size reference that stores the max number of objects to retrieve and deserialize. - * upon return max_size will represent the number of deserialized objects. - * @return status of this operation - * - * Base implementation returns true; - */ - virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>& /*store*/, size_t& /*max_size*/) { - return true; - } - - /** - * Specialization that allows us to deserialize max_size objects into store. - * the lambdaConstructor will create objects to put into store - * @param store vector in which we can store deserialized object - * @param max_size reference that stores the max number of objects to retrieve and deserialize. - * upon return max_size will represent the number of deserialized objects. - * @param lambdaConstructor reference that will create the objects for store - * @return status of this operation - * - * Base implementation returns true; - */ - virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>& /*store*/, size_t& /*max_size*/, std::function<std::shared_ptr<core::SerializableComponent>()> /*lambdaConstructor*/) { // NOLINT - return true; - } - - bool Serialize(const std::shared_ptr<core::SerializableComponent>& /*store*/) override { + virtual bool getElements(std::vector<std::shared_ptr<core::SerializableComponent>>& /*store*/, size_t& /*max_size*/) { return true; } - bool DeSerialize(const std::shared_ptr<core::SerializableComponent>& /*store*/) override { - return true; - } - - bool DeSerialize(gsl::span<const std::byte>) override { - return true; - } - - bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) override { - return Put(key, buffer, bufferSize); - } + virtual bool storeElement(const std::shared_ptr<core::SerializableComponent> element); virtual void loadComponent(const std::shared_ptr<core::ContentRepository>& /*content_repo*/) { } diff --git a/libminifi/include/core/SerializableComponent.h b/libminifi/include/core/SerializableComponent.h index d5b0b037b..3d9480ebf 100644 --- a/libminifi/include/core/SerializableComponent.h +++ b/libminifi/include/core/SerializableComponent.h @@ -15,86 +15,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_ -#define LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_ +#pragma once -#include <memory> #include <string> #include <utility> -#include "core/Connectable.h" #include "core/Core.h" #include "utils/gsl.h" +#include "io/InputStream.h" +#include "io/OutputStream.h" namespace org::apache::nifi::minifi::core { -/** - * Represents a component that is serializable and an extension point of core Component - */ -class SerializableComponent : public core::Connectable { +class SerializableComponent : public core::CoreComponent { public: explicit SerializableComponent(std::string name) - : core::Connectable(std::move(name)) { - } - - SerializableComponent(std::string name, const utils::Identifier& uuid) - : core::Connectable(std::move(name), uuid) { - } - - ~SerializableComponent() override = default; - - /** - * Serialize this object into the the store - * @param store object in which we are serializing data into - * @return status of this serialization. - */ - virtual bool Serialize(const std::shared_ptr<core::SerializableComponent> &store) = 0; - - /** - * Deserialization from the parameter store into the current object - * @param store from which we are deserializing the current object - * @return status of this deserialization. - */ - virtual bool DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) = 0; - - /** - * Deserializes the current object using buffer - * @param buffer buffer from which we can deserialize the currenet object - * @param bufferSize length of buffer from which we can deserialize the current object. - * @return status of the deserialization. - */ - virtual bool DeSerialize(gsl::span<const std::byte>) = 0; - - /** - * Serialization of this object into buffer - * @param key string that represents this objects identifier - * @param buffer buffer that contains the serialized object - * @param bufferSize length of buffer - * @return status of serialization - */ - virtual bool Serialize(const std::string& /*key*/, const uint8_t* /*buffer*/, const size_t /*bufferSize*/) { - return false; + : core::CoreComponent(std::move(name)) { } - void yield() override { } - - /** - * Determines if we are connected and operating - */ - bool isRunning() override { - return true; - } + virtual ~SerializableComponent() = default; - /** - * Determines if work is available by this connectable - * @return boolean if work is available. - */ - bool isWorkAvailable() override { - return true; - } + virtual bool serialize(io::OutputStream& output_stream) = 0; + virtual bool deserialize(io::InputStream &input_stream) = 0; }; } // namespace org::apache::nifi::minifi::core - -#endif // LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_ - diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h index 5bdf509cc..05d341f9d 100644 --- a/libminifi/include/core/repository/VolatileRepository.h +++ b/libminifi/include/core/repository/VolatileRepository.h @@ -82,25 +82,6 @@ class VolatileRepository : public core::ThreadedRepository { * @return status of the get operation. */ bool Get(const std::string& key, std::string &value) override; - /** - * Deserializes objects into store - * @param store vector in which we will store newly created objects. - * @param max_size size of objects deserialized - */ - bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) override; - - /** - * Deserializes objects into a store that contains a fixed number of objects in which - * we will deserialize from this repo - * @param store precreated object vector - * @param max_size size of objects deserialized - */ - bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) override; - - /** - * Function to load this component. - */ - void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) override; uint64_t getRepoSize() const override { return repo_data_.current_size; diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h index e3feb6708..275701fcb 100644 --- a/libminifi/include/provenance/Provenance.h +++ b/libminifi/include/provenance/Provenance.h @@ -15,8 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCE_H_ -#define LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCE_H_ +#pragma once #include <algorithm> #include <atomic> @@ -43,10 +42,7 @@ #include "utils/TimeUtil.h" namespace org::apache::nifi::minifi::provenance { -// Provenance Event Record Serialization Seg Size -#define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048 -// Provenance Event Record class ProvenanceEventRecord : public core::SerializableComponent { public: enum ProvenanceEventType { @@ -160,169 +156,168 @@ class ProvenanceEventRecord : public core::SerializableComponent { _eventTime = std::chrono::system_clock::now(); } - // Destructor virtual ~ProvenanceEventRecord() = default; - // Get the Event ID - utils::Identifier getEventId() { + + utils::Identifier getEventId() const { return getUUID(); } void setEventId(const utils::Identifier &id) { setUUID(id); } - // Get Attributes - std::map<std::string, std::string> getAttributes() { + + std::map<std::string, std::string> getAttributes() const { return _attributes; } - // Get Size - uint64_t getFileSize() { + + uint64_t getFileSize() const { return _size; } - // ! Get Offset - uint64_t getFileOffset() { + + uint64_t getFileOffset() const { return _offset; } - // ! Get Entry Date - std::chrono::system_clock::time_point getFlowFileEntryDate() { + + std::chrono::system_clock::time_point getFlowFileEntryDate() const { return _entryDate; } - // ! Get Lineage Start Date - std::chrono::system_clock::time_point getlineageStartDate() { + + std::chrono::system_clock::time_point getlineageStartDate() const { return _lineageStartDate; } - // ! Get Event Time - std::chrono::system_clock::time_point getEventTime() { + + std::chrono::system_clock::time_point getEventTime() const { return _eventTime; } - // ! Get Event Duration - std::chrono::milliseconds getEventDuration() { + + std::chrono::milliseconds getEventDuration() const { return _eventDuration; } - // Set Event Duration + void setEventDuration(std::chrono::milliseconds duration) { _eventDuration = duration; } - // ! Get Event Type - ProvenanceEventType getEventType() { + + ProvenanceEventType getEventType() const { return _eventType; } - // Get Component ID - std::string getComponentId() { + + std::string getComponentId() const { return _componentId; } - // Get Component Type - std::string getComponentType() { + + std::string getComponentType() const { return _componentType; } - // Get FlowFileUuid - utils::Identifier getFlowFileUuid() { + + utils::Identifier getFlowFileUuid() const { return flow_uuid_; } - // Get content full path - std::string getContentFullPath() { + + std::string getContentFullPath() const { return _contentFullPath; } - // Get LineageIdentifiers - std::vector<utils::Identifier> getLineageIdentifiers() { + + std::vector<utils::Identifier> getLineageIdentifiers() const { return _lineageIdentifiers; } - // Get Details - std::string getDetails() { + + std::string getDetails() const { return _details; } - // Set Details - void setDetails(std::string details) { + + void setDetails(const std::string& details) { _details = details; } - // Get TransitUri + std::string getTransitUri() { return _transitUri; } - // Set TransitUri - void setTransitUri(std::string uri) { + + void setTransitUri(const std::string& uri) { _transitUri = uri; } - // Get SourceSystemFlowFileIdentifier - std::string getSourceSystemFlowFileIdentifier() { + + std::string getSourceSystemFlowFileIdentifier() const { return _sourceSystemFlowFileIdentifier; } - // Set SourceSystemFlowFileIdentifier - void setSourceSystemFlowFileIdentifier(std::string identifier) { + + void setSourceSystemFlowFileIdentifier(const std::string& identifier) { _sourceSystemFlowFileIdentifier = identifier; } - // Get Parent UUIDs - std::vector<utils::Identifier> getParentUuids() { + + std::vector<utils::Identifier> getParentUuids() const { return _parentUuids; } - // Add Parent UUID + void addParentUuid(const utils::Identifier& uuid) { if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) != _parentUuids.end()) return; else _parentUuids.push_back(uuid); } - // Add Parent Flow File + void addParentFlowFile(const std::shared_ptr<core::FlowFile>& flow) { addParentUuid(flow->getUUID()); } - // Remove Parent UUID + void removeParentUuid(const utils::Identifier& uuid) { _parentUuids.erase(std::remove(_parentUuids.begin(), _parentUuids.end(), uuid), _parentUuids.end()); } - // Remove Parent Flow File + void removeParentFlowFile(const std::shared_ptr<core::FlowFile>& flow) { removeParentUuid(flow->getUUID()); } - // Get Children UUIDs - std::vector<utils::Identifier> getChildrenUuids() { + + std::vector<utils::Identifier> getChildrenUuids() const { return _childrenUuids; } - // Add Child UUID + void addChildUuid(const utils::Identifier& uuid) { if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid) != _childrenUuids.end()) return; else _childrenUuids.push_back(uuid); } - // Add Child Flow File - void addChildFlowFile(std::shared_ptr<core::FlowFile> flow) { + + void addChildFlowFile(const std::shared_ptr<core::FlowFile>& flow) { addChildUuid(flow->getUUID()); return; } - // Remove Child UUID + void removeChildUuid(const utils::Identifier& uuid) { _childrenUuids.erase(std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid), _childrenUuids.end()); } - // Remove Child Flow File - void removeChildFlowFile(std::shared_ptr<core::FlowFile> flow) { + + void removeChildFlowFile(const std::shared_ptr<core::FlowFile>& flow) { removeChildUuid(flow->getUUID()); } - // Get AlternateIdentifierUri - std::string getAlternateIdentifierUri() { + + std::string getAlternateIdentifierUri() const { return _alternateIdentifierUri; } - // Set AlternateIdentifierUri - void setAlternateIdentifierUri(std::string uri) { + + void setAlternateIdentifierUri(const std::string& uri) { _alternateIdentifierUri = uri; } - // Get Relationship - std::string getRelationship() { + + std::string getRelationship() const { return _relationship; } - // Set Relationship - void setRelationship(std::string relation) { + + void setRelationship(const std::string& relation) { _relationship = relation; } - // Get sourceQueueIdentifier - std::string getSourceQueueIdentifier() { + + std::string getSourceQueueIdentifier() const { return _sourceQueueIdentifier; } - // Set sourceQueueIdentifier - void setSourceQueueIdentifier(std::string identifier) { + + void setSourceQueueIdentifier(const std::string& identifier) { _sourceQueueIdentifier = identifier; } - // fromFlowFile - void fromFlowFile(std::shared_ptr<core::FlowFile> &flow) { + + void fromFlowFile(const std::shared_ptr<core::FlowFile> &flow) { _entryDate = flow->getEntryDate(); _lineageStartDate = flow->getlineageStartDate(); _lineageIdentifiers = flow->getlineageIdentifiers(); @@ -336,21 +331,12 @@ class ProvenanceEventRecord : public core::SerializableComponent { _contentFullPath = flow->getResourceClaim()->getContentFullPath(); } } - using SerializableComponent::Serialize; - - // Serialize the event to a stream - bool Serialize(org::apache::nifi::minifi::io::BufferStream& outStream); - // Serialize and Persistent to the repository - bool Serialize(const std::shared_ptr<core::SerializableComponent> &repo) override; - bool DeSerialize(gsl::span<const std::byte>) override; - bool DeSerialize(org::apache::nifi::minifi::io::BufferStream &stream) { - return DeSerialize(stream.getBuffer()); - } - bool DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) override; + bool serialize(io::OutputStream& output_stream) override; + bool deserialize(io::InputStream &input_stream) override; + bool loadFromRepository(const std::shared_ptr<core::Repository> &repo); protected: - // Event type ProvenanceEventType _eventType; // Date at which the event was created std::chrono::system_clock::time_point _eventTime{}; @@ -358,39 +344,24 @@ class ProvenanceEventRecord : public core::SerializableComponent { std::chrono::system_clock::time_point _entryDate{}; // Date at which the origin of this flow file entered the flow std::chrono::system_clock::time_point _lineageStartDate{}; - // Event Duration std::chrono::milliseconds _eventDuration{}; - // Component ID std::string _componentId; - // Component Type std::string _componentType; // Size in bytes of the data corresponding to this flow file uint64_t _size; - // flow uuid utils::Identifier flow_uuid_; - // Offset to the content uint64_t _offset; - // Full path to the content std::string _contentFullPath; - // Attributes key/values pairs for the flow record std::map<std::string, std::string> _attributes; // UUID string for all parents std::vector<utils::Identifier> _lineageIdentifiers; - // transitUri std::string _transitUri; - // sourceSystemFlowFileIdentifier std::string _sourceSystemFlowFileIdentifier; - // parent UUID std::vector<utils::Identifier> _parentUuids; - // child UUID std::vector<utils::Identifier> _childrenUuids; - // detail std::string _details; - // sourceQueueIdentifier std::string _sourceQueueIdentifier; - // relationship std::string _relationship; - // alternateIdentifierUri; std::string _alternateIdentifierUri; private: @@ -402,13 +373,8 @@ class ProvenanceEventRecord : public core::SerializableComponent { static std::shared_ptr<utils::IdGenerator> id_generator_; }; -// Provenance Reporter class ProvenanceReporter { public: - // Constructor - /*! - * Create a new provenance reporter associated with the process session - */ ProvenanceReporter(std::shared_ptr<core::Repository> repo, std::string componentId, std::string componentType) : logger_(core::logging::LoggerFactory<ProvenanceReporter>::getLogger()) { _componentId = componentId; @@ -416,59 +382,45 @@ class ProvenanceReporter { repo_ = repo; } - // Destructor virtual ~ProvenanceReporter() { clear(); } - // Get events - std::set<std::shared_ptr<ProvenanceEventRecord>> getEvents() { + + std::set<std::shared_ptr<ProvenanceEventRecord>> getEvents() const { return _events; } - // Add event + void add(const std::shared_ptr<ProvenanceEventRecord> &event) { _events.insert(event); } - // Remove event + void remove(const std::shared_ptr<ProvenanceEventRecord> &event) { if (_events.find(event) != _events.end()) { _events.erase(event); } } - // - // clear + void clear() { _events.clear(); } - // commit + void commit(); - // create - void create(std::shared_ptr<core::FlowFile> flow, std::string detail); - // route - void route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, std::string detail, std::chrono::milliseconds processingDuration); - // modifyAttributes - void modifyAttributes(std::shared_ptr<core::FlowFile> flow, std::string detail); - // modifyContent - void modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, std::chrono::milliseconds processingDuration); - // clone - void clone(std::shared_ptr<core::FlowFile> parent, std::shared_ptr<core::FlowFile> child); - // join - void join(std::vector<std::shared_ptr<core::FlowFile> > parents, std::shared_ptr<core::FlowFile> child, std::string detail, std::chrono::milliseconds processingDuration); - // fork - void fork(std::vector<std::shared_ptr<core::FlowFile> > child, std::shared_ptr<core::FlowFile> parent, std::string detail, std::chrono::milliseconds processingDuration); - // expire - void expire(std::shared_ptr<core::FlowFile> flow, std::string detail); - // drop - void drop(std::shared_ptr<core::FlowFile> flow, std::string reason); - // send - void send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, std::chrono::milliseconds processingDuration, bool force); - // fetch - void fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, std::chrono::milliseconds processingDuration); - // receive - void receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, std::chrono::milliseconds processingDuration); + void create(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail); + void route(const std::shared_ptr<core::FlowFile>& flow, const core::Relationship& relation, const std::string& detail, std::chrono::milliseconds processingDuration); + void modifyAttributes(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail); + void modifyContent(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail, std::chrono::milliseconds processingDuration); + void clone(const std::shared_ptr<core::FlowFile>& parent, const std::shared_ptr<core::FlowFile>& child); + void join(const std::vector<std::shared_ptr<core::FlowFile>>& parents, const std::shared_ptr<core::FlowFile>& child, const std::string& detail, std::chrono::milliseconds processingDuration); + void fork(const std::vector<std::shared_ptr<core::FlowFile>>& children, const std::shared_ptr<core::FlowFile>& parent, const std::string& detail, std::chrono::milliseconds processingDuration); + void expire(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail); + void drop(const std::shared_ptr<core::FlowFile>& flow, const std::string& reason); + void send(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri, const std::string& detail, std::chrono::milliseconds processingDuration, bool force); + void fetch(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri, const std::string& detail, std::chrono::milliseconds processingDuration); + void receive(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri, + const std::string& sourceSystemFlowFileIdentifier, const std::string& detail, std::chrono::milliseconds processingDuration); protected: - // allocate - std::shared_ptr<ProvenanceEventRecord> allocate(ProvenanceEventRecord::ProvenanceEventType eventType, std::shared_ptr<core::FlowFile> flow) { + std::shared_ptr<ProvenanceEventRecord> allocate(ProvenanceEventRecord::ProvenanceEventType eventType, const std::shared_ptr<core::FlowFile>& flow) { if (repo_->isNoop()) { return nullptr; } @@ -480,16 +432,12 @@ class ProvenanceReporter { return event; } - // Component ID std::string _componentId; - // Component Type std::string _componentType; private: std::shared_ptr<core::logging::Logger> logger_; - // Incoming connection Iterator std::set<std::shared_ptr<ProvenanceEventRecord>> _events; - // provenance repository. std::shared_ptr<core::Repository> repo_; // Prevent default copy constructor and assignment operation @@ -498,8 +446,4 @@ class ProvenanceReporter { ProvenanceReporter &operator=(const ProvenanceReporter &parent); }; -// Provenance Repository - } // namespace org::apache::nifi::minifi::provenance - -#endif // LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCE_H_ diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp new file mode 100644 index 000000000..5ab712be3 --- /dev/null +++ b/libminifi/src/core/Repository.cpp @@ -0,0 +1,46 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "core/Repository.h" + +namespace org::apache::nifi::minifi::core { + +bool Repository::Delete(std::vector<std::shared_ptr<core::SerializableComponent>> &storedValues) { + bool found = true; + for (const auto& storedValue : storedValues) { + found &= Delete(storedValue->getName()); + } + return found; +} + +bool Repository::storeElement(const std::shared_ptr<core::SerializableComponent> element) { + if (!element) { + return false; + } + + org::apache::nifi::minifi::io::BufferStream outStream; + + element->serialize(outStream); + + if (!Put(element->getUUIDStr(), const_cast<uint8_t*>(outStream.getBuffer().as_span<const uint8_t>().data()), outStream.size())) { + logger_->log_error("NiFi Provenance Store event %s size %llu fail", element->getUUIDStr(), outStream.size()); + return false; + } + return true; +} + +} // namespace org::apache::nifi::minifi::core diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp index 950977728..4c25ef5df 100644 --- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp +++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp @@ -17,8 +17,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - #include <vector> #include <queue> #include <map> @@ -164,8 +162,7 @@ void SiteToSiteProvenanceReportingTask::onTrigger(const std::shared_ptr<core::Pr logging::LOG_DEBUG(logger_) << "batch size " << batch_size_ << " records"; size_t deserialized = batch_size_; std::shared_ptr<core::Repository> repo = context->getProvenanceRepository(); - std::function<std::shared_ptr<core::SerializableComponent>()> constructor = []() {return std::make_shared<provenance::ProvenanceEventRecord>();}; - if (!repo->DeSerialize(records, deserialized, constructor) && deserialized == 0) { + if (!repo->getElements(records, deserialized) && deserialized == 0) { return; } logging::LOG_DEBUG(logger_) << "Captured " << deserialized << " records"; diff --git a/libminifi/src/core/repository/VolatileRepository.cpp b/libminifi/src/core/repository/VolatileRepository.cpp index e186d0053..033efffcc 100644 --- a/libminifi/src/core/repository/VolatileRepository.cpp +++ b/libminifi/src/core/repository/VolatileRepository.cpp @@ -25,9 +25,6 @@ namespace org::apache::nifi::minifi::core::repository { -void VolatileRepository::loadComponent(const std::shared_ptr<core::ContentRepository>& /*content_repo*/) { -} - VolatileRepository::~VolatileRepository() { for (auto ent : repo_data_.value_vector) { delete ent; @@ -122,45 +119,4 @@ bool VolatileRepository::Get(const std::string &key, std::string &value) { return false; } -bool VolatileRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, - size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) { - size_t requested_batch = max_size; - max_size = 0; - for (auto ent : repo_data_.value_vector) { - // let the destructor do the cleanup - RepoValue<std::string> repo_value; - - if (ent->getValue(repo_value)) { - std::shared_ptr<core::SerializableComponent> newComponent = lambda(); - // we've taken ownership of this repo value - newComponent->DeSerialize(repo_value.getBuffer()); - store.push_back(newComponent); - repo_data_.current_size -= repo_value.getBuffer().size(); - if (max_size++ >= requested_batch) { - break; - } - } - } - return max_size > 0; -} - -bool VolatileRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) { - logger_->log_debug("VolatileRepository -- DeSerialize %u", repo_data_.current_size.load()); - max_size = 0; - for (auto ent : repo_data_.value_vector) { - // let the destructor do the cleanup - RepoValue<std::string> repo_value; - - if (ent->getValue(repo_value)) { - // we've taken ownership of this repo value - store.at(max_size)->DeSerialize(repo_value.getBuffer()); - repo_data_.current_size -= repo_value.getBuffer().size(); - if (max_size++ >= store.size()) { - break; - } - } - } - return max_size > 0; -} - } // namespace org::apache::nifi::minifi::core::repository diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp index dc705dfc4..c3b9cfefc 100644 --- a/libminifi/src/provenance/Provenance.cpp +++ b/libminifi/src/provenance/Provenance.cpp @@ -23,6 +23,7 @@ #include <string> #include <vector> #include <list> +#include <utility> #include "core/Repository.h" #include "io/BufferStream.h" @@ -40,20 +41,17 @@ const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY + 1] = { "CREAT "ATTRIBUTES_MODIFIED", "ROUTE", "ADDINFO", "REPLAY" }; ProvenanceEventRecord::ProvenanceEventRecord(ProvenanceEventRecord::ProvenanceEventType event, std::string componentId, std::string componentType) - : core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()) { - _eventType = event; - _componentId = componentId; - _componentType = componentType; - _eventTime = std::chrono::system_clock::now(); + : core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()), + _eventType(event), + _componentId(std::move(componentId)), + _componentType(std::move(componentType)), + _eventTime(std::chrono::system_clock::now()) { } -// DeSerialize -bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) { +bool ProvenanceEventRecord::loadFromRepository(const std::shared_ptr<core::Repository> &repo) { std::string value; bool ret; - const std::shared_ptr<core::Repository> repo = std::dynamic_pointer_cast<core::Repository>(store); - if (nullptr == repo || uuid_.isNil()) { logger_->log_error("Repo could not be assigned"); return false; @@ -69,7 +67,7 @@ bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::Serializable org::apache::nifi::minifi::io::BufferStream stream(value); - ret = DeSerialize(stream); + ret = deserialize(stream); if (ret) { logger_->log_debug("NiFi Provenance retrieve event %s size %llu eventType %d success", getUUIDStr(), stream.size(), _eventType); @@ -80,68 +78,68 @@ bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::Serializable return ret; } -bool ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStream& outStream) { +bool ProvenanceEventRecord::serialize(io::OutputStream& output_stream) { { - const auto ret = outStream.write(this->uuid_); + const auto ret = output_stream.write(this->uuid_); if (ret == 0 || io::isError(ret)) { return false; } } { uint32_t eventType = this->_eventType; - const auto ret = outStream.write(eventType); + const auto ret = output_stream.write(eventType); if (ret != 4) { return false; } } { uint64_t event_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(_eventTime.time_since_epoch()).count(); - const auto ret = outStream.write(event_time_ms); + const auto ret = output_stream.write(event_time_ms); if (ret != 8) { return false; } } { uint64_t entry_date_ms = std::chrono::duration_cast<std::chrono::milliseconds>(_entryDate.time_since_epoch()).count(); - const auto ret = outStream.write(entry_date_ms); + const auto ret = output_stream.write(entry_date_ms); if (ret != 8) { return false; } } { uint64_t event_duration_ms = this->_eventDuration.count(); - const auto ret = outStream.write(event_duration_ms); + const auto ret = output_stream.write(event_duration_ms); if (ret != 8) { return false; } } { uint64_t lineage_start_date_ms = std::chrono::duration_cast<std::chrono::milliseconds>(_lineageStartDate.time_since_epoch()).count(); - const auto ret = outStream.write(lineage_start_date_ms); + const auto ret = output_stream.write(lineage_start_date_ms); if (ret != 8) { return false; } } { - const auto ret = outStream.write(this->_componentId); + const auto ret = output_stream.write(this->_componentId); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = outStream.write(this->_componentType); + const auto ret = output_stream.write(this->_componentType); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = outStream.write(this->flow_uuid_); + const auto ret = output_stream.write(this->flow_uuid_); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = outStream.write(this->_details); + const auto ret = output_stream.write(this->_details); if (ret == 0 || io::isError(ret)) { return false; } @@ -149,45 +147,45 @@ bool ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStrea // write flow attributes { const auto numAttributes = gsl::narrow<uint32_t>(this->_attributes.size()); - const auto ret = outStream.write(numAttributes); + const auto ret = output_stream.write(numAttributes); if (ret != 4) { return false; } } for (const auto& itAttribute : _attributes) { { - const auto ret = outStream.write(itAttribute.first); + const auto ret = output_stream.write(itAttribute.first); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = outStream.write(itAttribute.second); + const auto ret = output_stream.write(itAttribute.second); if (ret == 0 || io::isError(ret)) { return false; } } } { - const auto ret = outStream.write(this->_contentFullPath); + const auto ret = output_stream.write(this->_contentFullPath); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = outStream.write(this->_size); + const auto ret = output_stream.write(this->_size); if (ret != 8) { return false; } } { - const auto ret = outStream.write(this->_offset); + const auto ret = output_stream.write(this->_offset); if (ret != 8) { return false; } } { - const auto ret = outStream.write(this->_sourceQueueIdentifier); + const auto ret = output_stream.write(this->_sourceQueueIdentifier); if (ret == 0 || io::isError(ret)) { return false; } @@ -196,44 +194,44 @@ bool ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStrea // write UUIDs { const auto parent_uuids_count = gsl::narrow<uint32_t>(this->_parentUuids.size()); - const auto ret = outStream.write(parent_uuids_count); + const auto ret = output_stream.write(parent_uuids_count); if (ret != 4) { return false; } } for (const auto& parentUUID : _parentUuids) { - const auto ret = outStream.write(parentUUID); + const auto ret = output_stream.write(parentUUID); if (ret == 0 || io::isError(ret)) { return false; } } { const auto children_uuids_count = gsl::narrow<uint32_t>(this->_childrenUuids.size()); - const auto ret = outStream.write(children_uuids_count); + const auto ret = output_stream.write(children_uuids_count); if (ret != 4) { return false; } } for (const auto& childUUID : _childrenUuids) { - const auto ret = outStream.write(childUUID); + const auto ret = output_stream.write(childUUID); if (ret == 0 || io::isError(ret)) { return false; } } } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) { - const auto ret = outStream.write(this->_transitUri); + const auto ret = output_stream.write(this->_transitUri); if (ret == 0 || io::isError(ret)) { return false; } } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { { - const auto ret = outStream.write(this->_transitUri); + const auto ret = output_stream.write(this->_transitUri); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = outStream.write(this->_sourceSystemFlowFileIdentifier); + const auto ret = output_stream.write(this->_sourceSystemFlowFileIdentifier); if (ret == 0 || io::isError(ret)) { return false; } @@ -243,23 +241,9 @@ bool ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStrea return true; } -bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableComponent> &repo) { - org::apache::nifi::minifi::io::BufferStream outStream; - - Serialize(outStream); - - // Persist to the DB - if (!repo->Serialize(getUUIDStr(), const_cast<uint8_t*>(outStream.getBuffer().as_span<const uint8_t>().data()), outStream.size())) { - logger_->log_error("NiFi Provenance Store event %s size %llu fail", getUUIDStr(), outStream.size()); - } - return true; -} - -bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) { - org::apache::nifi::minifi::io::BufferStream outStream(buffer); - +bool ProvenanceEventRecord::deserialize(io::InputStream &input_stream) { { - const auto ret = outStream.read(uuid_); + const auto ret = input_stream.read(uuid_); if (ret == 0 || io::isError(ret)) { return false; } @@ -267,7 +251,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) uint32_t eventType; { - const auto ret = outStream.read(eventType); + const auto ret = input_stream.read(eventType); if (ret != 4) { return false; } @@ -276,7 +260,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType; { uint64_t event_time_in_ms; - const auto ret = outStream.read(event_time_in_ms); + const auto ret = input_stream.read(event_time_in_ms); if (ret != 8) { return false; } @@ -285,7 +269,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) { uint64_t entry_date_in_ms; - const auto ret = outStream.read(entry_date_in_ms); + const auto ret = input_stream.read(entry_date_in_ms); if (ret != 8) { return false; } @@ -294,7 +278,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) { uint64_t event_duration_ms; - const auto ret = outStream.read(event_duration_ms); + const auto ret = input_stream.read(event_duration_ms); if (ret != 8) { return false; } @@ -303,7 +287,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) { uint64_t lineage_start_date_in_ms; - const auto ret = outStream.read(lineage_start_date_in_ms); + const auto ret = input_stream.read(lineage_start_date_in_ms); if (ret != 8) { return false; } @@ -311,28 +295,28 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) } { - const auto ret = outStream.read(this->_componentId); + const auto ret = input_stream.read(this->_componentId); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = outStream.read(this->_componentType); + const auto ret = input_stream.read(this->_componentType); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = outStream.read(this->flow_uuid_); + const auto ret = input_stream.read(this->flow_uuid_); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = outStream.read(this->_details); + const auto ret = input_stream.read(this->_details); if (ret == 0 || io::isError(ret)) { return false; } @@ -341,7 +325,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) // read flow attributes uint32_t numAttributes = 0; { - const auto ret = outStream.read(numAttributes); + const auto ret = input_stream.read(numAttributes); if (ret != 4) { return false; } @@ -350,14 +334,14 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) for (uint32_t i = 0; i < numAttributes; i++) { std::string key; { - const auto ret = outStream.read(key); + const auto ret = input_stream.read(key); if (ret == 0 || io::isError(ret)) { return false; } } std::string value; { - const auto ret = outStream.read(value); + const auto ret = input_stream.read(value); if (ret == 0 || io::isError(ret)) { return false; } @@ -366,28 +350,28 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) } { - const auto ret = outStream.read(this->_contentFullPath); + const auto ret = input_stream.read(this->_contentFullPath); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = outStream.read(this->_size); + const auto ret = input_stream.read(this->_size); if (ret != 8) { return false; } } { - const auto ret = outStream.read(this->_offset); + const auto ret = input_stream.read(this->_offset); if (ret != 8) { return false; } } { - const auto ret = outStream.read(this->_sourceQueueIdentifier); + const auto ret = input_stream.read(this->_sourceQueueIdentifier); if (ret == 0 || io::isError(ret)) { return false; } @@ -397,7 +381,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) // read UUIDs uint32_t number = 0; { - const auto ret = outStream.read(number); + const auto ret = input_stream.read(number); if (ret != 4) { return false; } @@ -406,7 +390,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) for (uint32_t i = 0; i < number; i++) { utils::Identifier parentUUID; { - const auto ret = outStream.read(parentUUID); + const auto ret = input_stream.read(parentUUID); if (ret == 0 || io::isError(ret)) { return false; } @@ -415,7 +399,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) } number = 0; { - const auto ret = outStream.read(number); + const auto ret = input_stream.read(number); if (ret != 4) { return false; } @@ -423,7 +407,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) for (uint32_t i = 0; i < number; i++) { utils::Identifier childUUID; { - const auto ret = outStream.read(childUUID); + const auto ret = input_stream.read(childUUID); if (ret == 0 || io::isError(ret)) { return false; } @@ -432,20 +416,20 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) } } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) { { - const auto ret = outStream.read(this->_transitUri); + const auto ret = input_stream.read(this->_transitUri); if (ret == 0 || io::isError(ret)) { return false; } } } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { { - const auto ret = outStream.read(this->_transitUri); + const auto ret = input_stream.read(this->_transitUri); if (ret == 0 || io::isError(ret)) { return false; } } { - const auto ret = outStream.read(this->_sourceSystemFlowFileIdentifier); + const auto ret = input_stream.read(this->_sourceSystemFlowFileIdentifier); if (ret == 0 || io::isError(ret)) { return false; } @@ -469,14 +453,14 @@ void ProvenanceReporter::commit() { for (auto& event : _events) { std::unique_ptr<io::BufferStream> stramptr(new io::BufferStream()); - event->Serialize(*stramptr); + event->serialize(*stramptr); flowData.emplace_back(event->getUUIDStr(), std::move(stramptr)); } repo_->MultiPut(flowData); } -void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, std::string detail) { +void ProvenanceReporter::create(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail) { auto event = allocate(ProvenanceEventRecord::CREATE, flow); if (event) { @@ -485,7 +469,7 @@ void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, std::strin } } -void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, std::string detail, std::chrono::milliseconds processingDuration) { +void ProvenanceReporter::route(const std::shared_ptr<core::FlowFile>& flow, const core::Relationship& relation, const std::string& detail, std::chrono::milliseconds processingDuration) { auto event = allocate(ProvenanceEventRecord::ROUTE, flow); if (event) { @@ -496,7 +480,7 @@ void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow, core::Relat } } -void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow, std::string detail) { +void ProvenanceReporter::modifyAttributes(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail) { auto event = allocate(ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow); if (event) { @@ -505,7 +489,7 @@ void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow, } } -void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, std::chrono::milliseconds processingDuration) { +void ProvenanceReporter::modifyContent(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail, std::chrono::milliseconds processingDuration) { auto event = allocate(ProvenanceEventRecord::CONTENT_MODIFIED, flow); if (event) { @@ -515,7 +499,7 @@ void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow, std } } -void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent, std::shared_ptr<core::FlowFile> child) { +void ProvenanceReporter::clone(const std::shared_ptr<core::FlowFile>& parent, const std::shared_ptr<core::FlowFile>& child) { auto event = allocate(ProvenanceEventRecord::CLONE, parent); if (event) { @@ -525,15 +509,14 @@ void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent, std::shar } } -void ProvenanceReporter::join(std::vector<std::shared_ptr<core::FlowFile> > parents, std::shared_ptr<core::FlowFile> child, std::string detail, std::chrono::milliseconds processingDuration) { +void ProvenanceReporter::join(const std::vector<std::shared_ptr<core::FlowFile>>& parents, const std::shared_ptr<core::FlowFile>& child, + const std::string& detail, std::chrono::milliseconds processingDuration) { auto event = allocate(ProvenanceEventRecord::JOIN, child); if (event) { event->addChildFlowFile(child); - std::vector<std::shared_ptr<core::FlowFile> >::iterator it; - for (it = parents.begin(); it != parents.end(); it++) { - std::shared_ptr<core::FlowFile> record = *it; - event->addParentFlowFile(record); + for (const auto& parent : parents) { + event->addParentFlowFile(parent); } event->setDetails(detail); event->setEventDuration(processingDuration); @@ -541,15 +524,14 @@ void ProvenanceReporter::join(std::vector<std::shared_ptr<core::FlowFile> > pare } } -void ProvenanceReporter::fork(std::vector<std::shared_ptr<core::FlowFile> > child, std::shared_ptr<core::FlowFile> parent, std::string detail, std::chrono::milliseconds processingDuration) { +void ProvenanceReporter::fork(const std::vector<std::shared_ptr<core::FlowFile>>& children, const std::shared_ptr<core::FlowFile>& parent, + const std::string& detail, std::chrono::milliseconds processingDuration) { auto event = allocate(ProvenanceEventRecord::FORK, parent); if (event) { event->addParentFlowFile(parent); - std::vector<std::shared_ptr<core::FlowFile> >::iterator it; - for (it = child.begin(); it != child.end(); it++) { - std::shared_ptr<core::FlowFile> record = *it; - event->addChildFlowFile(record); + for (const auto& child : children) { + event->addChildFlowFile(child); } event->setDetails(detail); event->setEventDuration(processingDuration); @@ -557,7 +539,7 @@ void ProvenanceReporter::fork(std::vector<std::shared_ptr<core::FlowFile> > chil } } -void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow, std::string detail) { +void ProvenanceReporter::expire(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail) { auto event = allocate(ProvenanceEventRecord::EXPIRE, flow); if (event) { @@ -566,7 +548,7 @@ void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow, std::strin } } -void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow, std::string reason) { +void ProvenanceReporter::drop(const std::shared_ptr<core::FlowFile>& flow, const std::string& reason) { auto event = allocate(ProvenanceEventRecord::DROP, flow); if (event) { @@ -576,7 +558,7 @@ void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow, std::string } } -void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, std::chrono::milliseconds processingDuration, bool force) { +void ProvenanceReporter::send(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri, const std::string& detail, std::chrono::milliseconds processingDuration, bool force) { auto event = allocate(ProvenanceEventRecord::SEND, flow); if (event) { @@ -587,15 +569,15 @@ void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, std::string add(event); } else { if (!repo_->isFull()) - event->Serialize(repo_); + repo_->storeElement(event); } } } -void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow, - std::string transitUri, - std::string sourceSystemFlowFileIdentifier, - std::string detail, +void ProvenanceReporter::receive(const std::shared_ptr<core::FlowFile>& flow, + const std::string& transitUri, + const std::string& sourceSystemFlowFileIdentifier, + const std::string& detail, std::chrono::milliseconds processingDuration) { auto event = allocate(ProvenanceEventRecord::RECEIVE, flow); @@ -608,7 +590,7 @@ void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow, } } -void ProvenanceReporter::fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, std::chrono::milliseconds processingDuration) { +void ProvenanceReporter::fetch(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri, const std::string& detail, std::chrono::milliseconds processingDuration) { auto event = allocate(ProvenanceEventRecord::FETCH, flow); if (event) { diff --git a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp index 7a3f06557..09e3d4e0b 100644 --- a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp +++ b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp @@ -35,128 +35,128 @@ namespace provenance = minifi::provenance; using namespace std::literals::chrono_literals; TEST_CASE("Test Provenance record create", "[Testprovenance::ProvenanceEventRecord]") { - provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah", "blahblah"); - REQUIRE(record1.getAttributes().empty()); - REQUIRE(record1.getAlternateIdentifierUri().length() == 0); + auto record1 = std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah", "blahblah"); + REQUIRE(record1->getAttributes().empty()); + REQUIRE(record1->getAlternateIdentifierUri().length() == 0); } TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { - provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype"); + auto record1 = std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype"); - utils::Identifier eventId = record1.getEventId(); + utils::Identifier eventId = record1->getEventId(); std::string smileyface = ":)"; - record1.setDetails(smileyface); + record1->setDetails(smileyface); auto sample = 65555ms; std::shared_ptr<core::Repository> testRepository = std::make_shared<TestRepository>(); - record1.setEventDuration(sample); - - record1.Serialize(testRepository); - provenance::ProvenanceEventRecord record2; - record2.setEventId(eventId); - REQUIRE(record2.DeSerialize(testRepository) == true); - REQUIRE(record2.getEventId() == record1.getEventId()); - REQUIRE(record2.getComponentId() == record1.getComponentId()); - REQUIRE(record2.getComponentType() == record1.getComponentType()); - REQUIRE(record2.getDetails() == record1.getDetails()); - REQUIRE(record2.getDetails() == smileyface); - REQUIRE(record2.getEventDuration() == sample); + record1->setEventDuration(sample); + + testRepository->storeElement(record1); + auto record2 = std::make_shared<provenance::ProvenanceEventRecord>(); + record2->setEventId(eventId); + REQUIRE(record2->loadFromRepository(testRepository) == true); + REQUIRE(record2->getEventId() == record1->getEventId()); + REQUIRE(record2->getComponentId() == record1->getComponentId()); + REQUIRE(record2->getComponentType() == record1->getComponentType()); + REQUIRE(record2->getDetails() == record1->getDetails()); + REQUIRE(record2->getDetails() == smileyface); + REQUIRE(record2->getEventDuration() == sample); } TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") { - provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype"); - utils::Identifier eventId = record1.getEventId(); + auto record1 = std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype"); + utils::Identifier eventId = record1->getEventId(); std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(); ffr1->setAttribute("potato", "potatoe"); ffr1->setAttribute("tomato", "tomatoe"); - record1.addChildFlowFile(ffr1); + record1->addChildFlowFile(ffr1); auto sample = 65555ms; std::shared_ptr<core::Repository> testRepository = std::make_shared<TestRepository>(); - record1.setEventDuration(sample); - - record1.Serialize(testRepository); - provenance::ProvenanceEventRecord record2; - record2.setEventId(eventId); - REQUIRE(record2.DeSerialize(testRepository) == true); - REQUIRE(record1.getChildrenUuids().size() == 1); - REQUIRE(record2.getChildrenUuids().size() == 1); - utils::Identifier childId = record2.getChildrenUuids().at(0); + record1->setEventDuration(sample); + + testRepository->storeElement(record1); + auto record2 = std::make_shared<provenance::ProvenanceEventRecord>(); + record2->setEventId(eventId); + REQUIRE(record2->loadFromRepository(testRepository) == true); + REQUIRE(record1->getChildrenUuids().size() == 1); + REQUIRE(record2->getChildrenUuids().size() == 1); + utils::Identifier childId = record2->getChildrenUuids().at(0); REQUIRE(childId == ffr1->getUUID()); - record2.removeChildUuid(childId); - REQUIRE(record2.getChildrenUuids().empty()); + record2->removeChildUuid(childId); + REQUIRE(record2->getChildrenUuids().empty()); } TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { - provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype"); + auto record1 = std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype"); - utils::Identifier eventId = record1.getEventId(); + utils::Identifier eventId = record1->getEventId(); std::string smileyface = ":)"; - record1.setDetails(smileyface); + record1->setDetails(smileyface); auto sample = 65555ms; std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>(); testRepository->initialize(nullptr); - record1.setEventDuration(sample); - - record1.Serialize(testRepository); - provenance::ProvenanceEventRecord record2; - record2.setEventId(eventId); - REQUIRE(record2.DeSerialize(testRepository) == true); - REQUIRE(record2.getEventId() == record1.getEventId()); - REQUIRE(record2.getComponentId() == record1.getComponentId()); - REQUIRE(record2.getComponentType() == record1.getComponentType()); - REQUIRE(record2.getDetails() == record1.getDetails()); - REQUIRE(record2.getDetails() == smileyface); - REQUIRE(record2.getEventDuration() == sample); + record1->setEventDuration(sample); + + testRepository->storeElement(record1); + auto record2 = std::make_shared<provenance::ProvenanceEventRecord>(); + record2->setEventId(eventId); + REQUIRE(record2->loadFromRepository(testRepository) == true); + REQUIRE(record2->getEventId() == record1->getEventId()); + REQUIRE(record2->getComponentId() == record1->getComponentId()); + REQUIRE(record2->getComponentType() == record1->getComponentType()); + REQUIRE(record2->getDetails() == record1->getDetails()); + REQUIRE(record2->getDetails() == smileyface); + REQUIRE(record2->getEventDuration() == sample); } TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", "[TestFlowAndProv1]") { - provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype"); - utils::Identifier eventId = record1.getEventId(); + auto record1 = std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype"); + utils::Identifier eventId = record1->getEventId(); std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(); ffr1->setAttribute("potato", "potatoe"); ffr1->setAttribute("tomato", "tomatoe"); - record1.addChildFlowFile(ffr1); + record1->addChildFlowFile(ffr1); auto sample = 65555ms; std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>(); testRepository->initialize(nullptr); - record1.setEventDuration(sample); - - record1.Serialize(testRepository); - provenance::ProvenanceEventRecord record2; - record2.setEventId(eventId); - REQUIRE(record2.DeSerialize(testRepository) == true); - REQUIRE(record1.getChildrenUuids().size() == 1); - REQUIRE(record2.getChildrenUuids().size() == 1); - utils::Identifier childId = record2.getChildrenUuids().at(0); + record1->setEventDuration(sample); + + testRepository->storeElement(record1); + auto record2 = std::make_shared<provenance::ProvenanceEventRecord>(); + record2->setEventId(eventId); + REQUIRE(record2->loadFromRepository(testRepository) == true); + REQUIRE(record1->getChildrenUuids().size() == 1); + REQUIRE(record2->getChildrenUuids().size() == 1); + utils::Identifier childId = record2->getChildrenUuids().at(0); REQUIRE(childId == ffr1->getUUID()); - record2.removeChildUuid(childId); - REQUIRE(record2.getChildrenUuids().empty()); + record2->removeChildUuid(childId); + REQUIRE(record2->getChildrenUuids().empty()); } TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { - provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype"); + auto record1 = std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype"); - utils::Identifier eventId = record1.getEventId(); + utils::Identifier eventId = record1->getEventId(); std::string smileyface = ":)"; - record1.setDetails(smileyface); + record1->setDetails(smileyface); auto sample = 65555ms; std::shared_ptr<core::Repository> testRepository = core::createRepository("nooprepository"); testRepository->initialize(nullptr); - record1.setEventDuration(sample); + record1->setEventDuration(sample); - REQUIRE(record1.Serialize(testRepository) == true); - provenance::ProvenanceEventRecord record2; - record2.setEventId(eventId); - REQUIRE(record2.DeSerialize(testRepository) == false); + REQUIRE(testRepository->storeElement(record1)); + auto record2 = std::make_shared<provenance::ProvenanceEventRecord>(); + record2->setEventId(eventId); + REQUIRE(record2->loadFromRepository(testRepository) == false); } diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 41ca154a8..400e0e91d 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -35,6 +35,7 @@ #include "properties/Configure.h" #include "provenance/Provenance.h" #include "SwapManager.h" +#include "io/BufferStream.h" using namespace std::literals::chrono_literals; @@ -73,10 +74,6 @@ class TestRepositoryBase : public T_BaseRepository { return true; } - bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) override { - return Put(key, buffer, bufferSize); - } - bool Delete(const std::string& key) override { std::lock_guard<std::mutex> lock{repository_results_mutex_}; repository_results_.erase(key); @@ -94,11 +91,7 @@ class TestRepositoryBase : public T_BaseRepository { } } - bool Serialize(std::vector<std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent>>& /*store*/, size_t /*max_size*/) override { - return false; - } - - bool DeSerialize(std::vector<std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent>> &store, size_t &max_size) override { + bool getElements(std::vector<std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent>> &store, size_t &max_size) override { std::lock_guard<std::mutex> lock{repository_results_mutex_}; max_size = 0; for (const auto &entry : repository_results_) { @@ -106,27 +99,20 @@ class TestRepositoryBase : public T_BaseRepository { break; } const auto eventRead = store.at(max_size); - eventRead->DeSerialize(gsl::make_span(entry.second).template as_span<const std::byte>()); + org::apache::nifi::minifi::io::BufferStream stream(gsl::make_span(entry.second).template as_span<const std::byte>()); + eventRead->deserialize(stream); ++max_size; } return true; } - bool Serialize(const std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent>& /*store*/) override { - return false; - } - - bool DeSerialize(const std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent> &store) override { - std::string value; - Get(store->getUUIDStr(), value); - store->DeSerialize(gsl::make_span(value).as_span<const std::byte>()); + bool storeElement(const std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent> element) override { + org::apache::nifi::minifi::io::BufferStream outStream; + element->serialize(outStream); + Put(element->getUUIDStr(), const_cast<uint8_t*>(outStream.getBuffer().as_span<const uint8_t>().data()), outStream.size()); return true; } - bool DeSerialize(gsl::span<const std::byte>) override { - return false; - } - std::map<std::string, std::string> getRepoMap() const { std::lock_guard<std::mutex> lock{repository_results_mutex_}; return repository_results_; @@ -199,9 +185,6 @@ class TestFlowRepository : public org::apache::nifi::minifi::core::ThreadedRepos } } - void loadComponent(const std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>& /*content_repo*/) override { - } - private: void run() override { }
