szaszm commented on code in PR #1499:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1499#discussion_r1310172337
##########
extensions/rocksdb-repos/DatabaseContentRepository.cpp:
##########
@@ -198,27 +212,63 @@ bool DatabaseContentRepository::exists(const
minifi::ResourceClaim &streamId) {
}
bool DatabaseContentRepository::removeKey(const std::string& content_path) {
- if (!is_valid_ || !db_) {
- logger_->log_error("DB is not valid, could not delete %s", content_path);
- return false;
- }
- auto opendb = db_->open();
- if (!opendb) {
- logger_->log_error("Could not open DB, did not delete %s", content_path);
- return false;
- }
- rocksdb::Status status;
- status = opendb->Delete(rocksdb::WriteOptions(), content_path);
- if (status.ok()) {
- logger_->log_debug("Deleting resource %s", content_path);
- return true;
- } else if (status.IsNotFound()) {
- logger_->log_debug("Resource %s was not found", content_path);
- return true;
- } else {
- logger_->log_error("Attempted, but could not delete %s", content_path);
- return false;
+ if (purge_period_ == std::chrono::seconds(0)) {
+ if (!is_valid_ || !db_)
+ return false;
+ // synchronous deletion
+ auto opendb = db_->open();
+ if (!opendb) {
+ return false;
+ }
+ rocksdb::Status status = opendb->Delete(rocksdb::WriteOptions(),
content_path);
+ if (status.ok()) {
+ logger_->log_debug("Deleting resource %s", content_path);
+ return true;
+ } else if (status.IsNotFound()) {
+ logger_->log_debug("Resource %s was not found", content_path);
+ return true;
+ } else {
+ logger_->log_debug("Attempted, but could not delete %s", content_path);
+ return false;
+ }
}
+ // asynchronous deletion
+ std::lock_guard guard(keys_mtx_);
+ logger_->log_debug("Staging resource for deletion %s", content_path);
+ keys_to_delete_.push_back(content_path);
+ return true;
+}
+
+void DatabaseContentRepository::runGc() {
+ do {
+ auto opendb = db_->open();
+ if (!opendb) {
+ continue;
+ }
+ std::vector<std::string> keys;
+ {
+ std::lock_guard guard(keys_mtx_);
+ keys = std::exchange(keys_to_delete_, std::vector<std::string>{});
+ }
+ auto batch = opendb->createWriteBatch();
+ for (auto& key : keys) {
+ batch.Delete(key);
+ }
+ rocksdb::Status status;
+ status = opendb->Write(rocksdb::WriteOptions(), &batch);
+ if (status.ok()) {
Review Comment:
Could you add a code comment explaining this, including mentioning
`clearOrphans`? So that if a future reader comes across this code in isolation,
they won't think that this is a bug, or at least not for this reason.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]