This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 5af180f  MINIFICPP-1561 - Allow rocksdb encryption
5af180f is described below

commit 5af180fd964b6564045b146df8473602a0ec789c
Author: Adam Debreceni <[email protected]>
AuthorDate: Thu Jul 8 10:30:08 2021 +0200

    MINIFICPP-1561 - Allow rocksdb encryption
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #1090
---
 CONFIGURE.md                                       |  21 ++++
 encrypt-config/ConfigFile.h                        |   2 +-
 encrypt-config/ConfigFileEncryptor.h               |   2 +-
 encrypt-config/Utils.h                             |   2 +-
 extensions/http-curl/tests/C2ConfigEncryption.cpp  |   2 +-
 extensions/rocksdb-repos/CMakeLists.txt            |   2 +-
 .../rocksdb-repos/DatabaseContentRepository.cpp    |  16 ++-
 .../rocksdb-repos/DatabaseContentRepository.h      |   2 +
 extensions/rocksdb-repos/FlowFileRepository.cpp    |  41 +++++--
 extensions/rocksdb-repos/FlowFileRepository.h      |  16 ++-
 .../RocksDbPersistableKeyValueStoreService.cpp     |  13 +-
 .../RocksDbPersistableKeyValueStoreService.h       |   2 +
 extensions/rocksdb-repos/database/OpenRocksDb.cpp  |   9 ++
 extensions/rocksdb-repos/database/OpenRocksDb.h    |   2 +
 .../rocksdb-repos/database/RocksDatabase.cpp       |   2 +-
 extensions/rocksdb-repos/database/RocksDatabase.h  |   2 +-
 .../rocksdb-repos/database/RocksDbInstance.cpp     |  18 ++-
 .../rocksdb-repos/database/RocksDbInstance.h       |   4 +
 extensions/rocksdb-repos/database/RocksDbUtils.h   |  11 +-
 extensions/rocksdb-repos/database/StringAppender.h |  11 +-
 .../encryption/RocksDbEncryptionProvider.cpp       | 127 ++++++++++++++++++++
 .../RocksDbEncryptionProvider.h}                   |  28 ++---
 libminifi/CMakeLists.txt                           |   2 +-
 libminifi/include/properties/Decryptor.h           |   4 +-
 libminifi/include/properties/PropertiesFile.h      |   2 +-
 .../EncryptionManager.h}                           |  29 +++--
 .../utils/{ => crypto}/EncryptionProvider.h        |  15 ++-
 .../include/utils/{ => crypto}/EncryptionUtils.h   |   0
 libminifi/include/utils/crypto/ciphers/Aes256Ecb.h |  78 ++++++++++++
 .../ciphers/XSalsa20.h}                            |  13 +-
 libminifi/include/utils/file/FileSystem.h          |   2 +-
 libminifi/src/utils/EncryptionProvider.cpp         |  58 ---------
 libminifi/src/utils/crypto/EncryptionManager.cpp   |  88 ++++++++++++++
 .../utils/crypto/EncryptionProvider.cpp}           |  31 ++---
 .../src/utils/{ => crypto}/EncryptionUtils.cpp     |   2 +-
 libminifi/src/utils/crypto/ciphers/Aes256Ecb.cpp   | 133 +++++++++++++++++++++
 libminifi/src/utils/file/FileSystem.cpp            |   2 +-
 libminifi/test/TestBase.h                          |   7 ++
 libminifi/test/rocksdb-tests/EncryptionTests.cpp   | 108 +++++++++++++++++
 .../test/rocksdb-tests/RocksDBStreamTests.cpp      |   2 +-
 libminifi/test/rocksdb-tests/RocksDBTests.cpp      |  49 +++++++-
 libminifi/test/unit/EncryptionUtilsTests.cpp       |   2 +-
 42 files changed, 788 insertions(+), 174 deletions(-)

diff --git a/CONFIGURE.md b/CONFIGURE.md
index c93352d..13c859d 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -166,6 +166,8 @@ folder. You may specify your own path in place of these 
defaults.
      
nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository
         
nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository
 
+#### Shared database
+
 It is also possible to use a single database to store multiple repositories 
with the `minifidb://` scheme.
 This could help with migration and centralize agent state persistence. In the 
scheme the final path segment designates the
 column family in the repository, while the preceding path indicates the 
directory the rocksdb database is
@@ -189,6 +191,25 @@ Moreover the `"default"` name is restricted and should not 
be used.
         
nifi.state.manangement.provider.local.path=minifidb://${MINIFI_HOME}/agent_state/default
         ^ error: "default" is restricted
 
+### Configuring Repository encryption
+
+It is possible to provide rocksdb-backed repositories a key to request their
+encryption.
+
+    in conf/bootstrap.conf
+    
nifi.flowfile.repository.encryption.key=805D7B95EF44DC27C87FFBC4DFDE376DAE604D55DB2C5496DEEF5236362DE62E
+    nifi.database.content.repository.encryption.key=
+    # nifi.state.management.provider.local.encryption.key=
+
+In the above configuration the first line will cause `FlowFileRepository` to 
use the specified `256` bit key.
+The second line will trigger the generation of a random (`256` bit) key 
persisted back into `conf/bootstrap.conf`, which `DatabaseContentRepository` 
will then use for encryption.
+(This way one can request encryption while not bothering with what key to use.)
+Finally, as the last line is commented out, it will make the state manager use 
plaintext storage, and not trigger encryption.
+
+#### Mixing encryption with shared backend
+
+When multiple repositories use the same directory (as with `minifidb://` 
scheme) they should either be all plaintext or all encrypted with the same key.
+
 ### Configuring Volatile and NO-OP Repositories
 Each of the repositories can be configured to be volatile ( state kept in 
memory and flushed
  upon restart ) or persistent. Currently, the flow file and provenance 
repositories can persist
diff --git a/encrypt-config/ConfigFile.h b/encrypt-config/ConfigFile.h
index 61b7a0f..59cb7ef 100644
--- a/encrypt-config/ConfigFile.h
+++ b/encrypt-config/ConfigFile.h
@@ -20,7 +20,7 @@
 #include <string>
 #include <vector>
 
-#include "utils/EncryptionUtils.h"
+#include "utils/crypto/EncryptionUtils.h"
 #include "utils/OptionalUtils.h"
 #include "properties/PropertiesFile.h"
 
diff --git a/encrypt-config/ConfigFileEncryptor.h 
b/encrypt-config/ConfigFileEncryptor.h
index d58563f..aecd42f 100644
--- a/encrypt-config/ConfigFileEncryptor.h
+++ b/encrypt-config/ConfigFileEncryptor.h
@@ -17,7 +17,7 @@
 #pragma once
 
 #include "ConfigFile.h"
-#include "utils/EncryptionUtils.h"
+#include "utils/crypto/EncryptionUtils.h"
 #include "Utils.h"
 
 namespace org {
diff --git a/encrypt-config/Utils.h b/encrypt-config/Utils.h
index 23a521e..cf4975e 100644
--- a/encrypt-config/Utils.h
+++ b/encrypt-config/Utils.h
@@ -16,7 +16,7 @@
  */
 #pragma once
 
-#include "utils/EncryptionUtils.h"
+#include "utils/crypto/EncryptionUtils.h"
 #include "utils/OptionalUtils.h"
 
 namespace org {
diff --git a/extensions/http-curl/tests/C2ConfigEncryption.cpp 
b/extensions/http-curl/tests/C2ConfigEncryption.cpp
index d122b81..0560db8 100644
--- a/extensions/http-curl/tests/C2ConfigEncryption.cpp
+++ b/extensions/http-curl/tests/C2ConfigEncryption.cpp
@@ -22,7 +22,7 @@
 #include "HTTPIntegrationBase.h"
 #include "HTTPHandlers.h"
 #include "utils/IntegrationTestUtils.h"
-#include "utils/EncryptionProvider.h"
+#include "utils/crypto/EncryptionProvider.h"
 
 int main(int argc, char **argv) {
   const cmd_args args = parse_cmdline_args(argc, argv, "update");
diff --git a/extensions/rocksdb-repos/CMakeLists.txt 
b/extensions/rocksdb-repos/CMakeLists.txt
index f7f97d5..ee5ce4a 100644
--- a/extensions/rocksdb-repos/CMakeLists.txt
+++ b/extensions/rocksdb-repos/CMakeLists.txt
@@ -19,7 +19,7 @@
 
 include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
 
-file(GLOB SOURCES  "*.cpp" "controllers/*.cpp" "database/*.cpp")
+file(GLOB SOURCES  "*.cpp" "controllers/*.cpp" "database/*.cpp" 
"encryption/*.cpp")
 
 add_library(minifi-rocksdb-repos STATIC ${SOURCES})
 set_property(TARGET minifi-rocksdb-repos PROPERTY POSITION_INDEPENDENT_CODE ON)
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp 
b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index dc78af7..0eacce8 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -16,12 +16,12 @@
  * limitations under the License.
  */
 
-#include "DatabaseContentRepository.h"
-
 #include <memory>
 #include <string>
 #include <utility>
 
+#include "DatabaseContentRepository.h"
+#include "encryption/RocksDbEncryptionProvider.h"
 #include "RocksDbStream.h"
 #include "utils/GeneralUtils.h"
 #include "utils/gsl.h"
@@ -42,14 +42,22 @@ bool DatabaseContentRepository::initialize(const 
std::shared_ptr<minifi::Configu
   } else {
     directory_ = configuration->getHome() + "/dbcontentrepository";
   }
-  auto set_db_opts = [] (internal::Writable<rocksdb::DBOptions>& db_opts) {
+  const auto encrypted_env = 
createEncryptingEnv(utils::crypto::EncryptionManager{configuration->getHome()}, 
DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
+  logger_->log_info("Using %s DatabaseContentRepository", encrypted_env ? 
"encrypted" : "plaintext");
+
+  auto set_db_opts = [encrypted_env] (internal::Writable<rocksdb::DBOptions>& 
db_opts) {
     db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
     db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, 
true);
     db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
     db_opts.set(&rocksdb::DBOptions::error_if_exists, false);
+    if (encrypted_env) {
+      db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), 
EncryptionEq{});
+    } else {
+      db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
+    }
   };
   auto set_cf_opts = [] (internal::Writable<rocksdb::ColumnFamilyOptions>& 
cf_opts){
-    
cf_opts.transform<StringAppender>(&rocksdb::ColumnFamilyOptions::merge_operator);
+    cf_opts.set(&rocksdb::ColumnFamilyOptions::merge_operator, 
std::make_shared<StringAppender>(), StringAppender::Eq{});
     cf_opts.set<size_t>(&rocksdb::ColumnFamilyOptions::max_successive_merges, 
0);
   };
   db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, 
directory_);
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h 
b/extensions/rocksdb-repos/DatabaseContentRepository.h
index d4419d4..6be66c8 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -48,6 +48,8 @@ class DatabaseContentRepository : public 
core::ContentRepository, public core::C
   };
 
  public:
+  static constexpr const char* ENCRYPTION_KEY_NAME = 
"nifi.database.content.repository.encryption.key";
+
   explicit DatabaseContentRepository(const std::string& name = 
getClassName<DatabaseContentRepository>(), const utils::Identifier& uuid = {})
       : core::Connectable(name, uuid),
         is_valid_(false),
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp 
b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 8e47e3a..5393e89 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -130,16 +130,27 @@ void FlowFileRepository::run() {
 }
 
 void FlowFileRepository::prune_stored_flowfiles() {
-  auto set_db_opts = [] (minifi::internal::Writable<rocksdb::DBOptions>& 
db_opts) {
+  const auto encrypted_env = 
createEncryptingEnv(utils::crypto::EncryptionManager{config_->getHome()}, 
DbEncryptionOptions{checkpoint_dir_, ENCRYPTION_KEY_NAME});
+  logger_->log_info("Using %s FlowFileRepository checkpoint", encrypted_env ? 
"encrypted" : "plaintext");
+
+  auto set_db_opts = [encrypted_env] 
(minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
     db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
     db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, 
true);
     db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
+    if (encrypted_env) {
+      db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), 
EncryptionEq{});
+    } else {
+      db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
+    }
   };
   auto checkpointDB = minifi::internal::RocksDatabase::create(set_db_opts, {}, 
checkpoint_dir_, minifi::internal::RocksDbMode::ReadOnly);
   utils::optional<minifi::internal::OpenRocksDb> opendb;
   if (nullptr != checkpoint_) {
     opendb = checkpointDB->open();
-    if (!opendb) {
+    if (opendb) {
+      logger_->log_trace("Successfully opened checkpoint database at '%s'", 
checkpoint_dir_);
+    } else {
+      logger_->log_error("Couldn't open checkpoint database at '%s' using live 
database", checkpoint_dir_);
       opendb = db_->open();
     }
     if (!opendb) {
@@ -220,18 +231,24 @@ void FlowFileRepository::initialize_repository() {
     logger_->log_trace("Do not need checkpoint");
     return;
   }
-  rocksdb::Checkpoint *checkpoint;
   // delete any previous copy
-  if (utils::file::FileUtils::delete_dir(checkpoint_dir_) >= 0 && 
opendb->NewCheckpoint(&checkpoint).ok()) {
-    if (checkpoint->CreateCheckpoint(checkpoint_dir_).ok()) {
-      checkpoint_ = std::unique_ptr<rocksdb::Checkpoint>(checkpoint);
-      logger_->log_trace("Created checkpoint directory");
-    } else {
-      logger_->log_trace("Could not create checkpoint. Corrupt?");
-    }
-  } else {
-    logger_->log_trace("Could not create checkpoint directory. Not properly 
deleted?");
+  if (utils::file::FileUtils::delete_dir(checkpoint_dir_) < 0) {
+    logger_->log_error("Could not delete existing checkpoint directory '%s'", 
checkpoint_dir_);
+    return;
+  }
+  std::unique_ptr<rocksdb::Checkpoint> checkpoint;
+  rocksdb::Status checkpoint_status = opendb->NewCheckpoint(checkpoint);
+  if (!checkpoint_status.ok()) {
+    logger_->log_error("Could not create checkpoint object: %s", 
checkpoint_status.ToString());
+    return;
+  }
+  checkpoint_status = checkpoint->CreateCheckpoint(checkpoint_dir_);
+  if (!checkpoint_status.ok()) {
+    logger_->log_error("Could not initialize checkpoint: %s", 
checkpoint_status.ToString());
+    return;
   }
+  checkpoint_ = std::move(checkpoint);
+  logger_->log_trace("Created checkpoint in directory '%s'", checkpoint_dir_);
 }
 
 void FlowFileRepository::loadComponent(const 
std::shared_ptr<core::ContentRepository> &content_repo) {
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h 
b/extensions/rocksdb-repos/FlowFileRepository.h
index b7ed458..7eb4567 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -33,6 +33,8 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "concurrentqueue.h"
 #include "database/RocksDatabase.h"
+#include "encryption/RocksDbEncryptionProvider.h"
+#include "utils/crypto/EncryptionProvider.h"
 
 namespace org {
 namespace apache {
@@ -59,6 +61,7 @@ namespace repository {
  */
 class FlowFileRepository : public core::Repository, public 
std::enable_shared_from_this<FlowFileRepository> {
  public:
+  static constexpr const char* ENCRYPTION_KEY_NAME = 
"nifi.flowfile.repository.encryption.key";
   // Constructor
 
   FlowFileRepository(const std::string& name, const utils::Identifier& 
/*uuid*/)
@@ -87,6 +90,7 @@ class FlowFileRepository : public core::Repository, public 
std::enable_shared_fr
 
   // initialize
   virtual bool initialize(const std::shared_ptr<Configure> &configure) {
+    config_ = configure;
     std::string value;
 
     if (configure->get(Configure::nifi_flowfile_repository_directory_default, 
value)) {
@@ -104,10 +108,19 @@ class FlowFileRepository : public core::Repository, 
public std::enable_shared_fr
       }
     }
     logger_->log_debug("NiFi FlowFile Max Storage Time: [%d] ms", 
max_partition_millis_);
-    auto db_options = [] (minifi::internal::Writable<rocksdb::DBOptions>& 
options) {
+
+    const auto encrypted_env = 
createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, 
DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
+    logger_->log_info("Using %s FlowFileRepository", encrypted_env ? 
"encrypted" : "plaintext");
+
+    auto db_options = [encrypted_env] 
(minifi::internal::Writable<rocksdb::DBOptions>& options) {
       options.set(&rocksdb::DBOptions::create_if_missing, true);
       options.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, 
true);
       options.set(&rocksdb::DBOptions::use_direct_reads, true);
+      if (encrypted_env) {
+        options.set(&rocksdb::DBOptions::env, encrypted_env.get(), 
EncryptionEq{});
+      } else {
+        options.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
+      }
     };
 
     // Write buffers are used as db operation logs. When they get filled the 
events are merged and serialized.
@@ -222,6 +235,7 @@ class FlowFileRepository : public core::Repository, public 
std::enable_shared_fr
   std::unique_ptr<minifi::internal::RocksDatabase> db_;
   std::unique_ptr<rocksdb::Checkpoint> checkpoint_;
   std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<minifi::Configure> config_;
 };
 
 } /* namespace repository */
diff --git 
a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
 
b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
index 522ce23..9b2dfa9 100644
--- 
a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
+++ 
b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
@@ -19,7 +19,7 @@
 #include <set>
 
 #include "RocksDbPersistableKeyValueStoreService.h"
-
+#include "../encryption/RocksDbEncryptionProvider.h"
 #include "utils/StringUtils.h"
 
 
@@ -60,10 +60,19 @@ void RocksDbPersistableKeyValueStoreService::onEnable() {
   }
 
   db_.reset();
-  auto set_db_opts = [] (internal::Writable<rocksdb::DBOptions>& db_opts) {
+
+  const auto encrypted_env = 
createEncryptingEnv(utils::crypto::EncryptionManager{configuration_->getHome()},
 core::repository::DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
+  logger_->log_info("Using %s RocksDbPersistableKeyValueStoreService", 
encrypted_env ? "encrypted" : "plaintext");
+
+  auto set_db_opts = [encrypted_env] (internal::Writable<rocksdb::DBOptions>& 
db_opts) {
     db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
     db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, 
true);
     db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
+    if (encrypted_env) {
+      db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), 
core::repository::EncryptionEq{});
+    } else {
+      db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
+    }
   };
   // Use the same buffer settings as the FlowFileRepository
   auto set_cf_opts = [] 
(minifi::internal::Writable<rocksdb::ColumnFamilyOptions>& cf_opts) {
diff --git 
a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h 
b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
index b6ba299..bba7765 100644
--- 
a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
+++ 
b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
@@ -38,6 +38,8 @@ namespace controllers {
 
 class RocksDbPersistableKeyValueStoreService : public 
AbstractAutoPersistingKeyValueStoreService {
  public:
+  static constexpr const char* ENCRYPTION_KEY_NAME = 
"nifi.state.management.provider.local.encryption.key";
+
   explicit RocksDbPersistableKeyValueStoreService(const std::string& name, 
const utils::Identifier& uuid = {});
 
   ~RocksDbPersistableKeyValueStoreService() override = default;
diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.cpp 
b/extensions/rocksdb-repos/database/OpenRocksDb.cpp
index b74e1a7..9bf2746 100644
--- a/extensions/rocksdb-repos/database/OpenRocksDb.cpp
+++ b/extensions/rocksdb-repos/database/OpenRocksDb.cpp
@@ -80,6 +80,15 @@ rocksdb::Status 
OpenRocksDb::NewCheckpoint(rocksdb::Checkpoint **checkpoint) {
   return rocksdb::Checkpoint::Create(impl_.get(), checkpoint);
 }
 
+rocksdb::Status 
OpenRocksDb::NewCheckpoint(std::unique_ptr<rocksdb::Checkpoint>& checkpoint) {
+  rocksdb::Checkpoint* checkpoint_ptr = nullptr;
+  rocksdb::Status result = NewCheckpoint(&checkpoint_ptr);
+  if (result.ok()) {
+    checkpoint.reset(checkpoint_ptr);
+  }
+  return result;
+}
+
 rocksdb::Status OpenRocksDb::FlushWAL(bool sync) {
   rocksdb::Status result = impl_->FlushWAL(sync);
   handleResult(result);
diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.h 
b/extensions/rocksdb-repos/database/OpenRocksDb.h
index 7aed4d5..33d26d5 100644
--- a/extensions/rocksdb-repos/database/OpenRocksDb.h
+++ b/extensions/rocksdb-repos/database/OpenRocksDb.h
@@ -67,6 +67,8 @@ class OpenRocksDb {
 
   rocksdb::Status NewCheckpoint(rocksdb::Checkpoint** checkpoint);
 
+  rocksdb::Status NewCheckpoint(std::unique_ptr<rocksdb::Checkpoint>& 
checkpoint);
+
   rocksdb::Status FlushWAL(bool sync);
 
   rocksdb::DB* get();
diff --git a/extensions/rocksdb-repos/database/RocksDatabase.cpp 
b/extensions/rocksdb-repos/database/RocksDatabase.cpp
index a83cda2..248a632 100644
--- a/extensions/rocksdb-repos/database/RocksDatabase.cpp
+++ b/extensions/rocksdb-repos/database/RocksDatabase.cpp
@@ -82,7 +82,7 @@ std::unique_ptr<RocksDatabase> RocksDatabase::create(const 
DBOptionsPatch& db_op
 }
 
 RocksDatabase::RocksDatabase(std::shared_ptr<RocksDbInstance> db, std::string 
column, DBOptionsPatch db_options_patch, ColumnFamilyOptionsPatch 
cf_options_patch)
-  : db_(std::move(db)), column_(std::move(column)), 
db_options_patch_(std::move(db_options_patch)), 
cf_options_patch_(std::move(cf_options_patch)) {}
+  : column_(std::move(column)), 
db_options_patch_(std::move(db_options_patch)), 
cf_options_patch_(std::move(cf_options_patch)), db_(std::move(db)) {}
 
 utils::optional<OpenRocksDb> RocksDatabase::open() {
   return db_->open(column_, db_options_patch_, cf_options_patch_);
diff --git a/extensions/rocksdb-repos/database/RocksDatabase.h 
b/extensions/rocksdb-repos/database/RocksDatabase.h
index fda0d36..6f1ca32 100644
--- a/extensions/rocksdb-repos/database/RocksDatabase.h
+++ b/extensions/rocksdb-repos/database/RocksDatabase.h
@@ -51,10 +51,10 @@ class RocksDatabase {
   utils::optional<OpenRocksDb> open();
 
  private:
-  std::shared_ptr<RocksDbInstance> db_;
   const std::string column_;
   const DBOptionsPatch db_options_patch_;
   const ColumnFamilyOptionsPatch cf_options_patch_;
+  std::shared_ptr<RocksDbInstance> db_;
 
   static std::shared_ptr<core::logging::Logger> logger_;
 };
diff --git a/extensions/rocksdb-repos/database/RocksDbInstance.cpp 
b/extensions/rocksdb-repos/database/RocksDbInstance.cpp
index 39bdaa6..da01d45 100644
--- a/extensions/rocksdb-repos/database/RocksDbInstance.cpp
+++ b/extensions/rocksdb-repos/database/RocksDbInstance.cpp
@@ -48,7 +48,20 @@ utils::optional<OpenRocksDb> RocksDbInstance::open(const 
std::string& column, co
     rocksdb::DB* db_instance = nullptr;
     rocksdb::Status result;
 
-    rocksdb::ConfigOptions conf_options;
+    rocksdb::ConfigOptions conf_options = [&] {
+      // we have to extract the encryptor environment otherwise
+      // we won't be able to read the options file
+      rocksdb::ConfigOptions result;
+      if (db_options_patch) {
+        rocksdb::DBOptions dummy_opts;
+        Writable<rocksdb::DBOptions> db_options_writer(dummy_opts);
+        db_options_patch(db_options_writer);
+        if (dummy_opts.env) {
+          result.env = dummy_opts.env;
+        }
+      }
+      return result;
+    }();
     db_options_ = rocksdb::DBOptions{};
     std::vector<rocksdb::ColumnFamilyDescriptor> cf_descriptors;
     rocksdb::Status option_status = rocksdb::LoadLatestOptions(conf_options, 
db_name_, &db_options_, &cf_descriptors);
@@ -100,6 +113,9 @@ utils::optional<OpenRocksDb> RocksDbInstance::open(const 
std::string& column, co
       return utils::nullopt;
     }
     gsl_Expects(db_instance);
+    // the patcher could have internal resources the we need to keep alive
+    // as long as the database is open (e.g. custom environment)
+    db_options_patch_ = db_options_patch;
     impl_.reset(db_instance);
     for (size_t cf_idx{0}; cf_idx < column_handles.size(); ++cf_idx) {
       columns_[column_handles[cf_idx]->GetName()]
diff --git a/extensions/rocksdb-repos/database/RocksDbInstance.h 
b/extensions/rocksdb-repos/database/RocksDbInstance.h
index 63b031a..4e2d8a3 100644
--- a/extensions/rocksdb-repos/database/RocksDbInstance.h
+++ b/extensions/rocksdb-repos/database/RocksDbInstance.h
@@ -63,6 +63,10 @@ class RocksDbInstance {
   std::shared_ptr<rocksdb::DB> impl_;
   std::unordered_map<std::string, std::shared_ptr<ColumnHandle>> columns_;
 
+  // the patcher could have internal resources the we need to keep alive
+  // as long as the database is open (e.g. custom environment)
+  DBOptionsPatch db_options_patch_;
+
   static std::shared_ptr<core::logging::Logger> logger_;
 };
 
diff --git a/extensions/rocksdb-repos/database/RocksDbUtils.h 
b/extensions/rocksdb-repos/database/RocksDbUtils.h
index ead059c..6d6d5c3 100644
--- a/extensions/rocksdb-repos/database/RocksDbUtils.h
+++ b/extensions/rocksdb-repos/database/RocksDbUtils.h
@@ -39,19 +39,14 @@ class Writable {
  public:
   explicit Writable(T& target) : target_(target) {}
 
-  template<typename F>
-  void set(F T::* member, typename utils::type_identity<F>::type value) {
-    if (!(target_.*member == value)) {
+  template<typename F, typename Comparator = std::equal_to<F>>
+  void set(F T::* member, typename utils::type_identity<F>::type value, const 
Comparator& comparator = Comparator{}) {
+    if (!comparator(target_.*member, value)) {
       target_.*member = value;
       is_modified_ = true;
     }
   }
 
-  template<typename Transformer, typename F>
-  void transform(F T::* member) {
-    set(member, Transformer::transform(target_.*member));
-  }
-
   template<typename F>
   const F& get(F T::* member) {
     return target_.*member;
diff --git a/extensions/rocksdb-repos/database/StringAppender.h 
b/extensions/rocksdb-repos/database/StringAppender.h
index 7172a79..5e71073 100644
--- a/extensions/rocksdb-repos/database/StringAppender.h
+++ b/extensions/rocksdb-repos/database/StringAppender.h
@@ -32,12 +32,13 @@ namespace repository {
 
 class StringAppender : public rocksdb::AssociativeMergeOperator {
  public:
-  static std::shared_ptr<rocksdb::MergeOperator> transform(const 
std::shared_ptr<rocksdb::MergeOperator>& other) {
-    if (other && std::strcmp(other->Name(), "StringAppender") == 0) {
-      return other;
+  struct Eq {
+    bool operator()(const std::shared_ptr<rocksdb::MergeOperator>& lhs, const 
std::shared_ptr<rocksdb::MergeOperator>& rhs) const {
+      if (lhs == rhs) return true;
+      if (!lhs || !rhs) return false;
+      return std::strcmp(lhs->Name(), rhs->Name()) == 0;
     }
-    return std::make_shared<StringAppender>();
-  }
+  };
 
   bool Merge(const rocksdb::Slice& /*key*/, const rocksdb::Slice* 
existing_value, const rocksdb::Slice& value, std::string* new_value, 
rocksdb::Logger* /*logger*/) const override;
 
diff --git a/extensions/rocksdb-repos/encryption/RocksDbEncryptionProvider.cpp 
b/extensions/rocksdb-repos/encryption/RocksDbEncryptionProvider.cpp
new file mode 100644
index 0000000..4429e4c
--- /dev/null
+++ b/extensions/rocksdb-repos/encryption/RocksDbEncryptionProvider.cpp
@@ -0,0 +1,127 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "RocksDbEncryptionProvider.h"
+#include "utils/crypto/ciphers/Aes256Ecb.h"
+#include "logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+using utils::crypto::Bytes;
+using utils::crypto::Aes256EcbCipher;
+
+/**
+ * This cipher is used by rocksdb to implement a CTR encryption scheme.
+ */
+class AES256BlockCipher final : public rocksdb::BlockCipher {
+  static std::shared_ptr<logging::Logger> logger_;
+
+ public:
+  AES256BlockCipher(std::string database, Aes256EcbCipher cipher_impl)
+      : database_(std::move(database)),
+        cipher_impl_(std::move(cipher_impl)) {}
+
+  const char *Name() const override {
+    return "AES256BlockCipher";
+  }
+
+  size_t BlockSize() override {
+    return Aes256EcbCipher::BLOCK_SIZE;
+  }
+
+  bool operator==(const AES256BlockCipher& other) const {
+    return cipher_impl_ == other.cipher_impl_;
+  }
+
+  rocksdb::Status Encrypt(char *data) override;
+
+  rocksdb::Status Decrypt(char *data) override;
+
+ private:
+  const std::string database_;
+  const Aes256EcbCipher cipher_impl_;
+};
+
+class EncryptingEnv : public rocksdb::EnvWrapper {
+ public:
+  EncryptingEnv(Env* target, std::shared_ptr<AES256BlockCipher> cipher) : 
EnvWrapper(target), env_(target), cipher_(std::move(cipher)) {}
+
+  bool hasEqualKey(const EncryptingEnv& other) const {
+    return *cipher_ == *other.cipher_;
+  }
+
+ private:
+  std::unique_ptr<Env> env_;
+  std::shared_ptr<AES256BlockCipher> cipher_;
+};
+
+std::shared_ptr<logging::Logger> AES256BlockCipher::logger_ = 
logging::LoggerFactory<AES256BlockCipher>::getLogger();
+
+std::shared_ptr<rocksdb::Env> createEncryptingEnv(const 
utils::crypto::EncryptionManager& manager, const DbEncryptionOptions& options) {
+  auto cipher_impl = 
manager.createAes256EcbCipher(options.encryption_key_name);
+  if (!cipher_impl) {
+    return {};
+  }
+  auto cipher = std::make_shared<AES256BlockCipher>(options.database, 
cipher_impl.value());
+  return std::make_shared<EncryptingEnv>(
+      rocksdb::NewEncryptedEnv(rocksdb::Env::Default(), 
rocksdb::EncryptionProvider::NewCTRProvider(cipher)), cipher);
+}
+
+rocksdb::Status AES256BlockCipher::Encrypt(char *data) {
+  try {
+    cipher_impl_.encrypt({reinterpret_cast<unsigned char*>(data), 
Aes256EcbCipher::BLOCK_SIZE});
+    return rocksdb::Status::OK();
+  } catch (const utils::crypto::CipherError& error) {
+    logger_->log_error("Error while encrypting in database '%s': %s", 
database_, error.what());
+    return rocksdb::Status::IOError();
+  }
+}
+
+rocksdb::Status AES256BlockCipher::Decrypt(char *data) {
+  try {
+    cipher_impl_.decrypt({reinterpret_cast<unsigned char*>(data), 
Aes256EcbCipher::BLOCK_SIZE});
+    return rocksdb::Status::OK();
+  } catch (const utils::crypto::CipherError& error) {
+    logger_->log_error("Error while decrypting in database '%s': %s", 
database_, error.what());
+    return rocksdb::Status::IOError();
+  }
+}
+
+bool EncryptionEq::operator()(const rocksdb::Env* lhs, const rocksdb::Env* 
rhs) const {
+  auto* lhs_enc = dynamic_cast<const EncryptingEnv*>(lhs);
+  auto* rhs_enc = dynamic_cast<const EncryptingEnv*>(rhs);
+  if (lhs_enc == rhs_enc) return true;
+  if (!lhs_enc || !rhs_enc) return false;
+  return lhs_enc->hasEqualKey(*rhs_enc);
+}
+
+}  // namespace repository
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/rocksdb-repos/database/StringAppender.h 
b/extensions/rocksdb-repos/encryption/RocksDbEncryptionProvider.h
similarity index 62%
copy from extensions/rocksdb-repos/database/StringAppender.h
copy to extensions/rocksdb-repos/encryption/RocksDbEncryptionProvider.h
index 7172a79..baaafcd 100644
--- a/extensions/rocksdb-repos/database/StringAppender.h
+++ b/extensions/rocksdb-repos/encryption/RocksDbEncryptionProvider.h
@@ -1,4 +1,5 @@
 /**
+ *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,11 +18,13 @@
 
 #pragma once
 
-#include <cstring>
 #include <string>
 #include <memory>
-#include <algorithm>
-#include "rocksdb/merge_operator.h"
+
+#include "rocksdb/env_encryption.h"
+#include "utils/crypto/EncryptionUtils.h"
+#include "logging/Logger.h"
+#include "utils/crypto/EncryptionManager.h"
 
 namespace org {
 namespace apache {
@@ -30,20 +33,15 @@ namespace minifi {
 namespace core {
 namespace repository {
 
-class StringAppender : public rocksdb::AssociativeMergeOperator {
- public:
-  static std::shared_ptr<rocksdb::MergeOperator> transform(const 
std::shared_ptr<rocksdb::MergeOperator>& other) {
-    if (other && std::strcmp(other->Name(), "StringAppender") == 0) {
-      return other;
-    }
-    return std::make_shared<StringAppender>();
-  }
+struct DbEncryptionOptions {
+  std::string database;
+  std::string encryption_key_name;
+};
 
-  bool Merge(const rocksdb::Slice& /*key*/, const rocksdb::Slice* 
existing_value, const rocksdb::Slice& value, std::string* new_value, 
rocksdb::Logger* /*logger*/) const override;
+std::shared_ptr<rocksdb::Env> createEncryptingEnv(const 
utils::crypto::EncryptionManager& manager, const DbEncryptionOptions& options);
 
-  const char* Name() const override {
-    return "StringAppender";
-  }
+struct EncryptionEq {
+  bool operator()(const rocksdb::Env* lhs, const rocksdb::Env* rhs) const;
 };
 
 }  // namespace repository
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 6550e7f..12bf97e 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -57,7 +57,7 @@ if (NOT OPENSSL_OFF)
        set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp")
 endif()
 
-file(GLOB SOURCES "src/properties/*.cpp" "src/utils/file/*.cpp" 
"src/sitetosite/*.cpp"  "src/core/logging/*.cpp" 
"src/core/logging/internal/*.cpp"  "src/core/state/*.cpp" 
"src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" 
"src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} 
"src/core/controller/*.cpp" "src/controllers/*.cpp" 
"src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" 
"src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  [...]
+file(GLOB SOURCES "src/properties/*.cpp" "src/utils/file/*.cpp" 
"src/sitetosite/*.cpp"  "src/core/logging/*.cpp" 
"src/core/logging/internal/*.cpp"  "src/core/state/*.cpp" 
"src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" 
"src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} 
"src/core/controller/*.cpp" "src/controllers/*.cpp" 
"src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" 
"src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  [...]
 # manually add this as it might not yet be present when this executes
 list(APPEND SOURCES "src/agent/agent_version.cpp")
 
diff --git a/libminifi/include/properties/Decryptor.h 
b/libminifi/include/properties/Decryptor.h
index d344d3c..eda34d1 100644
--- a/libminifi/include/properties/Decryptor.h
+++ b/libminifi/include/properties/Decryptor.h
@@ -19,9 +19,9 @@
 #include <string>
 #include <utility>
 
-#include "utils/EncryptionUtils.h"
+#include "utils/crypto/EncryptionUtils.h"
 #include "utils/OptionalUtils.h"
-#include "utils/EncryptionProvider.h"
+#include "utils/crypto/EncryptionProvider.h"
 
 namespace org {
 namespace apache {
diff --git a/libminifi/include/properties/PropertiesFile.h 
b/libminifi/include/properties/PropertiesFile.h
index 721d697..6444168 100644
--- a/libminifi/include/properties/PropertiesFile.h
+++ b/libminifi/include/properties/PropertiesFile.h
@@ -20,7 +20,7 @@
 #include <string>
 #include <vector>
 
-#include "utils/EncryptionUtils.h"
+#include "utils/crypto/EncryptionUtils.h"
 #include "utils/OptionalUtils.h"
 
 namespace org {
diff --git a/libminifi/include/utils/EncryptionProvider.h 
b/libminifi/include/utils/crypto/EncryptionManager.h
similarity index 62%
copy from libminifi/include/utils/EncryptionProvider.h
copy to libminifi/include/utils/crypto/EncryptionManager.h
index 33815f1..8772977 100644
--- a/libminifi/include/utils/EncryptionProvider.h
+++ b/libminifi/include/utils/crypto/EncryptionManager.h
@@ -19,8 +19,12 @@
 
 #include <utility>
 #include <string>
-#include "utils/EncryptionUtils.h"
+#include <memory>
+#include "utils/crypto/EncryptionUtils.h"
 #include "utils/OptionalUtils.h"
+#include "utils/crypto/ciphers/XSalsa20.h"
+#include "utils/crypto/ciphers/Aes256Ecb.h"
+#include "core/logging/Logger.h"
 
 namespace org {
 namespace apache {
@@ -29,23 +33,18 @@ namespace minifi {
 namespace utils {
 namespace crypto {
 
-class EncryptionProvider {
+class EncryptionManager {
+  static std::shared_ptr<core::logging::Logger> logger_;
  public:
-  explicit EncryptionProvider(Bytes encryption_key)
-      : encryption_key_(std::move(encryption_key)) {}
-
-  static utils::optional<EncryptionProvider> create(const std::string& 
home_path);
-
-  std::string encrypt(const std::string& data) const {
-    return utils::crypto::encrypt(data, encryption_key_);
-  }
-
-  std::string decrypt(const std::string& data) const {
-    return utils::crypto::decrypt(data, encryption_key_);
-  }
+  explicit EncryptionManager(std::string key_dir) : 
key_dir_(std::move(key_dir)) {}
 
+  utils::optional<XSalsa20Cipher> createXSalsa20Cipher(const std::string& 
key_name) const;
+  utils::optional<Aes256EcbCipher> createAes256EcbCipher(const std::string& 
key_name) const;
  private:
-  const Bytes encryption_key_;
+  utils::optional<Bytes> readKey(const std::string& key_name) const;
+  bool writeKey(const std::string& key_name, const Bytes& key) const;
+
+  std::string key_dir_;
 };
 
 }  // namespace crypto
diff --git a/libminifi/include/utils/EncryptionProvider.h 
b/libminifi/include/utils/crypto/EncryptionProvider.h
similarity index 76%
copy from libminifi/include/utils/EncryptionProvider.h
copy to libminifi/include/utils/crypto/EncryptionProvider.h
index 33815f1..aa26f52 100644
--- a/libminifi/include/utils/EncryptionProvider.h
+++ b/libminifi/include/utils/crypto/EncryptionProvider.h
@@ -19,8 +19,11 @@
 
 #include <utility>
 #include <string>
-#include "utils/EncryptionUtils.h"
+#include <memory>
+#include "utils/crypto/EncryptionUtils.h"
 #include "utils/OptionalUtils.h"
+#include "utils/crypto/ciphers/XSalsa20.h"
+#include "core/logging/Logger.h"
 
 namespace org {
 namespace apache {
@@ -31,21 +34,21 @@ namespace crypto {
 
 class EncryptionProvider {
  public:
-  explicit EncryptionProvider(Bytes encryption_key)
-      : encryption_key_(std::move(encryption_key)) {}
+  explicit EncryptionProvider(Bytes key) : cipher_impl_(std::move(key)) {}
+  explicit EncryptionProvider(XSalsa20Cipher cipher_impl) : 
cipher_impl_(std::move(cipher_impl)) {}
 
   static utils::optional<EncryptionProvider> create(const std::string& 
home_path);
 
   std::string encrypt(const std::string& data) const {
-    return utils::crypto::encrypt(data, encryption_key_);
+    return cipher_impl_.encrypt(data);
   }
 
   std::string decrypt(const std::string& data) const {
-    return utils::crypto::decrypt(data, encryption_key_);
+    return cipher_impl_.decrypt(data);
   }
 
  private:
-  const Bytes encryption_key_;
+  const XSalsa20Cipher cipher_impl_;
 };
 
 }  // namespace crypto
diff --git a/libminifi/include/utils/EncryptionUtils.h 
b/libminifi/include/utils/crypto/EncryptionUtils.h
similarity index 100%
rename from libminifi/include/utils/EncryptionUtils.h
rename to libminifi/include/utils/crypto/EncryptionUtils.h
diff --git a/libminifi/include/utils/crypto/ciphers/Aes256Ecb.h 
b/libminifi/include/utils/crypto/ciphers/Aes256Ecb.h
new file mode 100644
index 0000000..665c506
--- /dev/null
+++ b/libminifi/include/utils/crypto/ciphers/Aes256Ecb.h
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <memory>
+#include <utility>
+
+#include "utils/crypto/EncryptionUtils.h"
+#include "Exception.h"
+#include "core/logging/Logger.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace crypto {
+
+class CipherError : public Exception {
+ public:
+  explicit CipherError(const std::string& error_msg) : 
Exception(ExceptionType::GENERAL_EXCEPTION, error_msg) {}
+};
+
+/**
+ * This cipher in itself is unsafe to use to encrypt sensitive data.
+ * Consider this cipher as a building block for more secure modes
+ * of operations (CTR, CBC, etc.)
+ */
+class Aes256EcbCipher {
+  static std::shared_ptr<core::logging::Logger> logger_;
+
+ public:
+  static constexpr size_t BLOCK_SIZE = 16;
+  static constexpr size_t KEY_SIZE = 32;
+
+  explicit Aes256EcbCipher(Bytes encryption_key);
+  void encrypt(gsl::span<unsigned char /*, BLOCK_SIZE*/> data) const;
+  void decrypt(gsl::span<unsigned char /*, BLOCK_SIZE*/> data) const;
+
+  static Bytes generateKey();
+
+  bool operator==(const Aes256EcbCipher& other) const;
+
+ private:
+  template<typename ...Args>
+  static void handleError(const char* format, Args&& ...args) {
+    std::string error_msg = core::logging::format_string(-1, format, 
std::forward<Args>(args)...);
+    logger_->log_error("%s", error_msg);
+    throw CipherError(error_msg);
+  }
+
+  static void handleOpenSSLError(const char* msg);
+
+  const Bytes encryption_key_;
+};
+
+}  // namespace crypto
+}  // namespace utils
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/utils/EncryptionProvider.h 
b/libminifi/include/utils/crypto/ciphers/XSalsa20.h
similarity index 83%
copy from libminifi/include/utils/EncryptionProvider.h
copy to libminifi/include/utils/crypto/ciphers/XSalsa20.h
index 33815f1..f11266b 100644
--- a/libminifi/include/utils/EncryptionProvider.h
+++ b/libminifi/include/utils/crypto/ciphers/XSalsa20.h
@@ -17,10 +17,10 @@
 
 #pragma once
 
-#include <utility>
 #include <string>
-#include "utils/EncryptionUtils.h"
-#include "utils/OptionalUtils.h"
+#include <utility>
+
+#include "utils/crypto/EncryptionUtils.h"
 
 namespace org {
 namespace apache {
@@ -29,12 +29,9 @@ namespace minifi {
 namespace utils {
 namespace crypto {
 
-class EncryptionProvider {
+class XSalsa20Cipher {
  public:
-  explicit EncryptionProvider(Bytes encryption_key)
-      : encryption_key_(std::move(encryption_key)) {}
-
-  static utils::optional<EncryptionProvider> create(const std::string& 
home_path);
+  explicit XSalsa20Cipher(Bytes encryption_key) : 
encryption_key_(std::move(encryption_key)) {}
 
   std::string encrypt(const std::string& data) const {
     return utils::crypto::encrypt(data, encryption_key_);
diff --git a/libminifi/include/utils/file/FileSystem.h 
b/libminifi/include/utils/file/FileSystem.h
index d97f2e4..d835c1d 100644
--- a/libminifi/include/utils/file/FileSystem.h
+++ b/libminifi/include/utils/file/FileSystem.h
@@ -20,7 +20,7 @@
 #include <string>
 #include <memory>
 #include "utils/OptionalUtils.h"
-#include "utils/EncryptionProvider.h"
+#include "utils/crypto/EncryptionProvider.h"
 #include "core/logging/LoggerConfiguration.h"
 
 namespace org {
diff --git a/libminifi/src/utils/EncryptionProvider.cpp 
b/libminifi/src/utils/EncryptionProvider.cpp
deleted file mode 100644
index b0ca5f1..0000000
--- a/libminifi/src/utils/EncryptionProvider.cpp
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <string>
-#include "utils/EncryptionProvider.h"
-#include "properties/Properties.h"
-#include "utils/OptionalUtils.h"
-#include "utils/StringUtils.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
-namespace crypto {
-
-namespace {
-
-#ifdef WIN32
-constexpr const char* DEFAULT_NIFI_BOOTSTRAP_FILE = "\\conf\\bootstrap.conf";
-#else
-constexpr const char* DEFAULT_NIFI_BOOTSTRAP_FILE = "./conf/bootstrap.conf";
-#endif  // WIN32
-
-constexpr const char* CONFIG_ENCRYPTION_KEY_PROPERTY_NAME = 
"nifi.bootstrap.sensitive.key";
-
-}  // namespace
-
-utils::optional<EncryptionProvider> EncryptionProvider::create(const 
std::string& home_path) {
-  minifi::Properties bootstrap_conf;
-  bootstrap_conf.setHome(home_path);
-  bootstrap_conf.loadConfigureFile(DEFAULT_NIFI_BOOTSTRAP_FILE);
-  return bootstrap_conf.getString(CONFIG_ENCRYPTION_KEY_PROPERTY_NAME)
-    | utils::map([](const std::string &encryption_key_hex) { return 
utils::StringUtils::from_hex(encryption_key_hex); })
-    | utils::map(&utils::crypto::stringToBytes)
-    | utils::map([](const utils::crypto::Bytes &encryption_key_bytes) { return 
EncryptionProvider{encryption_key_bytes}; });
-}
-
-}  // namespace crypto
-}  // namespace utils
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
diff --git a/libminifi/src/utils/crypto/EncryptionManager.cpp 
b/libminifi/src/utils/crypto/EncryptionManager.cpp
new file mode 100644
index 0000000..8778bec
--- /dev/null
+++ b/libminifi/src/utils/crypto/EncryptionManager.cpp
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <memory>
+#include "utils/crypto/EncryptionManager.h"
+#include "properties/Properties.h"
+#include "utils/OptionalUtils.h"
+#include "utils/StringUtils.h"
+#include "utils/crypto/ciphers/XSalsa20.h"
+#include "utils/crypto/ciphers/Aes256Ecb.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace crypto {
+
+#ifdef WIN32
+constexpr const char* DEFAULT_NIFI_BOOTSTRAP_FILE = "\\conf\\bootstrap.conf";
+#else
+constexpr const char* DEFAULT_NIFI_BOOTSTRAP_FILE = "./conf/bootstrap.conf";
+#endif  // WIN32
+
+std::shared_ptr<core::logging::Logger> 
EncryptionManager::logger_{core::logging::LoggerFactory<EncryptionManager>::getLogger()};
+
+utils::optional<XSalsa20Cipher> EncryptionManager::createXSalsa20Cipher(const 
std::string &key_name) const {
+  return readKey(key_name)
+         | utils::map([] (const Bytes& key) {return XSalsa20Cipher{key};});
+}
+
+utils::optional<Aes256EcbCipher> 
EncryptionManager::createAes256EcbCipher(const std::string &key_name) const {
+  utils::optional<Bytes> key = readKey(key_name);
+  if (!key) {
+    logger_->log_info("No encryption key found for '%s'", key_name);
+    return {};
+  }
+  if (key->empty()) {
+    // generate new key
+    logger_->log_info("Generating encryption key '%s'", key_name);
+    key = Aes256EcbCipher::generateKey();
+    writeKey(key_name, key.value());
+  } else {
+    logger_->log_info("Using existing encryption key '%s'", key_name);
+  }
+  return Aes256EcbCipher{key.value()};
+}
+
+
+utils::optional<Bytes> EncryptionManager::readKey(const std::string& key_name) 
const {
+  minifi::Properties bootstrap_conf;
+  bootstrap_conf.setHome(key_dir_);
+  bootstrap_conf.loadConfigureFile(DEFAULT_NIFI_BOOTSTRAP_FILE);
+  return bootstrap_conf.getString(key_name)
+         | utils::map([](const std::string &encryption_key_hex) { return 
utils::StringUtils::from_hex(encryption_key_hex); })
+         | utils::map(&utils::crypto::stringToBytes);
+}
+
+bool EncryptionManager::writeKey(const std::string &key_name, const Bytes& 
key) const {
+  minifi::Properties bootstrap_conf;
+  bootstrap_conf.setHome(key_dir_);
+  bootstrap_conf.loadConfigureFile(DEFAULT_NIFI_BOOTSTRAP_FILE);
+  bootstrap_conf.set(key_name, utils::StringUtils::to_hex(key));
+  return bootstrap_conf.persistProperties();
+}
+
+}  // namespace crypto
+}  // namespace utils
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/utils/EncryptionProvider.h 
b/libminifi/src/utils/crypto/EncryptionProvider.cpp
similarity index 64%
rename from libminifi/include/utils/EncryptionProvider.h
rename to libminifi/src/utils/crypto/EncryptionProvider.cpp
index 33815f1..ecc7b48 100644
--- a/libminifi/include/utils/EncryptionProvider.h
+++ b/libminifi/src/utils/crypto/EncryptionProvider.cpp
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-#pragma once
-
-#include <utility>
-#include <string>
-#include "utils/EncryptionUtils.h"
+#include <memory>
+#include "utils/crypto/EncryptionProvider.h"
 #include "utils/OptionalUtils.h"
+#include "utils/crypto/EncryptionManager.h"
+#include "core/logging/LoggerConfiguration.h"
 
 namespace org {
 namespace apache {
@@ -29,24 +28,12 @@ namespace minifi {
 namespace utils {
 namespace crypto {
 
-class EncryptionProvider {
- public:
-  explicit EncryptionProvider(Bytes encryption_key)
-      : encryption_key_(std::move(encryption_key)) {}
-
-  static utils::optional<EncryptionProvider> create(const std::string& 
home_path);
-
-  std::string encrypt(const std::string& data) const {
-    return utils::crypto::encrypt(data, encryption_key_);
-  }
-
-  std::string decrypt(const std::string& data) const {
-    return utils::crypto::decrypt(data, encryption_key_);
-  }
+constexpr const char* CONFIG_ENCRYPTION_KEY_PROPERTY_NAME = 
"nifi.bootstrap.sensitive.key";
 
- private:
-  const Bytes encryption_key_;
-};
+utils::optional<EncryptionProvider> EncryptionProvider::create(const 
std::string& home_path) {
+  return 
EncryptionManager{home_path}.createXSalsa20Cipher(CONFIG_ENCRYPTION_KEY_PROPERTY_NAME)
+    | utils::map([] (const XSalsa20Cipher& cipher) {return 
EncryptionProvider{cipher};});
+}
 
 }  // namespace crypto
 }  // namespace utils
diff --git a/libminifi/src/utils/EncryptionUtils.cpp 
b/libminifi/src/utils/crypto/EncryptionUtils.cpp
similarity index 99%
rename from libminifi/src/utils/EncryptionUtils.cpp
rename to libminifi/src/utils/crypto/EncryptionUtils.cpp
index 9dd174f..069d514 100644
--- a/libminifi/src/utils/EncryptionUtils.cpp
+++ b/libminifi/src/utils/crypto/EncryptionUtils.cpp
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-#include "utils/EncryptionUtils.h"
+#include "utils/crypto/EncryptionUtils.h"
 
 #include <sodium.h>
 
diff --git a/libminifi/src/utils/crypto/ciphers/Aes256Ecb.cpp 
b/libminifi/src/utils/crypto/ciphers/Aes256Ecb.cpp
new file mode 100644
index 0000000..b78020a
--- /dev/null
+++ b/libminifi/src/utils/crypto/ciphers/Aes256Ecb.cpp
@@ -0,0 +1,133 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/crypto/ciphers/Aes256Ecb.h"
+#include "openssl/conf.h"
+#include "openssl/evp.h"
+#include "openssl/err.h"
+#include "openssl/rand.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace crypto {
+
+using EVP_CIPHER_CTX_ptr = std::unique_ptr<EVP_CIPHER_CTX, 
decltype(&EVP_CIPHER_CTX_free)>;
+
+std::shared_ptr<core::logging::Logger> 
Aes256EcbCipher::logger_{core::logging::LoggerFactory<Aes256EcbCipher>::getLogger()};
+
+Aes256EcbCipher::Aes256EcbCipher(Bytes encryption_key) : 
encryption_key_(std::move(encryption_key)) {
+  if (encryption_key_.size() != KEY_SIZE) {
+    handleError("Invalid key length %zu bytes, expected %zu bytes", 
encryption_key_.size(), static_cast<size_t>(KEY_SIZE));
+  }
+}
+
+void Aes256EcbCipher::handleOpenSSLError(const char* msg) {
+  std::array<char, 128> errmsg = {0};
+  const auto errcode = ERR_peek_last_error();
+  if (!errcode) {
+    handleError("%s: %s", msg, "Unknown OpenSSL error");
+  }
+  ERR_error_string_n(errcode, errmsg.data(), errmsg.size());
+  handleError("%s: %s", msg, errmsg.data());
+}
+
+Bytes Aes256EcbCipher::generateKey() {
+  return utils::crypto::randomBytes(KEY_SIZE);
+}
+
+void Aes256EcbCipher::encrypt(gsl::span<unsigned char /*, BLOCK_SIZE*/> data) 
const {
+  gsl_Expects(data.size() == BLOCK_SIZE);
+  EVP_CIPHER_CTX_ptr ctx(EVP_CIPHER_CTX_new(), EVP_CIPHER_CTX_free);
+  if (!ctx) {
+    handleOpenSSLError("Could not create cipher context");
+  }
+
+  if (1 != EVP_EncryptInit_ex(ctx.get(), EVP_aes_256_ecb(), nullptr, 
encryption_key_.data(), nullptr)) {
+    handleOpenSSLError("Could not initialize encryption cipher context");
+  }
+
+  // EVP_EncryptFinal_ex pads the data even if there is none thus data that
+  // is exactly BLOCK_SIZE long would result in 2*BLOCK_SIZE ciphertext
+  if (1 != EVP_CIPHER_CTX_set_padding(ctx.get(), 0)) {
+    handleOpenSSLError("Could not disable padding for cipher");
+  }
+
+  int ciphertext_len = 0;
+  int len;
+
+  if (1 != EVP_EncryptUpdate(ctx.get(), data.begin(), &len, data.begin(), 
data.size())) {
+    handleOpenSSLError("Could not update cipher content");
+  }
+  ciphertext_len += len;
+
+  if (1 != EVP_EncryptFinal_ex(ctx.get(), data.begin() + len, &len)) {
+    handleOpenSSLError("Could not finalize encryption");
+  }
+  ciphertext_len += len;
+
+  gsl_Expects(ciphertext_len == BLOCK_SIZE);
+}
+
+void Aes256EcbCipher::decrypt(gsl::span<unsigned char /*, BLOCK_SIZE*/> data) 
const {
+  gsl_Expects(data.size() == BLOCK_SIZE);
+  EVP_CIPHER_CTX_ptr ctx(EVP_CIPHER_CTX_new(), EVP_CIPHER_CTX_free);
+  if (!ctx) {
+    handleOpenSSLError("Could not create cipher context");
+  }
+
+  if (1 != EVP_DecryptInit_ex(ctx.get(), EVP_aes_256_ecb(), nullptr, 
encryption_key_.data(), nullptr)) {
+    handleOpenSSLError("Could not initialize decryption cipher context");
+  }
+
+  // as we did not use padding during encryption
+  if (1 != EVP_CIPHER_CTX_set_padding(ctx.get(), 0)) {
+    handleOpenSSLError("Could not disable padding for cipher");
+  }
+
+  int plaintext_len = 0;
+  int len;
+
+  if (1 != EVP_DecryptUpdate(ctx.get(), data.begin(), &len, data.begin(), 
data.size())) {
+    handleOpenSSLError("Could not update cipher content");
+  }
+  plaintext_len += len;
+
+  if (1 != EVP_DecryptFinal_ex(ctx.get(), data.begin() + len, &len)) {
+    handleOpenSSLError("Could not finalize decryption");
+  }
+  plaintext_len += len;
+
+  gsl_Expects(plaintext_len == BLOCK_SIZE);
+}
+
+bool Aes256EcbCipher::operator==(const Aes256EcbCipher &other) const {
+  gsl_Expects(encryption_key_.size() == KEY_SIZE);
+  if (encryption_key_.size() != other.encryption_key_.size()) return false;
+  return CRYPTO_memcmp(encryption_key_.data(), other.encryption_key_.data(), 
KEY_SIZE) == 0;
+}
+
+}  // namespace crypto
+}  // namespace utils
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/src/utils/file/FileSystem.cpp 
b/libminifi/src/utils/file/FileSystem.cpp
index 6776182..cbb5f79 100644
--- a/libminifi/src/utils/file/FileSystem.cpp
+++ b/libminifi/src/utils/file/FileSystem.cpp
@@ -19,7 +19,7 @@
 #include <fstream>
 #include "utils/file/FileSystem.h"
 #include "utils/OptionalUtils.h"
-#include "utils/EncryptionProvider.h"
+#include "utils/crypto/EncryptionProvider.h"
 
 namespace org {
 namespace apache {
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 68edd27..fc254bb 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -445,6 +445,13 @@ class TestController {
     return dir;
   }
 
+  template<size_t N>
+  utils::Path createTempDirectory(const char (&format)[N]) {
+    char buffer[N];
+    std::memcpy(buffer, format, N);
+    return utils::Path{createTempDirectory(static_cast<char*>(buffer))};
+  }
+
  protected:
   std::shared_ptr<minifi::state::response::FlowVersion> flow_version_;
   LogTestController &log;
diff --git a/libminifi/test/rocksdb-tests/EncryptionTests.cpp 
b/libminifi/test/rocksdb-tests/EncryptionTests.cpp
new file mode 100644
index 0000000..9ce4258
--- /dev/null
+++ b/libminifi/test/rocksdb-tests/EncryptionTests.cpp
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+#include "utils/TestUtils.h"
+#include "FlowFileRepository.h"
+#include "utils/IntegrationTestUtils.h"
+
+using utils::Path;
+using core::repository::FlowFileRepository;
+
+class FFRepoFixture : public TestController {
+ public:
+  FFRepoFixture() {
+    LogTestController::getInstance().setDebug<minifi::FlowFileRecord>();
+    LogTestController::getInstance().setDebug<minifi::Connection>();
+    LogTestController::getInstance().setTrace<FlowFileRepository>();
+    home_ = createTempDirectory("/var/tmp/testRepo.XXXXXX");
+    repo_dir_ = home_ / "flowfile_repo";
+    checkpoint_dir_ = home_ / "checkpoint_dir";
+    config_ = std::make_shared<minifi::Configure>();
+    config_->setHome(home_.str());
+    container_ = std::make_shared<minifi::Connection>(nullptr, nullptr, 
"container");
+    content_repo_ = 
std::make_shared<core::repository::VolatileContentRepository>();
+    content_repo_->initialize(config_);
+  }
+
+  static void putFlowFile(const std::shared_ptr<minifi::FlowFileRecord>& 
flowfile, const std::shared_ptr<core::repository::FlowFileRepository>& repo) {
+    minifi::io::BufferStream buffer;
+    flowfile->Serialize(buffer);
+    REQUIRE(repo->Put(flowfile->getUUIDStr(), buffer.getBuffer(), 
buffer.size()));
+  }
+
+  template<typename Fn>
+  void runWithNewRepository(Fn&& fn) {
+    auto repository = std::make_shared<FlowFileRepository>("ff", 
checkpoint_dir_.str(), repo_dir_.str());
+    repository->initialize(config_);
+    std::map<std::string, std::shared_ptr<core::Connectable>> container_map;
+    container_map[container_->getUUIDStr()] = container_;
+    repository->setContainers(container_map);
+    repository->loadComponent(content_repo_);
+    repository->start();
+    std::forward<Fn>(fn)(repository);
+    repository->stop();
+  }
+
+ protected:
+  std::shared_ptr<minifi::Connection> container_;
+  Path home_;
+  Path repo_dir_;
+  Path checkpoint_dir_;
+  std::shared_ptr<minifi::Configure> config_;
+  std::shared_ptr<core::repository::VolatileContentRepository> content_repo_;
+};
+
+TEST_CASE_METHOD(FFRepoFixture, "FlowFileRepository creates checkpoint and 
loads flowfiles") {
+  SECTION("Without encryption") {
+    // pass
+  }
+  SECTION("With encryption") {
+    utils::file::FileUtils::create_dir((home_ / "conf").str());
+    std::ofstream{(home_ / "conf" / "bootstrap.conf").str()}
+      << static_cast<const char*>(FlowFileRepository::ENCRYPTION_KEY_NAME) << 
"="
+      << "805D7B95EF44DC27C87FFBC4DFDE376DAE604D55DB2C5496DEEF5236362DE62E"
+      << "\n";
+  }
+
+
+  runWithNewRepository([&] (const 
std::shared_ptr<core::repository::FlowFileRepository>& repo) {
+    auto flowfile = std::make_shared<minifi::FlowFileRecord>();
+    flowfile->setAttribute("my little pony", "my horse is amazing");
+    flowfile->setConnection(container_);
+    putFlowFile(flowfile, repo);
+  });
+
+  REQUIRE(container_->isEmpty());
+
+  runWithNewRepository([&] (const 
std::shared_ptr<core::repository::FlowFileRepository>& /*repo*/) {
+    // wait for the flowfiles to be loaded from the checkpoint
+    bool success = 
utils::verifyEventHappenedInPollTime(std::chrono::seconds{5}, [&] {
+      return !container_->isEmpty();
+    });
+    REQUIRE(success);
+    REQUIRE(utils::verifyLogLinePresenceInPollTime(
+        std::chrono::seconds{5},
+        "Successfully opened checkpoint database at '" + checkpoint_dir_.str() 
+ "'"));
+    std::set<std::shared_ptr<core::FlowFile>> expired;
+    auto flowfile = container_->poll(expired);
+    REQUIRE(expired.empty());
+    REQUIRE(flowfile);
+    REQUIRE(flowfile->getAttribute("my little pony") == "my horse is amazing");
+  });
+}
diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp 
b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
index b3ab99c..e8f9de2 100644
--- a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
+++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
@@ -32,7 +32,7 @@ class RocksDBStreamTest : TestController {
       db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
     };
     auto set_cf_opts = [] 
(minifi::internal::Writable<rocksdb::ColumnFamilyOptions>& cf_opts) {
-      
cf_opts.transform<core::repository::StringAppender>(&rocksdb::ColumnFamilyOptions::merge_operator);
+      cf_opts.set(&rocksdb::ColumnFamilyOptions::merge_operator, 
std::make_shared<core::repository::StringAppender>(), 
core::repository::StringAppender::Eq{});
     };
     db = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, 
dbPath);
     REQUIRE(db->open());
diff --git a/libminifi/test/rocksdb-tests/RocksDBTests.cpp 
b/libminifi/test/rocksdb-tests/RocksDBTests.cpp
index b04c994..178c705 100644
--- a/libminifi/test/rocksdb-tests/RocksDBTests.cpp
+++ b/libminifi/test/rocksdb-tests/RocksDBTests.cpp
@@ -21,6 +21,7 @@
 #include "../../extensions/rocksdb-repos/database/ColumnHandle.h"
 #include "IntegrationTestUtils.h"
 #include "database/StringAppender.h"
+#include 
"../../extensions/rocksdb-repos/encryption/RocksDbEncryptionProvider.h"
 
 #undef NDEBUG
 
@@ -180,7 +181,7 @@ TEST_CASE_METHOD(RocksDBTest, "Sanity check: merge fails 
without merge_operator"
 
 TEST_CASE_METHOD(RocksDBTest, "Column options are applied", "[rocksDBTest9]") {
   auto cf_opts = [] (minifi::internal::Writable<rocksdb::ColumnFamilyOptions>& 
cf_opts) {
-    
cf_opts.transform<StringAppender>(&rocksdb::ColumnFamilyOptions::merge_operator);
+    cf_opts.set(&rocksdb::ColumnFamilyOptions::merge_operator, 
std::make_shared<StringAppender>(), StringAppender::Eq{});
   };
   std::string db_uri;
   SECTION("Named column") {
@@ -204,3 +205,49 @@ TEST_CASE_METHOD(RocksDBTest, "Column options are 
applied", "[rocksDBTest9]") {
   REQUIRE(opendb->Get({}, "a", &value).ok());
   REQUIRE(value == "firstsecond");
 }
+
+minifi::internal::DBOptionsPatch createEncrSetter(const utils::Path& home_dir, 
const std::string& db_name, const std::string& key_name) {
+  auto env = 
core::repository::createEncryptingEnv(utils::crypto::EncryptionManager{home_dir.str()},
 core::repository::DbEncryptionOptions{db_name, key_name});
+  REQUIRE(env);
+  return [env] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
+    db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
+    db_opts.set(&rocksdb::DBOptions::env, env.get(), 
core::repository::EncryptionEq{});
+  };
+}
+
+void withDefaultEnv(minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
+  db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
+}
+
+TEST_CASE_METHOD(RocksDBTest, "Error is logged if different encryption keys 
are used", "[rocksDBTest10]") {
+  utils::Path home_dir = createTempDirectory("/var/tmp/test.XXXXXX");
+  utils::file::FileUtils::create_dir((home_dir / "conf").str());
+  std::ofstream{(home_dir / "conf" / "bootstrap.conf").str()}
+    << "encryption.key.one=" << 
"805D7B95EF44DC27C87FFBC4DFDE376DAE604D55DB2C5496DEEF5236362DE62E" << "\n"
+    << "encryption.key.two=" << 
"905D7B95EF44DC27C87FFBC4DFDE376DAE604D55DB2C5496DEEF5236362DE62E" << "\n";
+
+  auto db_opt_1 = createEncrSetter(home_dir, "one", "encryption.key.one");
+  auto col_1 = minifi::internal::RocksDatabase::create(db_opt_1, {}, 
"minifidb://" + db_dir + "/column_one");
+  REQUIRE(col_1->open());
+
+  SECTION("Using the same encryption key is OK") {
+    auto db_opt_2 = createEncrSetter(home_dir, "two", "encryption.key.one");
+    auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, 
"minifidb://" + db_dir + "/column_two");
+    REQUIRE(col_2->open());
+  }
+
+  SECTION("Using different encryption key") {
+    auto db_opt_2 = createEncrSetter(home_dir, "two", "encryption.key.two");
+    auto col_2 = minifi::internal::RocksDatabase::create(db_opt_2, {}, 
"minifidb://" + db_dir + "/column_two");
+    REQUIRE_FALSE(col_2->open());
+    REQUIRE(utils::verifyLogLinePresenceInPollTime(
+        std::chrono::seconds{1}, "Database '" + db_dir + "' has already been 
opened using a different configuration"));
+  }
+
+  SECTION("Using no encryption key") {
+    auto col_2 = minifi::internal::RocksDatabase::create(withDefaultEnv, {}, 
"minifidb://" + db_dir + "/column_two");
+    REQUIRE_FALSE(col_2->open());
+    REQUIRE(utils::verifyLogLinePresenceInPollTime(
+        std::chrono::seconds{1}, "Database '" + db_dir + "' has already been 
opened using a different configuration"));
+  }
+}
diff --git a/libminifi/test/unit/EncryptionUtilsTests.cpp 
b/libminifi/test/unit/EncryptionUtilsTests.cpp
index 934b889..8a58132 100644
--- a/libminifi/test/unit/EncryptionUtilsTests.cpp
+++ b/libminifi/test/unit/EncryptionUtilsTests.cpp
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-#include "utils/EncryptionUtils.h"
+#include "utils/crypto/EncryptionUtils.h"
 #include "utils/StringUtils.h"
 #include "../TestBase.h"
 

Reply via email to