[
https://issues.apache.org/jira/browse/MINIFI-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16056650#comment-16056650
]
ASF GitHub Bot commented on MINIFI-249:
---------------------------------------
Github user phrocker commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/110#discussion_r123120308
--- Diff: libminifi/include/core/repository/AtomicRepoEntries.h ---
@@ -0,0 +1,501 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ref_count_hip.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_
+#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_
+
+#include <cstddef>
+#include <cstring>
+#include <iostream>
+#include <chrono>
+#include <functional>
+#include <atomic>
+#include <vector>
+#include <map>
+#include <iterator>
+
+static uint16_t accounting_size = sizeof(std::vector<uint8_t>) +
sizeof(std::string) + sizeof(size_t);
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+/**
+ * Purpose: Repo value represents an item that will support a move
operation within an AtomicEntry
+ *
+ * Justification: Since AtomicEntry is a static entry that does not move
or change, the underlying
+ * RepoValue can be changed to support atomic operations.
+ */
+template<typename T>
+class RepoValue {
+ public:
+
+ explicit RepoValue() {
+ }
+
+ /**
+ * Constructor that populates the item allowing for a custom key
comparator.
+ * @param key key for this repo value.
+ * @param ptr buffer
+ * @param size size buffer
+ * @param comparator custom comparator.
+ */
+ explicit RepoValue(T key, const uint8_t *ptr, size_t size,
std::function<bool(T, T)> comparator = nullptr)
+ : key_(key),
+ comparator_(comparator) {
+ if (nullptr == ptr) {
+ size = 0;
+ }
+ buffer_.resize(size);
+ if (size > 0) {
+ std::memcpy(buffer_.data(), ptr, size);
+ }
+ }
+
+ /**
+ * RepoValue that moves the other object into this.
+ */
+ explicit RepoValue(RepoValue<T> &&other)
+noexcept : key_(std::move(other.key_)),
+ buffer_(std::move(other.buffer_)),
+ comparator_(std::move(other.comparator_)) {
+ }
+
+ ~RepoValue()
+ {
+ }
+
+ T &getKey() {
+ return key_;
+ }
+
+ /**
+ * Sets the key, relacing the custom comparator if needed.
+ */
+ void setKey(const T key, std::function<bool(T,T)> comparator =
nullptr) {
+ key_ = key;
+ comparator_ = comparator;
+ }
+
+ /**
+ * Determines if the key is the same using the custom comparator
+ * @param other object to compare against
+ * @return result of the comparison
+ */
+ inline bool isEqual(RepoValue<T> *other)
+ {
+ return comparator_ == nullptr ? key_ == other->key_ :
comparator_(key_,other->key_);
+ }
+
+ /**
+ * Determines if the key is the same using the custom comparator
+ * @param other object to compare against
+ * @return result of the comparison
+ */
+ inline bool isKey(T other)
+ {
+ return comparator_ == nullptr ? key_ == other :
comparator_(key_,other);
+ }
+
+ /**
+ * Clears the buffer.
+ */
+ void clearBuffer() {
+ buffer_.resize(0);
+ buffer_.clear();
+ }
+
+ /**
+ * Return the size of the memory within the key
+ * buffer, the size of timestamp, and the general
+ * system word size
+ */
+ uint64_t size() {
+ return buffer_.size();
+ }
+
+ size_t getBufferSize() {
+ return buffer_.size();
+ }
+
+ const uint8_t *getBuffer()
+ {
+ return buffer_.data();
+ }
+
+ /**
+ * Places the contents of buffer into str
+ * @param strnig into which we are placing the memory contained in
buffer.
+ */
+ void emplace(std::string &str) {
+ str.insert(0, reinterpret_cast<const char*>(buffer_.data()),
buffer_.size());
+ }
+
+ /**
+ * Appends ptr to the end of buffer.
+ * @param ptr pointer containing data to add to buffer_
+ */
+ void append(uint8_t *ptr, size_t size)
+ {
+ buffer_.insert(buffer_.end(), ptr, ptr + size);
+ }
+
+ RepoValue<T> &operator=(RepoValue<T> &&other) noexcept {
+ key_ = std::move(other.key_);
+ buffer_ = std::move(other.buffer_);
+ return *this;
+ }
+
+ private:
+ T key_;
+ std::function<bool(T,T)> comparator_;
+ std::vector<uint8_t> buffer_;
+ };
+
+ /**
+ * Purpose: Atomic Entry allows us to create a statically
+ * sized ring buffer, with the ability to create
+ *
+ **/
+template<typename T>
+class AtomicEntry {
+
+ public:
+ /**
+ * Constructor that accepts a max size and an atomic counter for the
total
+ * size allowd by this and other atomic entries.
+ */
+ explicit AtomicEntry(std::atomic<size_t> *total_size, size_t *max_size)
+ : write_pending_(false),
+ has_value_(false),
+ total_size_(total_size),
+ max_size_(max_size),
+ ref_count_(0),
+ free_required(false) {
+
+ }
+
+ /**
+ * Sets the repo value, moving the old value into old_value.
+ * @param new_value new value to move into value_.
+ * @param old_value the previous value of value_ will be moved into
old_value
+ * @param prev_size size reclaimed.
+ * @return result of this set. If true old_value will be populated.
+ */
+ bool setRepoValue(RepoValue<T> &new_value, RepoValue<T> &old_value,
size_t &prev_size) {
+ // delete the underlying pointer
+ bool lock = false;
+ if (!write_pending_.compare_exchange_weak(lock, true))
+ {
+ return false;
+ }
+ if (has_value_) {
+ prev_size = value_.size();
+ }
+ old_value = std::move(value_);
+ value_ = std::move(new_value);
+ has_value_ = true;
+ try_unlock();
+ return true;
+ }
+
+
+ AtomicEntry<T> *takeOwnership()
+ {
+ bool lock = false;
+ if (!write_pending_.compare_exchange_weak(lock, true) )
+ return nullptr;
+
+ ref_count_++;
+
+ try_unlock();
+
+ return this;
+ }
+ /**
+ * A test and set operation, which is used to allow a function to test
+ * if an item can be released and a function used for reclaiming memory
associated
+ * with said object.
+ * A custom comparator can be provided to augment the key being added
into value_
+ */
+ bool testAndSetKey(const T str, std::function<bool(T)> releaseTest =
nullptr, std::function<void(T)> reclaimer = nullptr, std::function<bool(T, T)>
comparator = nullptr) {
+ bool lock = false;
+
+ if (!write_pending_.compare_exchange_weak(lock, true) )
+ return false;
+
+ if (has_value_) {
+ // we either don't have a release test or we cannot release this
+ // entity
+ if (releaseTest != nullptr && reclaimer != nullptr &&
releaseTest(value_.getKey()))
+ {
+ reclaimer(value_.getKey());
+ }
+ else if (free_required && ref_count_ == 0)
+ {
+ size_t bufferSize = value_.getBufferSize();
+ value_.clearBuffer();
+ has_value_ = false;
+ if (total_size_ != nullptr) {
+ *total_size_ -= bufferSize;
+ }
+ free_required = false;
+ }
+ else {
+ try_unlock();
+ return false;
+ }
+
+ }
+ ref_count_=1;
+ value_.setKey(str, comparator);
+ has_value_ = true;
+ try_unlock();
+ return true;
+ }
+
+ /**
+ * Moved the value into the argument
+ * @param value the previous value will be moved into this parameter
+ * @return success of get operation based on whether or not this atomic
entry has a value.
+ */
+ bool getValue(RepoValue<T> &value) {
+ try_lock();
+ if (!has_value_) {
+ try_unlock();
+ return false;
+ }
+ value = std::move(value_);
+ has_value_ = false;
+ try_unlock();
+ return true;
+ }
+
+ /**
+ * Moved the value into the argument
+ * @param value the previous value will be moved into this parameter
+ * @return success of get operation based on whether or not this atomic
entry has a value.
+ */
+ bool getValue(const T &key, RepoValue<T> &value) {
+ try_lock();
+ if (!has_value_) {
+ try_unlock();
+ return false;
+ }
+ if (!value_.isKey(key)) {
+ try_unlock();
+ return false;
+ }
+ value = std::move(value_);
+ has_value_ = false;
+ try_unlock();
+ return true;
+ }
+
+ void decrementOwnership(){
+ try_lock();
+ if (!has_value_) {
+ try_unlock();
+ return;
+ }
+ if (ref_count_ > 0){
+ ref_count_--;
+ }
+ if (ref_count_ == 0 && free_required)
+ {
+ size_t bufferSize = value_.getBufferSize();
+ value_.clearBuffer();
+ has_value_ = false;
+ if (total_size_ != nullptr) {
+ *total_size_ -= bufferSize;
+ }
+ free_required = false;
+ }
+ else{
+ }
+ try_unlock();
+ }
+
+ /**
+ * Moved the value into the argument
+ * @param value the previous value will be moved into this parameter
+ * @return success of get operation based on whether or not this atomic
entry has a value.
+ */
+ bool getValue(const T &key, RepoValue<T> **value) {
+ try_lock();
+ if (!has_value_) {
+ try_unlock();
+ return false;
+ }
+ if (!value_.isKey(key)) {
+ try_unlock();
+ return false;
+ }
+ ref_count_++;
+ *value = &value_;
+ try_unlock();
+ return true;
+ }
+
+ /**
+ * Operation that will be used to test and free if a release is required
without
+ * setting a new object.
+ * @param releaseTest function that will be used to free the RepoValue
key from
+ * this atomic entry.
+ * @param freedValue informs the caller if an item was freed.
+ */
+ T testAndFree(std::function<bool(T)> releaseTest, bool &freedValue) {
+ try_lock();
+ T ref;
+ if (!has_value_) {
+ try_unlock();
+ return ref;
+ }
+
+ if (releaseTest(value_.getKey())) {
+ size_t bufferSize = value_.getBufferSize();
+ value_.clearBuffer();
+ ref = value_.getKey();
+ has_value_ = false;
+ if (total_size_ != nullptr) {
+ *total_size_ -= bufferSize;
+ }
+
+ }
+ try_unlock();
+ return ref;
+ }
+
+ size_t getLength()
+ {
+ size_t size = 0;
+ try_lock();
+ size = value_.getBufferSize();
+ try_unlock();
+ return size;
+
+ }
+
+ /**
+ * sets has_value to false; however, does not call
+ * any external entity to further free RepoValue
+ */
+ bool freeValue(const T &key) {
+ try_lock();
+ if (!has_value_) {
+ try_unlock();
+ return false;
+ }
+ if (!value_.isKey(key)) {
+ try_unlock();
+ return false;
+ }
+ if (ref_count_ > 0)
+ {
+ free_required = true;
+ try_unlock();
+ return true;
+ }
+ size_t bufferSize = value_.getBufferSize();
+ value_.clearBuffer();
+ has_value_ = false;
+ if (total_size_ != nullptr) {
+ *total_size_ -= bufferSize;
+ }
+ free_required = false;
+ try_unlock();
+ return true;
+ }
+
+ /**
+ * Appends buffer onto this atomic entry if key matches
+ * the current RepoValue's key.
+ */
+ bool insert(const T key, uint8_t *buffer, size_t size) {
+ try_lock();
+
+ if (!has_value_) {
+ try_unlock();
+ return false;
+ }
+
+ if (!value_.isKey(key)) {
+ try_unlock();
+ return false;
+ }
+
+ if ((total_size_ != nullptr && max_size_ != nullptr) && (*total_size_
+ size > *max_size_)) {
--- End diff --
These are pointers to a global max size for that volatile repo? Is it
confusing? I can rename it to make it clearer.
> Volatile Provenance Repository
> ------------------------------
>
> Key: MINIFI-249
> URL: https://issues.apache.org/jira/browse/MINIFI-249
> Project: Apache NiFi MiNiFi
> Issue Type: New Feature
> Components: C++, Core Framework
> Reporter: Aldrin Piri
> Assignee: marco polo
>
> It would be helpful to have a volatile implementation for handling
> provenance. While the data guarantees of capturing/storing information work
> in some scenarios, in others, folks may wish to just hold data while the
> instance is running. This is predicated on an extensible interface that
> supports this configurability.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)