This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 033ef9d0a119ba60c7698b762fc763bc9542e16e Author: Gabor Gyimesi <[email protected]> AuthorDate: Sat Mar 4 23:36:19 2023 +0100 MINIFICPP-2059 Handle content repository remove failures Closes #1519 Signed-off-by: Marton Szasz <[email protected]> --- .../rocksdb-repos/DatabaseContentRepository.cpp | 21 ++++- .../rocksdb-repos/DatabaseContentRepository.h | 3 +- libminifi/include/core/ContentRepository.h | 8 ++ .../include/core/repository/FileSystemRepository.h | 4 +- .../core/repository/VolatileContentRepository.h | 11 ++- libminifi/src/core/ContentRepository.cpp | 42 +++++++-- .../src/core/repository/FileSystemRepository.cpp | 40 ++++++-- .../core/repository/VolatileContentRepository.cpp | 24 +++-- libminifi/test/unit/FileSystemRepositoryTests.cpp | 104 ++++++++++++++++++++- 9 files changed, 212 insertions(+), 45 deletions(-) diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 020c60fca..187776778 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -197,20 +197,26 @@ bool DatabaseContentRepository::exists(const minifi::ResourceClaim &streamId) { } } -bool DatabaseContentRepository::remove(const minifi::ResourceClaim &claim) { - if (!is_valid_ || !db_) +bool DatabaseContentRepository::removeKey(const std::string& content_path) { + if (!is_valid_ || !db_) { + logger_->log_error("DB is not valid, could not delete %s", content_path); return false; + } auto opendb = db_->open(); if (!opendb) { + logger_->log_error("Could not open DB, did not delete %s", content_path); return false; } rocksdb::Status status; - status = opendb->Delete(rocksdb::WriteOptions(), claim.getContentFullPath()); + status = opendb->Delete(rocksdb::WriteOptions(), content_path); if (status.ok()) { - logger_->log_debug("Deleting resource %s", claim.getContentFullPath()); + logger_->log_debug("Deleting resource %s", content_path); + return true; + } else if (status.IsNotFound()) { + logger_->log_debug("Resource %s was not found", content_path); return true; } else { - logger_->log_debug("Attempted, but could not delete %s", claim.getContentFullPath()); + logger_->log_error("Attempted, but could not delete %s", content_path); return false; } } @@ -238,6 +244,7 @@ void DatabaseContentRepository::clearOrphans() { auto it = opendb->NewIterator(rocksdb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { auto key = it->key().ToString(); + std::lock_guard<std::mutex> lock(count_map_mutex_); auto claim_it = count_map_.find(key); if (claim_it == count_map_.end() || claim_it->second == 0) { logger_->log_error("Deleting orphan resource %s", key); @@ -253,6 +260,10 @@ void DatabaseContentRepository::clearOrphans() { if (!status.ok()) { logger_->log_error("Could not delete orphan contents from rocksdb database: %s", status.ToString()); + std::lock_guard<std::mutex> lock(purge_list_mutex_); + for (const auto& key : keys_to_be_deleted) { + purge_list_.push_back(key); + } } } diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h index 7e1bac61c..1305a7c2d 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.h +++ b/extensions/rocksdb-repos/DatabaseContentRepository.h @@ -67,7 +67,6 @@ class DatabaseContentRepository : public core::ContentRepository { return remove(claim); } - bool remove(const minifi::ResourceClaim &claim) override; bool exists(const minifi::ResourceClaim &streamId) override; void clearOrphans() override; @@ -76,6 +75,8 @@ class DatabaseContentRepository : public core::ContentRepository { void stop() override; protected: + bool removeKey(const std::string& content_path) override; + std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append, minifi::internal::WriteBatch* batch); void runCompaction(); diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h index ce5a0756c..17762af26 100644 --- a/libminifi/include/core/ContentRepository.h +++ b/libminifi/include/core/ContentRepository.h @@ -21,6 +21,7 @@ #include <memory> #include <string> #include <utility> +#include <list> #include "properties/Configure.h" #include "ResourceClaim.h" @@ -56,10 +57,17 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>, public ut virtual void start() {} virtual void stop() {} + bool remove(const minifi::ResourceClaim &streamId) final; + protected: + void removeFromPurgeList(); + virtual bool removeKey(const std::string& content_path) = 0; + std::string directory_; std::mutex count_map_mutex_; + std::mutex purge_list_mutex_; std::map<std::string, uint32_t> count_map_; + std::list<std::string> purge_list_; }; } // namespace org::apache::nifi::minifi::core diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h index 01926f5bd..b0c3900cd 100644 --- a/libminifi/include/core/repository/FileSystemRepository.h +++ b/libminifi/include/core/repository/FileSystemRepository.h @@ -46,11 +46,13 @@ class FileSystemRepository : public core::ContentRepository { return remove(claim); } - bool remove(const minifi::ResourceClaim& claim) override; std::shared_ptr<ContentSession> createSession() override; void clearOrphans() override; + protected: + bool removeKey(const std::string& content_path) override; + private: std::shared_ptr<logging::Logger> logger_; }; diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h index 62e9fd80b..a5f6027e9 100644 --- a/libminifi/include/core/repository/VolatileContentRepository.h +++ b/libminifi/include/core/repository/VolatileContentRepository.h @@ -88,15 +88,16 @@ class VolatileContentRepository : public core::ContentRepository { return remove(claim); } + void clearOrphans() override { + // there are no persisted orphans to delete + } + + protected: /** * Closes the claim. * @return whether or not the claim is associated with content stored in volatile memory. */ - bool remove(const minifi::ResourceClaim &claim) override; - - void clearOrphans() override { - // there are no persisted orphans to delete - } + bool removeKey(const std::string& content_path) override; private: VolatileRepositoryData repo_data_; diff --git a/libminifi/src/core/ContentRepository.cpp b/libminifi/src/core/ContentRepository.cpp index a13032c7b..19e9ba2ae 100644 --- a/libminifi/src/core/ContentRepository.cpp +++ b/libminifi/src/core/ContentRepository.cpp @@ -60,18 +60,42 @@ void ContentRepository::incrementStreamCount(const minifi::ResourceClaim &stream } } +void ContentRepository::removeFromPurgeList() { + std::lock_guard<std::mutex> lock(purge_list_mutex_); + for (auto it = purge_list_.begin(); it != purge_list_.end();) { + if (removeKey(*it)) { + purge_list_.erase(it++); + } else { + ++it; + } + } +} + ContentRepository::StreamState ContentRepository::decrementStreamCount(const minifi::ResourceClaim &streamId) { - std::lock_guard<std::mutex> lock(count_map_mutex_); - const std::string str = streamId.getContentFullPath(); - auto count = count_map_.find(str); - if (count != count_map_.end() && count->second > 1) { - count_map_[str] = count->second - 1; - return StreamState::Alive; - } else { + { + std::lock_guard<std::mutex> lock(count_map_mutex_); + const std::string str = streamId.getContentFullPath(); + auto count = count_map_.find(str); + if (count != count_map_.end() && count->second > 1) { + count_map_[str] = count->second - 1; + return StreamState::Alive; + } + count_map_.erase(str); - remove(streamId); - return StreamState::Deleted; } + + remove(streamId); + return StreamState::Deleted; +} + +bool ContentRepository::remove(const minifi::ResourceClaim &streamId) { + removeFromPurgeList(); + if (!removeKey(streamId.getContentFullPath())) { + std::lock_guard<std::mutex> lock(purge_list_mutex_); + purge_list_.push_back(streamId.getContentFullPath()); + return false; + } + return true; } } // namespace org::apache::nifi::minifi::core diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp index 034e9c36a..5c6648092 100644 --- a/libminifi/src/core/repository/FileSystemRepository.cpp +++ b/libminifi/src/core/repository/FileSystemRepository.cpp @@ -19,6 +19,7 @@ #include "core/repository/FileSystemRepository.h" #include <memory> #include <string> +#include <filesystem> #include "io/FileStream.h" #include "utils/file/FileUtils.h" #include "core/ForwardingContentSession.h" @@ -49,9 +50,23 @@ std::shared_ptr<io::BaseStream> FileSystemRepository::read(const minifi::Resourc return std::make_shared<io::FileStream>(claim.getContentFullPath(), 0, false); } -bool FileSystemRepository::remove(const minifi::ResourceClaim& claim) { - logger_->log_debug("Deleting resource %s", claim.getContentFullPath()); - std::remove(claim.getContentFullPath().c_str()); +bool FileSystemRepository::removeKey(const std::string& content_path) { + logger_->log_debug("Deleting resource %s", content_path); + std::error_code ec; + auto result = std::filesystem::exists(content_path, ec); + if (ec) { + logger_->log_error("Deleting %s from content repository failed with the following error: %s", content_path, ec.message()); + return false; + } + if (!result) { + logger_->log_debug("Content path %s does not exist, no need to delete it", content_path); + return true; + } + ec.clear(); + if (!std::filesystem::remove(content_path, ec)) { + logger_->log_error("Deleting %s from content repository failed with the following error: %s", content_path, ec.message()); + return false; + } return true; } @@ -60,13 +75,24 @@ std::shared_ptr<ContentSession> FileSystemRepository::createSession() { } void FileSystemRepository::clearOrphans() { - std::lock_guard<std::mutex> lock(count_map_mutex_); utils::file::list_dir(directory_, [&] (auto& /*dir*/, auto& filename) { auto path = directory_ + "/" + filename.string(); - auto it = count_map_.find(path); - if (it == count_map_.end() || it->second == 0) { + bool is_orphan = false; + { + std::lock_guard<std::mutex> lock(count_map_mutex_); + auto it = count_map_.find(path); + is_orphan = it == count_map_.end() || it->second == 0; + } + if (is_orphan) { logger_->log_debug("Deleting orphan resource %s", path); - std::remove(path.c_str()); + std::error_code ec; + if (!std::filesystem::remove(path, ec)) { + { + std::lock_guard<std::mutex> lock(purge_list_mutex_); + purge_list_.push_back(path); + } + logger_->log_error("Deleting %s from content repository failed with the following error: %s", path, ec.message()); + } } return true; }, logger_, false); diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp index 822433bd8..a47732119 100644 --- a/libminifi/src/core/repository/VolatileContentRepository.cpp +++ b/libminifi/src/core/repository/VolatileContentRepository.cpp @@ -122,40 +122,38 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const minifi::Re return nullptr; } -bool VolatileContentRepository::remove(const minifi::ResourceClaim &claim) { +bool VolatileContentRepository::removeKey(const std::string& content_path) { if (LIKELY(minimize_locking_ == true)) { std::lock_guard<std::mutex> lock(map_mutex_); - auto ent = master_list_.find(claim.getContentFullPath()); + auto ent = master_list_.find(content_path); if (ent != master_list_.end()) { auto ptr = ent->second; // if we cannot remove the entry we will let the owner's destructor // decrement the reference count and free it - master_list_.erase(claim.getContentFullPath()); + master_list_.erase(content_path); // because of the test and set we need to decrement ownership ptr->decrementOwnership(); - if (ptr->freeValue(claim.getContentFullPath())) { - logger_->log_info("Deleting resource %s", claim.getContentFullPath()); - return true; + if (ptr->freeValue(content_path)) { + logger_->log_info("Deleting resource %s", content_path); } else { - logger_->log_info("free failed for %s", claim.getContentFullPath()); + logger_->log_info("free failed for %s", content_path); } } else { - logger_->log_info("Could not remove %s", claim.getContentFullPath()); + logger_->log_info("Could not remove %s", content_path); } } else { std::lock_guard<std::mutex> lock(map_mutex_); - auto claim_item = master_list_.find(claim.getContentFullPath()); + auto claim_item = master_list_.find(content_path); if (claim_item != master_list_.end()) { auto size = claim_item->second->getLength(); delete claim_item->second; - master_list_.erase(claim.getContentFullPath()); + master_list_.erase(content_path); repo_data_.current_size -= size; } - return true; } - logger_->log_info("Could not remove %s, may not exist", claim.getContentFullPath()); - return false; + logger_->log_info("Could not remove %s, may not exist", content_path); + return true; } } // namespace org::apache::nifi::minifi::core::repository diff --git a/libminifi/test/unit/FileSystemRepositoryTests.cpp b/libminifi/test/unit/FileSystemRepositoryTests.cpp index c61579dd9..5aa652e54 100644 --- a/libminifi/test/unit/FileSystemRepositoryTests.cpp +++ b/libminifi/test/unit/FileSystemRepositoryTests.cpp @@ -20,7 +20,11 @@ // as we measure the absolute memory usage that would fail this test #define EXTENSION_LIST "" +#ifdef WIN32 +#include <Windows.h> +#endif #include <cstring> +#include <list> #include "utils/gsl.h" #include "utils/OsUtils.h" @@ -29,9 +33,20 @@ #include "utils/Literals.h" #include "core/repository/FileSystemRepository.h" #include "utils/IntegrationTestUtils.h" +#include "utils/file/FileUtils.h" using namespace std::literals::chrono_literals; +namespace org::apache::nifi::minifi::test { + +class TestFileSystemRepository : public minifi::core::repository::FileSystemRepository { + public: + using FileSystemRepository::FileSystemRepository; + std::list<std::string> getPurgeList() const { + return purge_list_; + } +}; + TEST_CASE("Test Physical memory usage", "[testphysicalmemoryusage]") { TestController controller; auto dir = controller.createTempDirectory(); @@ -39,7 +54,7 @@ TEST_CASE("Test Physical memory usage", "[testphysicalmemoryusage]") { auto config = std::make_shared<minifi::Configure>(); config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); REQUIRE(fs_repo->initialize(config)); - const auto start_memory = utils::OsUtils::getCurrentProcessPhysicalMemoryUsage(); + const auto start_memory = minifi::utils::OsUtils::getCurrentProcessPhysicalMemoryUsage(); REQUIRE(start_memory > 0); auto content_session = fs_repo->createSession(); @@ -53,7 +68,7 @@ TEST_CASE("Test Physical memory usage", "[testphysicalmemoryusage]") { using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime; CHECK(verifyEventHappenedInPollTime(5s, [&] { - const auto end_memory = utils::OsUtils::getCurrentProcessPhysicalMemoryUsage(); + const auto end_memory = minifi::utils::OsUtils::getCurrentProcessPhysicalMemoryUsage(); REQUIRE(end_memory > 0); return end_memory < start_memory + int64_t{5_MB}; }, 100ms)); @@ -74,11 +89,92 @@ TEST_CASE("FileSystemRepository can clear orphan entries") { content_repo->incrementStreamCount(claim); } - REQUIRE(utils::file::list_dir_all(dir, testController.getLogger()).size() == 1); + REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).size() == 1); auto content_repo = std::make_shared<core::repository::FileSystemRepository>(); REQUIRE(content_repo->initialize(configuration)); content_repo->clearOrphans(); - REQUIRE(utils::file::list_dir_all(dir, testController.getLogger()).empty()); + REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).empty()); +} + +TEST_CASE("FileSystemRepository can retry removing entry that previously failed to be removed") { + TestController testController; + auto dir = testController.createTempDirectory(); + auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); + + auto content_repo = std::make_shared<TestFileSystemRepository>(); + REQUIRE(content_repo->initialize(configuration)); + std::string filename; + { + minifi::ResourceClaim claim(content_repo); + content_repo->write(claim)->write("hi"); + auto files = minifi::utils::file::list_dir_all(dir, testController.getLogger()); + REQUIRE(files.size() == 1); + // ensure that the content is not deleted during resource claim destruction + filename = (files[0].first / files[0].second).string(); +#ifdef WIN32 + REQUIRE(SetFileAttributes(filename.c_str(), FILE_ATTRIBUTE_READONLY)); +#else + minifi::utils::file::set_permissions(dir, 0555); +#endif + } + +#ifdef WIN32 + REQUIRE(SetFileAttributes(filename.c_str(), GetFileAttributes(filename.c_str()) & ~FILE_ATTRIBUTE_READONLY)); +#else + minifi::utils::file::set_permissions(dir, 0777); +#endif + REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).size() == 1); + { + minifi::ResourceClaim claim(content_repo); + content_repo->write(claim)->write("hi"); + REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).size() == 2); + } + + REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).empty()); + REQUIRE(content_repo->getPurgeList().empty()); } + +TEST_CASE("FileSystemRepository removes non-existing resource file from purge list") { + TestController testController; + auto dir = testController.createTempDirectory(); + auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string()); + + auto content_repo = std::make_shared<TestFileSystemRepository>(); + REQUIRE(content_repo->initialize(configuration)); + std::string filename; + { + minifi::ResourceClaim claim(content_repo); + content_repo->write(claim)->write("hi"); + auto files = minifi::utils::file::list_dir_all(dir, testController.getLogger()); + REQUIRE(files.size() == 1); + // ensure that the content is not deleted during resource claim destruction + filename = (files[0].first / files[0].second).string(); +#ifdef WIN32 + REQUIRE(SetFileAttributes(filename.c_str(), FILE_ATTRIBUTE_READONLY)); +#else + minifi::utils::file::set_permissions(dir, 0555); +#endif + } + +#ifdef WIN32 + REQUIRE(SetFileAttributes(filename.c_str(), GetFileAttributes(filename.c_str()) & ~FILE_ATTRIBUTE_READONLY)); +#else + minifi::utils::file::set_permissions(dir, 0777); +#endif + REQUIRE(std::filesystem::remove(filename)); + REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).empty()); + { + minifi::ResourceClaim claim(content_repo); + content_repo->write(claim)->write("hi"); + REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).size() == 1); + } + + REQUIRE(minifi::utils::file::list_dir_all(dir, testController.getLogger()).empty()); + REQUIRE(content_repo->getPurgeList().empty()); +} + +} // namespace org::apache::nifi::minifi::test
