Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master b57af84d6 -> ed6dc27f6


MINIFICPP-554: Add checkpointing to the startup process to delete
flow files to avoid startup problems.

This closes #373.

Approved by achristianson on GH.

Signed-off-by: Marc Parisi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/ed6dc27f
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/ed6dc27f
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/ed6dc27f

Branch: refs/heads/master
Commit: ed6dc27f6373b41da85a9c1265fce7a9bce617b5
Parents: b57af84
Author: Marc Parisi <[email protected]>
Authored: Mon Jul 16 08:13:13 2018 -0400
Committer: Marc Parisi <[email protected]>
Committed: Tue Jul 24 08:58:59 2018 -0400

----------------------------------------------------------------------
 .gitignore                                      |  1 +
 extensions/rocksdb-repos/FlowFileRepository.cpp | 76 ++++++++++++++----
 extensions/rocksdb-repos/FlowFileRepository.h   | 30 ++++++-
 libminifi/include/utils/file/DiffUtils.h        | 83 +++++++++++++++++++
 libminifi/include/utils/file/FileUtils.h        | 84 +++++++++++++++-----
 libminifi/src/c2/C2Agent.cpp                    |  3 +-
 libminifi/test/rocksdb-tests/RepoTests.cpp      | 58 ++++++++++++++
 7 files changed, 292 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed6dc27f/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 1bbca52..e0a931f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -41,6 +41,7 @@ CTestTestfile.cmake
 cmake-build-debug
 
 # Generated files
+flowfile_checkpoint
 build
 bt_state
 bin

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed6dc27f/extensions/rocksdb-repos/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp 
b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 0d055ea..634819c 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -73,6 +73,9 @@ void FlowFileRepository::flush() {
 void FlowFileRepository::run() {
   // threshold for purge
 
+  if (running_) {
+    prune_stored_flowfiles();
+  }
   while (running_) {
     std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
 
@@ -80,19 +83,31 @@ void FlowFileRepository::run() {
 
     uint64_t size = getRepoSize();
 
-    if (size > (uint64_t)max_partition_bytes_)
+    if (size > (uint64_t) max_partition_bytes_)
       repo_full_ = true;
     else
       repo_full_ = false;
   }
 }
 
-void FlowFileRepository::loadComponent(const 
std::shared_ptr<core::ContentRepository> &content_repo) {
-  content_repo_ = content_repo;
-  std::vector<std::pair<std::string, uint64_t>> purgeList;
-  rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions());
+void FlowFileRepository::prune_stored_flowfiles() {
+  rocksdb::DB* stored_database_;
+  bool corrupt_checkpoint = false;
+  if (nullptr != checkpoint_) {
+    rocksdb::Options options;
+    options.create_if_missing = true;
+    options.use_direct_io_for_flush_and_compaction = true;
+    options.use_direct_reads = true;
+    rocksdb::Status status = rocksdb::DB::OpenForReadOnly(options, 
FLOWFILE_CHECKPOINT_DIRECTORY, &stored_database_);
+    if (!status.ok()) {
+      stored_database_ = db_;
+    }
+  } else {
+    logger_->log_trace("Could not open checkpoint as object doesn't exist. 
Likely not needed or file system error.");
+    return;
+  }
 
-  repo_size_ = 0;
+  rocksdb::Iterator* it = 
stored_database_->NewIterator(rocksdb::ReadOptions());
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
     std::shared_ptr<FlowFileRecord> eventRead = 
std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
     std::string key = it->key().ToString();
@@ -100,34 +115,63 @@ void FlowFileRepository::loadComponent(const 
std::shared_ptr<core::ContentReposi
     if (eventRead->DeSerialize(reinterpret_cast<const uint8_t 
*>(it->value().data()), it->value().size())) {
       logger_->log_debug("Found connection for %s, path %s ", 
eventRead->getConnectionUuid(), eventRead->getContentFullPath());
       auto search = connectionMap.find(eventRead->getConnectionUuid());
-      if (search != connectionMap.end()) {
+      if (!corrupt_checkpoint && search != connectionMap.end()) {
         // we find the connection for the persistent flowfile, create the 
flowfile and enqueue that
         std::shared_ptr<core::FlowFile> flow_file_ref = 
std::static_pointer_cast<core::FlowFile>(eventRead);
         eventRead->setStoredToRepository(true);
         search->second->put(eventRead);
       } else {
-        logger_->log_warn("Could not find connectinon for %s, path %s ", 
eventRead->getConnectionUuid(), eventRead->getContentFullPath());
+        logger_->log_warn("Could not find connection for %s, path %s ", 
eventRead->getConnectionUuid(), eventRead->getContentFullPath());
         if (eventRead->getContentFullPath().length() > 0) {
           if (nullptr != eventRead->getResourceClaim()) {
             content_repo_->remove(eventRead->getResourceClaim());
           }
         }
-        purgeList.push_back(std::make_pair(key, it->value().size()));
+        keys_to_delete.enqueue(key);
       }
     } else {
-      purgeList.push_back(std::make_pair(key, it->value().size()));
+      keys_to_delete.enqueue(key);
     }
   }
 
   delete it;
-  for (auto eventId : purgeList) {
-    logger_->log_debug("Repository Repo %s Purge %s", name_, eventId.first);
-    if (Delete(eventId.first)) {
-      repo_size_ -= eventId.second;
-    }
+
+}
+
+/**
+ * Returns True if there is data to interrogate.
+ * @return true if our db has data stored.
+ */
+bool FlowFileRepository::need_checkpoint(){
+  std::unique_ptr<rocksdb::Iterator> it = 
std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions()));
+  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+    return true;
   }
+  return false;
+}
+void FlowFileRepository::initialize_repository() {
+  // first we need to establish a checkpoint iff it is needed.
+  if (!need_checkpoint()){
+    logger_->log_trace("Do not need checkpoint");
+    return;
+  }
+  rocksdb::Checkpoint *checkpoint;
+  // delete any previous copy
+  if (utils::file::FileUtils::delete_dir(FLOWFILE_CHECKPOINT_DIRECTORY) >= 0 
&& rocksdb::Checkpoint::Create(db_, &checkpoint).ok()) {
+    if (checkpoint->CreateCheckpoint(FLOWFILE_CHECKPOINT_DIRECTORY).ok()) {
+      checkpoint_ = std::unique_ptr<rocksdb::Checkpoint>(checkpoint);
+      logger_->log_trace("Created checkpoint directory");
+    } else {
+      logger_->log_trace("Could not create checkpoint directory. Not properly 
deleted?");
+    }
+  } else
+    logger_->log_trace("Could not create checkpoint directory. Not properly 
deleted?");
+}
 
-  return;
+void FlowFileRepository::loadComponent(const 
std::shared_ptr<core::ContentRepository> &content_repo) {
+  content_repo_ = content_repo;
+  repo_size_ = 0;
+  initialize_repository();
 }
 
 } /* namespace repository */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed6dc27f/extensions/rocksdb-repos/FlowFileRepository.h
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h 
b/extensions/rocksdb-repos/FlowFileRepository.h
index 1bdd440..e7f8fd0 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -18,9 +18,11 @@
 #ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_
 #define LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_
 
+#include "utils/file/FileUtils.h"
 #include "rocksdb/db.h"
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
+#include "rocksdb/utilities/checkpoint.h"
 #include "core/Repository.h"
 #include "core/Core.h"
 #include "Connection.h"
@@ -35,6 +37,7 @@ namespace core {
 namespace repository {
 
 #define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository"
+#define FLOWFILE_CHECKPOINT_DIRECTORY "./flowfile_checkpoint"
 #define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
 #define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
 #define FLOWFILE_REPOSITORY_PURGE_PERIOD (2000) // 2000 msec
@@ -48,15 +51,16 @@ class FlowFileRepository : public core::Repository, public 
std::enable_shared_fr
   // Constructor
 
   FlowFileRepository(std::string name, uuid_t uuid)
-      : FlowFileRepository(name){
-
+      : FlowFileRepository(name) {
   }
 
   FlowFileRepository(const std::string repo_name = "", std::string directory = 
FLOWFILE_REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = 
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
                      int64_t maxPartitionBytes = 
MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = 
FLOWFILE_REPOSITORY_PURGE_PERIOD)
-      : core::SerializableComponent(repo_name,0), 
Repository(repo_name.length() > 0 ? repo_name : 
core::getClassName<FlowFileRepository>(), directory, maxPartitionMillis, 
maxPartitionBytes, purgePeriod),
+      : core::SerializableComponent(repo_name, 0),
+        Repository(repo_name.length() > 0 ? repo_name : 
core::getClassName<FlowFileRepository>(), directory, maxPartitionMillis, 
maxPartitionBytes, purgePeriod),
         content_repo_(nullptr),
-        logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()){
+        checkpoint_(nullptr),
+        logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()) {
     db_ = NULL;
   }
 
@@ -152,9 +156,27 @@ class FlowFileRepository : public core::Repository, public 
std::enable_shared_fr
   }
 
  private:
+
+  /**
+   * Initialize the repository
+   */
+  void initialize_repository();
+
+  /**
+   * Returns true if a checkpoint is needed at startup
+   * @return true if a checkpoint is needed.
+   */
+  bool need_checkpoint();
+
+  /**
+   * Prunes stored flow files.
+   */
+  void prune_stored_flowfiles();
+
   moodycamel::ConcurrentQueue<std::string> keys_to_delete;
   std::shared_ptr<core::ContentRepository> content_repo_;
   rocksdb::DB* db_;
+  std::unique_ptr<rocksdb::Checkpoint> checkpoint_;
   std::shared_ptr<logging::Logger> logger_;
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed6dc27f/libminifi/include/utils/file/DiffUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/file/DiffUtils.h 
b/libminifi/include/utils/file/DiffUtils.h
new file mode 100644
index 0000000..d44360d
--- /dev/null
+++ b/libminifi/include/utils/file/DiffUtils.h
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_UTILS_DIFFUTILS_H_
+#define LIBMINIFI_INCLUDE_UTILS_DIFFUTILS_H_
+
+#include <sstream>
+#include <fstream>
+#ifdef BOOST_VERSION
+#include <boost/filesystem.hpp>
+#else
+#include <cstdlib>
+#include <sys/stat.h>
+#include <dirent.h>
+#endif
+#include <cstdio>
+#include <unistd.h>
+#include <fcntl.h>
+#ifdef WIN32
+#define stat _stat
+#endif
+
+#ifdef BDIFF
+
+extern "C"
+{
+#include "bsdiff.h"
+#include "bspatch.h"
+}
+#else
+int apply_bsdiff_patch(const char *oldfile, const char *newfile, const char 
*patch) {
+  return -1;
+}
+
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace file {
+
+/**
+ * Simple implementation of some file system utilities.
+ *
+ */
+class DiffUtils {
+ public:
+
+  DiffUtils() = delete;
+
+  static int apply_binary_diff(const char *file_original, const char 
*file_new, const char *result_file) {
+    return apply_bsdiff_patch(file_original, file_new, result_file);
+  }
+
+  static int binary_diff(const char *file_original, const char *file_other, 
const char *patchfile) {
+    return -1;
+  }
+
+};
+
+} /* namespace file */
+} /* namespace utils */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_UTILS_DIFFUTILS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed6dc27f/libminifi/include/utils/file/FileUtils.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/file/FileUtils.h 
b/libminifi/include/utils/file/FileUtils.h
index 767ede7..21b13d8 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -17,6 +17,8 @@
 #ifndef LIBMINIFI_INCLUDE_UTILS_FILEUTILS_H_
 #define LIBMINIFI_INCLUDE_UTILS_FILEUTILS_H_
 
+#include <sstream>
+#include <fstream>
 #ifdef BOOST_VERSION
 #include <boost/filesystem.hpp>
 #else
@@ -31,20 +33,6 @@
 #define stat _stat
 #endif
 
-#ifdef BDIFF
-
-extern "C"
-{
-#include "bsdiff.h"
-#include "bspatch.h"
-}
-#else
-int apply_bsdiff_patch(const char *oldfile, const char *newfile, const char 
*patch) {
-  return -1;
-}
-
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
@@ -61,6 +49,66 @@ class FileUtils {
 
   FileUtils() = delete;
 
+  /**
+   * Deletes a directory, deleting recursively if delete_files_recursively is 
true
+   * @param path current path to delete
+   * @param delete_files_recursively deletes recursively
+   */
+  static int64_t delete_dir(const std::string &path, bool 
delete_files_recursively = true) {
+#ifdef BOOST_VERSION
+    try {
+      if (boost::filesystem::exists(path)) {
+        if (delete_files_recursively) {
+          boost::filesystem::remove_all(path);
+        }
+        else {
+          boost::filesystem::remove(path);
+        }
+      }
+    } catch(boost::filesystem::filesystem_error const & e)
+    {
+      return -1;
+      //display error message
+    }
+    return 0;
+#else
+    DIR *current_directory = opendir(path.c_str());
+    int r = -1;
+    if (current_directory) {
+      struct dirent *p;
+      r = 0;
+      while (!r && (p = readdir(current_directory))) {
+        int r2 = -1;
+        std::stringstream newpath;
+        if (!strcmp(p->d_name, ".") || !strcmp(p->d_name, "..")) {
+          continue;
+        }
+        struct stat statbuf;
+
+        newpath << path << "/" << p->d_name;
+
+        if (!stat(newpath.str().c_str(), &statbuf)) {
+          if (S_ISDIR(statbuf.st_mode)) {
+            if (delete_files_recursively) {
+              r2 = delete_dir(newpath.str(), delete_files_recursively);
+            }
+          } else {
+            r2 = unlink(newpath.str().c_str());
+          }
+        }
+        r = r2;
+      }
+      closedir(current_directory);
+    }
+
+    if (!r) {
+      return rmdir(path.c_str());
+    }
+
+    return 0;
+#endif
+  }
+
   static uint64_t last_write_time(const std::string &path) {
 #ifdef BOOST_VERSION
     return boost::filesystem::last_write_time(movedFile.str());
@@ -103,14 +151,6 @@ class FileUtils {
     return 0;
   }
 
-  static int apply_binary_diff(const char *file_original, const char 
*file_new, const char *result_file) {
-    return apply_bsdiff_patch(file_original, file_new, result_file);
-  }
-
-  static int binary_diff(const char *file_original, const char *file_other, 
const char *patchfile) {
-    return -1;
-  }
-
 };
 
 } /* namespace file */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed6dc27f/libminifi/src/c2/C2Agent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index c71283d..cb7e0f4 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -28,6 +28,7 @@
 #include "core/state/UpdateController.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "utils/file/DiffUtils.h"
 #include "utils/file/FileUtils.h"
 #include "utils/file/FileManager.h"
 namespace org {
@@ -648,7 +649,7 @@ void C2Agent::handle_update(const C2ContentResponse &resp) {
 
       if (allow_updates_) {
         if (partial_update && !bin_location_.empty()) {
-          utils::file::FileUtils::apply_binary_diff(bin_location_.c_str(), 
file_path.c_str(), update_location_.c_str());
+          utils::file::DiffUtils::apply_binary_diff(bin_location_.c_str(), 
file_path.c_str(), update_location_.c_str());
         } else {
           utils::file::FileUtils::copy_file(file_path, update_location_);
         }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed6dc27f/libminifi/test/rocksdb-tests/RepoTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp 
b/libminifi/test/rocksdb-tests/RepoTests.cpp
index 2da5713..c92133a 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -18,6 +18,8 @@
 #include "../TestBase.h"
 #include <memory>
 #include <string>
+#include <chrono>
+#include <thread>
 #include <map>
 #include "../unit/ProvenanceTestHelper.h"
 #include "provenance/Provenance.h"
@@ -159,3 +161,59 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") {
 
   LogTestController::getInstance().reset();
 }
+
+
+TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  
LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>();
+  
LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
+
+  char *dir = testController.createTempDirectory(format);
+
+  std::shared_ptr<core::repository::FlowFileRepository> repository = 
std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, 1);
+
+  std::map<std::string, std::string> attributes;
+
+  std::fstream file;
+  std::stringstream ss;
+  ss << dir << "/" << "tstFile.ext";
+  file.open(ss.str(), std::ios::out);
+  file << "tempFile";
+  file.close();
+
+  std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::FileSystemRepository>();
+
+  repository->initialize(std::make_shared<minifi::Configure>());
+
+  repository->loadComponent(content_repo);
+
+  std::shared_ptr<minifi::ResourceClaim> claim = 
std::make_shared<minifi::ResourceClaim>(ss.str(), content_repo);
+
+  minifi::FlowFileRecord record(repository, content_repo, attributes, claim);
+
+  record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
+
+  record.addAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd");
+
+  REQUIRE(true == record.Serialize());
+
+  repository->flush();
+
+  repository->stop();
+
+  repository->loadComponent(content_repo);
+
+  repository->start();
+
+  // sleep for 100 ms to let the delete work.
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+  repository->stop();
+
+  std::ifstream fileopen(ss.str());
+  REQUIRE(false == fileopen.good());
+
+  LogTestController::getInstance().reset();
+}

Reply via email to