lordgamez commented on code in PR #1509:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1509#discussion_r1106020797


##########
extensions/rocksdb-repos/DatabaseContentRepository.cpp:
##########
@@ -179,6 +180,37 @@ std::shared_ptr<io::BaseStream> 
DatabaseContentRepository::write(const minifi::R
   return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), 
gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true, batch);
 }
 
+bool DatabaseContentRepository::clearOrphans() {
+  if (!is_valid_ || !db_)
+    return false;
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
+  std::vector<std::string> keys;

Review Comment:
   This should have a more specific name like `keys_to_be_deleted`



##########
extensions/rocksdb-repos/FlowFileRepository.cpp:
##########
@@ -126,38 +123,28 @@ void FlowFileRepository::run() {
   flush();
 }
 
-void FlowFileRepository::prune_stored_flowfiles() {
-  const auto encrypted_env = 
createEncryptingEnv(utils::crypto::EncryptionManager{config_->getHome()}, 
DbEncryptionOptions{checkpoint_dir_.string(), ENCRYPTION_KEY_NAME});
-  logger_->log_info("Using %s FlowFileRepository checkpoint", encrypted_env ? 
"encrypted" : "plaintext");
-
-  auto set_db_opts = [encrypted_env] 
(minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
-    db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
-    db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, 
true);
-    db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
-    if (encrypted_env) {
-      db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), 
EncryptionEq{});
-    } else {
-      db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
-    }
-  };
-  auto checkpointDB = minifi::internal::RocksDatabase::create(set_db_opts, {}, 
checkpoint_dir_.string(), minifi::internal::RocksDbMode::ReadOnly);
-  std::optional<minifi::internal::OpenRocksDb> opendb;
-  if (nullptr != checkpoint_) {
-    opendb = checkpointDB->open();
-    if (opendb) {
-      logger_->log_trace("Successfully opened checkpoint database at '%s'", 
checkpoint_dir_.string());
-    } else {
-      logger_->log_error("Couldn't open checkpoint database at '%s' using live 
database", checkpoint_dir_.string());
-      opendb = db_->open();
-    }
-    if (!opendb) {
-      logger_->log_trace("Could not open neither the checkpoint nor the live 
database.");
-      return;
+bool FlowFileRepository::ExecuteWithRetry(const 
std::function<rocksdb::Status()>& operation) {
+  std::chrono::milliseconds waitTime = 0ms;

Review Comment:
   Please rename to `wait_time`



##########
extensions/rocksdb-repos/FlowFileRepository.cpp:
##########
@@ -126,38 +123,28 @@ void FlowFileRepository::run() {
   flush();
 }
 
-void FlowFileRepository::prune_stored_flowfiles() {
-  const auto encrypted_env = 
createEncryptingEnv(utils::crypto::EncryptionManager{config_->getHome()}, 
DbEncryptionOptions{checkpoint_dir_.string(), ENCRYPTION_KEY_NAME});
-  logger_->log_info("Using %s FlowFileRepository checkpoint", encrypted_env ? 
"encrypted" : "plaintext");
-
-  auto set_db_opts = [encrypted_env] 
(minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
-    db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
-    db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, 
true);
-    db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
-    if (encrypted_env) {
-      db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), 
EncryptionEq{});
-    } else {
-      db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
-    }
-  };
-  auto checkpointDB = minifi::internal::RocksDatabase::create(set_db_opts, {}, 
checkpoint_dir_.string(), minifi::internal::RocksDbMode::ReadOnly);
-  std::optional<minifi::internal::OpenRocksDb> opendb;
-  if (nullptr != checkpoint_) {
-    opendb = checkpointDB->open();
-    if (opendb) {
-      logger_->log_trace("Successfully opened checkpoint database at '%s'", 
checkpoint_dir_.string());
-    } else {
-      logger_->log_error("Couldn't open checkpoint database at '%s' using live 
database", checkpoint_dir_.string());
-      opendb = db_->open();
-    }
-    if (!opendb) {
-      logger_->log_trace("Could not open neither the checkpoint nor the live 
database.");
-      return;
+bool FlowFileRepository::ExecuteWithRetry(const 
std::function<rocksdb::Status()>& operation) {
+  std::chrono::milliseconds waitTime = 0ms;
+  for (int i=0; i < 3; ++i) {

Review Comment:
   Retry count should be extracted to a constant, or maybe made configurable.



##########
extensions/rocksdb-repos/DatabaseContentRepository.cpp:
##########
@@ -179,6 +180,37 @@ std::shared_ptr<io::BaseStream> 
DatabaseContentRepository::write(const minifi::R
   return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), 
gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true, batch);
 }
 
+bool DatabaseContentRepository::clearOrphans() {
+  if (!is_valid_ || !db_)
+    return false;
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
+  std::vector<std::string> keys;
+  auto it = opendb->NewIterator(rocksdb::ReadOptions());
+  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+    auto key = it->key().ToString();
+    auto claim_it = count_map_.find(key);
+    if (claim_it == count_map_.end() || claim_it->second == 0) {
+      logger_->log_debug("Deleting orphan resource %s", key);
+      keys.push_back(key);
+    }
+  }
+  auto batch = opendb->createWriteBatch();
+  for (auto& key : keys) {
+    batch.Delete(key);
+  }
+
+  rocksdb::Status status = opendb->Write(rocksdb::WriteOptions(), &batch);
+
+  if (!status.ok()) {
+    logger_->log_debug("Could not delete orphan contents from rocksdb 
database: %s", status.ToString());

Review Comment:
   This can be an error log



##########
extensions/rocksdb-repos/FlowFileRepository.cpp:
##########
@@ -191,62 +178,12 @@ void FlowFileRepository::prune_stored_flowfiles() {
       keys_to_delete.enqueue(key);
     }
   }
-}
-
-bool FlowFileRepository::ExecuteWithRetry(const 
std::function<rocksdb::Status()>& operation) {
-  std::chrono::milliseconds waitTime = 0ms;
-  for (int i=0; i < 3; ++i) {
-    auto status = operation();
-    if (status.ok()) {
-      logger_->log_trace("Rocksdb operation executed successfully");
-      return true;
-    }
-    logger_->log_error("Rocksdb operation failed: %s", status.ToString());
-    waitTime += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS;
-    std::this_thread::sleep_for(waitTime);
-  }
-  return false;
-}
-
-/**
- * Returns True if there is data to interrogate.
- * @return true if our db has data stored.
- */
-bool FlowFileRepository::need_checkpoint(minifi::internal::OpenRocksDb& 
opendb) {
-  auto it = opendb.NewIterator(rocksdb::ReadOptions());
-  it->SeekToFirst();
-  return it->Valid();
-}
-void FlowFileRepository::initialize_repository() {
-  checkpoint_.reset();
-  auto opendb = db_->open();
-  if (!opendb) {
-    logger_->log_trace("Couldn't open database, no way to checkpoint");
-    return;
-  }
-  // first we need to establish a checkpoint iff it is needed.
-  if (!need_checkpoint(*opendb)) {
-    logger_->log_trace("Do not need checkpoint");
-    return;
-  }
-  // delete any previous copy
-  if (utils::file::delete_dir(checkpoint_dir_) < 0) {
-    logger_->log_error("Could not delete existing checkpoint directory '%s'", 
checkpoint_dir_.string());
-    return;
-  }
-  std::unique_ptr<rocksdb::Checkpoint> checkpoint;
-  rocksdb::Status checkpoint_status = opendb->NewCheckpoint(checkpoint);
-  if (!checkpoint_status.ok()) {
-    logger_->log_error("Could not create checkpoint object: %s", 
checkpoint_status.ToString());
-    return;
-  }
-  checkpoint_status = checkpoint->CreateCheckpoint(checkpoint_dir_.string());
-  if (!checkpoint_status.ok()) {
-    logger_->log_error("Could not initialize checkpoint: %s", 
checkpoint_status.ToString());
-    return;
+  flush();
+  if (content_repo_->clearOrphans()) {

Review Comment:
   It may be better to remove the bool return value and do all the logging 
inside the `clearOrphans` function. It is more specific and there may be some 
cases where the `false` return value does not indicate an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to