[ https://issues.apache.org/jira/browse/MINIFI-37?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16014406#comment-16014406 ]
ASF GitHub Bot commented on MINIFI-37: -------------------------------------- Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/98#discussion_r117002738 --- Diff: libminifi/include/core/repository/VolatileRepository.h --- @@ -0,0 +1,350 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_ +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_ + +#include "core/Repository.h" +#include <chrono> +#include <vector> +#include <map> +#include "core/Core.h" +#include "Connection.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + + +static uint16_t accounting_size = sizeof(void*) * 2 + sizeof(uint64_t) + + sizeof(size_t); + +class RepoValue { + public: + + explicit RepoValue() + { + } + + explicit RepoValue(std::string key, uint8_t *ptr, size_t size) + : + key_(key) + { + buffer_.resize(size); + std::memcpy(buffer_.data(), ptr, size); + } + + explicit RepoValue(RepoValue &&other) noexcept + : key_(std::move(other.key_)), + buffer_(std::move(other.buffer_)){ + } + + ~RepoValue() + { + } + + + + std::string &getKey() { + return key_; + } + + /** + * Return the size of the memory within the key + * buffer, the size of timestamp, and the general + * system word size + */ + uint64_t size() { + return buffer_.size() + accounting_size; + } + + size_t bufferSize() { + return buffer_.size(); + } + + void emplace(std::string &str) { + str.insert(0, reinterpret_cast<const char*>(buffer_.data()), buffer_.size()); + } + + RepoValue &operator=(RepoValue &&other) noexcept { + key_ = std::move(other.key_); + buffer_ = std::move(other.buffer_); + other.buffer_.clear(); + return *this; + } + + private: + std::string key_; + std::vector<uint8_t> buffer_; +}; + +class AtomicEntry { + + public: + AtomicEntry() + : write_pending_(false), + has_value_(false) { + + } + + bool setRepoValue(RepoValue &new_value, size_t &prev_size) { + // delete the underlying pointer + bool lock = false; + if (!write_pending_.compare_exchange_weak(lock, true) && !lock) + return false; + if (has_value_) + { + prev_size = value_.size(); + } + value_ = std::move(new_value); + has_value_ = true; + try_unlock(); + return true; + } + + bool getValue(RepoValue &value) { + try_lock(); + if (!has_value_) { + try_unlock(); + return false; + } + value = std::move(value_); + has_value_ = false; + try_unlock(); + return true; + } + + bool getValue(const std::string &key, RepoValue &value) { + try_lock(); + if (!has_value_) { + try_unlock(); + return false; + } + if (value_.getKey() != key) { + try_unlock(); + return false; + } + value = std::move(value_); + has_value_ = false; + try_unlock(); + return true; + } + + private: + + inline void try_lock() { + bool lock = false; + while (!write_pending_.compare_exchange_weak(lock, true) && !lock) { + // attempt again + } + } + + inline void try_unlock() { + bool lock = true; + while (!write_pending_.compare_exchange_weak(lock, false) && lock) { + // attempt again + } + } + + std::atomic<bool> write_pending_; + std::atomic<bool> has_value_; + RepoValue value_; +}; + +/** + * Flow File repository + * Design: Extends Repository and implements the run function, using LevelDB as the primary substrate. + */ +class VolatileRepository : public core::Repository, + public std::enable_shared_from_this<VolatileRepository> { + public: + // Constructor + + VolatileRepository(std::string dir = REPOSITORY_DIRECTORY, + int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, + int64_t maxPartitionBytes = + MAX_REPOSITORY_STORAGE_SIZE, + uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) + : Repository(core::getClassName<VolatileRepository>(), "", + maxPartitionMillis, maxPartitionBytes, purgePeriod), + max_size_(maxPartitionBytes * 0.75), + current_index_(0), + max_count_(10000) + + { + + } + + // Destructor + ~VolatileRepository() { + for (auto ent : value_vector_) { + delete ent; + } + } + + // initialize + virtual bool initialize(const std::shared_ptr<Configure> &configure) { +// configure->get("nifi.volatile.repository.max_size") --- End diff -- not currently configurable and would imagine this would be taken care of in the ctor > Native Volatile Content Repository implementation > ------------------------------------------------- > > Key: MINIFI-37 > URL: https://issues.apache.org/jira/browse/MINIFI-37 > Project: Apache NiFi MiNiFi > Issue Type: Task > Components: C++, Core Framework > Reporter: Aldrin Piri > Assignee: marco polo > Priority: Minor > Labels: native > > Given the constrained environments in which MiNiFi could operate, it would be > beneficial to provide a content repository that is strictly in memory for > those environments where disk storage may be limited or non-existent. > This implementation should consider configuration options around its > footprint such as number of entries held and/or sheer capacity. -- This message was sent by Atlassian JIRA (v6.3.15#6346)