http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/repository/FlowFileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/FlowFileRepository.h b/libminifi/include/core/repository/FlowFileRepository.h deleted file mode 100644 index f2691ac..0000000 --- a/libminifi/include/core/repository/FlowFileRepository.h +++ /dev/null @@ -1,167 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_ -#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_ - -#include "leveldb/db.h" -#include "leveldb/options.h" -#include "leveldb/slice.h" -#include "leveldb/status.h" -#include "core/Repository.h" -#include "core/Core.h" -#include "Connection.h" -#include "core/logging/LoggerConfiguration.h" -#include "concurrentqueue.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace core { -namespace repository { - -#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository" -#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M -#define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute -#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2000) // 2000 msec - -/** - * Flow File repository - * Design: Extends Repository and implements the run function, using LevelDB as the primary substrate. - */ -class FlowFileRepository : public core::Repository, public std::enable_shared_from_this<FlowFileRepository> { - public: - // Constructor - - FlowFileRepository(const std::string repo_name = "", std::string directory = FLOWFILE_REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, - int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD) - : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<FlowFileRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod), - logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()), - content_repo_(nullptr) { - db_ = NULL; - } - - // Destructor - ~FlowFileRepository() { - if (db_) - delete db_; - } - - virtual void flush(); - - // initialize - virtual bool initialize(const std::shared_ptr<Configure> &configure) { - std::string value; - - if (configure->get(Configure::nifi_flowfile_repository_directory_default, value)) { - directory_ = value; - } - logger_->log_info("NiFi FlowFile Repository Directory %s", directory_.c_str()); - if (configure->get(Configure::nifi_flowfile_repository_max_storage_size, value)) { - Property::StringToInt(value, max_partition_bytes_); - } - logger_->log_info("NiFi FlowFile Max Partition Bytes %d", max_partition_bytes_); - if (configure->get(Configure::nifi_flowfile_repository_max_storage_time, value)) { - TimeUnit unit; - if (Property::StringToTime(value, max_partition_millis_, unit) && Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) { - } - } - logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", max_partition_millis_); - leveldb::Options options; - options.create_if_missing = true; - leveldb::Status status = leveldb::DB::Open(options, directory_.c_str(), &db_); - if (status.ok()) { - logger_->log_info("NiFi FlowFile Repository database open %s success", directory_.c_str()); - } else { - logger_->log_error("NiFi FlowFile Repository database open %s fail", directory_.c_str()); - return false; - } - return true; - } - - virtual void run(); - - virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) { - - // persistent to the DB - leveldb::Slice value((const char *) buf, bufLen); - leveldb::Status status; - repo_size_ += bufLen; - status = db_->Put(leveldb::WriteOptions(), key, value); - if (status.ok()) - return true; - else - return false; - } - /** - * - * Deletes the key - * @return status of the delete operation - */ - virtual bool Delete(std::string key) { - keys_to_delete.enqueue(key); - return true; - } - /** - * Sets the value from the provided key - * @return status of the get operation. - */ - virtual bool Get(const std::string &key, std::string &value) { - if (db_ == nullptr) - return false; - leveldb::Status status; - status = db_->Get(leveldb::ReadOptions(), key, &value); - if (status.ok()) - return true; - else - return false; - } - - void setConnectionMap(std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap) { - this->connectionMap = connectionMap; - } - virtual void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo); - - void start() { - if (this->purge_period_ <= 0) { - return; - } - if (running_) { - return; - } - running_ = true; - thread_ = std::thread(&FlowFileRepository::run, shared_from_this()); - logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); - } - - private: - moodycamel::ConcurrentQueue<std::string> keys_to_delete; - std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap; - std::shared_ptr<core::ContentRepository> content_repo_; - leveldb::DB* db_; - std::shared_ptr<logging::Logger> logger_; -}; - -} /* namespace repository */ -} /* namespace core */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/repository/VolatileContentRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h index 8507216..623f7be 100644 --- a/libminifi/include/core/repository/VolatileContentRepository.h +++ b/libminifi/include/core/repository/VolatileContentRepository.h @@ -24,6 +24,7 @@ #include "../ContentRepository.h" #include "core/repository/VolatileRepository.h" #include "properties/Configure.h" +#include "core/Connectable.h" #include "core/logging/LoggerConfiguration.h" namespace org { namespace apache { @@ -36,13 +37,14 @@ namespace repository { * Purpose: Stages content into a volatile area of memory. Note that when the maximum number * of entries is consumed we will rollback a session to wait for others to be freed. */ -class VolatileContentRepository : public core::ContentRepository, public core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>> { +class VolatileContentRepository : public core::ContentRepository, public virtual core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>> { public: static const char *minimal_locking; explicit VolatileContentRepository(std::string name = getClassName<VolatileContentRepository>()) : core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>>(name), + core::SerializableComponent(name,0), logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()), minimize_locking_(true) { max_count_ = 15000; @@ -100,13 +102,14 @@ class VolatileContentRepository : public core::ContentRepository, public core::r virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim); protected: + virtual void start(); virtual void run(); template<typename T2> std::shared_ptr<T2> shared_from_parent() { - return std::static_pointer_cast<T2>(shared_from_this()); + return std::dynamic_pointer_cast<T2>(shared_from_this()); } private: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/repository/VolatileFlowFileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h index 0e75580..1a6be6b 100644 --- a/libminifi/include/core/repository/VolatileFlowFileRepository.h +++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h @@ -19,6 +19,7 @@ #define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEFLOWFILEREPOSITORY_H_ #include "VolatileRepository.h" +#include "FlowFileRecord.h" namespace org { namespace apache { @@ -36,7 +37,8 @@ class VolatileFlowFileRepository : public VolatileRepository<std::string> { explicit VolatileFlowFileRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) - : VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod) + : core::SerializableComponent(repo_name, 0), + VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod) { purge_required_ = true; @@ -50,8 +52,11 @@ class VolatileFlowFileRepository : public VolatileRepository<std::string> { if (purge_required_ && nullptr != content_repo_) { std::lock_guard<std::mutex> lock(purge_mutex_); for (auto purgeItem : purge_list_) { - std::shared_ptr<minifi::ResourceClaim> newClaim = std::make_shared<minifi::ResourceClaim>(purgeItem, content_repo_, true); - content_repo_->remove(newClaim); + std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_); + if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(purgeItem.data()), purgeItem.size())) { + std::shared_ptr<minifi::ResourceClaim> newClaim = eventRead->getResourceClaim(); + content_repo_->remove(newClaim); + } } purge_list_.resize(0); purge_list_.clear(); @@ -66,6 +71,13 @@ class VolatileFlowFileRepository : public VolatileRepository<std::string> { protected: + virtual void emplace(RepoValue<std::string> &old_value) { + std::string buffer; + old_value.emplace(buffer); + std::lock_guard<std::mutex> lock(purge_mutex_); + purge_list_.push_back(buffer); + } + std::shared_ptr<core::ContentRepository> content_repo_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/repository/VolatileProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileProvenanceRepository.h b/libminifi/include/core/repository/VolatileProvenanceRepository.h index 2717510..cabb76f 100644 --- a/libminifi/include/core/repository/VolatileProvenanceRepository.h +++ b/libminifi/include/core/repository/VolatileProvenanceRepository.h @@ -36,7 +36,7 @@ class VolatileProvenanceRepository : public VolatileRepository<std::string> { explicit VolatileProvenanceRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) - : VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod) + : core::SerializableComponent(repo_name, 0),VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod) { purge_required_ = false; @@ -45,6 +45,10 @@ class VolatileProvenanceRepository : public VolatileRepository<std::string> { virtual void run() { repo_full_ = false; } + protected: + virtual void emplace(RepoValue<std::string> &old_value) { + purge_list_.push_back(old_value.getKey()); + } private: }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/repository/VolatileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h index da6608c..3b5d5e1 100644 --- a/libminifi/include/core/repository/VolatileRepository.h +++ b/libminifi/include/core/repository/VolatileRepository.h @@ -37,7 +37,7 @@ namespace repository { /** * Flow File repository - * Design: Extends Repository and implements the run function, using LevelDB as the primary substrate. + * Design: Extends Repository and implements the run function, using RocksDB as the primary substrate. */ template<typename T> class VolatileRepository : public core::Repository, public std::enable_shared_from_this<VolatileRepository<T>> { @@ -50,7 +50,8 @@ class VolatileRepository : public core::Repository, public std::enable_shared_fr explicit VolatileRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) - : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod), + : core::SerializableComponent(repo_name, 0), + Repository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod), max_size_(maxPartitionBytes * 0.75), current_index_(0), max_count_(10000), @@ -121,6 +122,11 @@ class VolatileRepository : public core::Repository, public std::enable_shared_fr protected: + virtual void emplace(RepoValue<T> &old_value) { + std::lock_guard<std::mutex> lock(purge_mutex_); + purge_list_.push_back(old_value.getKey()); + } + /** * Tests whether or not the current size exceeds the capacity * if the new prospectiveSize is inserted. @@ -208,7 +214,7 @@ bool VolatileRepository<T>::initialize(const std::shared_ptr<Configure> &configu } logger_->log_info("Resizing value_vector_ for %s count is %d", getName(), max_count_); - logger_->log_info("Using a maximum size of %u", max_size_); + logger_->log_info("Using a maximum size for %s of %u", getName(), max_size_); value_vector_.reserve(max_count_); for (int i = 0; i < max_count_; i++) { value_vector_.emplace_back(new AtomicEntry<T>(¤t_size_, &max_size_)); @@ -245,7 +251,7 @@ bool VolatileRepository<T>::Put(T key, const uint8_t *buf, size_t bufLen) { logger_->log_debug("Set repo value at %d out of %d updated %d current_size %d, adding %d to %d", private_index, max_count_, updated == true, reclaimed_size, size, current_size_.load()); if (updated && reclaimed_size > 0) { std::lock_guard<std::mutex> lock(mutex_); - purge_list_.push_back(old_value.getKey()); + emplace(old_value); } if (reclaimed_size > 0) { /** @@ -270,11 +276,14 @@ bool VolatileRepository<T>::Put(T key, const uint8_t *buf, size_t bufLen) { */ template<typename T> bool VolatileRepository<T>::Delete(T key) { + logger_->log_debug("Delete from volatile"); for (auto ent : value_vector_) { // let the destructor do the cleanup RepoValue<T> value; if (ent->getValue(key, value)) { current_size_ -= value.size(); + logger_->log_debug("Delete and pushed into purge_list from volatile"); + emplace(value); return true; } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/properties/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index f71ce9c..0d7bb55 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -51,6 +51,7 @@ class Configure : public Properties { static const char *nifi_provenance_repository_directory_default; static const char *nifi_provenance_repository_enable; static const char *nifi_flowfile_repository_max_storage_time; + static const char *nifi_dbcontent_repository_directory_default; static const char *nifi_flowfile_repository_max_storage_size; static const char *nifi_flowfile_repository_directory_default; static const char *nifi_flowfile_repository_enable; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/provenance/ProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h deleted file mode 100644 index 53f489f..0000000 --- a/libminifi/include/provenance/ProvenanceRepository.h +++ /dev/null @@ -1,250 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ -#define LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ - -#include "leveldb/db.h" -#include "leveldb/options.h" -#include "leveldb/slice.h" -#include "leveldb/status.h" -#include "core/Repository.h" -#include "core/Core.h" -#include "provenance/Provenance.h" -#include "core/logging/LoggerConfiguration.h" -#include "concurrentqueue.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace provenance { - -#define PROVENANCE_DIRECTORY "./provenance_repository" -#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024) // 10M -#define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute -#define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec - -class ProvenanceRepository : public core::Repository, public std::enable_shared_from_this<ProvenanceRepository> { - public: - // Constructor - /*! - * Create a new provenance repository - */ - ProvenanceRepository(const std::string repo_name = "", std::string directory = PROVENANCE_DIRECTORY, int64_t maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = - MAX_PROVENANCE_STORAGE_SIZE, - uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD) - : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod), - logger_(logging::LoggerFactory<ProvenanceRepository>::getLogger()) { - db_ = NULL; - } - - // Destructor - virtual ~ProvenanceRepository() { - if (db_) - delete db_; - } - - virtual void flush(); - - void start() { - if (this->purge_period_ <= 0) - return; - if (running_) - return; - running_ = true; - thread_ = std::thread(&ProvenanceRepository::run, shared_from_this()); - logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); - } - - // initialize - virtual bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) { - std::string value; - if (config->get(Configure::nifi_provenance_repository_directory_default, value)) { - directory_ = value; - } - logger_->log_info("NiFi Provenance Repository Directory %s", directory_.c_str()); - if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) { - core::Property::StringToInt(value, max_partition_bytes_); - } - logger_->log_info("NiFi Provenance Max Partition Bytes %d", max_partition_bytes_); - if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) { - core::TimeUnit unit; - if (core::Property::StringToTime(value, max_partition_millis_, unit) && core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) { - } - } - logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", max_partition_millis_); - leveldb::Options options; - options.create_if_missing = true; - leveldb::Status status = leveldb::DB::Open(options, directory_.c_str(), &db_); - if (status.ok()) { - logger_->log_info("NiFi Provenance Repository database open %s success", directory_.c_str()); - } else { - logger_->log_error("NiFi Provenance Repository database open %s fail", directory_.c_str()); - return false; - } - - return true; - } - // Put - virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) { - - if (repo_full_) { - return false; - } - - // persistent to the DB - leveldb::Slice value((const char *) buf, bufLen); - leveldb::Status status; - repo_size_ += bufLen; - status = db_->Put(leveldb::WriteOptions(), key, value); - if (status.ok()) - return true; - else - return false; - } - // Delete - virtual bool Delete(std::string key) { - keys_to_delete.enqueue(key); - return true; - } - // Get - virtual bool Get(const std::string &key, std::string &value) { - leveldb::Status status; - status = db_->Get(leveldb::ReadOptions(), key, &value); - if (status.ok()) - return true; - else - return false; - } - - // Remove event - void removeEvent(ProvenanceEventRecord *event) { - Delete(event->getEventId()); - } - - virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) { - return Put(key, buffer, bufferSize); - } - - virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size) { - leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); - for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); - std::string key = it->key().ToString(); - if (store.size() >= max_size) - break; - if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) { - store.push_back(std::dynamic_pointer_cast<core::CoreComponent>(eventRead)); - } - } - delete it; - return true; - } - - virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) { - leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); - size_t requested_batch = max_size; - max_size = 0; - for (it->SeekToFirst(); it->Valid(); it->Next()) { - - if (max_size >= requested_batch) - break; - std::shared_ptr<core::SerializableComponent> eventRead = lambda(); - std::string key = it->key().ToString(); - if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) { - max_size++; - records.push_back(eventRead); - } - - } - delete it; - - if (max_size > 0) { - return true; - } else { - return false; - } - } - //! get record - void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize) { - leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); - for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); - std::string key = it->key().ToString(); - if (records.size() >= maxSize) - break; - if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) { - records.push_back(eventRead); - } - } - delete it; - } - - virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) { - leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); - max_size = 0; - for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); - std::string key = it->key().ToString(); - - if (store.at(max_size)->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) { - max_size++; - } - if (store.size() >= max_size) - break; - } - delete it; - if (max_size > 0) { - return true; - } else { - return false; - } - } - //! purge record - void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records) { - for (auto record : records) { - Delete(record->getEventId()); - } - flush(); - } - // destroy - void destroy() { - if (db_) { - delete db_; - db_ = NULL; - } - } - // Run function for the thread - void run(); - - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ProvenanceRepository(const ProvenanceRepository &parent) = delete; - ProvenanceRepository &operator=(const ProvenanceRepository &parent) = delete; - - private: - moodycamel::ConcurrentQueue<std::string> keys_to_delete; - leveldb::DB* db_; - std::shared_ptr<logging::Logger> logger_; -}; - -} /* namespace provenance */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ -#endif /* LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ */ - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index 9dcd6c6..db9840e 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -44,6 +44,7 @@ const char *Configure::nifi_provenance_repository_directory_default = "nifi.prov const char *Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfile.repository.max.storage.size"; const char *Configure::nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time"; const char *Configure::nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default"; +const char *Configure::nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default"; const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure"; const char *Configure::nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth"; const char *Configure::nifi_security_client_certificate = "nifi.security.client.certificate"; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 6485e6c..28294c9 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -49,7 +49,7 @@ #include "SchedulingAgent.h" #include "core/controller/ControllerServiceProvider.h" #include "core/logging/LoggerConfiguration.h" -#include "core/repository/FlowFileRepository.h" +#include "core/Connectable.h" namespace org { namespace apache { @@ -306,15 +306,12 @@ void FlowController::reload(std::string yamlFile) { void FlowController::loadFlowRepo() { if (this->flow_file_repo_ != nullptr) { logger_->log_debug("Getting connection map"); - std::map<std::string, std::shared_ptr<Connection>> connectionMap; + std::map<std::string, std::shared_ptr<core::Connectable>> connectionMap; if (this->root_ != nullptr) { this->root_->getConnections(connectionMap); } logger_->log_debug("Number of connections from connectionMap %d", connectionMap.size()); - auto rep = std::dynamic_pointer_cast<core::repository::FlowFileRepository>(flow_file_repo_); - if (nullptr != rep) { - rep->setConnectionMap(connectionMap); - } + flow_file_repo_->setConnectionMap(connectionMap); flow_file_repo_->loadComponent(content_repo_); } else { logger_->log_debug("Flow file repository is not set"); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/FlowFileRecord.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index b97f290..c89a00b 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -77,7 +77,7 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository entry_date_ = event->getEntryDate(); lineage_start_date_ = event->getlineageStartDate(); lineage_Identifiers_ = event->getlineageIdentifiers(); - uuid_str_ = event->getUUIDStr(); + uuidStr_ = event->getUUIDStr(); attributes_ = event->getAttributes(); size_ = event->getSize(); offset_ = event->getOffset(); @@ -99,16 +99,16 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository FlowFileRecord::~FlowFileRecord() { if (!snapshot_) - logger_->log_debug("Delete FlowFile UUID %s", uuid_str_.c_str()); + logger_->log_debug("Delete FlowFile UUID %s", uuidStr_.c_str()); else - logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuid_str_.c_str()); + logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_.c_str()); if (claim_) { // Decrease the flow file record owned count for the resource claim claim_->decreaseFlowFileRecordOwnedCount(); std::string value; if (claim_->getFlowFileRecordOwnedCount() <= 0) { // we cannot rely on the stored variable here since we - if (flow_repository_ != nullptr && !flow_repository_->Get(uuid_str_, value)) { + if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, value)) { logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath().c_str()); content_repo_->remove(claim_); } @@ -181,9 +181,9 @@ bool FlowFileRecord::DeSerialize(std::string key) { ret = DeSerialize(stream); if (ret) { - logger_->log_debug("NiFi FlowFile retrieve uuid %s size %d connection %s success", uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str()); + logger_->log_debug("NiFi FlowFile retrieve uuid %s size %d connection %s success", uuidStr_.c_str(), stream.getSize(), uuid_connection_.c_str()); } else { - logger_->log_debug("NiFi FlowFile retrieve uuid %s size %d connection %d fail", uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str()); + logger_->log_debug("NiFi FlowFile retrieve uuid %s size %d connection %d fail", uuidStr_.c_str(), stream.getSize(), uuid_connection_.c_str()); } return ret; @@ -209,7 +209,7 @@ bool FlowFileRecord::Serialize() { return false; } - ret = writeUTF(this->uuid_str_, &outStream); + ret = writeUTF(this->uuidStr_, &outStream); if (ret <= 0) { return false; } @@ -251,11 +251,11 @@ bool FlowFileRecord::Serialize() { return false; } - if (flow_repository_->Put(uuid_str_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) { - logger_->log_debug("NiFi FlowFile Store event %s size %d success", uuid_str_.c_str(), outStream.getSize()); + if (flow_repository_->Put(uuidStr_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) { + logger_->log_debug("NiFi FlowFile Store event %s size %d success", uuidStr_.c_str(), outStream.getSize()); return true; } else { - logger_->log_error("NiFi FlowFile Store event %s size %d fail", uuid_str_.c_str(), outStream.getSize()); + logger_->log_error("NiFi FlowFile Store event %s size %d fail", uuidStr_.c_str(), outStream.getSize()); return false; } @@ -282,7 +282,7 @@ bool FlowFileRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) { return false; } - ret = readUTF(this->uuid_str_, &outStream); + ret = readUTF(this->uuidStr_, &outStream); if (ret <= 0) { return false; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/core/FlowConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 16a9778..68b7520 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -29,12 +29,11 @@ namespace minifi { namespace core { std::vector<std::string> FlowConfiguration::statics_sl_funcs_; +std::mutex FlowConfiguration::atomic_initialization_; FlowConfiguration::~FlowConfiguration() { } - - std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, uuid_t uuid) { auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(name, uuid); if (nullptr == ptr) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/core/FlowFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp index d08ea4b..f5d8754 100644 --- a/libminifi/src/core/FlowFile.cpp +++ b/libminifi/src/core/FlowFile.cpp @@ -33,7 +33,8 @@ std::shared_ptr<utils::IdGenerator> FlowFile::id_generator_ = utils::IdGenerator std::shared_ptr<logging::Logger> FlowFile::logger_ = logging::LoggerFactory<FlowFile>::getLogger(); FlowFile::FlowFile() - : size_(0), + : Connectable("FlowFile", 0), + size_(0), id_(0), stored(false), offset_(0), @@ -47,14 +48,6 @@ FlowFile::FlowFile() entry_date_ = getTimeMillis(); event_time_ = entry_date_; lineage_start_date_ = entry_date_; - - char uuidStr[37] = { 0 }; - - // Generate the global UUID for the flow record - id_generator_->generate(uuid_); - - uuid_unparse_lower(uuid_, uuidStr); - uuid_str_ = uuidStr; } FlowFile::~FlowFile() { @@ -74,7 +67,7 @@ FlowFile& FlowFile::operator=(const FlowFile& other) { claim_ = other.claim_; if (claim_ != nullptr) this->claim_->increaseFlowFileRecordOwnedCount(); - uuid_str_ = other.uuid_str_; + uuidStr_ = other.uuidStr_; connection_ = other.connection_; original_connection_ = other.original_connection_; return *this; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/core/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index a537f1a..5915089 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -259,6 +259,16 @@ void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connecti } } +void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap) { + for (auto connection : connections_) { + connectionMap[connection->getUUIDStr()] = connection; + connectionMap[connection->getName()] = connection; + } + for (auto processGroup : child_process_groups_) { + processGroup->getConnections(connectionMap); + } +} + void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) { std::lock_guard<std::recursive_mutex> lock(mutex_); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/core/Repository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp index be21b16..575f694 100644 --- a/libminifi/src/core/Repository.cpp +++ b/libminifi/src/core/Repository.cpp @@ -20,7 +20,6 @@ #include <cstdint> #include <vector> -#include "../../include/core/repository/FlowFileRepository.h" #include "io/DataStream.h" #include "io/Serializable.h" #include "core/Relationship.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/core/RepositoryFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp index b25e87c..d753a90 100644 --- a/libminifi/src/core/RepositoryFactory.cpp +++ b/libminifi/src/core/RepositoryFactory.cpp @@ -17,52 +17,45 @@ #include "core/RepositoryFactory.h" #include <memory> #include <string> +#include <utility> #include <algorithm> #include "core/ContentRepository.h" -#include "core/repository/FileSystemRepository.h" #include "core/repository/VolatileContentRepository.h" #include "core/Repository.h" -#ifdef LEVELDB_SUPPORT -#include "core/repository/FlowFileRepository.h" -#include "provenance/ProvenanceRepository.h" -#endif - -#include "core/repository/VolatileProvenanceRepository.h" +#include "core/ClassLoader.h" +#include "core/repository/FileSystemRepository.h" #include "core/repository/VolatileFlowFileRepository.h" +#include "core/repository/VolatileProvenanceRepository.h" namespace org { namespace apache { namespace nifi { namespace minifi { -#ifndef LEVELDB_SUPPORT -namespace provenance { -class ProvenanceRepository; -} -#endif namespace core { -#ifndef LEVELDB_SUPPORT -class FlowFileRepository; -#endif - std::shared_ptr<core::Repository> createRepository(const std::string configuration_class_name, bool fail_safe, const std::string repo_name) { std::shared_ptr<core::Repository> return_obj = nullptr; std::string class_name_lc = configuration_class_name; std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower); try { std::shared_ptr<core::Repository> return_obj = nullptr; - if (class_name_lc == "flowfilerepository") { - return_obj = instantiate<core::repository::FlowFileRepository>(repo_name); - } else if (class_name_lc == "provenancerepository") { - return_obj = instantiate<provenance::ProvenanceRepository>(repo_name); - } else if (class_name_lc == "volatileflowfilerepository") { + + auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate<core::Repository>(class_name_lc, class_name_lc); + if (nullptr != ptr) { + return_obj = ptr; + } + + if (return_obj) { + return return_obj; + } + // if the desired repos don't exist, we can try doing string matches and reoly on volatile repositories + if (class_name_lc == "flowfilerepository" || class_name_lc == "volatileflowfilerepository") { return_obj = instantiate<repository::VolatileFlowFileRepository>(repo_name); - } else if (class_name_lc == "volatileprovenancefilerepository") { + } else if (class_name_lc == "provenancerepository" || class_name_lc == "volatileprovenancefilerepository") { return_obj = instantiate<repository::VolatileProvenanceRepository>(repo_name); } else if (class_name_lc == "nooprepository") { return_obj = instantiate<core::Repository>(repo_name); } - if (return_obj) { return return_obj; } @@ -86,23 +79,27 @@ std::shared_ptr<core::ContentRepository> createContentRepository(const std::stri std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower); try { std::shared_ptr<core::ContentRepository> return_obj = nullptr; - if (class_name_lc == "volatilecontentrepository") { - return_obj = instantiate<core::repository::VolatileContentRepository>(repo_name); - } else { - return_obj = instantiate<core::repository::FileSystemRepository>(repo_name); - } + auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate<core::ContentRepository>(class_name_lc, class_name_lc); + if (nullptr != ptr) { + return_obj = ptr; + } if (return_obj) { return return_obj; } + if (class_name_lc == "volatilecontentrepository") { + return std::make_shared<core::repository::VolatileContentRepository>(repo_name); + } else if (class_name_lc == "filesystemrepository") { + return std::make_shared<core::repository::FileSystemRepository>(repo_name); + } if (fail_safe) { - return std::make_shared<core::repository::FileSystemRepository>("fail_safe"); + return std::make_shared<core::repository::VolatileContentRepository>("fail_safe"); } else { throw std::runtime_error("Support for the provided configuration class could not be found"); } } catch (const std::runtime_error &r) { if (fail_safe) { - return std::make_shared<core::repository::FileSystemRepository>("fail_safe"); + return std::make_shared<core::repository::VolatileContentRepository>("fail_safe"); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/core/repository/FlowFileRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp deleted file mode 100644 index 3ed7fbf..0000000 --- a/libminifi/src/core/repository/FlowFileRepository.cpp +++ /dev/null @@ -1,140 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "core/repository/FlowFileRepository.h" -#include "leveldb/write_batch.h" -#include <memory> -#include <string> -#include <utility> -#include <vector> -#include "FlowFileRecord.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace core { -namespace repository { - -void FlowFileRepository::flush() { - leveldb::WriteBatch batch; - std::string key; - std::string value; - leveldb::ReadOptions options; - - std::vector<std::shared_ptr<FlowFileRecord>> purgeList; - - uint64_t decrement_total = 0; - while (keys_to_delete.size_approx() > 0) { - if (keys_to_delete.try_dequeue(key)) { - db_->Get(options, key, &value); - decrement_total += value.size(); - std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_); - if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(value.data()), value.size())) { - purgeList.push_back(eventRead); - } - logger_->log_info("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath()); - batch.Delete(key); - } - } - if (db_->Write(leveldb::WriteOptions(), &batch).ok()) { - logger_->log_info("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load()); - if (decrement_total > repo_size_.load()) { - repo_size_ = 0; - } else { - repo_size_ -= decrement_total; - } - } - - if (nullptr != content_repo_) { - for (const auto &ffr : purgeList) { - auto claim = ffr->getResourceClaim(); - if (claim != nullptr) { - content_repo_->removeIfOrphaned(claim); - } - } - } -} - -void FlowFileRepository::run() { - // threshold for purge - uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4; - - while (running_) { - std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); - uint64_t curTime = getTimeMillis(); - - flush(); - - uint64_t size = getRepoSize(); - - if (size > max_partition_bytes_) - repo_full_ = true; - else - repo_full_ = false; - } -} - -void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { - content_repo_ = content_repo; - std::vector<std::pair<std::string, uint64_t>> purgeList; - leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); - - repo_size_ = 0; - for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_); - std::string key = it->key().ToString(); - repo_size_ += it->value().size(); - if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) { - logger_->log_info("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath()); - auto search = connectionMap.find(eventRead->getConnectionUuid()); - if (search != connectionMap.end()) { - // we find the connection for the persistent flowfile, create the flowfile and enqueue that - std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead); - eventRead->setStoredToRepository(true); - search->second->put(eventRead); - } else { - logger_->log_info("Could not find connectinon for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath()); - if (eventRead->getContentFullPath().length() > 0) { - if (nullptr != eventRead->getResourceClaim()) { - content_repo_->remove(eventRead->getResourceClaim()); - } - } - purgeList.push_back(std::make_pair(key, it->value().size())); - } - } else { - purgeList.push_back(std::make_pair(key, it->value().size())); - } - } - - delete it; - for (auto eventId : purgeList) { - logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.first.c_str()); - if (Delete(eventId.first)) { - repo_size_ -= eventId.second; - } - } - - return; -} - -} /* namespace repository */ -} /* namespace core */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/core/repository/VolatileContentRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp index 65f1cf9..d3e696b 100644 --- a/libminifi/src/core/repository/VolatileContentRepository.cpp +++ b/libminifi/src/core/repository/VolatileContentRepository.cpp @@ -83,11 +83,11 @@ void VolatileContentRepository::start() { thread_ = std::thread(&VolatileContentRepository::run, shared_from_parent<VolatileContentRepository>()); thread_.detach(); running_ = true; - logger_->log_info("%s Repository Monitor Thread Start", name_); + logger_->log_info("%s Repository Monitor Thread Start", getName()); } std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim) { - logger_->log_debug("enter write"); + logger_->log_debug("enter write for %s", claim->getContentFullPath()); { std::lock_guard<std::mutex> lock(map_mutex_); auto claim_check = master_list_.find(claim->getContentFullPath()); @@ -133,7 +133,7 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shar } bool VolatileContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &claim) { - logger_->log_debug("enter exists"); + logger_->log_debug("enter exists for %s", claim->getContentFullPath()); int size = 0; { std::lock_guard<std::mutex> lock(map_mutex_); @@ -150,7 +150,7 @@ bool VolatileContentRepository::exists(const std::shared_ptr<minifi::ResourceCla } std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) { - logger_->log_debug("enter read"); + logger_->log_debug("enter read for %s", claim->getContentFullPath()); int size = 0; { std::lock_guard<std::mutex> lock(map_mutex_); @@ -175,19 +175,28 @@ bool VolatileContentRepository::remove(const std::shared_ptr<minifi::ResourceCla if (ent != master_list_.end()) { // if we cannot remove the entry we will let the owner's destructor // decrement the reference count and free it + master_list_.erase(claim->getContentFullPath()); if (ent->second->freeValue(claim)) { logger_->log_debug("removed %s", claim->getContentFullPath()); + logger_->log_debug("Remove for %s, reduced to %d", claim->getContentFullPath(), current_size_.load()); return true; + } else { + logger_->log_debug("free failed for %s", claim->getContentFullPath()); } - master_list_.erase(claim->getContentFullPath()); + } else { + logger_->log_debug("Could not remove for %s, size is %d", claim->getContentFullPath(), current_size_.load()); } } else { std::lock_guard<std::mutex> lock(map_mutex_); + auto size = master_list_[claim->getContentFullPath()]->getLength(); delete master_list_[claim->getContentFullPath()]; master_list_.erase(claim->getContentFullPath()); + current_size_ -= size; + logger_->log_debug("Remove for %s, reduced to %d", claim->getContentFullPath(), current_size_.load()); return true; } + logger_->log_debug("Remove for %s, reduced to %d", claim->getContentFullPath(), current_size_.load()); logger_->log_debug("could not remove %s", claim->getContentFullPath()); return false; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/src/provenance/ProvenanceRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp deleted file mode 100644 index 665837c..0000000 --- a/libminifi/src/provenance/ProvenanceRepository.cpp +++ /dev/null @@ -1,92 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "provenance/ProvenanceRepository.h" -#include "leveldb/write_batch.h" -#include <string> -#include <vector> -#include "provenance/Provenance.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace provenance { - -void ProvenanceRepository::flush() { - leveldb::WriteBatch batch; - std::string key; - std::string value; - leveldb::ReadOptions options; - uint64_t decrement_total = 0; - while (keys_to_delete.size_approx() > 0) { - if (keys_to_delete.try_dequeue(key)) { - db_->Get(options, key, &value); - decrement_total += value.size(); - batch.Delete(key); - logger_->log_info("Removing %s", key); - } - } - if (db_->Write(leveldb::WriteOptions(), &batch).ok()) { - logger_->log_info("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load()); - if (decrement_total > repo_size_.load()) { - repo_size_ = 0; - } else { - repo_size_ -= decrement_total; - } - } -} - -void ProvenanceRepository::run() { - while (running_) { - std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); - uint64_t curTime = getTimeMillis(); - // threshold for purge - uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4; - - uint64_t size = getRepoSize(); - - if (size >= purgeThreshold) { - leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); - for (it->SeekToFirst(); it->Valid(); it->Next()) { - ProvenanceEventRecord eventRead; - std::string key = it->key().ToString(); - uint64_t eventTime = eventRead.getEventTime(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size()); - if (eventTime > 0) { - if ((curTime - eventTime) > max_partition_millis_) - Delete(key); - } else { - logger_->log_debug("NiFi Provenance retrieve event %s fail", key.c_str()); - Delete(key); - } - } - delete it; - } - flush(); - size = getRepoSize(); - if (size > max_partition_bytes_) - repo_full_ = true; - else - repo_full_ = false; - } -} -} /* namespace provenance */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/test/curl-tests/C2NullConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/C2NullConfiguration.cpp b/libminifi/test/curl-tests/C2NullConfiguration.cpp index a2a3906..de8d3b8 100644 --- a/libminifi/test/curl-tests/C2NullConfiguration.cpp +++ b/libminifi/test/curl-tests/C2NullConfiguration.cpp @@ -98,8 +98,8 @@ class VerifyC2Server : public IntegrationBase { parse_http_components(url, port, scheme, path); std::cout << "path is " << path << std::endl; configuration->set("c2.agent.protocol.class", "null"); - configuration->set("c2.rest.url", "null"); - configuration->set("c2.rest.url.ack", "null"); + configuration->set("c2.rest.url", ""); + configuration->set("c2.rest.url.ack", ""); configuration->set("c2.agent.heartbeat.reporter.classes", "null"); configuration->set("c2.rest.listener.port", "null"); configuration->set("c2.agent.heartbeat.period", "null"); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/test/nodefs/NoLevelDB.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/nodefs/NoLevelDB.cpp b/libminifi/test/nodefs/NoLevelDB.cpp deleted file mode 100644 index 6856287..0000000 --- a/libminifi/test/nodefs/NoLevelDB.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "../TestBase.h" -#include <memory> -#include "core/Core.h" -#include "core/RepositoryFactory.h" - -TEST_CASE("NoLevelDBTest1", "[NoLevelDBTest]") { - std::shared_ptr<core::Repository> prov_repo = core::createRepository("provenancerepository", true); - REQUIRE(nullptr != prov_repo); -} - -TEST_CASE("NoLevelDBTest2", "[NoLevelDBTest]") { - std::shared_ptr<core::Repository> prov_repo = core::createRepository("flowfilerepository", true); - REQUIRE(nullptr != prov_repo); -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/test/nodefs/NoRocksDB.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/nodefs/NoRocksDB.cpp b/libminifi/test/nodefs/NoRocksDB.cpp new file mode 100644 index 0000000..472334f --- /dev/null +++ b/libminifi/test/nodefs/NoRocksDB.cpp @@ -0,0 +1,32 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../TestBase.h" +#include <memory> +#include "core/Core.h" +#include "core/RepositoryFactory.h" + +TEST_CASE("NoRocksDBTest1", "[NoRocksDBTest]") { + std::shared_ptr<core::Repository> prov_repo = core::createRepository("provenancerepository", true); + REQUIRE(nullptr != prov_repo); +} + +TEST_CASE("NoRocksDBTest2", "[NoRocksDBTest]") { + std::shared_ptr<core::Repository> prov_repo = core::createRepository("flowfilerepository", true); + REQUIRE(nullptr != prov_repo); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp new file mode 100644 index 0000000..d92c038 --- /dev/null +++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp @@ -0,0 +1,246 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file +#include "../TestBase.h" +#include <memory> +#include <string> +#include "../unit/ProvenanceTestHelper.h" +#include "provenance/Provenance.h" +#include "FlowFileRecord.h" +#include "core/Core.h" +#include "DatabaseContentRepository.h" +#include "properties/Configure.h" + +TEST_CASE("Write Claim", "[TestDBCR1]") { + TestController testController; + char format[] = "/tmp/testRepo.XXXXXX"; + char *dir = testController.createTempDirectory(format); + auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + + auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir); + REQUIRE(true == content_repo->initialize(configuration)); + + + auto claim = std::make_shared<minifi::ResourceClaim>(content_repo, dir); + auto stream = content_repo->write(claim); + + stream->writeUTF("well hello there"); + + stream->closeStream(); + + content_repo->stop(); + // reclaim the memory + content_repo = nullptr; + + content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + + configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir); + REQUIRE(true == content_repo->initialize(configuration)); + + auto read_stream = content_repo->read(claim); + + std::string readstr; + read_stream->readUTF(readstr); + + REQUIRE(readstr == "well hello there"); + + // should not be able to write to the read stream + // -1 will indicate that we were not able to write any data + + REQUIRE(read_stream->writeUTF("other value") == -1); +} + +TEST_CASE("Delete Claim", "[TestDBCR2]") { + TestController testController; + char format[] = "/tmp/testRepo.XXXXXX"; + char *dir = testController.createTempDirectory(format); + auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + + auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir); + REQUIRE(true == content_repo->initialize(configuration)); + + + auto claim = std::make_shared<minifi::ResourceClaim>(content_repo, dir); + auto stream = content_repo->write(claim); + + stream->writeUTF("well hello there"); + + stream->closeStream(); + + content_repo->stop(); + + // reclaim the memory + content_repo = nullptr; + + content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + + configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir); + REQUIRE(true == content_repo->initialize(configuration)); + + content_repo->remove(claim); + + auto read_stream = content_repo->read(claim); + + std::string readstr; + + // -1 tell us we have an invalid stream + REQUIRE(read_stream->readUTF(readstr) == -1); +} + +TEST_CASE("Test Empty Claim", "[TestDBCR3]") { + TestController testController; + char format[] = "/tmp/testRepo.XXXXXX"; + char *dir = testController.createTempDirectory(format); + auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + + auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir); + REQUIRE(true == content_repo->initialize(configuration)); + + auto claim = std::make_shared<minifi::ResourceClaim>(content_repo, dir); + auto stream = content_repo->write(claim); + + // we're writing nothing to the stream. + + stream->closeStream(); + + content_repo->stop(); + + // reclaim the memory + content_repo = nullptr; + + content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + + configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir); + REQUIRE(true == content_repo->initialize(configuration)); + + auto read_stream = content_repo->read(claim); + + std::string readstr; + + // -1 tell us we have an invalid stream + REQUIRE(read_stream->readUTF(readstr) == -1); +} + +TEST_CASE("Test Null Claim", "[TestDBCR4]") { + TestController testController; + char format[] = "/tmp/testRepo.XXXXXX"; + char *dir = testController.createTempDirectory(format); + auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + + auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir); + REQUIRE(true == content_repo->initialize(configuration)); + + + auto claim = std::make_shared<minifi::ResourceClaim>(content_repo, dir); + auto stream = content_repo->write(nullptr); + + REQUIRE(stream == nullptr); + + auto read_stream = content_repo->write(nullptr); + + REQUIRE(read_stream == nullptr); +} + +TEST_CASE("Delete Null Claim", "[TestDBCR5]") { + TestController testController; + char format[] = "/tmp/testRepo.XXXXXX"; + char *dir = testController.createTempDirectory(format); + auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + + auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir); + REQUIRE(true == content_repo->initialize(configuration)); + + auto claim = std::make_shared<minifi::ResourceClaim>(content_repo, dir); + auto stream = content_repo->write(claim); + + stream->writeUTF("well hello there"); + + stream->closeStream(); + + content_repo->stop(); + + // reclaim the memory + content_repo = nullptr; + + content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + + configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir); + REQUIRE(true == content_repo->initialize(configuration)); + + REQUIRE(false == content_repo->remove(nullptr)); + + auto read_stream = content_repo->read(claim); + + std::string readstr; + + // -1 tell us we have an invalid stream + read_stream->readUTF(readstr); + + REQUIRE(readstr == "well hello there"); +} + +TEST_CASE("Delete NonExistent Claim", "[TestDBCR5]") { + TestController testController; + char format[] = "/tmp/testRepo.XXXXXX"; + char *dir = testController.createTempDirectory(format); + auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + + auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir); + REQUIRE(true == content_repo->initialize(configuration)); + + auto claim = std::make_shared<minifi::ResourceClaim>(content_repo, dir); + auto claim2 = std::make_shared<minifi::ResourceClaim>(content_repo, dir); + auto stream = content_repo->write(claim); + + stream->writeUTF("well hello there"); + + stream->closeStream(); + + content_repo->stop(); + + // reclaim the memory + content_repo = nullptr; + + content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + + configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir); + REQUIRE(true == content_repo->initialize(configuration)); + + // we won't complain if it does not exist + REQUIRE(true == content_repo->remove(claim2)); + + auto read_stream = content_repo->read(claim); + + std::string readstr; + + // -1 tell us we have an invalid stream + read_stream->readUTF(readstr); + + REQUIRE(readstr == "well hello there"); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/test/rocksdb-tests/ProvenanceTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp new file mode 100644 index 0000000..ffc5d98 --- /dev/null +++ b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp @@ -0,0 +1,165 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file +#include "../TestBase.h" +#include <utility> +#include <memory> +#include <string> +#include <map> +#include "../unit/ProvenanceTestHelper.h" +#include "provenance/Provenance.h" +#include "FlowFileRecord.h" +#include "core/Core.h" +#include "core/repository/AtomicRepoEntries.h" +#include "FlowFileRepository.h" +#include "core/repository/VolatileProvenanceRepository.h" + +TEST_CASE("Test Provenance record create", "[Testprovenance::ProvenanceEventRecord]") { + provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah", "blahblah"); + REQUIRE(record1.getAttributes().size() == 0); + REQUIRE(record1.getAlternateIdentifierUri().length() == 0); +} + +TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { + provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype"); + + std::string eventId = record1.getEventId(); + + std::string smileyface = ":)"; + record1.setDetails(smileyface); + + uint64_t sample = 65555; + std::shared_ptr<core::Repository> testRepository = std::make_shared<TestRepository>(); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + provenance::ProvenanceEventRecord record2; + record2.setEventId(eventId); + REQUIRE(record2.DeSerialize(testRepository) == true); + REQUIRE(record2.getEventId() == record1.getEventId()); + REQUIRE(record2.getComponentId() == record1.getComponentId()); + REQUIRE(record2.getComponentType() == record1.getComponentType()); + REQUIRE(record2.getDetails() == record1.getDetails()); + REQUIRE(record2.getDetails() == smileyface); + REQUIRE(record2.getEventDuration() == sample); +} + +TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") { + provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype"); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + std::string eventId = record1.getEventId(); + std::map<std::string, std::string> attributes; + attributes.insert(std::pair<std::string, std::string>("potato", "potatoe")); + attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe")); + std::shared_ptr<core::repository::FlowFileRepository> frepo = std::make_shared<core::repository::FlowFileRepository>("ff", "./content_repository", 0, 0, 0); + std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, content_repo, attributes); + + record1.addChildFlowFile(ffr1); + + uint64_t sample = 65555; + std::shared_ptr<core::Repository> testRepository = std::make_shared<TestRepository>(); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + provenance::ProvenanceEventRecord record2; + record2.setEventId(eventId); + REQUIRE(record2.DeSerialize(testRepository) == true); + REQUIRE(record1.getChildrenUuids().size() == 1); + REQUIRE(record2.getChildrenUuids().size() == 1); + std::string childId = record2.getChildrenUuids().at(0); + REQUIRE(childId == ffr1->getUUIDStr()); + record2.removeChildUuid(childId); + REQUIRE(record2.getChildrenUuids().size() == 0); +} + +TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { + provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype"); + + std::string eventId = record1.getEventId(); + + std::string smileyface = ":)"; + record1.setDetails(smileyface); + + uint64_t sample = 65555; + + std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>(); + testRepository->initialize(0); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + provenance::ProvenanceEventRecord record2; + record2.setEventId(eventId); + REQUIRE(record2.DeSerialize(testRepository) == true); + REQUIRE(record2.getEventId() == record1.getEventId()); + REQUIRE(record2.getComponentId() == record1.getComponentId()); + REQUIRE(record2.getComponentType() == record1.getComponentType()); + REQUIRE(record2.getDetails() == record1.getDetails()); + REQUIRE(record2.getDetails() == smileyface); + REQUIRE(record2.getEventDuration() == sample); +} + +TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", "[TestFlowAndProv1]") { + provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype"); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + std::string eventId = record1.getEventId(); + std::map<std::string, std::string> attributes; + attributes.insert(std::pair<std::string, std::string>("potato", "potatoe")); + attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe")); + std::shared_ptr<core::Repository> frepo = std::make_shared<core::repository::VolatileProvenanceRepository>(); + frepo->initialize(0); + std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, content_repo, attributes); + + record1.addChildFlowFile(ffr1); + + uint64_t sample = 65555; + std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>(); + testRepository->initialize(0); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + provenance::ProvenanceEventRecord record2; + record2.setEventId(eventId); + REQUIRE(record2.DeSerialize(testRepository) == true); + REQUIRE(record1.getChildrenUuids().size() == 1); + REQUIRE(record2.getChildrenUuids().size() == 1); + std::string childId = record2.getChildrenUuids().at(0); + REQUIRE(childId == ffr1->getUUIDStr()); + record2.removeChildUuid(childId); + REQUIRE(record2.getChildrenUuids().size() == 0); +} + +TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { + provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype"); + + std::string eventId = record1.getEventId(); + + std::string smileyface = ":)"; + record1.setDetails(smileyface); + + uint64_t sample = 65555; + + std::shared_ptr<core::Repository> testRepository = std::make_shared<core::Repository>(); + testRepository->initialize(0); + record1.setEventDuration(sample); + + REQUIRE(record1.Serialize(testRepository) == true); + provenance::ProvenanceEventRecord record2; + record2.setEventId(eventId); + REQUIRE(record2.DeSerialize(testRepository) == false); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/test/rocksdb-tests/RepoTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp new file mode 100644 index 0000000..e966aba --- /dev/null +++ b/libminifi/test/rocksdb-tests/RepoTests.cpp @@ -0,0 +1,162 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file +#include "../TestBase.h" +#include <memory> +#include <string> +#include <map> +#include "../unit/ProvenanceTestHelper.h" +#include "provenance/Provenance.h" +#include "FlowFileRecord.h" +#include "core/Core.h" +#include "FlowFileRepository.h" +#include "core/repository/AtomicRepoEntries.h" +#include "properties/Configure.h" + +TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") { + TestController testController; + char format[] = "/tmp/testRepo.XXXXXX"; + char *dir = testController.createTempDirectory(format); + std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, 1); + + repository->initialize(std::make_shared<minifi::Configure>()); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + minifi::FlowFileRecord record(repository, content_repo); + + record.addAttribute("keyA", ""); + + REQUIRE(true == record.Serialize()); + + repository->stop(); +} + +TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") { + TestController testController; + char format[] = "/tmp/testRepo.XXXXXX"; + char *dir = testController.createTempDirectory(format); + std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, 1); + + repository->initialize(std::make_shared<minifi::Configure>()); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + minifi::FlowFileRecord record(repository, content_repo); + + record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd"); + + record.addAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd"); + + REQUIRE(true == record.Serialize()); + + repository->stop(); +} + +TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") { + TestController testController; + char format[] = "/tmp/testRepo.XXXXXX"; + char *dir = testController.createTempDirectory(format); + std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, 1); + + repository->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + minifi::FlowFileRecord record(repository, content_repo); + + minifi::FlowFileRecord record2(repository, content_repo); + + std::string uuid = record.getUUIDStr(); + + record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd"); + + record.addAttribute("keyB", ""); + + record.addAttribute("", ""); + + record.updateAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd2"); + + record.addAttribute("", "sdgsdg"); + + REQUIRE(true == record.Serialize()); + + repository->stop(); + + record2.DeSerialize(uuid); + + std::string value; + REQUIRE(true == record2.getAttribute("", value)); + + REQUIRE("hasdgasdgjsdgasgdsgsadaskgasd2" == value); + + REQUIRE(false == record2.getAttribute("key", value)); + REQUIRE(true == record2.getAttribute("keyA", value)); + REQUIRE("hasdgasdgjsdgasgdsgsadaskgasd" == value); + + REQUIRE(true == record2.getAttribute("keyB", value)); + REQUIRE("" == value); +} + +TEST_CASE("Test Delete Content ", "[TestFFR4]") { + TestController testController; + char format[] = "/tmp/testRepo.XXXXXX"; + LogTestController::getInstance().setDebug<core::ContentRepository>(); + LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>(); + LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); + + char *dir = testController.createTempDirectory(format); + + std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, 1); + + std::map<std::string, std::string> attributes; + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>(); + + repository->initialize(std::make_shared<minifi::Configure>()); + + repository->loadComponent(content_repo); + + std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ss.str(), content_repo); + + minifi::FlowFileRecord record(repository, content_repo, attributes, claim); + + record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd"); + + record.addAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd"); + + REQUIRE(true == record.Serialize()); + + claim->decreaseFlowFileRecordOwnedCount(); + + claim->decreaseFlowFileRecordOwnedCount(); + + repository->Delete(record.getUUIDStr()); + + repository->flush(); + + repository->stop(); + + std::ifstream fileopen(ss.str()); + REQUIRE(false == fileopen.good()); + + LogTestController::getInstance().reset(); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/test/unit/ProvenanceTestHelper.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 2583a09..d3bccd0 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -28,7 +28,6 @@ #include <vector> #include "core/repository/VolatileContentRepository.h" #include "../../include/core/Processor.h" -#include "../../include/core/repository/FlowFileRepository.h" #include "../../include/Connection.h" #include "../../include/FlowController.h" #include "../../include/properties/Configure.h" @@ -40,7 +39,7 @@ class TestRepository : public core::Repository { public: TestRepository() - : Repository("repo_name", "./dir", 1000, 100, 0) { + : Repository("repo_name", "./dir", 1000, 100, 0), core::SerializableComponent("repo_name",0) { } // initialize bool initialize() { @@ -145,10 +144,10 @@ class TestRepository : public core::Repository { std::map<std::string, std::string> repositoryResults; }; -class TestFlowRepository : public core::repository::FlowFileRepository { +class TestFlowRepository : public core::Repository { public: TestFlowRepository() - : core::repository::FlowFileRepository("ff", "./dir", 1000, 100, 0) { + : core::Repository("ff", "./dir", 1000, 100, 0), core::SerializableComponent("ff",0) { } // initialize bool initialize() {
