[ 
https://issues.apache.org/jira/browse/MINIFI-37?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16014406#comment-16014406
 ] 

ASF GitHub Bot commented on MINIFI-37:
--------------------------------------

Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/98#discussion_r117002738
  
    --- Diff: libminifi/include/core/repository/VolatileRepository.h ---
    @@ -0,0 +1,350 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_
    +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_
    +
    +#include "core/Repository.h"
    +#include <chrono>
    +#include <vector>
    +#include <map>
    +#include "core/Core.h"
    +#include "Connection.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace core {
    +namespace repository {
    +
    +
    +static uint16_t accounting_size = sizeof(void*) * 2 + sizeof(uint64_t)
    +    + sizeof(size_t);
    +
    +class RepoValue {
    + public:
    +
    +  explicit RepoValue()
    +  {
    +  }
    +
    +  explicit RepoValue(std::string key, uint8_t *ptr, size_t size)
    +      :
    +        key_(key)
    +   {
    +     buffer_.resize(size);
    +    std::memcpy(buffer_.data(), ptr, size);
    +  }
    +
    +  explicit RepoValue(RepoValue &&other) noexcept
    +      : key_(std::move(other.key_)),
    +        buffer_(std::move(other.buffer_)){
    +  }
    +  
    +  ~RepoValue()
    +  {
    +  }
    +
    + 
    +
    +  std::string &getKey() {
    +    return key_;
    +  }
    +
    +  /**
    +   * Return the size of the memory within the key
    +   * buffer, the size of timestamp, and the general
    +   * system word size
    +   */
    +  uint64_t size() {
    +    return buffer_.size() + accounting_size;
    +  }
    +
    +  size_t bufferSize() {
    +    return buffer_.size();
    +  }
    +
    +  void emplace(std::string &str) {
    +    str.insert(0, reinterpret_cast<const char*>(buffer_.data()), 
buffer_.size());
    +  }
    +
    +  RepoValue &operator=(RepoValue &&other) noexcept {
    +    key_ = std::move(other.key_);
    +    buffer_ = std::move(other.buffer_);
    +    other.buffer_.clear();
    +    return *this;
    +  }
    +
    + private:
    +  std::string key_;
    +  std::vector<uint8_t> buffer_;
    +};
    +
    +class AtomicEntry {
    +
    + public:
    +  AtomicEntry()
    +      : write_pending_(false),
    +        has_value_(false) {
    +
    +  }
    +
    +  bool setRepoValue(RepoValue &new_value, size_t &prev_size) {
    +    // delete the underlying pointer
    +    bool lock = false;
    +    if (!write_pending_.compare_exchange_weak(lock, true) && !lock)
    +      return false;
    +    if (has_value_)
    +    {
    +      prev_size = value_.size();
    +    }
    +    value_ = std::move(new_value);
    +    has_value_ = true;
    +    try_unlock();
    +    return true;
    +  }
    +
    +  bool getValue(RepoValue &value) {
    +    try_lock();
    +    if (!has_value_) {
    +      try_unlock();
    +      return false;
    +    }
    +    value = std::move(value_);
    +    has_value_ = false;
    +    try_unlock();
    +    return true;
    +  }
    +
    +  bool getValue(const std::string &key, RepoValue &value) {
    +    try_lock();
    +    if (!has_value_) {
    +      try_unlock();
    +      return false;
    +    }
    +    if (value_.getKey() != key) {
    +      try_unlock();
    +      return false;
    +    }
    +    value = std::move(value_);
    +    has_value_ = false;
    +    try_unlock();
    +    return true;
    +  }
    +
    + private:
    +
    +  inline void try_lock() {
    +    bool lock = false;
    +    while (!write_pending_.compare_exchange_weak(lock, true) && !lock) {
    +      // attempt again
    +    }
    +  }
    +
    +  inline void try_unlock() {
    +    bool lock = true;
    +    while (!write_pending_.compare_exchange_weak(lock, false) && lock) {
    +      // attempt again
    +    }
    +  }
    +
    +  std::atomic<bool> write_pending_;
    +  std::atomic<bool> has_value_;
    +  RepoValue value_;
    +};
    +
    +/**
    + * Flow File repository
    + * Design: Extends Repository and implements the run function, using 
LevelDB as the primary substrate.
    + */
    +class VolatileRepository : public core::Repository,
    +    public std::enable_shared_from_this<VolatileRepository> {
    + public:
    +  // Constructor
    +
    +  VolatileRepository(std::string dir = REPOSITORY_DIRECTORY,
    +                     int64_t maxPartitionMillis = 
MAX_REPOSITORY_ENTRY_LIFE_TIME,
    +                     int64_t maxPartitionBytes =
    +                     MAX_REPOSITORY_STORAGE_SIZE,
    +                     uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
    +      : Repository(core::getClassName<VolatileRepository>(), "",
    +                   maxPartitionMillis, maxPartitionBytes, purgePeriod),
    +        max_size_(maxPartitionBytes * 0.75),
    +        current_index_(0),
    +        max_count_(10000)
    +
    +  {
    +      
    +  }
    +
    +  // Destructor
    +  ~VolatileRepository() {
    +    for (auto ent : value_vector_) {
    +      delete ent;
    +    }
    +  }
    +
    +  // initialize
    +  virtual bool initialize(const std::shared_ptr<Configure> &configure) {
    +//    configure->get("nifi.volatile.repository.max_size")
    --- End diff --
    
    not currently configurable and would imagine this would be taken care of in 
the ctor


> Native Volatile Content Repository implementation
> -------------------------------------------------
>
>                 Key: MINIFI-37
>                 URL: https://issues.apache.org/jira/browse/MINIFI-37
>             Project: Apache NiFi MiNiFi
>          Issue Type: Task
>          Components: C++, Core Framework
>            Reporter: Aldrin Piri
>            Assignee: marco polo
>            Priority: Minor
>              Labels: native
>
> Given the constrained environments in which MiNiFi could operate, it would be 
> beneficial to provide a content repository that is strictly in memory for 
> those environments where disk storage may be limited or non-existent. 
> This implementation should consider configuration options around its 
> footprint such as number of entries held and/or sheer capacity.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to