This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 033ef9d0a119ba60c7698b762fc763bc9542e16e
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Sat Mar 4 23:36:19 2023 +0100

    MINIFICPP-2059 Handle content repository remove failures
    
    Closes #1519
    Signed-off-by: Marton Szasz <[email protected]>
---
 .../rocksdb-repos/DatabaseContentRepository.cpp    |  21 ++++-
 .../rocksdb-repos/DatabaseContentRepository.h      |   3 +-
 libminifi/include/core/ContentRepository.h         |   8 ++
 .../include/core/repository/FileSystemRepository.h |   4 +-
 .../core/repository/VolatileContentRepository.h    |  11 ++-
 libminifi/src/core/ContentRepository.cpp           |  42 +++++++--
 .../src/core/repository/FileSystemRepository.cpp   |  40 ++++++--
 .../core/repository/VolatileContentRepository.cpp  |  24 +++--
 libminifi/test/unit/FileSystemRepositoryTests.cpp  | 104 ++++++++++++++++++++-
 9 files changed, 212 insertions(+), 45 deletions(-)

diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp 
b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 020c60fca..187776778 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -197,20 +197,26 @@ bool DatabaseContentRepository::exists(const 
minifi::ResourceClaim &streamId) {
   }
 }
 
-bool DatabaseContentRepository::remove(const minifi::ResourceClaim &claim) {
-  if (!is_valid_ || !db_)
+bool DatabaseContentRepository::removeKey(const std::string& content_path) {
+  if (!is_valid_ || !db_) {
+    logger_->log_error("DB is not valid, could not delete %s", content_path);
     return false;
+  }
   auto opendb = db_->open();
   if (!opendb) {
+    logger_->log_error("Could not open DB, did not delete %s", content_path);
     return false;
   }
   rocksdb::Status status;
-  status = opendb->Delete(rocksdb::WriteOptions(), claim.getContentFullPath());
+  status = opendb->Delete(rocksdb::WriteOptions(), content_path);
   if (status.ok()) {
-    logger_->log_debug("Deleting resource %s", claim.getContentFullPath());
+    logger_->log_debug("Deleting resource %s", content_path);
+    return true;
+  } else if (status.IsNotFound()) {
+    logger_->log_debug("Resource %s was not found", content_path);
     return true;
   } else {
-    logger_->log_debug("Attempted, but could not delete %s", 
claim.getContentFullPath());
+    logger_->log_error("Attempted, but could not delete %s", content_path);
     return false;
   }
 }
@@ -238,6 +244,7 @@ void DatabaseContentRepository::clearOrphans() {
   auto it = opendb->NewIterator(rocksdb::ReadOptions());
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
     auto key = it->key().ToString();
+    std::lock_guard<std::mutex> lock(count_map_mutex_);
     auto claim_it = count_map_.find(key);
     if (claim_it == count_map_.end() || claim_it->second == 0) {
       logger_->log_error("Deleting orphan resource %s", key);
@@ -253,6 +260,10 @@ void DatabaseContentRepository::clearOrphans() {
 
   if (!status.ok()) {
     logger_->log_error("Could not delete orphan contents from rocksdb 
database: %s", status.ToString());
+    std::lock_guard<std::mutex> lock(purge_list_mutex_);
+    for (const auto& key : keys_to_be_deleted) {
+      purge_list_.push_back(key);
+    }
   }
 }
 
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h 
b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 7e1bac61c..1305a7c2d 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -67,7 +67,6 @@ class DatabaseContentRepository : public 
core::ContentRepository {
     return remove(claim);
   }
 
-  bool remove(const minifi::ResourceClaim &claim) override;
   bool exists(const minifi::ResourceClaim &streamId) override;
 
   void clearOrphans() override;
@@ -76,6 +75,8 @@ class DatabaseContentRepository : public 
core::ContentRepository {
   void stop() override;
 
  protected:
+  bool removeKey(const std::string& content_path) override;
+
   std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, 
bool append, minifi::internal::WriteBatch* batch);
 
   void runCompaction();
diff --git a/libminifi/include/core/ContentRepository.h 
b/libminifi/include/core/ContentRepository.h
index ce5a0756c..17762af26 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <string>
 #include <utility>
+#include <list>
 
 #include "properties/Configure.h"
 #include "ResourceClaim.h"
@@ -56,10 +57,17 @@ class ContentRepository : public 
StreamManager<minifi::ResourceClaim>, public ut
   virtual void start() {}
   virtual void stop() {}
 
+  bool remove(const minifi::ResourceClaim &streamId) final;
+
  protected:
+  void removeFromPurgeList();
+  virtual bool removeKey(const std::string& content_path) = 0;
+
   std::string directory_;
   std::mutex count_map_mutex_;
+  std::mutex purge_list_mutex_;
   std::map<std::string, uint32_t> count_map_;
+  std::list<std::string> purge_list_;
 };
 
 }  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/repository/FileSystemRepository.h 
b/libminifi/include/core/repository/FileSystemRepository.h
index 01926f5bd..b0c3900cd 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -46,11 +46,13 @@ class FileSystemRepository : public core::ContentRepository 
{
     return remove(claim);
   }
 
-  bool remove(const minifi::ResourceClaim& claim) override;
   std::shared_ptr<ContentSession> createSession() override;
 
   void clearOrphans() override;
 
+ protected:
+  bool removeKey(const std::string& content_path) override;
+
  private:
   std::shared_ptr<logging::Logger> logger_;
 };
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h 
b/libminifi/include/core/repository/VolatileContentRepository.h
index 62e9fd80b..a5f6027e9 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -88,15 +88,16 @@ class VolatileContentRepository : public 
core::ContentRepository {
     return remove(claim);
   }
 
+  void clearOrphans() override {
+    // there are no persisted orphans to delete
+  }
+
+ protected:
   /**
    * Closes the claim.
    * @return whether or not the claim is associated with content stored in 
volatile memory.
    */
-  bool remove(const minifi::ResourceClaim &claim) override;
-
-  void clearOrphans() override {
-    // there are no persisted orphans to delete
-  }
+  bool removeKey(const std::string& content_path) override;
 
  private:
   VolatileRepositoryData repo_data_;
diff --git a/libminifi/src/core/ContentRepository.cpp 
b/libminifi/src/core/ContentRepository.cpp
index a13032c7b..19e9ba2ae 100644
--- a/libminifi/src/core/ContentRepository.cpp
+++ b/libminifi/src/core/ContentRepository.cpp
@@ -60,18 +60,42 @@ void ContentRepository::incrementStreamCount(const 
minifi::ResourceClaim &stream
   }
 }
 
+void ContentRepository::removeFromPurgeList() {
+  std::lock_guard<std::mutex> lock(purge_list_mutex_);
+  for (auto it = purge_list_.begin(); it != purge_list_.end();) {
+    if (removeKey(*it)) {
+      purge_list_.erase(it++);
+    } else {
+      ++it;
+    }
+  }
+}
+
 ContentRepository::StreamState ContentRepository::decrementStreamCount(const 
minifi::ResourceClaim &streamId) {
-  std::lock_guard<std::mutex> lock(count_map_mutex_);
-  const std::string str = streamId.getContentFullPath();
-  auto count = count_map_.find(str);
-  if (count != count_map_.end() && count->second > 1) {
-    count_map_[str] = count->second - 1;
-    return StreamState::Alive;
-  } else {
+  {
+    std::lock_guard<std::mutex> lock(count_map_mutex_);
+    const std::string str = streamId.getContentFullPath();
+    auto count = count_map_.find(str);
+    if (count != count_map_.end() && count->second > 1) {
+      count_map_[str] = count->second - 1;
+      return StreamState::Alive;
+    }
+
     count_map_.erase(str);
-    remove(streamId);
-    return StreamState::Deleted;
   }
+
+  remove(streamId);
+  return StreamState::Deleted;
+}
+
+bool ContentRepository::remove(const minifi::ResourceClaim &streamId) {
+  removeFromPurgeList();
+  if (!removeKey(streamId.getContentFullPath())) {
+    std::lock_guard<std::mutex> lock(purge_list_mutex_);
+    purge_list_.push_back(streamId.getContentFullPath());
+    return false;
+  }
+  return true;
 }
 
 }  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp 
b/libminifi/src/core/repository/FileSystemRepository.cpp
index 034e9c36a..5c6648092 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -19,6 +19,7 @@
 #include "core/repository/FileSystemRepository.h"
 #include <memory>
 #include <string>
+#include <filesystem>
 #include "io/FileStream.h"
 #include "utils/file/FileUtils.h"
 #include "core/ForwardingContentSession.h"
@@ -49,9 +50,23 @@ std::shared_ptr<io::BaseStream> 
FileSystemRepository::read(const minifi::Resourc
   return std::make_shared<io::FileStream>(claim.getContentFullPath(), 0, 
false);
 }
 
-bool FileSystemRepository::remove(const minifi::ResourceClaim& claim) {
-  logger_->log_debug("Deleting resource %s", claim.getContentFullPath());
-  std::remove(claim.getContentFullPath().c_str());
+bool FileSystemRepository::removeKey(const std::string& content_path) {
+  logger_->log_debug("Deleting resource %s", content_path);
+  std::error_code ec;
+  auto result = std::filesystem::exists(content_path, ec);
+  if (ec) {
+    logger_->log_error("Deleting %s from content repository failed with the 
following error: %s", content_path, ec.message());
+    return false;
+  }
+  if (!result) {
+    logger_->log_debug("Content path %s does not exist, no need to delete it", 
content_path);
+    return true;
+  }
+  ec.clear();
+  if (!std::filesystem::remove(content_path, ec)) {
+    logger_->log_error("Deleting %s from content repository failed with the 
following error: %s", content_path, ec.message());
+    return false;
+  }
   return true;
 }
 
@@ -60,13 +75,24 @@ std::shared_ptr<ContentSession> 
FileSystemRepository::createSession() {
 }
 
 void FileSystemRepository::clearOrphans() {
-  std::lock_guard<std::mutex> lock(count_map_mutex_);
   utils::file::list_dir(directory_, [&] (auto& /*dir*/, auto& filename) {
     auto path = directory_ +  "/" + filename.string();
-    auto it = count_map_.find(path);
-    if (it == count_map_.end() || it->second == 0) {
+    bool is_orphan = false;
+    {
+      std::lock_guard<std::mutex> lock(count_map_mutex_);
+      auto it = count_map_.find(path);
+      is_orphan = it == count_map_.end() || it->second == 0;
+    }
+    if (is_orphan) {
       logger_->log_debug("Deleting orphan resource %s", path);
-      std::remove(path.c_str());
+      std::error_code ec;
+      if (!std::filesystem::remove(path, ec)) {
+        {
+          std::lock_guard<std::mutex> lock(purge_list_mutex_);
+          purge_list_.push_back(path);
+        }
+        logger_->log_error("Deleting %s from content repository failed with 
the following error: %s", path, ec.message());
+      }
     }
     return true;
   }, logger_, false);
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp 
b/libminifi/src/core/repository/VolatileContentRepository.cpp
index 822433bd8..a47732119 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -122,40 +122,38 @@ std::shared_ptr<io::BaseStream> 
VolatileContentRepository::read(const minifi::Re
   return nullptr;
 }
 
-bool VolatileContentRepository::remove(const minifi::ResourceClaim &claim) {
+bool VolatileContentRepository::removeKey(const std::string& content_path) {
   if (LIKELY(minimize_locking_ == true)) {
     std::lock_guard<std::mutex> lock(map_mutex_);
-    auto ent = master_list_.find(claim.getContentFullPath());
+    auto ent = master_list_.find(content_path);
     if (ent != master_list_.end()) {
       auto ptr = ent->second;
       // if we cannot remove the entry we will let the owner's destructor
       // decrement the reference count and free it
-      master_list_.erase(claim.getContentFullPath());
+      master_list_.erase(content_path);
       // because of the test and set we need to decrement ownership
       ptr->decrementOwnership();
-      if (ptr->freeValue(claim.getContentFullPath())) {
-        logger_->log_info("Deleting resource %s", claim.getContentFullPath());
-        return true;
+      if (ptr->freeValue(content_path)) {
+        logger_->log_info("Deleting resource %s", content_path);
       } else {
-        logger_->log_info("free failed for %s", claim.getContentFullPath());
+        logger_->log_info("free failed for %s", content_path);
       }
     } else {
-      logger_->log_info("Could not remove %s", claim.getContentFullPath());
+      logger_->log_info("Could not remove %s", content_path);
     }
   } else {
     std::lock_guard<std::mutex> lock(map_mutex_);
-    auto claim_item = master_list_.find(claim.getContentFullPath());
+    auto claim_item = master_list_.find(content_path);
     if (claim_item != master_list_.end()) {
       auto size = claim_item->second->getLength();
       delete claim_item->second;
-      master_list_.erase(claim.getContentFullPath());
+      master_list_.erase(content_path);
       repo_data_.current_size -= size;
     }
-    return true;
   }
 
-  logger_->log_info("Could not remove %s, may not exist", 
claim.getContentFullPath());
-  return false;
+  logger_->log_info("Could not remove %s, may not exist", content_path);
+  return true;
 }
 
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/test/unit/FileSystemRepositoryTests.cpp 
b/libminifi/test/unit/FileSystemRepositoryTests.cpp
index c61579dd9..5aa652e54 100644
--- a/libminifi/test/unit/FileSystemRepositoryTests.cpp
+++ b/libminifi/test/unit/FileSystemRepositoryTests.cpp
@@ -20,7 +20,11 @@
 // as we measure the absolute memory usage that would fail this test
 #define EXTENSION_LIST ""
 
+#ifdef WIN32
+#include <Windows.h>
+#endif
 #include <cstring>
+#include <list>
 
 #include "utils/gsl.h"
 #include "utils/OsUtils.h"
@@ -29,9 +33,20 @@
 #include "utils/Literals.h"
 #include "core/repository/FileSystemRepository.h"
 #include "utils/IntegrationTestUtils.h"
+#include "utils/file/FileUtils.h"
 
 using namespace std::literals::chrono_literals;
 
+namespace org::apache::nifi::minifi::test {
+
+class TestFileSystemRepository : public 
minifi::core::repository::FileSystemRepository {
+ public:
+  using FileSystemRepository::FileSystemRepository;
+  std::list<std::string> getPurgeList() const {
+    return purge_list_;
+  }
+};
+
 TEST_CASE("Test Physical memory usage", "[testphysicalmemoryusage]") {
   TestController controller;
   auto dir = controller.createTempDirectory();
@@ -39,7 +54,7 @@ TEST_CASE("Test Physical memory usage", 
"[testphysicalmemoryusage]") {
   auto config = std::make_shared<minifi::Configure>();
   config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, 
dir.string());
   REQUIRE(fs_repo->initialize(config));
-  const auto start_memory = 
utils::OsUtils::getCurrentProcessPhysicalMemoryUsage();
+  const auto start_memory = 
minifi::utils::OsUtils::getCurrentProcessPhysicalMemoryUsage();
   REQUIRE(start_memory > 0);
 
   auto content_session = fs_repo->createSession();
@@ -53,7 +68,7 @@ TEST_CASE("Test Physical memory usage", 
"[testphysicalmemoryusage]") {
 
   using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime;
   CHECK(verifyEventHappenedInPollTime(5s, [&] {
-      const auto end_memory = 
utils::OsUtils::getCurrentProcessPhysicalMemoryUsage();
+      const auto end_memory = 
minifi::utils::OsUtils::getCurrentProcessPhysicalMemoryUsage();
       REQUIRE(end_memory > 0);
       return end_memory < start_memory + int64_t{5_MB};
     }, 100ms));
@@ -74,11 +89,92 @@ TEST_CASE("FileSystemRepository can clear orphan entries") {
     content_repo->incrementStreamCount(claim);
   }
 
-  REQUIRE(utils::file::list_dir_all(dir, testController.getLogger()).size() == 
1);
+  REQUIRE(minifi::utils::file::list_dir_all(dir, 
testController.getLogger()).size() == 1);
 
   auto content_repo = 
std::make_shared<core::repository::FileSystemRepository>();
   REQUIRE(content_repo->initialize(configuration));
   content_repo->clearOrphans();
 
-  REQUIRE(utils::file::list_dir_all(dir, testController.getLogger()).empty());
+  REQUIRE(minifi::utils::file::list_dir_all(dir, 
testController.getLogger()).empty());
+}
+
+TEST_CASE("FileSystemRepository can retry removing entry that previously 
failed to be removed") {
+  TestController testController;
+  auto dir = testController.createTempDirectory();
+  auto configuration = 
std::make_shared<org::apache::nifi::minifi::Configure>();
+  
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
 dir.string());
+
+  auto content_repo = std::make_shared<TestFileSystemRepository>();
+  REQUIRE(content_repo->initialize(configuration));
+  std::string filename;
+  {
+    minifi::ResourceClaim claim(content_repo);
+    content_repo->write(claim)->write("hi");
+    auto files = minifi::utils::file::list_dir_all(dir, 
testController.getLogger());
+    REQUIRE(files.size() == 1);
+    // ensure that the content is not deleted during resource claim destruction
+    filename = (files[0].first / files[0].second).string();
+#ifdef WIN32
+    REQUIRE(SetFileAttributes(filename.c_str(), FILE_ATTRIBUTE_READONLY));
+#else
+    minifi::utils::file::set_permissions(dir, 0555);
+#endif
+  }
+
+#ifdef WIN32
+  REQUIRE(SetFileAttributes(filename.c_str(), 
GetFileAttributes(filename.c_str()) & ~FILE_ATTRIBUTE_READONLY));
+#else
+  minifi::utils::file::set_permissions(dir, 0777);
+#endif
+  REQUIRE(minifi::utils::file::list_dir_all(dir, 
testController.getLogger()).size() == 1);
+  {
+    minifi::ResourceClaim claim(content_repo);
+    content_repo->write(claim)->write("hi");
+    REQUIRE(minifi::utils::file::list_dir_all(dir, 
testController.getLogger()).size() == 2);
+  }
+
+  REQUIRE(minifi::utils::file::list_dir_all(dir, 
testController.getLogger()).empty());
+  REQUIRE(content_repo->getPurgeList().empty());
 }
+
+TEST_CASE("FileSystemRepository removes non-existing resource file from purge 
list") {
+  TestController testController;
+  auto dir = testController.createTempDirectory();
+  auto configuration = 
std::make_shared<org::apache::nifi::minifi::Configure>();
+  
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
 dir.string());
+
+  auto content_repo = std::make_shared<TestFileSystemRepository>();
+  REQUIRE(content_repo->initialize(configuration));
+  std::string filename;
+  {
+    minifi::ResourceClaim claim(content_repo);
+    content_repo->write(claim)->write("hi");
+    auto files = minifi::utils::file::list_dir_all(dir, 
testController.getLogger());
+    REQUIRE(files.size() == 1);
+    // ensure that the content is not deleted during resource claim destruction
+    filename = (files[0].first / files[0].second).string();
+#ifdef WIN32
+    REQUIRE(SetFileAttributes(filename.c_str(), FILE_ATTRIBUTE_READONLY));
+#else
+    minifi::utils::file::set_permissions(dir, 0555);
+#endif
+  }
+
+#ifdef WIN32
+  REQUIRE(SetFileAttributes(filename.c_str(), 
GetFileAttributes(filename.c_str()) & ~FILE_ATTRIBUTE_READONLY));
+#else
+  minifi::utils::file::set_permissions(dir, 0777);
+#endif
+  REQUIRE(std::filesystem::remove(filename));
+  REQUIRE(minifi::utils::file::list_dir_all(dir, 
testController.getLogger()).empty());
+  {
+    minifi::ResourceClaim claim(content_repo);
+    content_repo->write(claim)->write("hi");
+    REQUIRE(minifi::utils::file::list_dir_all(dir, 
testController.getLogger()).size() == 1);
+  }
+
+  REQUIRE(minifi::utils::file::list_dir_all(dir, 
testController.getLogger()).empty());
+  REQUIRE(content_repo->getPurgeList().empty());
+}
+
+}  // namespace org::apache::nifi::minifi::test

Reply via email to