szaszm commented on code in PR #1499:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1499#discussion_r1295482217
##########
libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp:
##########
@@ -103,16 +105,35 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {
configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
dir.string());
- REQUIRE(content_repo->initialize(configuration));
- content_repo->remove(*claim);
+ std::string readstr;
- auto read_stream = content_repo->read(*claim);
+ SECTION("Sync") {
+
configuration->set(minifi::Configure::nifi_dbcontent_repository_purge_period,
"0 s");
+ REQUIRE(content_repo->initialize(configuration));
- std::string readstr;
+ content_repo->remove(*claim);
- // error tells us we have an invalid stream
- REQUIRE(minifi::io::isError(read_stream->read(readstr)));
+ auto read_stream = content_repo->read(*claim);
+
+ // error tells us we have an invalid stream
+ REQUIRE(minifi::io::isError(read_stream->read(readstr)));
+ }
+
+ SECTION("Async") {
+
configuration->set(minifi::Configure::nifi_dbcontent_repository_purge_period,
"100 ms");
+ REQUIRE(content_repo->initialize(configuration));
+ content_repo->start();
+
+ content_repo->remove(*claim);
+
+ // an immediate read will still be able to access the content
+
REQUIRE_FALSE(minifi::io::isError(content_repo->read(*claim)->read(readstr)));
Review Comment:
Does this work even with high concurrency and repetition? I want to avoid a
flickery test case if possible.
##########
extensions/rocksdb-repos/DatabaseContentRepository.cpp:
##########
@@ -198,27 +212,63 @@ bool DatabaseContentRepository::exists(const
minifi::ResourceClaim &streamId) {
}
bool DatabaseContentRepository::removeKey(const std::string& content_path) {
- if (!is_valid_ || !db_) {
- logger_->log_error("DB is not valid, could not delete %s", content_path);
- return false;
- }
- auto opendb = db_->open();
- if (!opendb) {
- logger_->log_error("Could not open DB, did not delete %s", content_path);
- return false;
- }
- rocksdb::Status status;
- status = opendb->Delete(rocksdb::WriteOptions(), content_path);
- if (status.ok()) {
- logger_->log_debug("Deleting resource %s", content_path);
- return true;
- } else if (status.IsNotFound()) {
- logger_->log_debug("Resource %s was not found", content_path);
- return true;
- } else {
- logger_->log_error("Attempted, but could not delete %s", content_path);
- return false;
+ if (purge_period_ == std::chrono::seconds(0)) {
+ if (!is_valid_ || !db_)
+ return false;
+ // synchronous deletion
+ auto opendb = db_->open();
+ if (!opendb) {
+ return false;
+ }
+ rocksdb::Status status = opendb->Delete(rocksdb::WriteOptions(),
content_path);
+ if (status.ok()) {
+ logger_->log_debug("Deleting resource %s", content_path);
+ return true;
+ } else if (status.IsNotFound()) {
+ logger_->log_debug("Resource %s was not found", content_path);
+ return true;
+ } else {
+ logger_->log_debug("Attempted, but could not delete %s", content_path);
+ return false;
+ }
}
+ // asynchronous deletion
+ std::lock_guard guard(keys_mtx_);
+ logger_->log_debug("Staging resource for deletion %s", content_path);
+ keys_to_delete_.push_back(content_path);
+ return true;
+}
+
+void DatabaseContentRepository::runGc() {
+ do {
+ auto opendb = db_->open();
+ if (!opendb) {
+ continue;
+ }
+ std::vector<std::string> keys;
+ {
+ std::lock_guard guard(keys_mtx_);
+ keys = std::exchange(keys_to_delete_, std::vector<std::string>{});
+ }
+ auto batch = opendb->createWriteBatch();
+ for (auto& key : keys) {
+ batch.Delete(key);
+ }
+ rocksdb::Status status;
+ status = opendb->Write(rocksdb::WriteOptions(), &batch);
+ if (status.ok()) {
Review Comment:
We should handle the case when there is an interruption (kill, shutdown,
etc) between adding the keys to the vector, and actually deleting them,
including an interruption between these lines: emptying `keys_to_delete_` and
actually deleting. For the latter part, doing the delete before emptying the
list, but while holding the mutex may do the trick, but may not be the best
idea from a performance standpoint. What do you think?
Do we store `keys_to_delete_` on disk?
##########
extensions/rocksdb-repos/DatabaseContentRepository.cpp:
##########
@@ -107,10 +115,15 @@ void DatabaseContentRepository::start() {
return;
}
if (compaction_period_.count() != 0) {
- compaction_thread_ = std::make_unique<utils::StoppableThread>([this] () {
+ compaction_thread_ = std::make_unique<utils::StoppableThread>([this] {
runCompaction();
});
}
+ if (purge_period_.count() != 0) {
+ gc_thread_ = std::make_unique<utils::StoppableThread>([this] {
+ runGc();
+ });
Review Comment:
Could we do these two on the same thread?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]