This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d6efa8a3 MINIFICPP-1937 Rework rocksdb handling, use 
OptimizeForPointLookup
5d6efa8a3 is described below

commit 5d6efa8a3a792d03e9994ad6cbc98e5156fdb6e2
Author: Adam Debreceni <[email protected]>
AuthorDate: Wed Nov 9 15:57:29 2022 +0100

    MINIFICPP-1937 Rework rocksdb handling, use OptimizeForPointLookup
    
    Closes #1431
    Signed-off-by: Marton Szasz <[email protected]>
---
 .../rocksdb-repos/DatabaseContentRepository.cpp    |   7 +-
 extensions/rocksdb-repos/FlowFileRepository.h      |   9 +-
 .../RocksDbPersistableKeyValueStoreService.cpp     |   6 +-
 extensions/rocksdb-repos/database/ColumnHandle.cpp |  12 +-
 extensions/rocksdb-repos/database/ColumnHandle.h   |  20 +-
 .../database/{ColumnHandle.cpp => DbHandle.cpp}    |  20 +-
 .../database/{ColumnHandle.h => DbHandle.h}        |  28 ++-
 .../rocksdb-repos/database/RocksDatabase.cpp       |  24 ++-
 extensions/rocksdb-repos/database/RocksDatabase.h  |  23 +--
 .../rocksdb-repos/database/RocksDbInstance.cpp     | 201 ++++++++++++++-------
 .../rocksdb-repos/database/RocksDbInstance.h       |  37 ++--
 extensions/rocksdb-repos/database/RocksDbUtils.h   |   2 +-
 libminifi/include/utils/crypto/EncryptionManager.h |   6 +-
 .../test/rocksdb-tests/RocksDBStreamTests.cpp      |   4 +-
 libminifi/test/rocksdb-tests/RocksDBTests.cpp      |  51 +++++-
 15 files changed, 258 insertions(+), 192 deletions(-)

diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp 
b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 008deda72..ccd71c995 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -52,9 +52,10 @@ bool DatabaseContentRepository::initialize(const 
std::shared_ptr<minifi::Configu
       db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
     }
   };
-  auto set_cf_opts = [] 
(minifi::internal::Writable<rocksdb::ColumnFamilyOptions>& cf_opts){
-    cf_opts.set(&rocksdb::ColumnFamilyOptions::merge_operator, 
std::make_shared<StringAppender>(), StringAppender::Eq{});
-    cf_opts.set<size_t>(&rocksdb::ColumnFamilyOptions::max_successive_merges, 
0);
+  auto set_cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts){
+    cf_opts.OptimizeForPointLookup(4);
+    cf_opts.merge_operator = std::make_shared<StringAppender>();
+    cf_opts.max_successive_merges = 0;
   };
   db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, 
directory_);
   if (db_->open()) {
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h 
b/extensions/rocksdb-repos/FlowFileRepository.h
index 86e9d652e..b774fcba6 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -125,10 +125,11 @@ class FlowFileRepository : public ThreadedRepository, 
public SwapManager {
     // To avoid DB write issues during heavy load it's recommended to have 
high number of buffer.
     // Rocksdb's stall feature can also trigger in case the number of buffers 
is >= 3.
     // The more buffers we have the more memory rocksdb can utilize without 
significant memory consumption under low load.
-    auto cf_options = [] 
(minifi::internal::Writable<rocksdb::ColumnFamilyOptions>& cf_opts) {
-      cf_opts.set(&rocksdb::ColumnFamilyOptions::write_buffer_size, 8ULL << 
20U);
-      cf_opts.set<int>(&rocksdb::ColumnFamilyOptions::max_write_buffer_number, 
20);
-      
cf_opts.set<int>(&rocksdb::ColumnFamilyOptions::min_write_buffer_number_to_merge,
 1);
+    auto cf_options = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
+      cf_opts.OptimizeForPointLookup(4);
+      cf_opts.write_buffer_size = 8ULL << 20U;
+      cf_opts.max_write_buffer_number = 20;
+      cf_opts.min_write_buffer_number_to_merge = 1;
     };
     db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, 
directory_);
     if (db_->open()) {
diff --git 
a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
 
b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
index 202785c54..11afc6ec1 100644
--- 
a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
+++ 
b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
@@ -86,9 +86,9 @@ void RocksDbPersistableKeyValueStoreService::onEnable() {
     }
   };
   // Use the same buffer settings as the FlowFileRepository
-  auto set_cf_opts = [] 
(minifi::internal::Writable<rocksdb::ColumnFamilyOptions>& cf_opts) {
-    cf_opts.set(&rocksdb::ColumnFamilyOptions::write_buffer_size, 8ULL << 20U);
-    
cf_opts.set<int>(&rocksdb::ColumnFamilyOptions::min_write_buffer_number_to_merge,
 1);
+  auto set_cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
+    cf_opts.write_buffer_size = 8ULL << 20U;
+    cf_opts.min_write_buffer_number_to_merge = 1;
   };
   db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, 
directory_);
   if (db_->open()) {
diff --git a/extensions/rocksdb-repos/database/ColumnHandle.cpp 
b/extensions/rocksdb-repos/database/ColumnHandle.cpp
index 5de54a21b..df4f65713 100644
--- a/extensions/rocksdb-repos/database/ColumnHandle.cpp
+++ b/extensions/rocksdb-repos/database/ColumnHandle.cpp
@@ -19,19 +19,11 @@
 #include "ColumnHandle.h"
 #include "logging/LoggerConfiguration.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
 
 ColumnHandle::~ColumnHandle() {
   static auto logger = core::logging::LoggerFactory<ColumnHandle>::getLogger();
   logger->log_trace("Closing column handle '%s'", handle->GetName());
 }
 
-}  // namespace internal
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/ColumnHandle.h 
b/extensions/rocksdb-repos/database/ColumnHandle.h
index e73e79f26..1203480fb 100644
--- a/extensions/rocksdb-repos/database/ColumnHandle.h
+++ b/extensions/rocksdb-repos/database/ColumnHandle.h
@@ -18,26 +18,20 @@
 
 #pragma once
 
+#include <optional>
 #include <memory>
 #include <utility>
 #include "rocksdb/db.h"
+#include "RocksDbUtils.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
 
 struct ColumnHandle {
-  explicit ColumnHandle(std::unique_ptr<rocksdb::ColumnFamilyHandle> handle, 
rocksdb::ColumnFamilyOptions options)
-      : handle(std::move(handle)), options(options) {}
+  explicit ColumnHandle(std::unique_ptr<rocksdb::ColumnFamilyHandle> handle, 
ColumnFamilyOptionsPatch cfo_patch)
+      : cfo_patch(cfo_patch), handle(std::move(handle)) {}
   ~ColumnHandle();
+  ColumnFamilyOptionsPatch cfo_patch;
   std::unique_ptr<rocksdb::ColumnFamilyHandle> handle;
-  rocksdb::ColumnFamilyOptions options;
 };
 
-}  // namespace internal
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/ColumnHandle.cpp 
b/extensions/rocksdb-repos/database/DbHandle.cpp
similarity index 67%
copy from extensions/rocksdb-repos/database/ColumnHandle.cpp
copy to extensions/rocksdb-repos/database/DbHandle.cpp
index 5de54a21b..70c4bc637 100644
--- a/extensions/rocksdb-repos/database/ColumnHandle.cpp
+++ b/extensions/rocksdb-repos/database/DbHandle.cpp
@@ -16,22 +16,14 @@
  * limitations under the License.
  */
 
-#include "ColumnHandle.h"
+#include "DbHandle.h"
 #include "logging/LoggerConfiguration.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
 
-ColumnHandle::~ColumnHandle() {
-  static auto logger = core::logging::LoggerFactory<ColumnHandle>::getLogger();
-  logger->log_trace("Closing column handle '%s'", handle->GetName());
+DbHandle::~DbHandle() {
+  static auto logger = core::logging::LoggerFactory<DbHandle>::getLogger();
+  logger->log_trace("Closing database handle '%s'", handle->GetName());
 }
 
-}  // namespace internal
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/ColumnHandle.h 
b/extensions/rocksdb-repos/database/DbHandle.h
similarity index 62%
copy from extensions/rocksdb-repos/database/ColumnHandle.h
copy to extensions/rocksdb-repos/database/DbHandle.h
index e73e79f26..cbf590169 100644
--- a/extensions/rocksdb-repos/database/ColumnHandle.h
+++ b/extensions/rocksdb-repos/database/DbHandle.h
@@ -20,24 +20,20 @@
 
 #include <memory>
 #include <utility>
+#include <vector>
 #include "rocksdb/db.h"
+#include "RocksDbUtils.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
 
-struct ColumnHandle {
-  explicit ColumnHandle(std::unique_ptr<rocksdb::ColumnFamilyHandle> handle, 
rocksdb::ColumnFamilyOptions options)
-      : handle(std::move(handle)), options(options) {}
-  ~ColumnHandle();
-  std::unique_ptr<rocksdb::ColumnFamilyHandle> handle;
-  rocksdb::ColumnFamilyOptions options;
+struct DbHandle {
+  explicit DbHandle(std::unique_ptr<rocksdb::DB> handle, 
std::vector<DBOptionsPatch> dbo_patches)
+      : dbo_patches(dbo_patches), handle(std::move(handle)) {}
+  ~DbHandle();
+  // we need to keep the patch object alive as the DB handle could
+  // reference patcher-owned resources
+  std::vector<DBOptionsPatch> dbo_patches;
+  std::unique_ptr<rocksdb::DB> handle;
 };
 
-}  // namespace internal
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/RocksDatabase.cpp 
b/extensions/rocksdb-repos/database/RocksDatabase.cpp
index fa52a1d1f..330f1398e 100644
--- a/extensions/rocksdb-repos/database/RocksDatabase.cpp
+++ b/extensions/rocksdb-repos/database/RocksDatabase.cpp
@@ -25,11 +25,7 @@
 #include "utils/StringUtils.h"
 #include "RocksDbInstance.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
 
 std::shared_ptr<core::logging::Logger> RocksDatabase::logger_ = 
core::logging::LoggerFactory<RocksDatabase>::getLogger();
 
@@ -81,15 +77,17 @@ std::unique_ptr<RocksDatabase> RocksDatabase::create(const 
DBOptionsPatch& db_op
   return std::make_unique<RocksDatabase>(instance, db_column, 
db_options_patch, cf_options_patch);
 }
 
-RocksDatabase::RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string 
column, DBOptionsPatch db_options_patch, ColumnFamilyOptionsPatch 
cf_options_patch)
-  : column_(std::move(column)), 
db_options_patch_(std::move(db_options_patch)), 
cf_options_patch_(std::move(cf_options_patch)), db_(std::move(db)) {}
+RocksDatabase::RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string 
column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& 
cf_options_patch)
+    : column_(std::move(column)), db_(std::move(db)) {
+  db_->registerColumnConfig(column_, db_options_patch, cf_options_patch);
+}
+
+RocksDatabase::~RocksDatabase() {
+  db_->unregisterColumnConfig(column_);
+}
 
 std::optional<OpenRocksDb> RocksDatabase::open() {
-  return db_->open(column_, db_options_patch_, cf_options_patch_);
+  return db_->open(column_);
 }
 
-}  // namespace internal
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/RocksDatabase.h 
b/extensions/rocksdb-repos/database/RocksDatabase.h
index a53c5cf07..acaa3a6d4 100644
--- a/extensions/rocksdb-repos/database/RocksDatabase.h
+++ b/extensions/rocksdb-repos/database/RocksDatabase.h
@@ -27,11 +27,7 @@
 #include "RocksDbUtils.h"
 #include "OpenRocksDb.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
 
 class RocksDbInstance;
 
@@ -46,21 +42,22 @@ class RocksDatabase {
                                                const std::string& uri,
                                                RocksDbMode mode = 
RocksDbMode::ReadWrite);
 
-  RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string column, 
DBOptionsPatch db_options_patch, ColumnFamilyOptionsPatch cf_options_patch);
+  RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string column, const 
DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& 
cf_options_patch);
+
+  RocksDatabase(const RocksDatabase&) = delete;
+  RocksDatabase(RocksDatabase&&) = delete;
+  RocksDatabase& operator=(const RocksDatabase&) = delete;
+  RocksDatabase& operator=(RocksDatabase&&) = delete;
 
   std::optional<OpenRocksDb> open();
 
+  ~RocksDatabase();
+
  private:
   const std::string column_;
-  const DBOptionsPatch db_options_patch_;
-  const ColumnFamilyOptionsPatch cf_options_patch_;
   std::shared_ptr<RocksDbInstance> db_;
 
   static std::shared_ptr<core::logging::Logger> logger_;
 };
 
-}  // namespace internal
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/RocksDbInstance.cpp 
b/extensions/rocksdb-repos/database/RocksDbInstance.cpp
index 8f86fc5cd..f570934d2 100644
--- a/extensions/rocksdb-repos/database/RocksDbInstance.cpp
+++ b/extensions/rocksdb-repos/database/RocksDbInstance.cpp
@@ -18,29 +18,79 @@
 
 #include "RocksDbInstance.h"
 #include <vector>
+#include <utility>
 #include "logging/LoggerConfiguration.h"
 #include "rocksdb/utilities/options_util.h"
 #include "OpenRocksDb.h"
 #include "ColumnHandle.h"
+#include "DbHandle.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
 
 std::shared_ptr<core::logging::Logger> RocksDbInstance::logger_ = 
core::logging::LoggerFactory<RocksDbInstance>::getLogger();
 
-RocksDbInstance::RocksDbInstance(const std::string& path, RocksDbMode mode) : 
db_name_(path), mode_(mode) {}
+RocksDbInstance::RocksDbInstance(std::string path, RocksDbMode mode) : 
db_name_(std::move(path)), mode_(mode) {}
 
 void RocksDbInstance::invalidate() {
   std::lock_guard<std::mutex> db_guard{mtx_};
+  invalidate(db_guard);
+}
+
+void RocksDbInstance::invalidate(const std::lock_guard<std::mutex>&) {
   // discard our own instance
   columns_.clear();
   impl_.reset();
 }
 
-std::optional<OpenRocksDb> RocksDbInstance::open(const std::string& column, 
const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& 
cf_options_patch) {
+void RocksDbInstance::registerColumnConfig(const std::string& column, const 
DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& 
cf_options_patch) {
+  std::lock_guard<std::mutex> db_guard{mtx_};
+  logger_->log_trace("Registering column '%s' in database '%s'", column, 
db_name_);
+  auto [_, inserted] = column_configs_.insert({column, ColumnConfig{.dbo_patch 
= db_options_patch, .cfo_patch = cf_options_patch}});
+  if (!inserted) {
+    throw std::runtime_error("Configuration is already registered for column 
'" + column + "'");
+  }
+
+  bool need_reopen = [&] {
+    if (!impl_) {
+      logger_->log_trace("Database is already scheduled to be reopened");
+      return false;
+    }
+    {
+      rocksdb::DBOptions db_opts_copy = db_options_;
+      Writable<rocksdb::DBOptions> db_opts_writer(db_opts_copy);
+      if (db_options_patch) {
+        db_options_patch(db_opts_writer);
+        if (db_opts_writer.isModified()) {
+          logger_->log_trace("Requested a difference DBOptions than the one 
that was used to open the database");
+          return true;
+        }
+      }
+    }
+    if (!columns_.contains(column)) {
+      logger_->log_trace("Previously unspecified column, will dynamically 
create the column");
+      return false;
+    }
+    if (!cf_options_patch) {
+      logger_->log_trace("No explicit ColumnFamilyOptions was requested");
+      return false;
+    }
+    logger_->log_trace("Could not determine if we definitely need to reopen or 
we are definitely safe, requesting reopen");
+    return true;
+  }();
+  if (need_reopen) {
+    // reset impl_, for the database to be reopened on the next 
RocksDbInstance::open call
+    invalidate(db_guard);
+  }
+}
+
+void RocksDbInstance::unregisterColumnConfig(const std::string& column) {
+  std::lock_guard<std::mutex> db_guard{mtx_};
+  if (column_configs_.erase(column) == 0) {
+    throw std::runtime_error("Could not find column configuration for column 
'" + column + "'");
+  }
+}
+
+std::optional<OpenRocksDb> RocksDbInstance::open(const std::string& column) {
   std::lock_guard<std::mutex> db_guard{mtx_};
   if (!impl_) {
     gsl_Expects(columns_.empty());
@@ -48,26 +98,55 @@ std::optional<OpenRocksDb> RocksDbInstance::open(const 
std::string& column, cons
     rocksdb::DB* db_instance = nullptr;
     rocksdb::Status result;
 
-    rocksdb::ConfigOptions conf_options = [&] {
+    std::vector<DBOptionsPatch> dbo_patches;
+    rocksdb::ConfigOptions conf_options;
+    conf_options.sanity_level = 
rocksdb::ConfigOptions::kSanityLevelLooselyCompatible;
+    {
       // we have to extract the encryptor environment otherwise
       // we won't be able to read the options file
-      rocksdb::ConfigOptions result;
-      if (db_options_patch) {
-        rocksdb::DBOptions dummy_opts;
-        Writable<rocksdb::DBOptions> db_options_writer(dummy_opts);
-        db_options_patch(db_options_writer);
-        if (dummy_opts.env) {
-          result.env = dummy_opts.env;
+      rocksdb::DBOptions dummy_opts;
+      dummy_opts.env = nullptr;  // manually clear it, for the patcher to 
explicitly set it
+      for (auto& [col_name, config] : column_configs_) {
+        if (auto& dbo_patch = config.dbo_patch) {
+          dbo_patches.push_back(dbo_patch);
+          Writable<rocksdb::DBOptions> db_options_writer(dummy_opts);
+          dbo_patch(db_options_writer);
+          if (dummy_opts.env) {
+            conf_options.env = dummy_opts.env;
+          }
+        }
+      }
+      // we need to reapply the DBOptions changes to check for conflicts
+      for (auto& [col_name, config] : column_configs_) {
+        if (auto& dbo_patch = config.dbo_patch) {
+          Writable<rocksdb::DBOptions> db_options_writer(dummy_opts);
+          dbo_patch(db_options_writer);
+          if (db_options_writer.isModified()) {
+            logger_->log_error("Conflicting database options requested for 
'%s'", db_name_);
+            return std::nullopt;
+          }
         }
       }
-      return result;
-    }();
+    }
     db_options_ = rocksdb::DBOptions{};
     std::vector<rocksdb::ColumnFamilyDescriptor> cf_descriptors;
     rocksdb::Status option_status = rocksdb::LoadLatestOptions(conf_options, 
db_name_, &db_options_, &cf_descriptors);
-    Writable<rocksdb::DBOptions> db_options_writer(db_options_);
-    if (db_options_patch) {
-      db_options_patch(db_options_writer);
+    {
+      // apply the database options patchers
+      Writable<rocksdb::DBOptions> db_options_writer(db_options_);
+      for (auto& [col_name, config] : column_configs_) {
+        if (auto& dbo_patch = config.dbo_patch) {
+          dbo_patch(db_options_writer);
+        }
+      }
+    }
+    // apply requested ColumnFamilyOptions for each already existing 
ColumnFamily
+    for (auto& cf_descr : cf_descriptors) {
+      if (auto it = column_configs_.find(cf_descr.name); it != 
column_configs_.end()) {
+        if (auto& cfo_patch = it->second.cfo_patch) {
+          cfo_patch(cf_descr.options);
+        }
+      }
     }
     if (option_status.ok()) {
       logger_->log_trace("Found existing database '%s', checking 
compatibility", db_name_);
@@ -78,17 +157,13 @@ std::optional<OpenRocksDb> RocksDbInstance::open(const 
std::string& column, cons
       }
     } else if (option_status.IsNotFound()) {
       logger_->log_trace("Database at '%s' not found, creating", db_name_);
-      if (column == "default") {
-        rocksdb::ColumnFamilyOptions default_cf_options;
-        Writable<rocksdb::ColumnFamilyOptions> cf_writer(default_cf_options);
-        if (cf_options_patch) {
-          cf_options_patch(cf_writer);
+      rocksdb::ColumnFamilyOptions default_cf_options;
+      if (auto it = column_configs_.find("default"); it != 
column_configs_.end()) {
+        if (auto& cfo_patch = it->second.cfo_patch) {
+          cfo_patch(default_cf_options);
         }
-        cf_descriptors.emplace_back("default", default_cf_options);
-      } else {
-        // we must create the "default" column, using default options
-        cf_descriptors.emplace_back("default", rocksdb::ColumnFamilyOptions{});
       }
+      cf_descriptors.emplace_back("default", default_cf_options);
     } else if (!option_status.ok()) {
       logger_->log_error("Couldn't query database '%s' for options: '%s'", 
db_name_, option_status.ToString());
       return std::nullopt;
@@ -113,50 +188,37 @@ std::optional<OpenRocksDb> RocksDbInstance::open(const 
std::string& column, cons
       return std::nullopt;
     }
     gsl_Expects(db_instance);
-    // the patcher could have internal resources the we need to keep alive
+    // the patches could have internal resources that we need to keep alive
     // as long as the database is open (e.g. custom environment)
-    db_options_patch_ = db_options_patch;
-    impl_.reset(db_instance);
+    impl_ = 
std::make_shared<DbHandle>(std::unique_ptr<rocksdb::DB>(db_instance), 
std::move(dbo_patches));
     for (size_t cf_idx{0}; cf_idx < column_handles.size(); ++cf_idx) {
-      columns_[column_handles[cf_idx]->GetName()]
-          = 
std::make_shared<ColumnHandle>(std::unique_ptr<rocksdb::ColumnFamilyHandle>(column_handles[cf_idx]),
 cf_descriptors[cf_idx].options);
-    }
-  } else {
-    logger_->log_trace("Checking if the already open database is compatible 
with the requested options");
-    if (db_options_patch) {
-      rocksdb::DBOptions db_options_copy = db_options_;
-      Writable<rocksdb::DBOptions> writer(db_options_copy);
-      db_options_patch(writer);
-      if (writer.isModified()) {
-        logger_->log_error("Database '%s' has already been opened using a 
different configuration", db_name_);
-        return std::nullopt;
+      ColumnFamilyOptionsPatch cfo_patch;
+      if (auto it = column_configs_.find(column_handles[cf_idx]->GetName()); 
it != column_configs_.end()) {
+        cfo_patch = it->second.cfo_patch;
       }
+      columns_[column_handles[cf_idx]->GetName()]
+          = 
std::make_shared<ColumnHandle>(std::unique_ptr<rocksdb::ColumnFamilyHandle>(column_handles[cf_idx]),
 cfo_patch);
     }
   }
-  std::shared_ptr<ColumnHandle> column_handle = 
getOrCreateColumnFamily(column, cf_options_patch, db_guard);
+  std::shared_ptr<ColumnHandle> column_handle = 
getOrCreateColumnFamily(column, db_guard);
   if (!column_handle) {
     // error is already logged by the method
     return std::nullopt;
   }
   return OpenRocksDb(
       *this,
-      gsl::make_not_null<std::shared_ptr<rocksdb::DB>>(impl_),
+      
gsl::make_not_null<std::shared_ptr<rocksdb::DB>>(std::shared_ptr<rocksdb::DB>(impl_,
 impl_->handle.get())),
       gsl::make_not_null<std::shared_ptr<ColumnHandle>>(column_handle));
 }
 
-std::shared_ptr<ColumnHandle> RocksDbInstance::getOrCreateColumnFamily(const 
std::string& column, const ColumnFamilyOptionsPatch& cf_options_patch, const 
std::lock_guard<std::mutex>& /*guard*/) {
+std::shared_ptr<ColumnHandle> RocksDbInstance::getOrCreateColumnFamily(const 
std::string& column, const std::lock_guard<std::mutex>& /*guard*/) {
   gsl_Expects(impl_);
-  auto it = columns_.find(column);
-  if (it != columns_.end()) {
-    logger_->log_trace("Column '%s' already exists in database '%s'", column, 
impl_->GetName());
-    Writable<rocksdb::ColumnFamilyOptions> writer(it->second->options);
-    if (cf_options_patch) {
-      cf_options_patch(writer);
-    }
-    if (writer.isModified()) {
-      logger_->log_error("Requested column '%s' has already been opened using 
a different configuration", column);
-      return nullptr;
-    }
+  if (!column_configs_.contains(column)) {
+    logger_->log_error("Trying to access column '%s' in database '%s' without 
configuration", column, impl_->handle->GetName());
+    return nullptr;
+  }
+  if (auto it = columns_.find(column); it != columns_.end()) {
+    logger_->log_trace("Column '%s' already exists in database '%s'", column, 
impl_->handle->GetName());
     return it->second;
   }
   if (mode_ == RocksDbMode::ReadOnly) {
@@ -165,22 +227,21 @@ std::shared_ptr<ColumnHandle> 
RocksDbInstance::getOrCreateColumnFamily(const std
   }
   rocksdb::ColumnFamilyHandle* raw_handle{nullptr};
   rocksdb::ColumnFamilyOptions cf_options;
-  Writable<rocksdb::ColumnFamilyOptions> writer(cf_options);
-  if (cf_options_patch) {
-    cf_options_patch(writer);
+  ColumnFamilyOptionsPatch cfo_patch;
+  if (auto it = column_configs_.find(column); it != column_configs_.end()) {
+    cfo_patch = it->second.cfo_patch;
+    if (cfo_patch) {
+      cfo_patch(cf_options);
+    }
   }
-  auto status = impl_->CreateColumnFamily(cf_options, column, &raw_handle);
+  auto status = impl_->handle->CreateColumnFamily(cf_options, column, 
&raw_handle);
   if (!status.ok()) {
-    logger_->log_error("Failed to create column '%s' in database '%s'", 
column, impl_->GetName());
+    logger_->log_error("Failed to create column '%s' in database '%s'", 
column, impl_->handle->GetName());
     return nullptr;
   }
-  logger_->log_trace("Successfully created column '%s' in database '%s'", 
column, impl_->GetName());
-  columns_[column] = 
std::make_shared<ColumnHandle>(std::unique_ptr<rocksdb::ColumnFamilyHandle>(raw_handle),
 cf_options);
+  logger_->log_trace("Successfully created column '%s' in database '%s'", 
column, impl_->handle->GetName());
+  columns_[column] = 
std::make_shared<ColumnHandle>(std::unique_ptr<rocksdb::ColumnFamilyHandle>(raw_handle),
 cfo_patch);
   return columns_[column];
 }
 
-}  // namespace internal
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/RocksDbInstance.h 
b/extensions/rocksdb-repos/database/RocksDbInstance.h
index fdc3e2421..182324958 100644
--- a/extensions/rocksdb-repos/database/RocksDbInstance.h
+++ b/extensions/rocksdb-repos/database/RocksDbInstance.h
@@ -26,52 +26,55 @@
 #include "rocksdb/db.h"
 #include "logging/Logger.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
 
+class RocksDatabase;
 class OpenRocksDb;
 struct ColumnHandle;
+struct DbHandle;
 
 /**
  * Purpose: represents a single rocksdb instance backed by a directory
  */
 class RocksDbInstance {
   friend class OpenRocksDb;
+  friend class RocksDatabase;
+
+  struct ColumnConfig {
+    DBOptionsPatch dbo_patch;
+    ColumnFamilyOptionsPatch cfo_patch;
+  };
 
  public:
-  explicit RocksDbInstance(const std::string& path, RocksDbMode mode = 
RocksDbMode::ReadWrite);
+  explicit RocksDbInstance(std::string path, RocksDbMode mode = 
RocksDbMode::ReadWrite);
 
-  std::optional<OpenRocksDb> open(const std::string& column, const 
DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& 
cf_options_patch);
+  std::optional<OpenRocksDb> open(const std::string& column);
 
  protected:
   // caller must hold the mtx_ mutex
-  std::shared_ptr<ColumnHandle> getOrCreateColumnFamily(const std::string& 
column, const ColumnFamilyOptionsPatch& cf_options_patch, const 
std::lock_guard<std::mutex>& guard);
+  std::shared_ptr<ColumnHandle> getOrCreateColumnFamily(const std::string& 
column, const std::lock_guard<std::mutex>& guard);
   /*
    * notify RocksDatabase that the next open should check if they can reopen 
the database
    * until a successful reopen no more open is possible
    */
   void invalidate();
 
+  void invalidate(const std::lock_guard<std::mutex>&);
+
+  void registerColumnConfig(const std::string& column, const DBOptionsPatch& 
db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch);
+  void unregisterColumnConfig(const std::string& column);
+
   rocksdb::DBOptions db_options_;
   const std::string db_name_;
   const RocksDbMode mode_;
 
   std::mutex mtx_;
-  std::shared_ptr<rocksdb::DB> impl_;
+  std::shared_ptr<DbHandle> impl_;
   std::unordered_map<std::string, std::shared_ptr<ColumnHandle>> columns_;
 
-  // the patcher could have internal resources the we need to keep alive
-  // as long as the database is open (e.g. custom environment)
-  DBOptionsPatch db_options_patch_;
+  std::unordered_map<std::string, ColumnConfig> column_configs_;
 
   static std::shared_ptr<core::logging::Logger> logger_;
 };
 
-}  // namespace internal
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/RocksDbUtils.h 
b/extensions/rocksdb-repos/database/RocksDbUtils.h
index 6d6d5c30a..d42afce6a 100644
--- a/extensions/rocksdb-repos/database/RocksDbUtils.h
+++ b/extensions/rocksdb-repos/database/RocksDbUtils.h
@@ -67,7 +67,7 @@ class Writable {
  * options of the existing database, apply a "patch" and then check for 
compatibility.
  */
 using DBOptionsPatch = std::function<void(Writable<rocksdb::DBOptions>&)>;
-using ColumnFamilyOptionsPatch = 
std::function<void(Writable<rocksdb::ColumnFamilyOptions>&)>;
+using ColumnFamilyOptionsPatch = 
std::function<void(rocksdb::ColumnFamilyOptions&)>;
 
 }  // namespace internal
 }  // namespace minifi
diff --git a/libminifi/include/utils/crypto/EncryptionManager.h 
b/libminifi/include/utils/crypto/EncryptionManager.h
index a21dda8df..f538471ff 100644
--- a/libminifi/include/utils/crypto/EncryptionManager.h
+++ b/libminifi/include/utils/crypto/EncryptionManager.h
@@ -40,9 +40,9 @@ class EncryptionManager {
 
   [[nodiscard]] std::optional<XSalsa20Cipher> createXSalsa20Cipher(const 
std::string& key_name) const;
   [[nodiscard]] std::optional<Aes256EcbCipher> createAes256EcbCipher(const 
std::string& key_name) const;
- private:
-  [[nodiscard]] std::optional<Bytes> readKey(const std::string& key_name) 
const;
-  [[nodiscard]] bool writeKey(const std::string& key_name, const Bytes& key) 
const;
+ protected:
+  [[nodiscard]] virtual std::optional<Bytes> readKey(const std::string& 
key_name) const;
+  [[nodiscard]] virtual bool writeKey(const std::string& key_name, const 
Bytes& key) const;
 
   std::string key_dir_;
 };
diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp 
b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
index d0a7ffa51..0ce216ecd 100644
--- a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
+++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
@@ -31,8 +31,8 @@ class RocksDBStreamTest : TestController {
       db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, 
true);
       db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
     };
-    auto set_cf_opts = [] 
(minifi::internal::Writable<rocksdb::ColumnFamilyOptions>& cf_opts) {
-      cf_opts.set(&rocksdb::ColumnFamilyOptions::merge_operator, 
std::make_shared<core::repository::StringAppender>(), 
core::repository::StringAppender::Eq{});
+    auto set_cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
+      cf_opts.merge_operator = 
std::make_shared<core::repository::StringAppender>();
     };
     db = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, 
dbPath);
     REQUIRE(db->open());
diff --git a/libminifi/test/rocksdb-tests/RocksDBTests.cpp 
b/libminifi/test/rocksdb-tests/RocksDBTests.cpp
index 6e6e93514..45c920362 100644
--- a/libminifi/test/rocksdb-tests/RocksDBTests.cpp
+++ b/libminifi/test/rocksdb-tests/RocksDBTests.cpp
@@ -22,6 +22,7 @@
 #include "../../extensions/rocksdb-repos/database/RocksDatabase.h"
 #include "../../extensions/rocksdb-repos/database/RocksDbInstance.h"
 #include "../../extensions/rocksdb-repos/database/ColumnHandle.h"
+#include "../../extensions/rocksdb-repos/database/DbHandle.h"
 #include "IntegrationTestUtils.h"
 #include "database/StringAppender.h"
 #include 
"../../extensions/rocksdb-repos/encryption/RocksDbEncryptionProvider.h"
@@ -41,6 +42,7 @@ struct RocksDBTest : TestController {
     
LogTestController::getInstance().setTrace<minifi::internal::RocksDatabase>();
     
LogTestController::getInstance().setTrace<minifi::internal::RocksDbInstance>();
     
LogTestController::getInstance().setTrace<minifi::internal::ColumnHandle>();
+    LogTestController::getInstance().setTrace<minifi::internal::DbHandle>();
     db_dir = createTempDirectory();
   }
 
@@ -107,9 +109,11 @@ TEST_CASE_METHOD(RocksDBTest, "Can write to two specific 
columns at once", "[roc
   {
     auto db1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/column_one");
     auto opendb1 = db1->open();
+    REQUIRE(opendb1);
     opendb1->Put(rocksdb::WriteOptions{}, "fruit", "apple");
     auto db2 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/column_two");
     auto opendb2 = db2->open();
+    REQUIRE(opendb2);
     opendb2->Put(rocksdb::WriteOptions{}, "animal", "penguin");
   }
 
@@ -127,9 +131,11 @@ TEST_CASE_METHOD(RocksDBTest, "Can write to the default 
and a specific column at
   {
     auto db1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/column_one");
     auto opendb1 = db1->open();
+    REQUIRE(opendb1);
     opendb1->Put(rocksdb::WriteOptions{}, "fruit", "apple");
     auto db2 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
db_dir);
     auto opendb2 = db2->open();
+    REQUIRE(opendb2);
     opendb2->Put(rocksdb::WriteOptions{}, "animal", "penguin");
   }
 
@@ -143,17 +149,17 @@ TEST_CASE_METHOD(RocksDBTest, "Can write to the default 
and a specific column at
   REQUIRE(value == "penguin");
 }
 
-TEST_CASE_METHOD(RocksDBTest, "Error is logged if the options are incompatible 
with an existing column family", "[rocksDBTest6]") {
+TEST_CASE_METHOD(RocksDBTest, "Logged if the options are incompatible with an 
existing column family", "[rocksDBTest6]") {
   auto db = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
"minifidb://" + db_dir + "/column_one");
   REQUIRE(db->open());
   // implicitly created the "default" column family, but with the default 
options
-  auto cf_opts = [] (minifi::internal::Writable<rocksdb::ColumnFamilyOptions>& 
cf_opts) {
-    cf_opts.set(&rocksdb::ColumnFamilyOptions::merge_operator, 
std::make_shared<minifi::core::repository::StringAppender>());
+  auto cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
+    cf_opts.merge_operator = 
std::make_shared<minifi::core::repository::StringAppender>();
   };
   auto default_db = minifi::internal::RocksDatabase::create(new_db_opts, 
cf_opts, "minifidb://" + db_dir + "/default");
-  REQUIRE_FALSE(default_db->open());
+  REQUIRE(default_db->open());
   REQUIRE(utils::verifyLogLinePresenceInPollTime(
-      std::chrono::seconds{1}, "Requested column 'default' has already been 
opened using a different configuration"));
+      std::chrono::seconds{1}, "Could not determine if we definitely need to 
reopen or we are definitely safe, requesting reopen"));
 }
 
 TEST_CASE_METHOD(RocksDBTest, "Error is logged if different DBOptions are 
used", "[rocksDBTest7]") {
@@ -170,7 +176,7 @@ TEST_CASE_METHOD(RocksDBTest, "Error is logged if different 
DBOptions are used",
   auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, 
"minifidb://" + db_dir + "/column_two");
   REQUIRE_FALSE(col_2->open());
   REQUIRE(utils::verifyLogLinePresenceInPollTime(
-      std::chrono::seconds{1}, "Database '" + db_dir + "' has already been 
opened using a different configuration"));
+      std::chrono::seconds{1}, "Conflicting database options requested for '" 
+ db_dir + "'"));
 }
 
 TEST_CASE_METHOD(RocksDBTest, "Sanity check: merge fails without 
merge_operator", "[rocksDBTest8]") {
@@ -178,14 +184,15 @@ TEST_CASE_METHOD(RocksDBTest, "Sanity check: merge fails 
without merge_operator"
   REQUIRE(db);
 
   auto opendb = db->open();
+  REQUIRE(opendb);
 
   REQUIRE(opendb->Put({}, "a", "first").ok());
   REQUIRE_FALSE(opendb->Merge({}, "a", "second").ok());
 }
 
 TEST_CASE_METHOD(RocksDBTest, "Column options are applied", "[rocksDBTest9]") {
-  auto cf_opts = [] (minifi::internal::Writable<rocksdb::ColumnFamilyOptions>& 
cf_opts) {
-    cf_opts.set(&rocksdb::ColumnFamilyOptions::merge_operator, 
std::make_shared<StringAppender>(), StringAppender::Eq{});
+  auto cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
+    cf_opts.merge_operator = std::make_shared<StringAppender>();
   };
   std::string db_uri;
   SECTION("Named column") {
@@ -201,6 +208,7 @@ TEST_CASE_METHOD(RocksDBTest, "Column options are applied", 
"[rocksDBTest9]") {
   REQUIRE(db);
 
   auto opendb = db->open();
+  REQUIRE(opendb);
 
   REQUIRE(opendb->Put({}, "a", "first").ok());
   REQUIRE(opendb->Merge({}, "a", "second").ok());
@@ -245,13 +253,36 @@ TEST_CASE_METHOD(RocksDBTest, "Error is logged if 
different encryption keys are
     auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, 
"minifidb://" + db_dir + "/column_two");
     REQUIRE_FALSE(col_2->open());
     REQUIRE(utils::verifyLogLinePresenceInPollTime(
-        std::chrono::seconds{1}, "Database '" + db_dir + "' has already been 
opened using a different configuration"));
+        std::chrono::seconds{1}, "Conflicting database options requested for 
'" + db_dir + "'"));
   }
 
   SECTION("Using no encryption key") {
     auto col_2 = minifi::internal::RocksDatabase::create(withDefaultEnv, {}, 
"minifidb://" + db_dir + "/column_two");
     REQUIRE_FALSE(col_2->open());
     REQUIRE(utils::verifyLogLinePresenceInPollTime(
-        std::chrono::seconds{1}, "Database '" + db_dir + "' has already been 
opened using a different configuration"));
+        std::chrono::seconds{1}, "Conflicting database options requested for 
'" + db_dir + "'"));
+  }
+}
+
+TEST_CASE_METHOD(RocksDBTest, "RocksDb works correctly on changed (but 
compatible) ColumnFamilyOptions change", "[rocksDBTest11]") {
+  {
+    auto col1 = minifi::internal::RocksDatabase::create(new_db_opts, {}, 
db_dir);
+    auto opendb = col1->open();
+    REQUIRE(opendb);
+    REQUIRE(opendb->Put({}, "fruit", "apple").ok());
+  }
+
+  auto cf_opts = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
+    cf_opts.OptimizeForPointLookup(4);
+  };
+
+  auto col2 = minifi::internal::RocksDatabase::create(new_db_opts, cf_opts, 
db_dir);
+  REQUIRE(col2);
+  {
+    auto opendb = col2->open();
+    REQUIRE(opendb);
+    std::string value;
+    REQUIRE(opendb->Get({}, "fruit", &value).ok());
+    REQUIRE(value == "apple");
   }
 }


Reply via email to