This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 5d6efa8a3 MINIFICPP-1937 Rework rocksdb handling, use
OptimizeForPointLookup
5d6efa8a3 is described below
commit 5d6efa8a3a792d03e9994ad6cbc98e5156fdb6e2
Author: Adam Debreceni <[email protected]>
AuthorDate: Wed Nov 9 15:57:29 2022 +0100
MINIFICPP-1937 Rework rocksdb handling, use OptimizeForPointLookup
Closes #1431
Signed-off-by: Marton Szasz <[email protected]>
---
.../rocksdb-repos/DatabaseContentRepository.cpp | 7 +-
extensions/rocksdb-repos/FlowFileRepository.h | 9 +-
.../RocksDbPersistableKeyValueStoreService.cpp | 6 +-
extensions/rocksdb-repos/database/ColumnHandle.cpp | 12 +-
extensions/rocksdb-repos/database/ColumnHandle.h | 20 +-
.../database/{ColumnHandle.cpp => DbHandle.cpp} | 20 +-
.../database/{ColumnHandle.h => DbHandle.h} | 28 ++-
.../rocksdb-repos/database/RocksDatabase.cpp | 24 ++-
extensions/rocksdb-repos/database/RocksDatabase.h | 23 +--
.../rocksdb-repos/database/RocksDbInstance.cpp | 201 ++++++++++++++-------
.../rocksdb-repos/database/RocksDbInstance.h | 37 ++--
extensions/rocksdb-repos/database/RocksDbUtils.h | 2 +-
libminifi/include/utils/crypto/EncryptionManager.h | 6 +-
.../test/rocksdb-tests/RocksDBStreamTests.cpp | 4 +-
libminifi/test/rocksdb-tests/RocksDBTests.cpp | 51 +++++-
15 files changed, 258 insertions(+), 192 deletions(-)
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 008deda72..ccd71c995 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -52,9 +52,10 @@ bool DatabaseContentRepository::initialize(const
std::shared_ptr<minifi::Configu
db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
}
};
- auto set_cf_opts = []
(minifi::internal::Writable<rocksdb::ColumnFamilyOptions>& cf_opts){
- cf_opts.set(&rocksdb::ColumnFamilyOptions::merge_operator,
std::make_shared<StringAppender>(), StringAppender::Eq{});
- cf_opts.set<size_t>(&rocksdb::ColumnFamilyOptions::max_successive_merges,
0);
+ auto set_cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts){
+ cf_opts.OptimizeForPointLookup(4);
+ cf_opts.merge_operator = std::make_shared<StringAppender>();
+ cf_opts.max_successive_merges = 0;
};
db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts,
directory_);
if (db_->open()) {
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h
b/extensions/rocksdb-repos/FlowFileRepository.h
index 86e9d652e..b774fcba6 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -125,10 +125,11 @@ class FlowFileRepository : public ThreadedRepository,
public SwapManager {
// To avoid DB write issues during heavy load it's recommended to have
high number of buffer.
// Rocksdb's stall feature can also trigger in case the number of buffers
is >= 3.
// The more buffers we have the more memory rocksdb can utilize without
significant memory consumption under low load.
- auto cf_options = []
(minifi::internal::Writable<rocksdb::ColumnFamilyOptions>& cf_opts) {
- cf_opts.set(&rocksdb::ColumnFamilyOptions::write_buffer_size, 8ULL <<
20U);
- cf_opts.set<int>(&rocksdb::ColumnFamilyOptions::max_write_buffer_number,
20);
-
cf_opts.set<int>(&rocksdb::ColumnFamilyOptions::min_write_buffer_number_to_merge,
1);
+ auto cf_options = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
+ cf_opts.OptimizeForPointLookup(4);
+ cf_opts.write_buffer_size = 8ULL << 20U;
+ cf_opts.max_write_buffer_number = 20;
+ cf_opts.min_write_buffer_number_to_merge = 1;
};
db_ = minifi::internal::RocksDatabase::create(db_options, cf_options,
directory_);
if (db_->open()) {
diff --git
a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
index 202785c54..11afc6ec1 100644
---
a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
+++
b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
@@ -86,9 +86,9 @@ void RocksDbPersistableKeyValueStoreService::onEnable() {
}
};
// Use the same buffer settings as the FlowFileRepository
- auto set_cf_opts = []
(minifi::internal::Writable<rocksdb::ColumnFamilyOptions>& cf_opts) {
- cf_opts.set(&rocksdb::ColumnFamilyOptions::write_buffer_size, 8ULL << 20U);
-
cf_opts.set<int>(&rocksdb::ColumnFamilyOptions::min_write_buffer_number_to_merge,
1);
+ auto set_cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
+ cf_opts.write_buffer_size = 8ULL << 20U;
+ cf_opts.min_write_buffer_number_to_merge = 1;
};
db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts,
directory_);
if (db_->open()) {
diff --git a/extensions/rocksdb-repos/database/ColumnHandle.cpp
b/extensions/rocksdb-repos/database/ColumnHandle.cpp
index 5de54a21b..df4f65713 100644
--- a/extensions/rocksdb-repos/database/ColumnHandle.cpp
+++ b/extensions/rocksdb-repos/database/ColumnHandle.cpp
@@ -19,19 +19,11 @@
#include "ColumnHandle.h"
#include "logging/LoggerConfiguration.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
ColumnHandle::~ColumnHandle() {
static auto logger = core::logging::LoggerFactory<ColumnHandle>::getLogger();
logger->log_trace("Closing column handle '%s'", handle->GetName());
}
-} // namespace internal
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/ColumnHandle.h
b/extensions/rocksdb-repos/database/ColumnHandle.h
index e73e79f26..1203480fb 100644
--- a/extensions/rocksdb-repos/database/ColumnHandle.h
+++ b/extensions/rocksdb-repos/database/ColumnHandle.h
@@ -18,26 +18,20 @@
#pragma once
+#include <optional>
#include <memory>
#include <utility>
#include "rocksdb/db.h"
+#include "RocksDbUtils.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
struct ColumnHandle {
- explicit ColumnHandle(std::unique_ptr<rocksdb::ColumnFamilyHandle> handle,
rocksdb::ColumnFamilyOptions options)
- : handle(std::move(handle)), options(options) {}
+ explicit ColumnHandle(std::unique_ptr<rocksdb::ColumnFamilyHandle> handle,
ColumnFamilyOptionsPatch cfo_patch)
+ : cfo_patch(cfo_patch), handle(std::move(handle)) {}
~ColumnHandle();
+ ColumnFamilyOptionsPatch cfo_patch;
std::unique_ptr<rocksdb::ColumnFamilyHandle> handle;
- rocksdb::ColumnFamilyOptions options;
};
-} // namespace internal
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/ColumnHandle.cpp
b/extensions/rocksdb-repos/database/DbHandle.cpp
similarity index 67%
copy from extensions/rocksdb-repos/database/ColumnHandle.cpp
copy to extensions/rocksdb-repos/database/DbHandle.cpp
index 5de54a21b..70c4bc637 100644
--- a/extensions/rocksdb-repos/database/ColumnHandle.cpp
+++ b/extensions/rocksdb-repos/database/DbHandle.cpp
@@ -16,22 +16,14 @@
* limitations under the License.
*/
-#include "ColumnHandle.h"
+#include "DbHandle.h"
#include "logging/LoggerConfiguration.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
-ColumnHandle::~ColumnHandle() {
- static auto logger = core::logging::LoggerFactory<ColumnHandle>::getLogger();
- logger->log_trace("Closing column handle '%s'", handle->GetName());
+DbHandle::~DbHandle() {
+ static auto logger = core::logging::LoggerFactory<DbHandle>::getLogger();
+ logger->log_trace("Closing database handle '%s'", handle->GetName());
}
-} // namespace internal
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/ColumnHandle.h
b/extensions/rocksdb-repos/database/DbHandle.h
similarity index 62%
copy from extensions/rocksdb-repos/database/ColumnHandle.h
copy to extensions/rocksdb-repos/database/DbHandle.h
index e73e79f26..cbf590169 100644
--- a/extensions/rocksdb-repos/database/ColumnHandle.h
+++ b/extensions/rocksdb-repos/database/DbHandle.h
@@ -20,24 +20,20 @@
#include <memory>
#include <utility>
+#include <vector>
#include "rocksdb/db.h"
+#include "RocksDbUtils.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
-struct ColumnHandle {
- explicit ColumnHandle(std::unique_ptr<rocksdb::ColumnFamilyHandle> handle,
rocksdb::ColumnFamilyOptions options)
- : handle(std::move(handle)), options(options) {}
- ~ColumnHandle();
- std::unique_ptr<rocksdb::ColumnFamilyHandle> handle;
- rocksdb::ColumnFamilyOptions options;
+struct DbHandle {
+ explicit DbHandle(std::unique_ptr<rocksdb::DB> handle,
std::vector<DBOptionsPatch> dbo_patches)
+ : dbo_patches(dbo_patches), handle(std::move(handle)) {}
+ ~DbHandle();
+ // we need to keep the patch object alive as the DB handle could
+ // reference patcher-owned resources
+ std::vector<DBOptionsPatch> dbo_patches;
+ std::unique_ptr<rocksdb::DB> handle;
};
-} // namespace internal
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/RocksDatabase.cpp
b/extensions/rocksdb-repos/database/RocksDatabase.cpp
index fa52a1d1f..330f1398e 100644
--- a/extensions/rocksdb-repos/database/RocksDatabase.cpp
+++ b/extensions/rocksdb-repos/database/RocksDatabase.cpp
@@ -25,11 +25,7 @@
#include "utils/StringUtils.h"
#include "RocksDbInstance.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
std::shared_ptr<core::logging::Logger> RocksDatabase::logger_ =
core::logging::LoggerFactory<RocksDatabase>::getLogger();
@@ -81,15 +77,17 @@ std::unique_ptr<RocksDatabase> RocksDatabase::create(const
DBOptionsPatch& db_op
return std::make_unique<RocksDatabase>(instance, db_column,
db_options_patch, cf_options_patch);
}
-RocksDatabase::RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string
column, DBOptionsPatch db_options_patch, ColumnFamilyOptionsPatch
cf_options_patch)
- : column_(std::move(column)),
db_options_patch_(std::move(db_options_patch)),
cf_options_patch_(std::move(cf_options_patch)), db_(std::move(db)) {}
+RocksDatabase::RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string
column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch&
cf_options_patch)
+ : column_(std::move(column)), db_(std::move(db)) {
+ db_->registerColumnConfig(column_, db_options_patch, cf_options_patch);
+}
+
+RocksDatabase::~RocksDatabase() {
+ db_->unregisterColumnConfig(column_);
+}
std::optional<OpenRocksDb> RocksDatabase::open() {
- return db_->open(column_, db_options_patch_, cf_options_patch_);
+ return db_->open(column_);
}
-} // namespace internal
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/RocksDatabase.h
b/extensions/rocksdb-repos/database/RocksDatabase.h
index a53c5cf07..acaa3a6d4 100644
--- a/extensions/rocksdb-repos/database/RocksDatabase.h
+++ b/extensions/rocksdb-repos/database/RocksDatabase.h
@@ -27,11 +27,7 @@
#include "RocksDbUtils.h"
#include "OpenRocksDb.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
class RocksDbInstance;
@@ -46,21 +42,22 @@ class RocksDatabase {
const std::string& uri,
RocksDbMode mode =
RocksDbMode::ReadWrite);
- RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string column,
DBOptionsPatch db_options_patch, ColumnFamilyOptionsPatch cf_options_patch);
+ RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string column, const
DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch&
cf_options_patch);
+
+ RocksDatabase(const RocksDatabase&) = delete;
+ RocksDatabase(RocksDatabase&&) = delete;
+ RocksDatabase& operator=(const RocksDatabase&) = delete;
+ RocksDatabase& operator=(RocksDatabase&&) = delete;
std::optional<OpenRocksDb> open();
+ ~RocksDatabase();
+
private:
const std::string column_;
- const DBOptionsPatch db_options_patch_;
- const ColumnFamilyOptionsPatch cf_options_patch_;
std::shared_ptr<RocksDbInstance> db_;
static std::shared_ptr<core::logging::Logger> logger_;
};
-} // namespace internal
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/RocksDbInstance.cpp
b/extensions/rocksdb-repos/database/RocksDbInstance.cpp
index 8f86fc5cd..f570934d2 100644
--- a/extensions/rocksdb-repos/database/RocksDbInstance.cpp
+++ b/extensions/rocksdb-repos/database/RocksDbInstance.cpp
@@ -18,29 +18,79 @@
#include "RocksDbInstance.h"
#include <vector>
+#include <utility>
#include "logging/LoggerConfiguration.h"
#include "rocksdb/utilities/options_util.h"
#include "OpenRocksDb.h"
#include "ColumnHandle.h"
+#include "DbHandle.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
std::shared_ptr<core::logging::Logger> RocksDbInstance::logger_ =
core::logging::LoggerFactory<RocksDbInstance>::getLogger();
-RocksDbInstance::RocksDbInstance(const std::string& path, RocksDbMode mode) :
db_name_(path), mode_(mode) {}
+RocksDbInstance::RocksDbInstance(std::string path, RocksDbMode mode) :
db_name_(std::move(path)), mode_(mode) {}
void RocksDbInstance::invalidate() {
std::lock_guard<std::mutex> db_guard{mtx_};
+ invalidate(db_guard);
+}
+
+void RocksDbInstance::invalidate(const std::lock_guard<std::mutex>&) {
// discard our own instance
columns_.clear();
impl_.reset();
}
-std::optional<OpenRocksDb> RocksDbInstance::open(const std::string& column,
const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch&
cf_options_patch) {
+void RocksDbInstance::registerColumnConfig(const std::string& column, const
DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch&
cf_options_patch) {
+ std::lock_guard<std::mutex> db_guard{mtx_};
+ logger_->log_trace("Registering column '%s' in database '%s'", column,
db_name_);
+ auto [_, inserted] = column_configs_.insert({column, ColumnConfig{.dbo_patch
= db_options_patch, .cfo_patch = cf_options_patch}});
+ if (!inserted) {
+ throw std::runtime_error("Configuration is already registered for column
'" + column + "'");
+ }
+
+ bool need_reopen = [&] {
+ if (!impl_) {
+ logger_->log_trace("Database is already scheduled to be reopened");
+ return false;
+ }
+ {
+ rocksdb::DBOptions db_opts_copy = db_options_;
+ Writable<rocksdb::DBOptions> db_opts_writer(db_opts_copy);
+ if (db_options_patch) {
+ db_options_patch(db_opts_writer);
+ if (db_opts_writer.isModified()) {
+ logger_->log_trace("Requested a difference DBOptions than the one
that was used to open the database");
+ return true;
+ }
+ }
+ }
+ if (!columns_.contains(column)) {
+ logger_->log_trace("Previously unspecified column, will dynamically
create the column");
+ return false;
+ }
+ if (!cf_options_patch) {
+ logger_->log_trace("No explicit ColumnFamilyOptions was requested");
+ return false;
+ }
+ logger_->log_trace("Could not determine if we definitely need to reopen or
we are definitely safe, requesting reopen");
+ return true;
+ }();
+ if (need_reopen) {
+ // reset impl_, for the database to be reopened on the next
RocksDbInstance::open call
+ invalidate(db_guard);
+ }
+}
+
+void RocksDbInstance::unregisterColumnConfig(const std::string& column) {
+ std::lock_guard<std::mutex> db_guard{mtx_};
+ if (column_configs_.erase(column) == 0) {
+ throw std::runtime_error("Could not find column configuration for column
'" + column + "'");
+ }
+}
+
+std::optional<OpenRocksDb> RocksDbInstance::open(const std::string& column) {
std::lock_guard<std::mutex> db_guard{mtx_};
if (!impl_) {
gsl_Expects(columns_.empty());
@@ -48,26 +98,55 @@ std::optional<OpenRocksDb> RocksDbInstance::open(const
std::string& column, cons
rocksdb::DB* db_instance = nullptr;
rocksdb::Status result;
- rocksdb::ConfigOptions conf_options = [&] {
+ std::vector<DBOptionsPatch> dbo_patches;
+ rocksdb::ConfigOptions conf_options;
+ conf_options.sanity_level =
rocksdb::ConfigOptions::kSanityLevelLooselyCompatible;
+ {
// we have to extract the encryptor environment otherwise
// we won't be able to read the options file
- rocksdb::ConfigOptions result;
- if (db_options_patch) {
- rocksdb::DBOptions dummy_opts;
- Writable<rocksdb::DBOptions> db_options_writer(dummy_opts);
- db_options_patch(db_options_writer);
- if (dummy_opts.env) {
- result.env = dummy_opts.env;
+ rocksdb::DBOptions dummy_opts;
+ dummy_opts.env = nullptr; // manually clear it, for the patcher to
explicitly set it
+ for (auto& [col_name, config] : column_configs_) {
+ if (auto& dbo_patch = config.dbo_patch) {
+ dbo_patches.push_back(dbo_patch);
+ Writable<rocksdb::DBOptions> db_options_writer(dummy_opts);
+ dbo_patch(db_options_writer);
+ if (dummy_opts.env) {
+ conf_options.env = dummy_opts.env;
+ }
+ }
+ }
+ // we need to reapply the DBOptions changes to check for conflicts
+ for (auto& [col_name, config] : column_configs_) {
+ if (auto& dbo_patch = config.dbo_patch) {
+ Writable<rocksdb::DBOptions> db_options_writer(dummy_opts);
+ dbo_patch(db_options_writer);
+ if (db_options_writer.isModified()) {
+ logger_->log_error("Conflicting database options requested for
'%s'", db_name_);
+ return std::nullopt;
+ }
}
}
- return result;
- }();
+ }
db_options_ = rocksdb::DBOptions{};
std::vector<rocksdb::ColumnFamilyDescriptor> cf_descriptors;
rocksdb::Status option_status = rocksdb::LoadLatestOptions(conf_options,
db_name_, &db_options_, &cf_descriptors);
- Writable<rocksdb::DBOptions> db_options_writer(db_options_);
- if (db_options_patch) {
- db_options_patch(db_options_writer);
+ {
+ // apply the database options patchers
+ Writable<rocksdb::DBOptions> db_options_writer(db_options_);
+ for (auto& [col_name, config] : column_configs_) {
+ if (auto& dbo_patch = config.dbo_patch) {
+ dbo_patch(db_options_writer);
+ }
+ }
+ }
+ // apply requested ColumnFamilyOptions for each already existing
ColumnFamily
+ for (auto& cf_descr : cf_descriptors) {
+ if (auto it = column_configs_.find(cf_descr.name); it !=
column_configs_.end()) {
+ if (auto& cfo_patch = it->second.cfo_patch) {
+ cfo_patch(cf_descr.options);
+ }
+ }
}
if (option_status.ok()) {
logger_->log_trace("Found existing database '%s', checking
compatibility", db_name_);
@@ -78,17 +157,13 @@ std::optional<OpenRocksDb> RocksDbInstance::open(const
std::string& column, cons
}
} else if (option_status.IsNotFound()) {
logger_->log_trace("Database at '%s' not found, creating", db_name_);
- if (column == "default") {
- rocksdb::ColumnFamilyOptions default_cf_options;
- Writable<rocksdb::ColumnFamilyOptions> cf_writer(default_cf_options);
- if (cf_options_patch) {
- cf_options_patch(cf_writer);
+ rocksdb::ColumnFamilyOptions default_cf_options;
+ if (auto it = column_configs_.find("default"); it !=
column_configs_.end()) {
+ if (auto& cfo_patch = it->second.cfo_patch) {
+ cfo_patch(default_cf_options);
}
- cf_descriptors.emplace_back("default", default_cf_options);
- } else {
- // we must create the "default" column, using default options
- cf_descriptors.emplace_back("default", rocksdb::ColumnFamilyOptions{});
}
+ cf_descriptors.emplace_back("default", default_cf_options);
} else if (!option_status.ok()) {
logger_->log_error("Couldn't query database '%s' for options: '%s'",
db_name_, option_status.ToString());
return std::nullopt;
@@ -113,50 +188,37 @@ std::optional<OpenRocksDb> RocksDbInstance::open(const
std::string& column, cons
return std::nullopt;
}
gsl_Expects(db_instance);
- // the patcher could have internal resources the we need to keep alive
+ // the patches could have internal resources that we need to keep alive
// as long as the database is open (e.g. custom environment)
- db_options_patch_ = db_options_patch;
- impl_.reset(db_instance);
+ impl_ =
std::make_shared<DbHandle>(std::unique_ptr<rocksdb::DB>(db_instance),
std::move(dbo_patches));
for (size_t cf_idx{0}; cf_idx < column_handles.size(); ++cf_idx) {
- columns_[column_handles[cf_idx]->GetName()]
- =
std::make_shared<ColumnHandle>(std::unique_ptr<rocksdb::ColumnFamilyHandle>(column_handles[cf_idx]),
cf_descriptors[cf_idx].options);
- }
- } else {
- logger_->log_trace("Checking if the already open database is compatible
with the requested options");
- if (db_options_patch) {
- rocksdb::DBOptions db_options_copy = db_options_;
- Writable<rocksdb::DBOptions> writer(db_options_copy);
- db_options_patch(writer);
- if (writer.isModified()) {
- logger_->log_error("Database '%s' has already been opened using a
different configuration", db_name_);
- return std::nullopt;
+ ColumnFamilyOptionsPatch cfo_patch;
+ if (auto it = column_configs_.find(column_handles[cf_idx]->GetName());
it != column_configs_.end()) {
+ cfo_patch = it->second.cfo_patch;
}
+ columns_[column_handles[cf_idx]->GetName()]
+ =
std::make_shared<ColumnHandle>(std::unique_ptr<rocksdb::ColumnFamilyHandle>(column_handles[cf_idx]),
cfo_patch);
}
}
- std::shared_ptr<ColumnHandle> column_handle =
getOrCreateColumnFamily(column, cf_options_patch, db_guard);
+ std::shared_ptr<ColumnHandle> column_handle =
getOrCreateColumnFamily(column, db_guard);
if (!column_handle) {
// error is already logged by the method
return std::nullopt;
}
return OpenRocksDb(
*this,
- gsl::make_not_null<std::shared_ptr<rocksdb::DB>>(impl_),
+
gsl::make_not_null<std::shared_ptr<rocksdb::DB>>(std::shared_ptr<rocksdb::DB>(impl_,
impl_->handle.get())),
gsl::make_not_null<std::shared_ptr<ColumnHandle>>(column_handle));
}
-std::shared_ptr<ColumnHandle> RocksDbInstance::getOrCreateColumnFamily(const
std::string& column, const ColumnFamilyOptionsPatch& cf_options_patch, const
std::lock_guard<std::mutex>& /*guard*/) {
+std::shared_ptr<ColumnHandle> RocksDbInstance::getOrCreateColumnFamily(const
std::string& column, const std::lock_guard<std::mutex>& /*guard*/) {
gsl_Expects(impl_);
- auto it = columns_.find(column);
- if (it != columns_.end()) {
- logger_->log_trace("Column '%s' already exists in database '%s'", column,
impl_->GetName());
- Writable<rocksdb::ColumnFamilyOptions> writer(it->second->options);
- if (cf_options_patch) {
- cf_options_patch(writer);
- }
- if (writer.isModified()) {
- logger_->log_error("Requested column '%s' has already been opened using
a different configuration", column);
- return nullptr;
- }
+ if (!column_configs_.contains(column)) {
+ logger_->log_error("Trying to access column '%s' in database '%s' without
configuration", column, impl_->handle->GetName());
+ return nullptr;
+ }
+ if (auto it = columns_.find(column); it != columns_.end()) {
+ logger_->log_trace("Column '%s' already exists in database '%s'", column,
impl_->handle->GetName());
return it->second;
}
if (mode_ == RocksDbMode::ReadOnly) {
@@ -165,22 +227,21 @@ std::shared_ptr<ColumnHandle>
RocksDbInstance::getOrCreateColumnFamily(const std
}
rocksdb::ColumnFamilyHandle* raw_handle{nullptr};
rocksdb::ColumnFamilyOptions cf_options;
- Writable<rocksdb::ColumnFamilyOptions> writer(cf_options);
- if (cf_options_patch) {
- cf_options_patch(writer);
+ ColumnFamilyOptionsPatch cfo_patch;
+ if (auto it = column_configs_.find(column); it != column_configs_.end()) {
+ cfo_patch = it->second.cfo_patch;
+ if (cfo_patch) {
+ cfo_patch(cf_options);
+ }
}
- auto status = impl_->CreateColumnFamily(cf_options, column, &raw_handle);
+ auto status = impl_->handle->CreateColumnFamily(cf_options, column,
&raw_handle);
if (!status.ok()) {
- logger_->log_error("Failed to create column '%s' in database '%s'",
column, impl_->GetName());
+ logger_->log_error("Failed to create column '%s' in database '%s'",
column, impl_->handle->GetName());
return nullptr;
}
- logger_->log_trace("Successfully created column '%s' in database '%s'",
column, impl_->GetName());
- columns_[column] =
std::make_shared<ColumnHandle>(std::unique_ptr<rocksdb::ColumnFamilyHandle>(raw_handle),
cf_options);
+ logger_->log_trace("Successfully created column '%s' in database '%s'",
column, impl_->handle->GetName());
+ columns_[column] =
std::make_shared<ColumnHandle>(std::unique_ptr<rocksdb::ColumnFamilyHandle>(raw_handle),
cfo_patch);
return columns_[column];
}
-} // namespace internal
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/RocksDbInstance.h
b/extensions/rocksdb-repos/database/RocksDbInstance.h
index fdc3e2421..182324958 100644
--- a/extensions/rocksdb-repos/database/RocksDbInstance.h
+++ b/extensions/rocksdb-repos/database/RocksDbInstance.h
@@ -26,52 +26,55 @@
#include "rocksdb/db.h"
#include "logging/Logger.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
+class RocksDatabase;
class OpenRocksDb;
struct ColumnHandle;
+struct DbHandle;
/**
* Purpose: represents a single rocksdb instance backed by a directory
*/
class RocksDbInstance {
friend class OpenRocksDb;
+ friend class RocksDatabase;
+
+ struct ColumnConfig {
+ DBOptionsPatch dbo_patch;
+ ColumnFamilyOptionsPatch cfo_patch;
+ };
public:
- explicit RocksDbInstance(const std::string& path, RocksDbMode mode =
RocksDbMode::ReadWrite);
+ explicit RocksDbInstance(std::string path, RocksDbMode mode =
RocksDbMode::ReadWrite);
- std::optional<OpenRocksDb> open(const std::string& column, const
DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch&
cf_options_patch);
+ std::optional<OpenRocksDb> open(const std::string& column);
protected:
// caller must hold the mtx_ mutex
- std::shared_ptr<ColumnHandle> getOrCreateColumnFamily(const std::string&
column, const ColumnFamilyOptionsPatch& cf_options_patch, const
std::lock_guard<std::mutex>& guard);
+ std::shared_ptr<ColumnHandle> getOrCreateColumnFamily(const std::string&
column, const std::lock_guard<std::mutex>& guard);
/*
* notify RocksDatabase that the next open should check if they can reopen
the database
* until a successful reopen no more open is possible
*/
void invalidate();
+ void invalidate(const std::lock_guard<std::mutex>&);
+
+ void registerColumnConfig(const std::string& column, const DBOptionsPatch&
db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch);
+ void unregisterColumnConfig(const std::string& column);
+
rocksdb::DBOptions db_options_;
const std::string db_name_;
const RocksDbMode mode_;
std::mutex mtx_;
- std::shared_ptr<rocksdb::DB> impl_;
+ std::shared_ptr<DbHandle> impl_;
std::unordered_map<std::string, std::shared_ptr<ColumnHandle>> columns_;
- // the patcher could have internal resources the we need to keep alive
- // as long as the database is open (e.g. custom environment)
- DBOptionsPatch db_options_patch_;
+ std::unordered_map<std::string, ColumnConfig> column_configs_;
static std::shared_ptr<core::logging::Logger> logger_;
};
-} // namespace internal
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/RocksDbUtils.h
b/extensions/rocksdb-repos/database/RocksDbUtils.h
index 6d6d5c30a..d42afce6a 100644
--- a/extensions/rocksdb-repos/database/RocksDbUtils.h
+++ b/extensions/rocksdb-repos/database/RocksDbUtils.h
@@ -67,7 +67,7 @@ class Writable {
* options of the existing database, apply a "patch" and then check for
compatibility.
*/
using DBOptionsPatch = std::function<void(Writable<rocksdb::DBOptions>&)>;
-using ColumnFamilyOptionsPatch =
std::function<void(Writable<rocksdb::ColumnFamilyOptions>&)>;
+using ColumnFamilyOptionsPatch =
std::function<void(rocksdb::ColumnFamilyOptions&)>;
} // namespace internal
} // namespace minifi
diff --git a/libminifi/include/utils/crypto/EncryptionManager.h
b/libminifi/include/utils/crypto/EncryptionManager.h
index a21dda8df..f538471ff 100644
--- a/libminifi/include/utils/crypto/EncryptionManager.h
+++ b/libminifi/include/utils/crypto/EncryptionManager.h
@@ -40,9 +40,9 @@ class EncryptionManager {
[[nodiscard]] std::optional<XSalsa20Cipher> createXSalsa20Cipher(const
std::string& key_name) const;
[[nodiscard]] std::optional<Aes256EcbCipher> createAes256EcbCipher(const
std::string& key_name) const;
- private:
- [[nodiscard]] std::optional<Bytes> readKey(const std::string& key_name)
const;
- [[nodiscard]] bool writeKey(const std::string& key_name, const Bytes& key)
const;
+ protected:
+ [[nodiscard]] virtual std::optional<Bytes> readKey(const std::string&
key_name) const;
+ [[nodiscard]] virtual bool writeKey(const std::string& key_name, const
Bytes& key) const;
std::string key_dir_;
};
diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
index d0a7ffa51..0ce216ecd 100644
--- a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
+++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
@@ -31,8 +31,8 @@ class RocksDBStreamTest : TestController {
db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction,
true);
db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
};
- auto set_cf_opts = []
(minifi::internal::Writable<rocksdb::ColumnFamilyOptions>& cf_opts) {
- cf_opts.set(&rocksdb::ColumnFamilyOptions::merge_operator,
std::make_shared<core::repository::StringAppender>(),
core::repository::StringAppender::Eq{});
+ auto set_cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
+ cf_opts.merge_operator =
std::make_shared<core::repository::StringAppender>();
};
db = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts,
dbPath);
REQUIRE(db->open());
diff --git a/libminifi/test/rocksdb-tests/RocksDBTests.cpp
b/libminifi/test/rocksdb-tests/RocksDBTests.cpp
index 6e6e93514..45c920362 100644
--- a/libminifi/test/rocksdb-tests/RocksDBTests.cpp
+++ b/libminifi/test/rocksdb-tests/RocksDBTests.cpp
@@ -22,6 +22,7 @@
#include "../../extensions/rocksdb-repos/database/RocksDatabase.h"
#include "../../extensions/rocksdb-repos/database/RocksDbInstance.h"
#include "../../extensions/rocksdb-repos/database/ColumnHandle.h"
+#include "../../extensions/rocksdb-repos/database/DbHandle.h"
#include "IntegrationTestUtils.h"
#include "database/StringAppender.h"
#include
"../../extensions/rocksdb-repos/encryption/RocksDbEncryptionProvider.h"
@@ -41,6 +42,7 @@ struct RocksDBTest : TestController {
LogTestController::getInstance().setTrace<minifi::internal::RocksDatabase>();
LogTestController::getInstance().setTrace<minifi::internal::RocksDbInstance>();
LogTestController::getInstance().setTrace<minifi::internal::ColumnHandle>();
+ LogTestController::getInstance().setTrace<minifi::internal::DbHandle>();
db_dir = createTempDirectory();
}
@@ -107,9 +109,11 @@ TEST_CASE_METHOD(RocksDBTest, "Can write to two specific
columns at once", "[roc
{
auto db1 = minifi::internal::RocksDatabase::create(new_db_opts, {},
"minifidb://" + db_dir + "/column_one");
auto opendb1 = db1->open();
+ REQUIRE(opendb1);
opendb1->Put(rocksdb::WriteOptions{}, "fruit", "apple");
auto db2 = minifi::internal::RocksDatabase::create(new_db_opts, {},
"minifidb://" + db_dir + "/column_two");
auto opendb2 = db2->open();
+ REQUIRE(opendb2);
opendb2->Put(rocksdb::WriteOptions{}, "animal", "penguin");
}
@@ -127,9 +131,11 @@ TEST_CASE_METHOD(RocksDBTest, "Can write to the default
and a specific column at
{
auto db1 = minifi::internal::RocksDatabase::create(new_db_opts, {},
"minifidb://" + db_dir + "/column_one");
auto opendb1 = db1->open();
+ REQUIRE(opendb1);
opendb1->Put(rocksdb::WriteOptions{}, "fruit", "apple");
auto db2 = minifi::internal::RocksDatabase::create(new_db_opts, {},
db_dir);
auto opendb2 = db2->open();
+ REQUIRE(opendb2);
opendb2->Put(rocksdb::WriteOptions{}, "animal", "penguin");
}
@@ -143,17 +149,17 @@ TEST_CASE_METHOD(RocksDBTest, "Can write to the default
and a specific column at
REQUIRE(value == "penguin");
}
-TEST_CASE_METHOD(RocksDBTest, "Error is logged if the options are incompatible
with an existing column family", "[rocksDBTest6]") {
+TEST_CASE_METHOD(RocksDBTest, "Logged if the options are incompatible with an
existing column family", "[rocksDBTest6]") {
auto db = minifi::internal::RocksDatabase::create(new_db_opts, {},
"minifidb://" + db_dir + "/column_one");
REQUIRE(db->open());
// implicitly created the "default" column family, but with the default
options
- auto cf_opts = [] (minifi::internal::Writable<rocksdb::ColumnFamilyOptions>&
cf_opts) {
- cf_opts.set(&rocksdb::ColumnFamilyOptions::merge_operator,
std::make_shared<minifi::core::repository::StringAppender>());
+ auto cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
+ cf_opts.merge_operator =
std::make_shared<minifi::core::repository::StringAppender>();
};
auto default_db = minifi::internal::RocksDatabase::create(new_db_opts,
cf_opts, "minifidb://" + db_dir + "/default");
- REQUIRE_FALSE(default_db->open());
+ REQUIRE(default_db->open());
REQUIRE(utils::verifyLogLinePresenceInPollTime(
- std::chrono::seconds{1}, "Requested column 'default' has already been
opened using a different configuration"));
+ std::chrono::seconds{1}, "Could not determine if we definitely need to
reopen or we are definitely safe, requesting reopen"));
}
TEST_CASE_METHOD(RocksDBTest, "Error is logged if different DBOptions are
used", "[rocksDBTest7]") {
@@ -170,7 +176,7 @@ TEST_CASE_METHOD(RocksDBTest, "Error is logged if different
DBOptions are used",
auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {},
"minifidb://" + db_dir + "/column_two");
REQUIRE_FALSE(col_2->open());
REQUIRE(utils::verifyLogLinePresenceInPollTime(
- std::chrono::seconds{1}, "Database '" + db_dir + "' has already been
opened using a different configuration"));
+ std::chrono::seconds{1}, "Conflicting database options requested for '"
+ db_dir + "'"));
}
TEST_CASE_METHOD(RocksDBTest, "Sanity check: merge fails without
merge_operator", "[rocksDBTest8]") {
@@ -178,14 +184,15 @@ TEST_CASE_METHOD(RocksDBTest, "Sanity check: merge fails
without merge_operator"
REQUIRE(db);
auto opendb = db->open();
+ REQUIRE(opendb);
REQUIRE(opendb->Put({}, "a", "first").ok());
REQUIRE_FALSE(opendb->Merge({}, "a", "second").ok());
}
TEST_CASE_METHOD(RocksDBTest, "Column options are applied", "[rocksDBTest9]") {
- auto cf_opts = [] (minifi::internal::Writable<rocksdb::ColumnFamilyOptions>&
cf_opts) {
- cf_opts.set(&rocksdb::ColumnFamilyOptions::merge_operator,
std::make_shared<StringAppender>(), StringAppender::Eq{});
+ auto cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
+ cf_opts.merge_operator = std::make_shared<StringAppender>();
};
std::string db_uri;
SECTION("Named column") {
@@ -201,6 +208,7 @@ TEST_CASE_METHOD(RocksDBTest, "Column options are applied",
"[rocksDBTest9]") {
REQUIRE(db);
auto opendb = db->open();
+ REQUIRE(opendb);
REQUIRE(opendb->Put({}, "a", "first").ok());
REQUIRE(opendb->Merge({}, "a", "second").ok());
@@ -245,13 +253,36 @@ TEST_CASE_METHOD(RocksDBTest, "Error is logged if
different encryption keys are
auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {},
"minifidb://" + db_dir + "/column_two");
REQUIRE_FALSE(col_2->open());
REQUIRE(utils::verifyLogLinePresenceInPollTime(
- std::chrono::seconds{1}, "Database '" + db_dir + "' has already been
opened using a different configuration"));
+ std::chrono::seconds{1}, "Conflicting database options requested for
'" + db_dir + "'"));
}
SECTION("Using no encryption key") {
auto col_2 = minifi::internal::RocksDatabase::create(withDefaultEnv, {},
"minifidb://" + db_dir + "/column_two");
REQUIRE_FALSE(col_2->open());
REQUIRE(utils::verifyLogLinePresenceInPollTime(
- std::chrono::seconds{1}, "Database '" + db_dir + "' has already been
opened using a different configuration"));
+ std::chrono::seconds{1}, "Conflicting database options requested for
'" + db_dir + "'"));
+ }
+}
+
+TEST_CASE_METHOD(RocksDBTest, "RocksDb works correctly on changed (but
compatible) ColumnFamilyOptions change", "[rocksDBTest11]") {
+ {
+ auto col1 = minifi::internal::RocksDatabase::create(new_db_opts, {},
db_dir);
+ auto opendb = col1->open();
+ REQUIRE(opendb);
+ REQUIRE(opendb->Put({}, "fruit", "apple").ok());
+ }
+
+ auto cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
+ cf_opts.OptimizeForPointLookup(4);
+ };
+
+ auto col2 = minifi::internal::RocksDatabase::create(new_db_opts, cf_opts,
db_dir);
+ REQUIRE(col2);
+ {
+ auto opendb = col2->open();
+ REQUIRE(opendb);
+ std::string value;
+ REQUIRE(opendb->Get({}, "fruit", &value).ok());
+ REQUIRE(value == "apple");
}
}