http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/repository/VolatileProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileProvenanceRepository.h b/libminifi/include/core/repository/VolatileProvenanceRepository.h new file mode 100644 index 0000000..7397751 --- /dev/null +++ b/libminifi/include/core/repository/VolatileProvenanceRepository.h @@ -0,0 +1,60 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEPROVENANCEREPOSITORY_H_ +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEPROVENANCEREPOSITORY_H_ + +#include "VolatileRepository.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +/** + * Volatile provenance repository. + */ +class VolatileProvenanceRepository : public VolatileRepository<std::string> +{ + + public: + explicit VolatileProvenanceRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = + MAX_REPOSITORY_STORAGE_SIZE, + uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) + : VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod) + + { + purge_required_ = false; + } + + virtual void run() { + repo_full_ = false; + } + private: + +}; + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEPROVENANCEREPOSITORY_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/repository/VolatileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h index 870a1f5..958d91a 100644 --- a/libminifi/include/core/repository/VolatileRepository.h +++ b/libminifi/include/core/repository/VolatileRepository.h @@ -22,9 +22,11 @@ #include <chrono> #include <vector> #include <map> +#include "core/SerializableComponent.h" #include "core/Core.h" #include "Connection.h" #include "utils/StringUtils.h" +#include "AtomicRepoEntries.h" namespace org { namespace apache { @@ -33,290 +35,89 @@ namespace minifi { namespace core { namespace repository { -static uint16_t accounting_size = sizeof(std::vector<uint8_t>) + sizeof(std::string) + sizeof(size_t); - -class RepoValue { - public: - - explicit RepoValue() { - } - - explicit RepoValue(std::string key, uint8_t *ptr, size_t size) - : key_(key) { - buffer_.resize(size); - std::memcpy(buffer_.data(), ptr, size); - fast_size_ = key.size() + size; - } - - explicit RepoValue(RepoValue &&other) -noexcept : key_(std::move(other.key_)), - buffer_(std::move(other.buffer_)), - fast_size_(other.fast_size_) { - } - - ~RepoValue() - { - } - - std::string &getKey() { - return key_; - } - - /** - * Return the size of the memory within the key - * buffer, the size of timestamp, and the general - * system word size - */ - uint64_t size() { - return fast_size_; - } - - size_t bufferSize() { - return buffer_.size(); - } - - void emplace(std::string &str) { - str.insert(0, reinterpret_cast<const char*>(buffer_.data()), buffer_.size()); - } - - RepoValue &operator=(RepoValue &&other) noexcept { - key_ = std::move(other.key_); - buffer_ = std::move(other.buffer_); - other.buffer_.clear(); - return *this; - } - - private: - size_t fast_size_; - std::string key_; - std::vector<uint8_t> buffer_; - }; - - /** - * Purpose: Atomic Entry allows us to create a statically - * sized ring buffer, with the ability to create - **/ -class AtomicEntry { - - public: - AtomicEntry() - : write_pending_(false), - has_value_(false) { - - } - - bool setRepoValue(RepoValue &new_value, size_t &prev_size) { - // delete the underlying pointer - bool lock = false; - if (!write_pending_.compare_exchange_weak(lock, true) && !lock) - return false; - if (has_value_) { - prev_size = value_.size(); - } - value_ = std::move(new_value); - has_value_ = true; - try_unlock(); - return true; - } - - bool getValue(RepoValue &value) { - try_lock(); - if (!has_value_) { - try_unlock(); - return false; - } - value = std::move(value_); - has_value_ = false; - try_unlock(); - return true; - } - - bool getValue(const std::string &key, RepoValue &value) { - try_lock(); - if (!has_value_) { - try_unlock(); - return false; - } - if (value_.getKey() != key) { - try_unlock(); - return false; - } - value = std::move(value_); - has_value_ = false; - try_unlock(); - return true; - } - - private: - - inline void try_lock() { - bool lock = false; - while (!write_pending_.compare_exchange_weak(lock, true) && !lock) { - // attempt again - } - } - - inline void try_unlock() { - bool lock = true; - while (!write_pending_.compare_exchange_weak(lock, false) && lock) { - // attempt again - } - } - - std::atomic<bool> write_pending_; - std::atomic<bool> has_value_; - RepoValue value_; -}; - /** * Flow File repository * Design: Extends Repository and implements the run function, using LevelDB as the primary substrate. */ -class VolatileRepository : public core::Repository, public std::enable_shared_from_this<VolatileRepository> { +template<typename T> +class VolatileRepository : public core::Repository, public std::enable_shared_from_this<VolatileRepository<T>> { public: static const char *volatile_repo_max_count; + static const char *volatile_repo_max_bytes; // Constructor - VolatileRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = + explicit VolatileRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE, - uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) + uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod), max_size_(maxPartitionBytes * 0.75), current_index_(0), max_count_(10000), + current_size_(0), logger_(logging::LoggerFactory<VolatileRepository>::getLogger()) { - + purge_required_ = false; } // Destructor - ~VolatileRepository() { - for (auto ent : value_vector_) { - delete ent; - } - } + virtual ~VolatileRepository(); /** * Initialize thevolatile repsitory **/ - virtual bool initialize(const std::shared_ptr<Configure> &configure) { - std::string value = ""; - - if (configure != nullptr) { - int64_t max_cnt = 0; - std::stringstream strstream; - strstream << Configure::nifi_volatile_repository_options << getName() << "." << volatile_repo_max_count; - if (configure->get(strstream.str(), value)) { - if (core::Property::StringToInt(value, max_cnt)) { - max_count_ = max_cnt; - } - } - } + virtual bool initialize(const std::shared_ptr<Configure> &configure); - logger_->log_debug("Resizing value_vector_ for %s count is %d", getName(), max_count_); - value_vector_.reserve(max_count_); - for (int i = 0; i < max_count_; i++) { - value_vector_.emplace_back(new AtomicEntry()); - } - return true; - } - - virtual void run(); + virtual void run() = 0; /** * Places a new object into the volatile memory area * @param key key to add to the repository * @param buf buffer **/ - virtual bool Put(std::string key, uint8_t *buf, int bufLen) { - RepoValue new_value(key, buf, bufLen); - - const size_t size = new_value.size(); - bool updated = false; - size_t reclaimed_size = 0; - do { - - int private_index = current_index_.fetch_add(1); - // round robin through the beginning - if (private_index >= max_count_) { - uint16_t new_index = 0; - if (current_index_.compare_exchange_weak(new_index, 0)) { - private_index = 0; - } else { - continue; - } - } - logger_->log_info("Set repo value at %d out of %d", private_index, max_count_); - updated = value_vector_.at(private_index)->setRepoValue(new_value, reclaimed_size); + virtual bool Put(T key, const uint8_t *buf, size_t bufLen); - if (reclaimed_size > 0) { - current_size_ -= reclaimed_size; - } - - } while (!updated); - current_size_ += size; - - logger_->log_info("VolatileRepository -- put %s %d %d", key, current_size_.load(), current_index_.load()); - return true; - } /** - *c * Deletes the key * @return status of the delete operation */ - virtual bool Delete(std::string key) { - - logger_->log_info("VolatileRepository -- delete %s", key); - for (auto ent : value_vector_) { - // let the destructor do the cleanup - RepoValue value; - if (ent->getValue(key, value)) { - current_size_ -= value.size(); - return true; - } + virtual bool Delete(T key); - } - return false; - } /** * Sets the value from the provided key. Once the item is retrieved * it may not be retrieved again. * @return status of the get operation. */ - virtual bool Get(std::string key, std::string &value) { - for (auto ent : value_vector_) { - // let the destructor do the cleanup - RepoValue repo_value; - - if (ent->getValue(key, repo_value)) { - current_size_ -= value.size(); - repo_value.emplace(value); - logger_->log_info("VolatileRepository -- get %s %d", key, current_size_.load()); - return true; - } + virtual bool Get(const T &key, std::string &value); + /** + * Deserializes objects into store + * @param store vector in which we will store newly created objects. + * @param max_size size of objects deserialized + */ + virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda); - } - return false; - } + /** + * Deserializes objects into a store that contains a fixed number of objects in which + * we will deserialize from this repo + * @param store precreated object vector + * @param max_size size of objects deserialized + */ + virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size); - void setConnectionMap(std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap) { - this->connectionMap = connectionMap; - } - void loadComponent(); - - void start() { - if (this->purge_period_ <= 0) - return; - if (running_) - return; - thread_ = std::thread(&VolatileRepository::run, shared_from_this()); - thread_.detach(); - running_ = true; - logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); - } + /** + * Set the connection map + * @param connectionMap map of all connections through this repo. + */ + void setConnectionMap(std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap); + + /** + * Function to load this component. + */ + virtual void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo); + + virtual void start(); protected: @@ -331,22 +132,240 @@ class VolatileRepository : public core::Repository, public std::enable_shared_fr else return false; } - /** - * Purges the volatile repository. - */ - void purge(); - private: std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap; - - std::atomic<uint32_t> current_size_; + // current size of the volatile repo. + std::atomic<size_t> current_size_; + // current index. std::atomic<uint16_t> current_index_; - std::vector<AtomicEntry*> value_vector_; + // value vector that exists for non blocking iteration over + // objects that store data for this repo instance. + std::vector<AtomicEntry<T>*> value_vector_; + + // max count we are allowed to store. uint32_t max_count_; - uint32_t max_size_; + // maximum estimated size + size_t max_size_; + + bool purge_required_; + + std::mutex purge_mutex_; + // purge list + std::vector<T> purge_list_; + + private: std::shared_ptr<logging::Logger> logger_; + +}; + +template<typename T> +const char *VolatileRepository<T>::volatile_repo_max_count = "max.count"; +template<typename T> +const char *VolatileRepository<T>::volatile_repo_max_bytes = "max.bytes"; + +template<typename T> +void VolatileRepository<T>::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { +} + +// Destructor +template<typename T> +VolatileRepository<T>::~VolatileRepository() { + for (auto ent : value_vector_) { + delete ent; + } +} + +/** + * Initialize the volatile repsitory + **/ +template<typename T> +bool VolatileRepository<T>::initialize(const std::shared_ptr<Configure> &configure) { + std::string value = ""; + + if (configure != nullptr) { + int64_t max_cnt = 0; + std::stringstream strstream; + strstream << Configure::nifi_volatile_repository_options << getName() << "." << volatile_repo_max_count; + if (configure->get(strstream.str(), value)) { + if (core::Property::StringToInt(value, max_cnt)) { + max_count_ = max_cnt; + } + } + + strstream.str(""); + strstream.clear(); + int64_t max_bytes = 0; + strstream << Configure::nifi_volatile_repository_options << getName() << "." << volatile_repo_max_bytes; + if (configure->get(strstream.str(), value)) { + if (core::Property::StringToInt(value, max_bytes)) { + if (max_bytes <= 0) { + max_size_ = std::numeric_limits<uint32_t>::max(); + } else { + max_size_ = max_bytes; + } + } + } + } + + logger_->log_info("Resizing value_vector_ for %s count is %d", getName(), max_count_); + logger_->log_info("Using a maximum size of %u", max_size_); + value_vector_.reserve(max_count_); + for (int i = 0; i < max_count_; i++) { + value_vector_.emplace_back(new AtomicEntry<T>(¤t_size_, &max_size_)); + } + return true; +} + +/** + * Places a new object into the volatile memory area + * @param key key to add to the repository + * @param buf buffer + **/ +template<typename T> +bool VolatileRepository<T>::Put(T key, const uint8_t *buf, size_t bufLen) { + RepoValue<T> new_value(key, buf, bufLen); + + const size_t size = new_value.size(); + bool updated = false; + size_t reclaimed_size = 0; + RepoValue<T> old_value; + do { + int private_index = current_index_.fetch_add(1); + // round robin through the beginning + if (private_index >= max_count_) { + uint16_t new_index = 0; + if (current_index_.compare_exchange_weak(new_index, 0)) { + private_index = 0; + } else { + continue; + } + } + + updated = value_vector_.at(private_index)->setRepoValue(new_value, old_value, reclaimed_size); + logger_->log_debug("Set repo value at %d out of %d updated %d current_size %d, adding %d to %d", private_index, max_count_,updated==true,reclaimed_size,size, current_size_.load()); + if (updated && reclaimed_size > 0) + { + std::lock_guard<std::mutex> lock(mutex_); + purge_list_.push_back(old_value.getKey()); + } + if (reclaimed_size > 0) { + /** + * this is okay since current_size_ is really an estimate. + * we don't need precise counts. + */ + if (current_size_ < reclaimed_size) { + current_size_ = 0; + } else { + current_size_ -= reclaimed_size; + } + } + } while (!updated); + current_size_ += size; + + logger_->log_debug("VolatileRepository -- put %d %d", current_size_.load(), current_index_.load()); + return true; +} +/** + * Deletes the key + * @return status of the delete operation + */ +template<typename T> +bool VolatileRepository<T>::Delete(T key) { + for (auto ent : value_vector_) { + // let the destructor do the cleanup + RepoValue<T> value; + if (ent->getValue(key, value)) { + current_size_ -= value.size(); + return true; + } + } + return false; +} +/** + * Sets the value from the provided key. Once the item is retrieved + * it may not be retrieved again. + * @return status of the get operation. + */ +template<typename T> +bool VolatileRepository<T>::Get(const T &key, std::string &value) { + + for (auto ent : value_vector_) { + // let the destructor do the cleanup + RepoValue<T> repo_value; + if (ent->getValue(key, repo_value)) { + current_size_ -= value.size(); + repo_value.emplace(value); + return true; + } + } + return false; +} + +template<typename T> +bool VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) { + size_t requested_batch = max_size; + max_size = 0; + for (auto ent : value_vector_) { + // let the destructor do the cleanup + RepoValue<T> repo_value; + + if (ent->getValue(repo_value)) { + std::shared_ptr<core::SerializableComponent> newComponent = lambda(); + // we've taken ownership of this repo value + newComponent->DeSerialize(repo_value.getBuffer(), repo_value.getBufferSize()); + + store.push_back(newComponent); + + if (max_size++ >= requested_batch) { + break; + } + } + } + if (max_size > 0) { + return true; + } else { + return false; + } +} + +template<typename T> +bool VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) { + logger_->log_debug("VolatileRepository -- DeSerialize %d", current_size_.load()); + max_size = 0; + for (auto ent : value_vector_) { + // let the destructor do the cleanup + RepoValue<T> repo_value; + + if (ent->getValue(repo_value)) { + // we've taken ownership of this repo value + store.at(max_size)->DeSerialize(repo_value.getBuffer(), repo_value.getBufferSize()); + if (max_size++ >= store.size()) { + break; + } + } + } + if (max_size > 0) { + return true; + } else { + return false; + } +} + +template<typename T> +void VolatileRepository<T>::setConnectionMap(std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap) { + this->connectionMap = connectionMap; +} + +template<typename T> +void VolatileRepository<T>::start() { + if (this->purge_period_ <= 0) + return; + if (running_) + return; + running_ = true; + thread_ = std::thread(&VolatileRepository<T>::run, std::enable_shared_from_this<VolatileRepository<T>>::shared_from_this()); + logger_->log_info("%s Repository Monitor Thread Start", name_); } -; } /* namespace repository */ } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/yaml/YamlConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h index e03c794..17b060f 100644 --- a/libminifi/include/core/yaml/YamlConfiguration.h +++ b/libminifi/include/core/yaml/YamlConfiguration.h @@ -21,7 +21,7 @@ #include "core/ProcessorConfig.h" #include "yaml-cpp/yaml.h" #include "processors/LoadProcessors.h" -#include "../FlowConfiguration.h" +#include "core/FlowConfiguration.h" #include "Site2SiteClientProtocol.h" #include <string> #include "io/validation.h" @@ -46,13 +46,12 @@ namespace core { class YamlConfiguration : public FlowConfiguration { public: - explicit YamlConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<io::StreamFactory> stream_factory, - std::shared_ptr<Configure> configuration, - const std::string path = DEFAULT_FLOW_YAML_FILE_NAME) - : FlowConfiguration(repo, flow_file_repo, stream_factory, configuration, path), + explicit YamlConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo, + std::shared_ptr<io::StreamFactory> stream_factory, std::shared_ptr<Configure> configuration, const std::string path = DEFAULT_FLOW_YAML_FILE_NAME) + : FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configuration, path), logger_(logging::LoggerFactory<YamlConfiguration>::getLogger()) { stream_factory_ = stream_factory; - if (IsNullOrEmpty(config_path_)) { + if (IsNullOrEmpty (config_path_)) { config_path_ = DEFAULT_FLOW_YAML_FILE_NAME; } } @@ -93,20 +92,20 @@ class YamlConfiguration : public FlowConfiguration { } /** - * Returns a shared pointer to a ProcessGroup object containing the - * flow configuration. The yamlConfigPayload argument must be - * a payload for the raw YAML configuration. - * - * @param yamlConfigPayload an input payload for the raw YAML configuration - * to be parsed and loaded into the flow - * configuration tree - * @return the root ProcessGroup node of the flow - * configuration tree - */ - std::unique_ptr<core::ProcessGroup> getRootFromPayload(std::string &yamlConfigPayload) { - YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload); - return getRoot(&rootYamlNode); - } + * Returns a shared pointer to a ProcessGroup object containing the + * flow configuration. The yamlConfigPayload argument must be + * a payload for the raw YAML configuration. + * + * @param yamlConfigPayload an input payload for the raw YAML configuration + * to be parsed and loaded into the flow + * configuration tree + * @return the root ProcessGroup node of the flow + * configuration tree + */ + std::unique_ptr<core::ProcessGroup> getRootFromPayload(std::string &yamlConfigPayload) { + YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload); + return getRoot(&rootYamlNode); + } protected: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/io/AtomicEntryStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h new file mode 100644 index 0000000..5f200f0 --- /dev/null +++ b/libminifi/include/io/AtomicEntryStream.h @@ -0,0 +1,205 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_IO_ATOMICENTRYSTREAM_H_ +#define LIBMINIFI_INCLUDE_IO_ATOMICENTRYSTREAM_H_ + +#include <mutex> +#include <cstring> +#include "BaseStream.h" +#include "core/repository/AtomicRepoEntries.h" +#include "Exception.h" +#include "core/logging/LoggerConfiguration.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +template<typename T> +class AtomicEntryStream : public BaseStream { + public: + AtomicEntryStream(const T key, core::repository::AtomicEntry<T> *entry) + : key_(key), + entry_(entry), + offset_(0), + length_(0), + logger_(logging::LoggerFactory<AtomicEntryStream()>::getLogger()) { + core::repository::RepoValue<T> *value; + if (entry_->getValue(key, &value)) { + length_ = value->getBufferSize(); + entry_->decrementOwnership(); + invalid_stream_ = false; + } else { + invalid_stream_ = true; + } + } + + virtual ~AtomicEntryStream(); + + virtual void closeStream() { + + } + + /** + * Skip to the specified offset. + * @param offset offset to which we will skip + */ + void seek(uint64_t offset); + + virtual const uint32_t getSize() const { + return length_; + } + + // data stream extensions + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(std::vector<uint8_t> &buf, int buflen); + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(uint8_t *buf, int buflen); + + /** + * Write value to the stream using std::vector + * @param buf incoming buffer + * @param buflen buffer to write + * + */ + virtual int writeData(std::vector<uint8_t> &buf, int buflen); + + /** + * writes value to stream + * @param value value to write + * @param size size of value + */ + virtual int writeData(uint8_t *value, int size); + + /** + * Returns the underlying buffer + * @return vector's array + **/ + const uint8_t *getBuffer() const { + throw std::runtime_error("Stream does not support this operation"); + } + + protected: + size_t length_; + size_t offset_; + T key_; + core::repository::AtomicEntry<T> *entry_; + std::atomic<bool> invalid_stream_; + std::recursive_mutex entry_lock_; + + // Logger + std::shared_ptr<logging::Logger> logger_; + +}; + +template<typename T> +AtomicEntryStream<T>::~AtomicEntryStream(){ + logger_->log_debug("Decrementing"); + entry_->decrementOwnership(); +} + +template<typename T> +void AtomicEntryStream<T>::seek(uint64_t offset) { + std::lock_guard<std::recursive_mutex> lock(entry_lock_); + offset_ = offset; +} + +template<typename T> +int AtomicEntryStream<T>::writeData(std::vector<uint8_t> &buf, int buflen) { + if (buf.capacity() < buflen || invalid_stream_) + return -1; + return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen); +} + +// data stream overrides +template<typename T> +int AtomicEntryStream<T>::writeData(uint8_t *value, int size) { + if (nullptr != value && !invalid_stream_) { + std::lock_guard<std::recursive_mutex> lock(entry_lock_); + if (entry_->insert(key_, value, size)) { + offset_ += size; + if (offset_ > length_) + { + length_ = offset_; + } + return size; + } + else { + logger_->log_debug("Cannot insert %d bytes due to insufficient space in atomic entry", size); + } + + } + return -1; + +} + +template<typename T> +int AtomicEntryStream<T>::readData(std::vector<uint8_t> &buf, int buflen) { + if (invalid_stream_){ + return -1; + } + if (buf.capacity() < buflen) { + buf.resize(buflen); + } + int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen); + + if (ret < buflen) { + buf.resize(ret); + } + return ret; +} + +template<typename T> +int AtomicEntryStream<T>::readData(uint8_t *buf, int buflen) { + if (nullptr != buf && !invalid_stream_) { + std::lock_guard<std::recursive_mutex> lock(entry_lock_); + int len = buflen; + core::repository::RepoValue<T> *value; + if (entry_->getValue(key_, &value)) { + if (offset_ + len > value->getBufferSize()) { + len = value->getBufferSize() - offset_; + if (len <= 0) { + entry_->decrementOwnership(); + return 0; + } + } + std::memcpy(buf, reinterpret_cast<uint8_t*>(const_cast<uint8_t*>(value->getBuffer()) + offset_), len); + offset_ += len; + entry_->decrementOwnership(); + return len; + } + + } + return -1; +} + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_IO_ATOMICENTRYSTREAM_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/io/BaseStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/BaseStream.h b/libminifi/include/io/BaseStream.h index cae8a43..cd982bb 100644 --- a/libminifi/include/io/BaseStream.h +++ b/libminifi/include/io/BaseStream.h @@ -30,6 +30,11 @@ namespace nifi { namespace minifi { namespace io { +/** + * Base Stream. Not intended to be thread safe as it is not intended to be shared + * + * Extensions may be thread safe and thus shareable, but that is up to the implementation. + */ class BaseStream : public DataStream, public Serializable { public: @@ -55,6 +60,14 @@ class BaseStream : public DataStream, public Serializable { int writeData(uint8_t *value, int size); + virtual void seek(uint32_t offset) { + if (composable_stream_ != this) { + composable_stream_->seek(offset); + } else { + DataStream::seek(offset); + } + } + /** * write 2 bytes to stream * @param base_value non encoded value http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/io/ClientSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h index c7db7f1..cd8a4fc 100644 --- a/libminifi/include/io/ClientSocket.h +++ b/libminifi/include/io/ClientSocket.h @@ -84,7 +84,7 @@ class Socket : public BaseStream { * Static function to return the current machine's host name */ static std::string getMyHostName() { - static char *HOSTNAME = init_hostname(); + static std::string HOSTNAME = init_hostname(); return HOSTNAME; } @@ -239,12 +239,12 @@ class Socket : public BaseStream { private: std::shared_ptr<logging::Logger> logger_; - static char* init_hostname() { + static std::string init_hostname() { char hostname[1024]; gethostname(hostname, 1024); Socket mySock(nullptr, hostname, 0); mySock.initialize(); - return const_cast<char*>(mySock.getHostname().c_str()); + return mySock.getHostname(); } }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/io/DataStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/DataStream.h b/libminifi/include/io/DataStream.h index 460930d..2ebc9a4 100644 --- a/libminifi/include/io/DataStream.h +++ b/libminifi/include/io/DataStream.h @@ -30,6 +30,8 @@ namespace io { /** * DataStream defines the mechanism through which * binary data will be written to a sink + * + * This object is not intended to be thread safe. */ class DataStream { public: @@ -58,6 +60,10 @@ class DataStream { return 0; } + virtual void seek(uint32_t offset) { + readBuffer += offset; + } + virtual void closeStream() { } @@ -111,7 +117,7 @@ class DataStream { * Retrieve size of data stream * @return size of data stream **/ - const uint32_t getSize() const { + virtual const uint32_t getSize() const { return buffer.size(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/io/FileStream.h ---------------------------------------------------------------------- diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h new file mode 100644 index 0000000..23a1f0b --- /dev/null +++ b/libminifi/include/io/FileStream.h @@ -0,0 +1,136 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_IO_TLS_FILESTREAM_H_ +#define LIBMINIFI_INCLUDE_IO_TLS_FILESTREAM_H_ + +#include <iostream> +#include <cstdint> +#include <string> +#include "EndianCheck.h" +#include "BaseStream.h" +#include "Serializable.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +/** + * Purpose: File Stream Base stream extension. This is intended to be a thread safe access to + * read/write to the local file system. + * + * Design: Simply extends BaseStream and overrides readData/writeData to allow a sink to the + * fstream object. + */ +class FileStream : public io::BaseStream { + public: + /** + * File Stream constructor that accepts an fstream shared pointer. + * It must already be initialized for read and write. + */ + explicit FileStream(const std::string &path, uint32_t offset, bool write_enable = false); + + /** + * File Stream constructor that accepts an fstream shared pointer. + * It must already be initialized for read and write. + */ + explicit FileStream(const std::string &path); + + virtual ~FileStream() { + closeStream(); + } + + virtual void closeStream(); + /** + * Skip to the specified offset. + * @param offset offset to which we will skip + */ + void seek(uint64_t offset); + + const uint32_t getSize() const { + return length_; + } + + // data stream extensions + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(std::vector<uint8_t> &buf, int buflen); + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(uint8_t *buf, int buflen); + + /** + * Write value to the stream using std::vector + * @param buf incoming buffer + * @param buflen buffer to write + * + */ + virtual int writeData(std::vector<uint8_t> &buf, int buflen); + + /** + * writes value to stream + * @param value value to write + * @param size size of value + */ + virtual int writeData(uint8_t *value, int size); + + /** + * Returns the underlying buffer + * @return vector's array + **/ + const uint8_t *getBuffer() const { + throw std::runtime_error("Stream does not support this operation"); + } + + protected: + + /** + * Creates a vector and returns the vector using the provided + * type name. + * @param t incoming object + * @returns vector. + */ + template<typename T> + std::vector<uint8_t> readBuffer(const T&); + std::recursive_mutex file_lock_; + std::unique_ptr<std::fstream> file_stream_; + size_t offset_; + std::string path_; + size_t length_; + + private: + + std::shared_ptr<logging::Logger> logger_; + +}; + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_IO_TLS_FILESTREAM_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/processors/ExecuteProcess.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ExecuteProcess.h b/libminifi/include/processors/ExecuteProcess.h index 28dcf76..8cc7a25 100644 --- a/libminifi/include/processors/ExecuteProcess.h +++ b/libminifi/include/processors/ExecuteProcess.h @@ -31,6 +31,7 @@ #include <iostream> #include <sys/types.h> #include <signal.h> +#include "io/BaseStream.h" #include "FlowFileRecord.h" #include "core/Processor.h" #include "core/ProcessSession.h" @@ -85,9 +86,12 @@ class ExecuteProcess : public core::Processor { } char *_data; uint64_t _dataSize; - void process(std::ofstream *stream) { + //void process(std::ofstream *stream) { + int64_t process(std::shared_ptr<io::BaseStream> stream) { + int64_t ret = 0; if (_data && _dataSize > 0) - stream->write(_data, _dataSize); + ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize); + return ret; } }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/processors/GenerateFlowFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GenerateFlowFile.h b/libminifi/include/processors/GenerateFlowFile.h index abb5740..7551e88 100644 --- a/libminifi/include/processors/GenerateFlowFile.h +++ b/libminifi/include/processors/GenerateFlowFile.h @@ -68,9 +68,11 @@ class GenerateFlowFile : public core::Processor { } char *_data; uint64_t _dataSize; - void process(std::ofstream *stream) { + int64_t process(std::shared_ptr<io::BaseStream> stream) { + int64_t ret = 0; if (_data && _dataSize > 0) - stream->write(_data, _dataSize); + ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize); + return ret; } }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/processors/InvokeHTTP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/InvokeHTTP.h b/libminifi/include/processors/InvokeHTTP.h index d55a5be..03a1611 100644 --- a/libminifi/include/processors/InvokeHTTP.h +++ b/libminifi/include/processors/InvokeHTTP.h @@ -104,6 +104,12 @@ class InvokeHTTP : public core::Processor { void onTrigger(core::ProcessContext *context, core::ProcessSession *session); void initialize(); void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + /** + * Provides a reference to the URL. + */ + const std::string &getUrl() { + return url_; + } protected: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/processors/ListenHTTP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ListenHTTP.h b/libminifi/include/processors/ListenHTTP.h index c9e42bc..1b58dcd 100644 --- a/libminifi/include/processors/ListenHTTP.h +++ b/libminifi/include/processors/ListenHTTP.h @@ -92,7 +92,7 @@ class ListenHTTP : public core::Processor { class WriteCallback : public OutputStreamCallback { public: WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo); - void process(std::ofstream *stream); + int64_t process(std::shared_ptr<io::BaseStream> stream); private: // Logger http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/processors/ListenSyslog.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ListenSyslog.h b/libminifi/include/processors/ListenSyslog.h index ed54b44..25acac9 100644 --- a/libminifi/include/processors/ListenSyslog.h +++ b/libminifi/include/processors/ListenSyslog.h @@ -114,14 +114,16 @@ class ListenSyslog : public core::Processor { class WriteCallback : public OutputStreamCallback { public: WriteCallback(char *data, uint64_t size) - : _data(data), + : _data(reinterpret_cast<uint8_t*>(data)), _dataSize(size) { } - char *_data; + uint8_t *_data; uint64_t _dataSize; - void process(std::ofstream *stream) { + int64_t process(std::shared_ptr<io::BaseStream> stream) { + int64_t ret = 0; if (_data && _dataSize > 0) - stream->write(_data, _dataSize); + ret = stream->write(_data, _dataSize); + return ret; } }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/processors/LogAttribute.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/LogAttribute.h b/libminifi/include/processors/LogAttribute.h index 88230f7..b9e333f 100644 --- a/libminifi/include/processors/LogAttribute.h +++ b/libminifi/include/processors/LogAttribute.h @@ -87,25 +87,27 @@ class LogAttribute : public core::Processor { // Nest Callback Class for read stream class ReadCallback : public InputStreamCallback { public: - ReadCallback(uint64_t size) { - _bufferSize = size; - _buffer = new char[_bufferSize]; + ReadCallback(uint64_t size) + : read_size_(0) { + buffer_size_ = size; + buffer_ = new uint8_t[buffer_size_]; } ~ReadCallback() { - if (_buffer) - delete[] _buffer; + if (buffer_) + delete[] buffer_; } - void process(std::ifstream *stream) { - - stream->read(_buffer, _bufferSize); + int64_t process(std::shared_ptr<io::BaseStream> stream) { + int64_t ret = 0; + ret = stream->read(buffer_, buffer_size_); if (!stream) - _readSize = stream->gcount(); + read_size_ = stream->getSize(); else - _readSize = _bufferSize; + read_size_ = buffer_size_; + return ret; } - char *_buffer; - uint64_t _bufferSize; - uint64_t _readSize; + uint8_t *buffer_; + uint64_t buffer_size_; + uint64_t read_size_; }; public: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/processors/PutFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h index f67e512..c7f2823 100644 --- a/libminifi/include/processors/PutFile.h +++ b/libminifi/include/processors/PutFile.h @@ -80,7 +80,8 @@ class PutFile : public core::Processor { public: ReadCallback(const std::string &tmpFile, const std::string &destFile); ~ReadCallback(); - virtual void process(std::ifstream *stream);bool commit(); + virtual int64_t process(std::shared_ptr<io::BaseStream> stream); + bool commit(); private: std::shared_ptr<logging::Logger> logger_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/properties/Configure.h ---------------------------------------------------------------------- diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index 13da55a..341b89c 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -40,6 +40,7 @@ class Configure : public Properties { static const char *nifi_server_name; static const char *nifi_configuration_class_name; static const char *nifi_flow_repository_class_name; + static const char *nifi_content_repository_class_name; static const char *nifi_volatile_repository_options; static const char *nifi_provenance_repository_class_name; static const char *nifi_server_port; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/provenance/Provenance.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h index 1479514..b9415dc 100644 --- a/libminifi/include/provenance/Provenance.h +++ b/libminifi/include/provenance/Provenance.h @@ -29,7 +29,8 @@ #include <string> #include <thread> #include <vector> - +#include "core/Core.h" +#include "core/SerializableComponent.h" #include "core/Repository.h" #include "core/Property.h" #include "properties/Configure.h" @@ -50,7 +51,7 @@ namespace provenance { #define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048 // Provenance Event Record -class ProvenanceEventRecord : protected org::apache::nifi::minifi::io::Serializable { +class ProvenanceEventRecord : public core::SerializableComponent { public: enum ProvenanceEventType { @@ -163,7 +164,8 @@ class ProvenanceEventRecord : protected org::apache::nifi::minifi::io::Serializa */ ProvenanceEventRecord(ProvenanceEventType event, std::string componentId, std::string componentType); - ProvenanceEventRecord() { + ProvenanceEventRecord() + : core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()) { _eventTime = getTimeMillis(); } @@ -172,7 +174,11 @@ class ProvenanceEventRecord : protected org::apache::nifi::minifi::io::Serializa } // Get the Event ID std::string getEventId() { - return _eventIdStr; + return uuidStr_; + } + + void setEventId(const std::string &id) { + setUUIDStr(id); } // Get Attributes std::map<std::string, std::string> getAttributes() { @@ -220,7 +226,7 @@ class ProvenanceEventRecord : protected org::apache::nifi::minifi::io::Serializa } // Get FlowFileUuid std::string getFlowFileUuid() { - return uuid_; + return flow_uuid_; } // Get content full path std::string getContentFullPath() { @@ -333,7 +339,7 @@ class ProvenanceEventRecord : protected org::apache::nifi::minifi::io::Serializa _entryDate = flow->getEntryDate(); _lineageStartDate = flow->getlineageStartDate(); _lineageIdentifiers = flow->getlineageIdentifiers(); - uuid_ = flow->getUUIDStr(); + flow_uuid_ = flow->getUUIDStr(); _attributes = flow->getAttributes(); _size = flow->getSize(); _offset = flow->getOffset(); @@ -344,15 +350,43 @@ class ProvenanceEventRecord : protected org::apache::nifi::minifi::io::Serializa } } // Serialize and Persistent to the repository - bool Serialize(const std::shared_ptr<core::Repository> &repo); + bool Serialize(const std::shared_ptr<core::SerializableComponent> &repo); // DeSerialize - bool DeSerialize(const uint8_t *buffer, const int bufferSize); + bool DeSerialize(const uint8_t *buffer, const size_t bufferSize); // DeSerialize bool DeSerialize(org::apache::nifi::minifi::io::DataStream &stream) { return DeSerialize(stream.getBuffer(), stream.getSize()); } // DeSerialize - bool DeSerialize(const std::shared_ptr<core::Repository> &repo, std::string key); + bool DeSerialize(const std::shared_ptr<core::SerializableComponent> &repo); + + uint64_t getEventTime(const uint8_t *buffer, const size_t bufferSize) { + + int size = bufferSize > 72 ? 72 : bufferSize; + org::apache::nifi::minifi::io::DataStream outStream(buffer, size); + + std::string uuid; + int ret = readUTF(uuid, &outStream); + + if (ret <= 0) { + return 0; + } + + uint32_t eventType; + ret = read(eventType, &outStream); + if (ret != 4) { + return 0; + } + + uint64_t event_time; + + ret = read(event_time, &outStream); + if (ret != 8) { + return 0; + } + + return event_time; + } protected: @@ -373,15 +407,13 @@ class ProvenanceEventRecord : protected org::apache::nifi::minifi::io::Serializa // Size in bytes of the data corresponding to this flow file uint64_t _size; // flow uuid - std::string uuid_; + std::string flow_uuid_; // Offset to the content uint64_t _offset; // Full path to the content std::string _contentFullPath; // Attributes key/values pairs for the flow record std::map<std::string, std::string> _attributes; - // provenance ID - uuid_t _eventId; // UUID string for all parents std::set<std::string> _lineageIdentifiers; // transitUri @@ -396,8 +428,6 @@ class ProvenanceEventRecord : protected org::apache::nifi::minifi::io::Serializa std::string _details; // sourceQueueIdentifier std::string _sourceQueueIdentifier; - // event ID Str - std::string _eventIdStr; // relationship std::string _relationship; // alternateIdentifierUri; @@ -437,6 +467,7 @@ class ProvenanceReporter { // Add event void add(ProvenanceEventRecord *event) { _events.insert(event); + logger_->log_debug("Prove reporter now %d", _events.size()); } // Remove event void remove(ProvenanceEventRecord *event) { @@ -496,10 +527,9 @@ class ProvenanceReporter { private: + std::shared_ptr<logging::Logger> logger_; // Incoming connection Iterator std::set<ProvenanceEventRecord *> _events; - // Logger - std::shared_ptr<logging::Logger> logger_; // provenance repository. std::shared_ptr<core::Repository> repo_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/provenance/ProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h index dd2c5ec..ea78a3c 100644 --- a/libminifi/include/provenance/ProvenanceRepository.h +++ b/libminifi/include/provenance/ProvenanceRepository.h @@ -42,12 +42,11 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_ /*! * Create a new provenance repository */ - ProvenanceRepository(const std::string repo_name = "", std::string directory = PROVENANCE_DIRECTORY, int64_t maxPartitionMillis = - MAX_PROVENANCE_ENTRY_LIFE_TIME, - int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE, uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD) + ProvenanceRepository(const std::string repo_name = "", std::string directory = PROVENANCE_DIRECTORY, int64_t maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = + MAX_PROVENANCE_STORAGE_SIZE, + uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD) : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod), logger_(logging::LoggerFactory<ProvenanceRepository>::getLogger()) { - db_ = NULL; } @@ -62,9 +61,8 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_ return; if (running_) return; - thread_ = std::thread(&ProvenanceRepository::run, shared_from_this()); - thread_.detach(); running_ = true; + thread_ = std::thread(&ProvenanceRepository::run, shared_from_this()); logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); } @@ -98,7 +96,7 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_ return true; } // Put - virtual bool Put(std::string key, uint8_t *buf, int bufLen) { + virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) { if (repo_full_) return false; @@ -122,7 +120,7 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_ return false; } // Get - virtual bool Get(std::string key, std::string &value) { + virtual bool Get(const std::string &key, std::string &value) { leveldb::Status status; status = db_->Get(leveldb::ReadOptions(), key, &value); if (status.ok()) @@ -130,17 +128,53 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_ else return false; } - // Persistent event - void registerEvent(std::shared_ptr<ProvenanceEventRecord> &event) { - event->Serialize(std::static_pointer_cast<core::Repository>(shared_from_this())); - } + // Remove event void removeEvent(ProvenanceEventRecord *event) { Delete(event->getEventId()); } + + virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size) { + leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); + std::string key = it->key().ToString(); + if (store.size() >= max_size) + break; + if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) { + store.push_back(std::dynamic_pointer_cast<core::CoreComponent>(eventRead)); + } + } + delete it; + return true; + } + + virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) { + leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); + size_t requested_batch = max_size; + max_size = 0; + for (it->SeekToFirst(); it->Valid(); it->Next()) { + + if (max_size >= requested_batch) + break; + std::shared_ptr<core::SerializableComponent> eventRead = lambda(); + std::string key = it->key().ToString(); + if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) { + max_size++; + records.push_back(eventRead); + } + + } + delete it; + + if (max_size > 0) { + return true; + } else { + return false; + } + } //! get record void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize) { - std::lock_guard<std::mutex> lock(mutex_); leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); @@ -153,9 +187,29 @@ class ProvenanceRepository : public core::Repository, public std::enable_shared_ } delete it; } + + virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) { + leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); + max_size = 0; + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); + std::string key = it->key().ToString(); + + if (store.at(max_size)->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) { + max_size++; + } + if (store.size() >= max_size) + break; + } + delete it; + if (max_size > 0) { + return true; + } else { + return false; + } + } //! purge record void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records) { - std::lock_guard<std::mutex> lock(mutex_); for (auto record : records) { Delete(record->getEventId()); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/utils/ByteInputCallBack.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ByteInputCallBack.h b/libminifi/include/utils/ByteInputCallBack.h index a2b7838..86aae09 100644 --- a/libminifi/include/utils/ByteInputCallBack.h +++ b/libminifi/include/utils/ByteInputCallBack.h @@ -32,19 +32,27 @@ namespace utils { */ class ByteInputCallBack : public InputStreamCallback { public: - ByteInputCallBack() { + ByteInputCallBack() + : ptr(nullptr) { } virtual ~ByteInputCallBack() { } - virtual void process(std::ifstream *stream) { + int64_t process(std::shared_ptr<io::BaseStream> stream) { - std::vector<char> nv = std::vector<char>(std::istreambuf_iterator<char>(*stream), std::istreambuf_iterator<char>()); - vec = std::move(nv); + stream->seek(0); - ptr = &vec[0]; + if (stream->getSize() > 0) { + vec.resize(stream->getSize()); + + stream->readData(vec, stream->getSize()); + } + + ptr = (char*) &vec[0]; + + return vec.size(); } @@ -58,7 +66,7 @@ class ByteInputCallBack : public InputStreamCallback { private: char *ptr; - std::vector<char> vec; + std::vector<uint8_t> vec; }; } /* namespace utils */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/ConfigurationListener.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ConfigurationListener.cpp b/libminifi/src/ConfigurationListener.cpp index aaf50ce..858e455 100644 --- a/libminifi/src/ConfigurationListener.cpp +++ b/libminifi/src/ConfigurationListener.cpp @@ -35,14 +35,10 @@ void ConfigurationListener::start() { pull_interval_ = 60 * 1000; std::string value; // grab the value for configuration - if (configure_->get(Configure::nifi_configuration_listener_pull_interval, - value)) { + if (configure_->get(Configure::nifi_configuration_listener_pull_interval, value)) { core::TimeUnit unit; - if (core::Property::StringToTime(value, pull_interval_, unit) - && core::Property::ConvertTimeUnitToMS(pull_interval_, unit, - pull_interval_)) { - logger_->log_info("Configuration Listener pull interval: [%d] ms", - pull_interval_); + if (core::Property::StringToTime(value, pull_interval_, unit) && core::Property::ConvertTimeUnitToMS(pull_interval_, unit, pull_interval_)) { + logger_->log_info("Configuration Listener pull interval: [%d] ms", pull_interval_); } } @@ -62,7 +58,7 @@ void ConfigurationListener::stop() { } void ConfigurationListener::run() { - std::unique_lock<std::mutex> lk(mutex_); + std::unique_lock < std::mutex > lk(mutex_); std::condition_variable cv; int64_t interval = 0; while (!cv.wait_for(lk, std::chrono::milliseconds(100), [this] {return (running_ == false);})) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index 8bbc5fc..acad1fd 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -31,7 +31,8 @@ const char *Configure::nifi_graceful_shutdown_seconds = "nifi.flowcontroller.gra const char *Configure::nifi_log_level = "nifi.log.level"; const char *Configure::nifi_server_name = "nifi.server.name"; const char *Configure::nifi_configuration_class_name = "nifi.flow.configuration.class.name"; -const char *Configure::nifi_flow_repository_class_name = "nifi.flow.repository.class.name"; +const char *Configure::nifi_flow_repository_class_name = "nifi.flowfile.repository.class.name"; +const char *Configure::nifi_content_repository_class_name = "nifi.content.repository.class.name"; const char *Configure::nifi_volatile_repository_options = "nifi.volatile.repository.options."; const char *Configure::nifi_provenance_repository_class_name = "nifi.provenance.repository.class.name"; const char *Configure::nifi_server_port = "nifi.server.port"; @@ -43,39 +44,22 @@ const char *Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfil const char *Configure::nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time"; const char *Configure::nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default"; const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure"; -const char *Configure::nifi_security_need_ClientAuth = - "nifi.security.need.ClientAuth"; -const char *Configure::nifi_security_client_certificate = - "nifi.security.client.certificate"; -const char *Configure::nifi_security_client_private_key = - "nifi.security.client.private.key"; -const char *Configure::nifi_security_client_pass_phrase = - "nifi.security.client.pass.phrase"; -const char *Configure::nifi_security_client_ca_certificate = - "nifi.security.client.ca.certificate"; -const char *Configure::nifi_configuration_listener_pull_interval = - "nifi.configuration.listener.pull.interval"; -const char *Configure::nifi_configuration_listener_http_url = - "nifi.configuration.listener.http.url"; -const char *Configure::nifi_configuration_listener_rest_url = - "nifi.configuration.listener.rest.url"; -const char *Configure::nifi_configuration_listener_type = - "nifi.configuration.listener.type"; -const char *Configure::nifi_https_need_ClientAuth = - "nifi.https.need.ClientAuth"; -const char *Configure::nifi_https_client_certificate = - "nifi.https.client.certificate"; -const char *Configure::nifi_https_client_private_key = - "nifi.https.client.private.key"; -const char *Configure::nifi_https_client_pass_phrase = - "nifi.https.client.pass.phrase"; -const char *Configure::nifi_https_client_ca_certificate = - "nifi.https.client.ca.certificate"; -const char *Configure::nifi_rest_api_user_name = - "nifi.rest.api.user.name"; -const char *Configure::nifi_rest_api_password = - "nifi.rest.api.password"; - +const char *Configure::nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth"; +const char *Configure::nifi_security_client_certificate = "nifi.security.client.certificate"; +const char *Configure::nifi_security_client_private_key = "nifi.security.client.private.key"; +const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase"; +const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate"; +const char *Configure::nifi_configuration_listener_pull_interval = "nifi.configuration.listener.pull.interval"; +const char *Configure::nifi_configuration_listener_http_url = "nifi.configuration.listener.http.url"; +const char *Configure::nifi_configuration_listener_rest_url = "nifi.configuration.listener.rest.url"; +const char *Configure::nifi_configuration_listener_type = "nifi.configuration.listener.type"; +const char *Configure::nifi_https_need_ClientAuth = "nifi.https.need.ClientAuth"; +const char *Configure::nifi_https_client_certificate = "nifi.https.client.certificate"; +const char *Configure::nifi_https_client_private_key = "nifi.https.client.private.key"; +const char *Configure::nifi_https_client_pass_phrase = "nifi.https.client.pass.phrase"; +const char *Configure::nifi_https_client_ca_certificate = "nifi.https.client.ca.certificate"; +const char *Configure::nifi_rest_api_user_name = "nifi.rest.api.user.name"; +const char *Configure::nifi_rest_api_password = "nifi.rest.api.password"; } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/Connection.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp index 0901a30..1d937b4 100644 --- a/libminifi/src/Connection.cpp +++ b/libminifi/src/Connection.cpp @@ -39,9 +39,11 @@ namespace apache { namespace nifi { namespace minifi { -Connection::Connection(std::shared_ptr<core::Repository> flow_repository, std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t destUUID) +Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, uuid_t uuid, uuid_t srcUUID, + uuid_t destUUID) : core::Connectable(name, uuid), flow_repository_(flow_repository), + content_repo_(content_repo), logger_(logging::LoggerFactory<Connection>::getLogger()) { if (srcUUID) @@ -89,12 +91,12 @@ void Connection::put(std::shared_ptr<core::FlowFile> flow) { queued_data_size_ += flow->getSize(); - logger_->log_debug("Enqueue flow file UUID %s to connection %s", flow->getUUIDStr().c_str(), name_.c_str()); + logger_->log_debug("Enqueue flow file UUID %s to connection %s %d", flow->getUUIDStr(), name_, queue_.size()); } if (!flow->isStored()) { // Save to the flowfile repo - FlowFileRecord event(flow_repository_, flow, this->uuidStr_); + FlowFileRecord event(flow_repository_, content_repo_, flow, this->uuidStr_); if (event.Serialize()) { flow->setStoredToRepository(true); } @@ -102,6 +104,7 @@ void Connection::put(std::shared_ptr<core::FlowFile> flow) { // Notify receiving processor that work may be available if (dest_connectable_) { + logger_->log_debug("Notifying %s", dest_connectable_->getName()); dest_connectable_->notifyWork(); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/FlowControlProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp index dbe27e8..74a1573 100644 --- a/libminifi/src/FlowControlProtocol.cpp +++ b/libminifi/src/FlowControlProtocol.cpp @@ -63,15 +63,14 @@ int FlowControlProtocol::connectServer(const char *host, uint16_t port) { close(sock); return 0; } - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) { + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) { logger_->log_error("setsockopt() SO_REUSEADDR failed"); close(sock); return 0; } } - int sndsize = 256*1024; + int sndsize = 256 * 1024; if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&sndsize), sizeof(sndsize)) < 0) { logger_->log_error("setsockopt() SO_SNDBUF failed"); close(sock); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 2c84811..6358ed0 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -39,8 +39,8 @@ #include "utils/StringUtils.h" #include "core/Core.h" #include "core/controller/ControllerServiceProvider.h" -#include "core/repository/FlowFileRepository.h" #include "core/logging/LoggerConfiguration.h" +#include "core/repository/FlowFileRepository.h" namespace org { namespace apache { @@ -52,7 +52,7 @@ std::shared_ptr<utils::IdGenerator> FlowController::id_generator_ = utils::IdGen #define DEFAULT_CONFIG_NAME "conf/flow.yml" FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, - std::unique_ptr<core::FlowConfiguration> flow_configuration, const std::string name, bool headless_mode) + std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo, const std::string name, bool headless_mode) : core::controller::ControllerServiceProvider(core::getClassName<FlowController>()), root_(nullptr), max_timer_driven_threads_(0), @@ -68,6 +68,7 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo controller_service_provider_(nullptr), flow_configuration_(std::move(flow_configuration)), configuration_(configure), + content_repo_(content_repo), logger_(logging::LoggerFactory<FlowController>::getLogger()) { if (provenance_repo == nullptr) throw std::runtime_error("Provenance Repo should not be null"); @@ -159,8 +160,7 @@ bool FlowController::applyConfiguration(std::string &configurePayload) { std::unique_ptr<core::ProcessGroup> newRoot; try { newRoot = std::move(flow_configuration_->getRootFromPayload(configurePayload)); - } - catch (const YAML::Exception& e) { + } catch (const YAML::Exception& e) { logger_->log_error("Invalid configuration payload"); return false; } @@ -168,10 +168,9 @@ bool FlowController::applyConfiguration(std::string &configurePayload) { if (newRoot == nullptr) return false; - logger_->log_info("Starting to reload Flow Controller with flow control name %s, version %d", - newRoot->getName().c_str(), newRoot->getVersion()); + logger_->log_info("Starting to reload Flow Controller with flow control name %s, version %d", newRoot->getName().c_str(), newRoot->getVersion()); - std::lock_guard<std::recursive_mutex> flow_lock(mutex_); + std::lock_guard < std::recursive_mutex > flow_lock(mutex_); stop(true); waitUnload(30000); this->root_ = std::move(newRoot); @@ -181,7 +180,7 @@ bool FlowController::applyConfiguration(std::string &configurePayload) { } void FlowController::stop(bool force) { - std::lock_guard<std::recursive_mutex> flow_lock(mutex_); + std::lock_guard < std::recursive_mutex > flow_lock(mutex_); if (running_) { // immediately indicate that we are not running running_ = false; @@ -222,7 +221,7 @@ void FlowController::waitUnload(const uint64_t timeToWaitMs) { } void FlowController::unload() { - std::lock_guard<std::recursive_mutex> flow_lock(mutex_); + std::lock_guard < std::recursive_mutex > flow_lock(mutex_); if (running_) { stop(true); } @@ -237,7 +236,7 @@ void FlowController::unload() { } void FlowController::load() { - std::lock_guard<std::recursive_mutex> flow_lock(mutex_); + std::lock_guard < std::recursive_mutex > flow_lock(mutex_); if (running_) { stop(true); } @@ -246,29 +245,30 @@ void FlowController::load() { // grab the value for configuration if (this->http_configuration_listener_ == nullptr && configuration_->get(Configure::nifi_configuration_listener_type, listenerType)) { if (listenerType == "http") { - this->http_configuration_listener_ = - std::unique_ptr<minifi::HttpConfigurationListener>(new minifi::HttpConfigurationListener(shared_from_this(), configuration_)); + this->http_configuration_listener_ = std::unique_ptr < minifi::HttpConfigurationListener > (new minifi::HttpConfigurationListener(shared_from_this(), configuration_)); } } logger_->log_info("Initializing timers"); if (nullptr == timer_scheduler_) { - timer_scheduler_ = std::make_shared<TimerDrivenSchedulingAgent>(std::static_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this()), provenance_repo_, configuration_); + timer_scheduler_ = std::make_shared < TimerDrivenSchedulingAgent + > (std::static_pointer_cast < core::controller::ControllerServiceProvider > (shared_from_this()), provenance_repo_, flow_file_repo_, content_repo_, configuration_); } if (nullptr == event_scheduler_) { - event_scheduler_ = std::make_shared<EventDrivenSchedulingAgent>(std::static_pointer_cast<core::controller::ControllerServiceProvider>(shared_from_this()), provenance_repo_, configuration_); + event_scheduler_ = std::make_shared < EventDrivenSchedulingAgent + > (std::static_pointer_cast < core::controller::ControllerServiceProvider > (shared_from_this()), provenance_repo_, flow_file_repo_, content_repo_, configuration_); } logger_->log_info("Load Flow Controller from file %s", configuration_filename_.c_str()); - this->root_ = std::shared_ptr<core::ProcessGroup>(flow_configuration_->getRoot(configuration_filename_)); + this->root_ = std::shared_ptr < core::ProcessGroup > (flow_configuration_->getRoot(configuration_filename_)); logger_->log_info("Loaded root processor Group"); controller_service_provider_ = flow_configuration_->getControllerServiceProvider(); - std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setRootGroup(root_); - std::static_pointer_cast<core::controller::StandardControllerServiceProvider>(controller_service_provider_)->setSchedulingAgent( - std::static_pointer_cast<minifi::SchedulingAgent>(event_scheduler_)); + std::static_pointer_cast < core::controller::StandardControllerServiceProvider > (controller_service_provider_)->setRootGroup(root_); + std::static_pointer_cast < core::controller::StandardControllerServiceProvider + > (controller_service_provider_)->setSchedulingAgent(std::static_pointer_cast < minifi::SchedulingAgent > (event_scheduler_)); logger_->log_info("Loaded controller service provider"); // Load Flow File from Repo @@ -279,7 +279,7 @@ void FlowController::load() { } void FlowController::reload(std::string yamlFile) { - std::lock_guard<std::recursive_mutex> flow_lock(mutex_); + std::lock_guard < std::recursive_mutex > flow_lock(mutex_); logger_->log_info("Starting to reload Flow Controller with yaml %s", yamlFile.c_str()); stop(true); unload(); @@ -305,18 +305,18 @@ void FlowController::loadFlowRepo() { this->root_->getConnections(connectionMap); } logger_->log_debug("Number of connections from connectionMap %d", connectionMap.size()); - auto rep = std::dynamic_pointer_cast<core::repository::FlowFileRepository>(flow_file_repo_); + auto rep = std::dynamic_pointer_cast < core::repository::FlowFileRepository > (flow_file_repo_); if (nullptr != rep) { rep->setConnectionMap(connectionMap); } - flow_file_repo_->loadComponent(); + flow_file_repo_->loadComponent(content_repo_); } else { logger_->log_debug("Flow file repository is not set"); } } bool FlowController::start() { - std::lock_guard<std::recursive_mutex> flow_lock(mutex_); + std::lock_guard < std::recursive_mutex > flow_lock(mutex_); if (!initialized_) { logger_->log_error("Can not start Flow Controller because it has not been initialized"); return false; @@ -349,8 +349,7 @@ bool FlowController::start() { * @param id service identifier * @param firstTimeAdded first time this CS was added */ -std::shared_ptr<core::controller::ControllerServiceNode> FlowController::createControllerService(const std::string &type, const std::string &id, -bool firstTimeAdded) { +std::shared_ptr<core::controller::ControllerServiceNode> FlowController::createControllerService(const std::string &type, const std::string &id, bool firstTimeAdded) { return controller_service_provider_->createControllerService(type, id, firstTimeAdded); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/FlowFileRecord.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index 12711a9..efd6fa7 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -40,8 +40,10 @@ namespace minifi { std::shared_ptr<logging::Logger> FlowFileRecord::logger_ = logging::LoggerFactory<FlowFileRecord>::getLogger(); std::atomic<uint64_t> FlowFileRecord::local_flow_seq_number_(0); -FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::map<std::string, std::string> attributes, std::shared_ptr<ResourceClaim> claim) +FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::map<std::string, std::string> attributes, + std::shared_ptr<ResourceClaim> claim) : FlowFile(), + content_repo_(content_repo), flow_repository_(flow_repository) { id_ = local_flow_seq_number_.load(); claim_ = claim; @@ -64,9 +66,11 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository claim_->increaseFlowFileRecordOwnedCount(); } -FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::FlowFile> &event, const std::string &uuidConnection) +FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::shared_ptr<core::FlowFile> &event, + const std::string &uuidConnection) : FlowFile(), snapshot_(""), + content_repo_(content_repo), flow_repository_(flow_repository) { entry_date_ = event->getEntryDate(); lineage_start_date_ = event->getlineageStartDate(); @@ -82,10 +86,11 @@ FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository } } -FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::FlowFile> &event) +FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::shared_ptr<core::FlowFile> &event) : FlowFile(), uuid_connection_(""), snapshot_(""), + content_repo_(content_repo), flow_repository_(flow_repository) { } @@ -101,7 +106,7 @@ FlowFileRecord::~FlowFileRecord() { if (claim_->getFlowFileRecordOwnedCount() <= 0) { logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath().c_str()); if (!this->stored || !flow_repository_->Get(uuid_str_, value)) { - std::remove(claim_->getContentFullPath().c_str()); + content_repo_->remove(claim_); } } } @@ -319,6 +324,9 @@ bool FlowFileRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) { return false; } + if (nullptr == claim_) { + claim_ = std::make_shared<ResourceClaim>(content_full_fath_, content_repo_, true); + } return true; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/HttpConfigurationListener.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/HttpConfigurationListener.cpp b/libminifi/src/HttpConfigurationListener.cpp index 39da67b..c16ca75 100644 --- a/libminifi/src/HttpConfigurationListener.cpp +++ b/libminifi/src/HttpConfigurationListener.cpp @@ -63,17 +63,14 @@ bool HttpConfigurationListener::pullConfiguration(std::string &configuration) { } utils::HTTPRequestResponse content; - curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, - &utils::HTTPRequestResponse::recieve_write); + curl_easy_setopt(http_session, CURLOPT_WRITEFUNCTION, &utils::HTTPRequestResponse::recieve_write); - curl_easy_setopt(http_session, CURLOPT_WRITEDATA, - static_cast<void*>(&content)); + curl_easy_setopt(http_session, CURLOPT_WRITEDATA, static_cast<void*>(&content)); CURLcode res = curl_easy_perform(http_session); if (res == CURLE_OK) { - logger_->log_debug("HttpConfigurationListener -- curl successful to %s", - fullUrl.c_str()); + logger_->log_debug("HttpConfigurationListener -- curl successful to %s", fullUrl.c_str()); std::string response_body(content.data.begin(), content.data.end()); int64_t http_code = 0; @@ -82,8 +79,7 @@ bool HttpConfigurationListener::pullConfiguration(std::string &configuration) { /* ask for the content-type */ curl_easy_getinfo(http_session, CURLINFO_CONTENT_TYPE, &content_type); - bool isSuccess = ((int32_t) (http_code / 100)) == 2 - && res != CURLE_ABORTED_BY_CALLBACK; + bool isSuccess = ((int32_t) (http_code / 100)) == 2 && res != CURLE_ABORTED_BY_CALLBACK; bool body_empty = IsNullOrEmpty(content.data); if (isSuccess && !body_empty) { @@ -94,9 +90,7 @@ bool HttpConfigurationListener::pullConfiguration(std::string &configuration) { logger_->log_error("Cannot output body to content"); } } else { - logger_->log_error( - "HttpConfigurationListener -- curl_easy_perform() failed %s\n", - curl_easy_strerror(res)); + logger_->log_error("HttpConfigurationListener -- curl_easy_perform() failed %s\n", curl_easy_strerror(res)); } curl_easy_cleanup(http_session); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/Properties.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp index abebfbb..076cefc 100644 --- a/libminifi/src/Properties.cpp +++ b/libminifi/src/Properties.cpp @@ -34,7 +34,7 @@ Properties::Properties() // Get the config value bool Properties::get(std::string key, std::string &value) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); auto it = properties_.find(key); if (it != properties_.end()) { @@ -46,7 +46,7 @@ bool Properties::get(std::string key, std::string &value) { } int Properties::getInt(const std::string &key, int default_value) { - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard < std::mutex > lock(mutex_); auto it = properties_.find(key); if (it != properties_.end()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index d1862cd..3c88e8f 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -18,7 +18,7 @@ * limitations under the License. */ -#include "../include/RemoteProcessorGroupPort.h" +#include "RemoteProcessorGroupPort.h" #include <curl/curl.h> #include <curl/curlbuild.h> @@ -30,18 +30,20 @@ #include <deque> #include <iostream> #include <set> + #include <string> #include <type_traits> #include <utility> #include "json/json.h" #include "json/writer.h" -#include "../include/core/logging/Logger.h" -#include "../include/core/ProcessContext.h" -#include "../include/core/ProcessorNode.h" -#include "../include/core/Property.h" -#include "../include/core/Relationship.h" -#include "../include/Site2SitePeer.h" +#include "Exception.h" +#include "core/logging/Logger.h" +#include "core/ProcessContext.h" +#include "core/ProcessorNode.h" +#include "core/Property.h" +#include "core/Relationship.h" +#include "Site2SitePeer.h" namespace org { namespace apache { @@ -54,8 +56,7 @@ core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", ""); core::Property RemoteProcessorGroupPort::portUUID("Port UUID", "Specifies remote NiFi Port UUID.", ""); core::Relationship RemoteProcessorGroupPort::relation; -std::unique_ptr<Site2SiteClientProtocol> RemoteProcessorGroupPort::getNextProtocol( -bool create = true) { +std::unique_ptr<Site2SiteClientProtocol> RemoteProcessorGroupPort::getNextProtocol(bool create = true) { std::unique_ptr<Site2SiteClientProtocol> nextProtocol = nullptr; if (!available_protocols_.try_dequeue(nextProtocol)) { if (create) { @@ -170,31 +171,41 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::Pr uuid_parse(value.c_str(), protocol_uuid_); } - std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol(); + std::unique_ptr<Site2SiteClientProtocol> protocol_ = nullptr; + try { + protocol_ = getNextProtocol(); - if (!protocol_) { - context->yield(); - return; - } + if (!protocol_) { + context->yield(); + return; + } + if (!protocol_->bootstrap()) { + // bootstrap the client protocol if needeed + context->yield(); + std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor > (context->getProcessorNode().getProcessor()); + logger_->log_error("Site2Site bootstrap failed yield period %d peer ", processor->getYieldPeriodMsec()); + + return; + } + + if (direction_ == RECEIVE) { + protocol_->receiveFlowFiles(context, session); + } else { + protocol_->transferFlowFiles(context, session); + } - if (!protocol_->bootstrap()) { - // bootstrap the client protocol if needeed - context->yield(); - std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(context->getProcessorNode().getProcessor()); - logger_->log_error("Site2Site bootstrap failed yield period %d peer ", processor->getYieldPeriodMsec()); returnProtocol(std::move(protocol_)); return; + } catch (const minifi::Exception &ex2) { + context->yield(); + session->rollback(); + } catch (...) { + context->yield(); + session->rollback(); } - if (direction_ == RECEIVE) { - protocol_->receiveFlowFiles(context, session); - } else { - protocol_->transferFlowFiles(context, session); - } - - returnProtocol(std::move(protocol_)); - return; + throw std::exception(); } void RemoteProcessorGroupPort::refreshRemoteSite2SiteInfo() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/src/ResourceClaim.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp index 1a9f2fe..e7d4557 100644 --- a/libminifi/src/ResourceClaim.cpp +++ b/libminifi/src/ResourceClaim.cpp @@ -17,14 +17,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "ResourceClaim.h" #include <uuid/uuid.h> - #include <map> #include <queue> #include <string> #include <vector> - -#include "ResourceClaim.h" +#include <memory> +#include "core/StreamManager.h" +#include "utils/Id.h" #include "core/logging/LoggerConfiguration.h" namespace org { @@ -36,14 +37,22 @@ utils::NonRepeatingStringGenerator ResourceClaim::non_repeating_string_generator char *ResourceClaim::default_directory_path = const_cast<char*>(DEFAULT_CONTENT_DIRECTORY); -ResourceClaim::ResourceClaim(const std::string contentDirectory) +ResourceClaim::ResourceClaim(std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, const std::string contentDirectory) : _flowFileRecordOwnedCount(0), + claim_manager_(claim_manager), + deleted_(false), logger_(logging::LoggerFactory<ResourceClaim>::getLogger()) { // Create the full content path for the content _contentFullPath = contentDirectory + "/" + non_repeating_string_generator_.generate(); logger_->log_debug("Resource Claim created %s", _contentFullPath); } +ResourceClaim::ResourceClaim(const std::string path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, bool deleted) + : claim_manager_(claim_manager), + deleted_(deleted) { + _contentFullPath = path; +} + } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */
