[
https://issues.apache.org/jira/browse/MINIFI-37?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16014406#comment-16014406
]
ASF GitHub Bot commented on MINIFI-37:
--------------------------------------
Github user apiri commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/98#discussion_r117002738
--- Diff: libminifi/include/core/repository/VolatileRepository.h ---
@@ -0,0 +1,350 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_
+#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileRepository_H_
+
+#include "core/Repository.h"
+#include <chrono>
+#include <vector>
+#include <map>
+#include "core/Core.h"
+#include "Connection.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+
+static uint16_t accounting_size = sizeof(void*) * 2 + sizeof(uint64_t)
+ + sizeof(size_t);
+
+class RepoValue {
+ public:
+
+ explicit RepoValue()
+ {
+ }
+
+ explicit RepoValue(std::string key, uint8_t *ptr, size_t size)
+ :
+ key_(key)
+ {
+ buffer_.resize(size);
+ std::memcpy(buffer_.data(), ptr, size);
+ }
+
+ explicit RepoValue(RepoValue &&other) noexcept
+ : key_(std::move(other.key_)),
+ buffer_(std::move(other.buffer_)){
+ }
+
+ ~RepoValue()
+ {
+ }
+
+
+
+ std::string &getKey() {
+ return key_;
+ }
+
+ /**
+ * Return the size of the memory within the key
+ * buffer, the size of timestamp, and the general
+ * system word size
+ */
+ uint64_t size() {
+ return buffer_.size() + accounting_size;
+ }
+
+ size_t bufferSize() {
+ return buffer_.size();
+ }
+
+ void emplace(std::string &str) {
+ str.insert(0, reinterpret_cast<const char*>(buffer_.data()),
buffer_.size());
+ }
+
+ RepoValue &operator=(RepoValue &&other) noexcept {
+ key_ = std::move(other.key_);
+ buffer_ = std::move(other.buffer_);
+ other.buffer_.clear();
+ return *this;
+ }
+
+ private:
+ std::string key_;
+ std::vector<uint8_t> buffer_;
+};
+
+class AtomicEntry {
+
+ public:
+ AtomicEntry()
+ : write_pending_(false),
+ has_value_(false) {
+
+ }
+
+ bool setRepoValue(RepoValue &new_value, size_t &prev_size) {
+ // delete the underlying pointer
+ bool lock = false;
+ if (!write_pending_.compare_exchange_weak(lock, true) && !lock)
+ return false;
+ if (has_value_)
+ {
+ prev_size = value_.size();
+ }
+ value_ = std::move(new_value);
+ has_value_ = true;
+ try_unlock();
+ return true;
+ }
+
+ bool getValue(RepoValue &value) {
+ try_lock();
+ if (!has_value_) {
+ try_unlock();
+ return false;
+ }
+ value = std::move(value_);
+ has_value_ = false;
+ try_unlock();
+ return true;
+ }
+
+ bool getValue(const std::string &key, RepoValue &value) {
+ try_lock();
+ if (!has_value_) {
+ try_unlock();
+ return false;
+ }
+ if (value_.getKey() != key) {
+ try_unlock();
+ return false;
+ }
+ value = std::move(value_);
+ has_value_ = false;
+ try_unlock();
+ return true;
+ }
+
+ private:
+
+ inline void try_lock() {
+ bool lock = false;
+ while (!write_pending_.compare_exchange_weak(lock, true) && !lock) {
+ // attempt again
+ }
+ }
+
+ inline void try_unlock() {
+ bool lock = true;
+ while (!write_pending_.compare_exchange_weak(lock, false) && lock) {
+ // attempt again
+ }
+ }
+
+ std::atomic<bool> write_pending_;
+ std::atomic<bool> has_value_;
+ RepoValue value_;
+};
+
+/**
+ * Flow File repository
+ * Design: Extends Repository and implements the run function, using
LevelDB as the primary substrate.
+ */
+class VolatileRepository : public core::Repository,
+ public std::enable_shared_from_this<VolatileRepository> {
+ public:
+ // Constructor
+
+ VolatileRepository(std::string dir = REPOSITORY_DIRECTORY,
+ int64_t maxPartitionMillis =
MAX_REPOSITORY_ENTRY_LIFE_TIME,
+ int64_t maxPartitionBytes =
+ MAX_REPOSITORY_STORAGE_SIZE,
+ uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD)
+ : Repository(core::getClassName<VolatileRepository>(), "",
+ maxPartitionMillis, maxPartitionBytes, purgePeriod),
+ max_size_(maxPartitionBytes * 0.75),
+ current_index_(0),
+ max_count_(10000)
+
+ {
+
+ }
+
+ // Destructor
+ ~VolatileRepository() {
+ for (auto ent : value_vector_) {
+ delete ent;
+ }
+ }
+
+ // initialize
+ virtual bool initialize(const std::shared_ptr<Configure> &configure) {
+// configure->get("nifi.volatile.repository.max_size")
--- End diff --
not currently configurable and would imagine this would be taken care of in
the ctor
> Native Volatile Content Repository implementation
> -------------------------------------------------
>
> Key: MINIFI-37
> URL: https://issues.apache.org/jira/browse/MINIFI-37
> Project: Apache NiFi MiNiFi
> Issue Type: Task
> Components: C++, Core Framework
> Reporter: Aldrin Piri
> Assignee: marco polo
> Priority: Minor
> Labels: native
>
> Given the constrained environments in which MiNiFi could operate, it would be
> beneficial to provide a content repository that is strictly in memory for
> those environments where disk storage may be limited or non-existent.
> This implementation should consider configuration options around its
> footprint such as number of entries held and/or sheer capacity.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)