[ 
https://issues.apache.org/jira/browse/MINIFI-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16056643#comment-16056643
 ] 

ASF GitHub Bot commented on MINIFI-249:
---------------------------------------

Github user benqiu2016 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/110#discussion_r123119696
  
    --- Diff: libminifi/include/core/repository/AtomicRepoEntries.h ---
    @@ -0,0 +1,501 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ref_count_hip.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_
    +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_
    +
    +#include  <cstddef>
    +#include <cstring>
    +#include <iostream>
    +#include <chrono>
    +#include <functional>
    +#include <atomic>
    +#include <vector>
    +#include <map>
    +#include <iterator>
    +
    +static uint16_t accounting_size = sizeof(std::vector<uint8_t>) + 
sizeof(std::string) + sizeof(size_t);
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace core {
    +namespace repository {
    +
    +/**
    + * Purpose: Repo value represents an item that will support a move 
operation within an AtomicEntry
    + *
    + * Justification: Since AtomicEntry is a static entry that does not move 
or change, the underlying
    + * RepoValue can be changed to support atomic operations.
    + */
    +template<typename T>
    +class RepoValue {
    + public:
    +
    +  explicit RepoValue() {
    +  }
    +
    +  /**
    +   * Constructor that populates the item allowing for a custom key 
comparator.
    +   * @param key key for this repo value.
    +   * @param ptr buffer
    +   * @param size size buffer
    +   * @param comparator custom comparator.
    +   */
    +  explicit RepoValue(T key, const uint8_t *ptr, size_t size, 
std::function<bool(T, T)> comparator = nullptr)
    +      : key_(key),
    +        comparator_(comparator) {
    +    if (nullptr == ptr) {
    +      size = 0;
    +    }
    +    buffer_.resize(size);
    +    if (size > 0) {
    +      std::memcpy(buffer_.data(), ptr, size);
    +    }
    +  }
    +
    +  /**
    +   * RepoValue that moves the other object into this.
    +   */
    +  explicit RepoValue(RepoValue<T> &&other)
    +noexcept      : key_(std::move(other.key_)),
    +      buffer_(std::move(other.buffer_)),
    +      comparator_(std::move(other.comparator_)) {
    +      }
    +
    +      ~RepoValue()
    +      {
    +      }
    +
    +      T &getKey() {
    +        return key_;
    +      }
    +
    +      /**
    +       * Sets the key, relacing the custom comparator if needed.
    +       */
    +      void setKey(const T key, std::function<bool(T,T)> comparator = 
nullptr) {
    +        key_ = key;
    +        comparator_ = comparator;
    +      }
    +
    +      /**
    +       * Determines if the key is the same using the custom comparator
    +       * @param other object to compare against
    +       * @return result of the comparison
    +       */
    +      inline bool isEqual(RepoValue<T> *other)
    +      {
    +        return comparator_ == nullptr ? key_ == other->key_ : 
comparator_(key_,other->key_);
    +      }
    +
    +      /**
    +       * Determines if the key is the same using the custom comparator
    +       * @param other object to compare against
    +       * @return result of the comparison
    +       */
    +      inline bool isKey(T other)
    +      {
    +        return comparator_ == nullptr ? key_ == other : 
comparator_(key_,other);
    +      }
    +
    +      /**
    +       * Clears the buffer.
    +       */
    +      void clearBuffer() {
    +        buffer_.resize(0);
    +        buffer_.clear();
    +      }
    +
    +      /**
    +       * Return the size of the memory within the key
    +       * buffer, the size of timestamp, and the general
    +       * system word size
    +       */
    +      uint64_t size() {
    +        return buffer_.size();
    +      }
    +
    +      size_t getBufferSize() {
    +        return buffer_.size();
    +      }
    +
    +      const uint8_t *getBuffer()
    +      {
    +        return buffer_.data();
    +      }
    +
    +      /**
    +       * Places the contents of buffer into str
    +       * @param strnig into which we are placing the memory contained in 
buffer.
    +       */
    +      void emplace(std::string &str) {
    +        str.insert(0, reinterpret_cast<const char*>(buffer_.data()), 
buffer_.size());
    +      }
    +
    +      /**
    +       * Appends ptr to the end of buffer.
    +       * @param ptr pointer containing data to add to buffer_
    +       */
    +      void append(uint8_t *ptr, size_t size)
    +      {
    +        buffer_.insert(buffer_.end(), ptr, ptr + size);
    +      }
    +
    +      RepoValue<T> &operator=(RepoValue<T> &&other) noexcept {
    +        key_ = std::move(other.key_);
    +        buffer_ = std::move(other.buffer_);
    +        return *this;
    +      }
    +
    +    private:
    +      T key_;
    +      std::function<bool(T,T)> comparator_;
    +      std::vector<uint8_t> buffer_;
    +    };
    +
    +    /**
    +     * Purpose: Atomic Entry allows us to create a statically
    +     * sized ring buffer, with the ability to create
    +     *
    +     **/
    +template<typename T>
    +class AtomicEntry {
    +
    + public:
    +  /**
    +   * Constructor that accepts a max size and an atomic counter for the 
total
    +   * size allowd by this and other atomic entries.
    +   */
    +  explicit AtomicEntry(std::atomic<size_t> *total_size, size_t *max_size)
    +      : write_pending_(false),
    +        has_value_(false),
    +        total_size_(total_size),
    +        max_size_(max_size),
    +        ref_count_(0),
    +        free_required(false) {
    +
    +  }
    +
    +  /**
    +   * Sets the repo value, moving the old value into old_value.
    +   * @param new_value new value to move into value_.
    +   * @param old_value the previous value of value_ will be moved into 
old_value
    +   * @param prev_size size reclaimed.
    +   * @return result of this set. If true old_value will be populated.
    +   */
    +  bool setRepoValue(RepoValue<T> &new_value, RepoValue<T> &old_value, 
size_t &prev_size) {
    +    // delete the underlying pointer
    +    bool lock = false;
    +    if (!write_pending_.compare_exchange_weak(lock, true))
    +    {
    +      return false;
    +    }
    +    if (has_value_) {
    +      prev_size = value_.size();
    +    }
    +    old_value = std::move(value_);
    +    value_ = std::move(new_value);
    +    has_value_ = true;
    +    try_unlock();
    +    return true;
    +  }
    +  
    +  
    +  AtomicEntry<T> *takeOwnership()
    +  {
    +      bool lock = false;
    +      if (!write_pending_.compare_exchange_weak(lock, true) )
    +   return nullptr;
    +      
    +      ref_count_++;
    +      
    +      try_unlock();
    +      
    +      return this;
    +  }
    +  /**
    +   * A test and set operation, which is used to allow a function to test
    +   * if an item can be released and a function used for reclaiming memory 
associated
    +   * with said object.
    +   * A custom comparator can be provided to augment the key being added 
into value_
    +   */
    +  bool testAndSetKey(const T str, std::function<bool(T)> releaseTest = 
nullptr, std::function<void(T)> reclaimer = nullptr, std::function<bool(T, T)> 
comparator = nullptr) {
    +    bool lock = false;
    +
    +    if (!write_pending_.compare_exchange_weak(lock, true) )
    +      return false;
    +
    +    if (has_value_) {
    +      // we either don't have a release test or we cannot release this
    +      // entity
    +      if (releaseTest != nullptr && reclaimer != nullptr && 
releaseTest(value_.getKey()))
    +                                                                        {
    +        reclaimer(value_.getKey());
    +      }
    +      else if (free_required && ref_count_ == 0)
    +      {
    +   size_t bufferSize = value_.getBufferSize();
    +   value_.clearBuffer();
    +   has_value_ = false;
    +   if (total_size_ != nullptr) {
    +     *total_size_ -= bufferSize;
    +   }
    +   free_required = false;  
    +      }
    +      else {
    +        try_unlock();
    +        return false;
    +      }
    +
    +    }
    +    ref_count_=1;
    +    value_.setKey(str, comparator);
    +    has_value_ = true;
    +    try_unlock();
    +    return true;
    +  }
    +
    +  /**
    +   * Moved the value into the argument
    +   * @param value the previous value will be moved into this parameter
    +   * @return  success of get operation based on whether or not this atomic 
entry has a value.
    +   */
    +  bool getValue(RepoValue<T> &value) {
    +    try_lock();
    +    if (!has_value_) {
    +      try_unlock();
    +      return false;
    +    }
    +    value = std::move(value_);
    +    has_value_ = false;
    +    try_unlock();
    +    return true;
    +  }
    +
    +  /**
    +   * Moved the value into the argument
    +   * @param value the previous value will be moved into this parameter
    +   * @return  success of get operation based on whether or not this atomic 
entry has a value.
    +   */
    +  bool getValue(const T &key, RepoValue<T> &value) {
    +    try_lock();
    +    if (!has_value_) {
    +      try_unlock();
    +      return false;
    +    }
    +    if (!value_.isKey(key)) {
    +      try_unlock();
    +      return false;
    +    }
    +    value = std::move(value_);
    +    has_value_ = false;
    +    try_unlock();
    +    return true;
    +  }
    +  
    +  void decrementOwnership(){
    +    try_lock();
    +    if (!has_value_) {
    +      try_unlock();
    +      return;
    +    }
    +    if (ref_count_ > 0){
    +      ref_count_--;
    +    }
    +    if (ref_count_ == 0 && free_required)
    +    {
    +      size_t bufferSize = value_.getBufferSize();
    +      value_.clearBuffer();
    +      has_value_ = false;
    +      if (total_size_ != nullptr) {
    +   *total_size_ -= bufferSize;
    +      }
    +      free_required = false;
    +    }
    +    else{
    +    }
    +    try_unlock();
    +  }
    +
    +  /**
    +   * Moved the value into the argument
    +   * @param value the previous value will be moved into this parameter
    +   * @return  success of get operation based on whether or not this atomic 
entry has a value.
    +   */
    +  bool getValue(const T &key, RepoValue<T> **value) {
    +    try_lock();
    +    if (!has_value_) {
    +      try_unlock();
    +      return false;
    +    }
    +    if (!value_.isKey(key)) {
    +      try_unlock();
    +      return false;
    +    }
    +    ref_count_++;
    +    *value = &value_;
    +    try_unlock();
    +    return true;
    +  }
    +
    +  /**
    +   * Operation that will be used to test and free if a release is required 
without
    +   * setting a new object.
    +   * @param releaseTest function that will be used to free the RepoValue 
key from
    +   * this atomic entry.
    +   * @param freedValue informs the caller if an item was freed.
    +   */
    +  T testAndFree(std::function<bool(T)> releaseTest, bool &freedValue) {
    +    try_lock();
    +    T ref;
    +    if (!has_value_) {
    +      try_unlock();
    +      return ref;
    +    }
    +
    +    if (releaseTest(value_.getKey())) {
    +      size_t bufferSize = value_.getBufferSize();
    +      value_.clearBuffer();
    +      ref = value_.getKey();
    +      has_value_ = false;
    +      if (total_size_ != nullptr) {
    +        *total_size_ -= bufferSize;
    +      }
    +
    +    }
    +    try_unlock();
    +    return ref;
    +  }
    +  
    +  size_t getLength()
    +  {
    +    size_t size = 0;
    +     try_lock();
    +     size = value_.getBufferSize();
    +     try_unlock();
    +     return size;
    +     
    +  }
    +
    +  /**
    +   * sets has_value to false; however, does not call
    +   * any external entity to further free RepoValue
    +   */
    +  bool freeValue(const T &key) {
    +    try_lock();
    +    if (!has_value_) {
    +      try_unlock();
    +      return false;
    +    }
    +    if (!value_.isKey(key)) {
    +      try_unlock();
    +      return false;
    +    }
    +    if (ref_count_ > 0)
    +    {
    +       free_required = true;
    +       try_unlock();
    +       return true;
    +    }
    +    size_t bufferSize = value_.getBufferSize();
    +    value_.clearBuffer();
    +    has_value_ = false;
    +    if (total_size_ != nullptr) {
    +      *total_size_ -= bufferSize;
    +    }
    +    free_required = false;
    +    try_unlock();
    +    return true;
    +  }
    +
    +  /**
    +   * Appends buffer onto this atomic entry if key matches
    +   * the current RepoValue's key.
    +   */
    +  bool insert(const T key, uint8_t *buffer, size_t size) {
    +    try_lock();
    +
    +    if (!has_value_) {
    +      try_unlock();
    +      return false;
    +    }
    +
    +    if (!value_.isKey(key)) {
    +      try_unlock();
    +      return false;
    +    }
    +
    +    if ((total_size_ != nullptr && max_size_ != nullptr) && (*total_size_ 
+ size > *max_size_)) {
    --- End diff --
    
    is the max size per entry or global volatile repo?


> Volatile Provenance Repository
> ------------------------------
>
>                 Key: MINIFI-249
>                 URL: https://issues.apache.org/jira/browse/MINIFI-249
>             Project: Apache NiFi MiNiFi
>          Issue Type: New Feature
>          Components: C++, Core Framework
>            Reporter: Aldrin Piri
>            Assignee: marco polo
>
> It would be helpful to have a volatile implementation for handling 
> provenance.  While the data guarantees of capturing/storing information work 
> in some scenarios, in others, folks may wish to just hold data while the 
> instance is running.  This is predicated on an extensible interface that 
> supports this configurability.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to