adamdebreceni commented on code in PR #1509:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1509#discussion_r1106938339


##########
extensions/rocksdb-repos/FlowFileRepository.cpp:
##########
@@ -191,62 +178,12 @@ void FlowFileRepository::prune_stored_flowfiles() {
       keys_to_delete.enqueue(key);
     }
   }
-}
-
-bool FlowFileRepository::ExecuteWithRetry(const 
std::function<rocksdb::Status()>& operation) {
-  std::chrono::milliseconds waitTime = 0ms;
-  for (int i=0; i < 3; ++i) {
-    auto status = operation();
-    if (status.ok()) {
-      logger_->log_trace("Rocksdb operation executed successfully");
-      return true;
-    }
-    logger_->log_error("Rocksdb operation failed: %s", status.ToString());
-    waitTime += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS;
-    std::this_thread::sleep_for(waitTime);
-  }
-  return false;
-}
-
-/**
- * Returns True if there is data to interrogate.
- * @return true if our db has data stored.
- */
-bool FlowFileRepository::need_checkpoint(minifi::internal::OpenRocksDb& 
opendb) {
-  auto it = opendb.NewIterator(rocksdb::ReadOptions());
-  it->SeekToFirst();
-  return it->Valid();
-}
-void FlowFileRepository::initialize_repository() {
-  checkpoint_.reset();
-  auto opendb = db_->open();
-  if (!opendb) {
-    logger_->log_trace("Couldn't open database, no way to checkpoint");
-    return;
-  }
-  // first we need to establish a checkpoint iff it is needed.
-  if (!need_checkpoint(*opendb)) {
-    logger_->log_trace("Do not need checkpoint");
-    return;
-  }
-  // delete any previous copy
-  if (utils::file::delete_dir(checkpoint_dir_) < 0) {
-    logger_->log_error("Could not delete existing checkpoint directory '%s'", 
checkpoint_dir_.string());
-    return;
-  }
-  std::unique_ptr<rocksdb::Checkpoint> checkpoint;
-  rocksdb::Status checkpoint_status = opendb->NewCheckpoint(checkpoint);
-  if (!checkpoint_status.ok()) {
-    logger_->log_error("Could not create checkpoint object: %s", 
checkpoint_status.ToString());
-    return;
-  }
-  checkpoint_status = checkpoint->CreateCheckpoint(checkpoint_dir_.string());
-  if (!checkpoint_status.ok()) {
-    logger_->log_error("Could not initialize checkpoint: %s", 
checkpoint_status.ToString());
-    return;
+  flush();
+  if (content_repo_->clearOrphans()) {

Review Comment:
   changed it to void, added some extra logs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to