Repository: nifi-minifi-cpp Updated Branches: refs/heads/master b57af84d6 -> ed6dc27f6
MINIFICPP-554: Add checkpointing to the startup process to delete flow files to avoid startup problems. This closes #373. Approved by achristianson on GH. Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/ed6dc27f Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/ed6dc27f Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/ed6dc27f Branch: refs/heads/master Commit: ed6dc27f6373b41da85a9c1265fce7a9bce617b5 Parents: b57af84 Author: Marc Parisi <[email protected]> Authored: Mon Jul 16 08:13:13 2018 -0400 Committer: Marc Parisi <[email protected]> Committed: Tue Jul 24 08:58:59 2018 -0400 ---------------------------------------------------------------------- .gitignore | 1 + extensions/rocksdb-repos/FlowFileRepository.cpp | 76 ++++++++++++++---- extensions/rocksdb-repos/FlowFileRepository.h | 30 ++++++- libminifi/include/utils/file/DiffUtils.h | 83 +++++++++++++++++++ libminifi/include/utils/file/FileUtils.h | 84 +++++++++++++++----- libminifi/src/c2/C2Agent.cpp | 3 +- libminifi/test/rocksdb-tests/RepoTests.cpp | 58 ++++++++++++++ 7 files changed, 292 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed6dc27f/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 1bbca52..e0a931f 100644 --- a/.gitignore +++ b/.gitignore @@ -41,6 +41,7 @@ CTestTestfile.cmake cmake-build-debug # Generated files +flowfile_checkpoint build bt_state bin http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed6dc27f/extensions/rocksdb-repos/FlowFileRepository.cpp ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp index 0d055ea..634819c 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.cpp +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -73,6 +73,9 @@ void FlowFileRepository::flush() { void FlowFileRepository::run() { // threshold for purge + if (running_) { + prune_stored_flowfiles(); + } while (running_) { std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); @@ -80,19 +83,31 @@ void FlowFileRepository::run() { uint64_t size = getRepoSize(); - if (size > (uint64_t)max_partition_bytes_) + if (size > (uint64_t) max_partition_bytes_) repo_full_ = true; else repo_full_ = false; } } -void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { - content_repo_ = content_repo; - std::vector<std::pair<std::string, uint64_t>> purgeList; - rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions()); +void FlowFileRepository::prune_stored_flowfiles() { + rocksdb::DB* stored_database_; + bool corrupt_checkpoint = false; + if (nullptr != checkpoint_) { + rocksdb::Options options; + options.create_if_missing = true; + options.use_direct_io_for_flush_and_compaction = true; + options.use_direct_reads = true; + rocksdb::Status status = rocksdb::DB::OpenForReadOnly(options, FLOWFILE_CHECKPOINT_DIRECTORY, &stored_database_); + if (!status.ok()) { + stored_database_ = db_; + } + } else { + logger_->log_trace("Could not open checkpoint as object doesn't exist. Likely not needed or file system error."); + return; + } - repo_size_ = 0; + rocksdb::Iterator* it = stored_database_->NewIterator(rocksdb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_); std::string key = it->key().ToString(); @@ -100,34 +115,63 @@ void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentReposi if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) { logger_->log_debug("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath()); auto search = connectionMap.find(eventRead->getConnectionUuid()); - if (search != connectionMap.end()) { + if (!corrupt_checkpoint && search != connectionMap.end()) { // we find the connection for the persistent flowfile, create the flowfile and enqueue that std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead); eventRead->setStoredToRepository(true); search->second->put(eventRead); } else { - logger_->log_warn("Could not find connectinon for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath()); + logger_->log_warn("Could not find connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath()); if (eventRead->getContentFullPath().length() > 0) { if (nullptr != eventRead->getResourceClaim()) { content_repo_->remove(eventRead->getResourceClaim()); } } - purgeList.push_back(std::make_pair(key, it->value().size())); + keys_to_delete.enqueue(key); } } else { - purgeList.push_back(std::make_pair(key, it->value().size())); + keys_to_delete.enqueue(key); } } delete it; - for (auto eventId : purgeList) { - logger_->log_debug("Repository Repo %s Purge %s", name_, eventId.first); - if (Delete(eventId.first)) { - repo_size_ -= eventId.second; - } + +} + +/** + * Returns True if there is data to interrogate. + * @return true if our db has data stored. + */ +bool FlowFileRepository::need_checkpoint(){ + std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions())); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + return true; } + return false; +} +void FlowFileRepository::initialize_repository() { + // first we need to establish a checkpoint iff it is needed. + if (!need_checkpoint()){ + logger_->log_trace("Do not need checkpoint"); + return; + } + rocksdb::Checkpoint *checkpoint; + // delete any previous copy + if (utils::file::FileUtils::delete_dir(FLOWFILE_CHECKPOINT_DIRECTORY) >= 0 && rocksdb::Checkpoint::Create(db_, &checkpoint).ok()) { + if (checkpoint->CreateCheckpoint(FLOWFILE_CHECKPOINT_DIRECTORY).ok()) { + checkpoint_ = std::unique_ptr<rocksdb::Checkpoint>(checkpoint); + logger_->log_trace("Created checkpoint directory"); + } else { + logger_->log_trace("Could not create checkpoint directory. Not properly deleted?"); + } + } else + logger_->log_trace("Could not create checkpoint directory. Not properly deleted?"); +} - return; +void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { + content_repo_ = content_repo; + repo_size_ = 0; + initialize_repository(); } } /* namespace repository */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed6dc27f/extensions/rocksdb-repos/FlowFileRepository.h ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h index 1bdd440..e7f8fd0 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.h +++ b/extensions/rocksdb-repos/FlowFileRepository.h @@ -18,9 +18,11 @@ #ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_ #define LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_ +#include "utils/file/FileUtils.h" #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" +#include "rocksdb/utilities/checkpoint.h" #include "core/Repository.h" #include "core/Core.h" #include "Connection.h" @@ -35,6 +37,7 @@ namespace core { namespace repository { #define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository" +#define FLOWFILE_CHECKPOINT_DIRECTORY "./flowfile_checkpoint" #define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M #define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute #define FLOWFILE_REPOSITORY_PURGE_PERIOD (2000) // 2000 msec @@ -48,15 +51,16 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr // Constructor FlowFileRepository(std::string name, uuid_t uuid) - : FlowFileRepository(name){ - + : FlowFileRepository(name) { } FlowFileRepository(const std::string repo_name = "", std::string directory = FLOWFILE_REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD) - : core::SerializableComponent(repo_name,0), Repository(repo_name.length() > 0 ? repo_name : core::getClassName<FlowFileRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod), + : core::SerializableComponent(repo_name, 0), + Repository(repo_name.length() > 0 ? repo_name : core::getClassName<FlowFileRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod), content_repo_(nullptr), - logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()){ + checkpoint_(nullptr), + logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()) { db_ = NULL; } @@ -152,9 +156,27 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr } private: + + /** + * Initialize the repository + */ + void initialize_repository(); + + /** + * Returns true if a checkpoint is needed at startup + * @return true if a checkpoint is needed. + */ + bool need_checkpoint(); + + /** + * Prunes stored flow files. + */ + void prune_stored_flowfiles(); + moodycamel::ConcurrentQueue<std::string> keys_to_delete; std::shared_ptr<core::ContentRepository> content_repo_; rocksdb::DB* db_; + std::unique_ptr<rocksdb::Checkpoint> checkpoint_; std::shared_ptr<logging::Logger> logger_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed6dc27f/libminifi/include/utils/file/DiffUtils.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/file/DiffUtils.h b/libminifi/include/utils/file/DiffUtils.h new file mode 100644 index 0000000..d44360d --- /dev/null +++ b/libminifi/include/utils/file/DiffUtils.h @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_UTILS_DIFFUTILS_H_ +#define LIBMINIFI_INCLUDE_UTILS_DIFFUTILS_H_ + +#include <sstream> +#include <fstream> +#ifdef BOOST_VERSION +#include <boost/filesystem.hpp> +#else +#include <cstdlib> +#include <sys/stat.h> +#include <dirent.h> +#endif +#include <cstdio> +#include <unistd.h> +#include <fcntl.h> +#ifdef WIN32 +#define stat _stat +#endif + +#ifdef BDIFF + +extern "C" +{ +#include "bsdiff.h" +#include "bspatch.h" +} +#else +int apply_bsdiff_patch(const char *oldfile, const char *newfile, const char *patch) { + return -1; +} + +#endif + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { +namespace file { + +/** + * Simple implementation of some file system utilities. + * + */ +class DiffUtils { + public: + + DiffUtils() = delete; + + static int apply_binary_diff(const char *file_original, const char *file_new, const char *result_file) { + return apply_bsdiff_patch(file_original, file_new, result_file); + } + + static int binary_diff(const char *file_original, const char *file_other, const char *patchfile) { + return -1; + } + +}; + +} /* namespace file */ +} /* namespace utils */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_UTILS_DIFFUTILS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed6dc27f/libminifi/include/utils/file/FileUtils.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h index 767ede7..21b13d8 100644 --- a/libminifi/include/utils/file/FileUtils.h +++ b/libminifi/include/utils/file/FileUtils.h @@ -17,6 +17,8 @@ #ifndef LIBMINIFI_INCLUDE_UTILS_FILEUTILS_H_ #define LIBMINIFI_INCLUDE_UTILS_FILEUTILS_H_ +#include <sstream> +#include <fstream> #ifdef BOOST_VERSION #include <boost/filesystem.hpp> #else @@ -31,20 +33,6 @@ #define stat _stat #endif -#ifdef BDIFF - -extern "C" -{ -#include "bsdiff.h" -#include "bspatch.h" -} -#else -int apply_bsdiff_patch(const char *oldfile, const char *newfile, const char *patch) { - return -1; -} - -#endif - namespace org { namespace apache { namespace nifi { @@ -61,6 +49,66 @@ class FileUtils { FileUtils() = delete; + /** + * Deletes a directory, deleting recursively if delete_files_recursively is true + * @param path current path to delete + * @param delete_files_recursively deletes recursively + */ + static int64_t delete_dir(const std::string &path, bool delete_files_recursively = true) { +#ifdef BOOST_VERSION + try { + if (boost::filesystem::exists(path)) { + if (delete_files_recursively) { + boost::filesystem::remove_all(path); + } + else { + boost::filesystem::remove(path); + } + } + } catch(boost::filesystem::filesystem_error const & e) + { + return -1; + //display error message + } + return 0; +#else + DIR *current_directory = opendir(path.c_str()); + int r = -1; + if (current_directory) { + struct dirent *p; + r = 0; + while (!r && (p = readdir(current_directory))) { + int r2 = -1; + std::stringstream newpath; + if (!strcmp(p->d_name, ".") || !strcmp(p->d_name, "..")) { + continue; + } + struct stat statbuf; + + newpath << path << "/" << p->d_name; + + if (!stat(newpath.str().c_str(), &statbuf)) { + if (S_ISDIR(statbuf.st_mode)) { + if (delete_files_recursively) { + r2 = delete_dir(newpath.str(), delete_files_recursively); + } + } else { + r2 = unlink(newpath.str().c_str()); + } + } + r = r2; + } + closedir(current_directory); + } + + if (!r) { + return rmdir(path.c_str()); + } + + return 0; +#endif + } + static uint64_t last_write_time(const std::string &path) { #ifdef BOOST_VERSION return boost::filesystem::last_write_time(movedFile.str()); @@ -103,14 +151,6 @@ class FileUtils { return 0; } - static int apply_binary_diff(const char *file_original, const char *file_new, const char *result_file) { - return apply_bsdiff_patch(file_original, file_new, result_file); - } - - static int binary_diff(const char *file_original, const char *file_other, const char *patchfile) { - return -1; - } - }; } /* namespace file */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed6dc27f/libminifi/src/c2/C2Agent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index c71283d..cb7e0f4 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -28,6 +28,7 @@ #include "core/state/UpdateController.h" #include "core/logging/Logger.h" #include "core/logging/LoggerConfiguration.h" +#include "utils/file/DiffUtils.h" #include "utils/file/FileUtils.h" #include "utils/file/FileManager.h" namespace org { @@ -648,7 +649,7 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { if (allow_updates_) { if (partial_update && !bin_location_.empty()) { - utils::file::FileUtils::apply_binary_diff(bin_location_.c_str(), file_path.c_str(), update_location_.c_str()); + utils::file::DiffUtils::apply_binary_diff(bin_location_.c_str(), file_path.c_str(), update_location_.c_str()); } else { utils::file::FileUtils::copy_file(file_path, update_location_); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/ed6dc27f/libminifi/test/rocksdb-tests/RepoTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp index 2da5713..c92133a 100644 --- a/libminifi/test/rocksdb-tests/RepoTests.cpp +++ b/libminifi/test/rocksdb-tests/RepoTests.cpp @@ -18,6 +18,8 @@ #include "../TestBase.h" #include <memory> #include <string> +#include <chrono> +#include <thread> #include <map> #include "../unit/ProvenanceTestHelper.h" #include "provenance/Provenance.h" @@ -159,3 +161,59 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") { LogTestController::getInstance().reset(); } + + +TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") { + TestController testController; + char format[] = "/tmp/testRepo.XXXXXX"; + LogTestController::getInstance().setDebug<core::ContentRepository>(); + LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>(); + LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>(); + + char *dir = testController.createTempDirectory(format); + + std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, 1); + + std::map<std::string, std::string> attributes; + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>(); + + repository->initialize(std::make_shared<minifi::Configure>()); + + repository->loadComponent(content_repo); + + std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ss.str(), content_repo); + + minifi::FlowFileRecord record(repository, content_repo, attributes, claim); + + record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd"); + + record.addAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd"); + + REQUIRE(true == record.Serialize()); + + repository->flush(); + + repository->stop(); + + repository->loadComponent(content_repo); + + repository->start(); + + // sleep for 100 ms to let the delete work. + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + repository->stop(); + + std::ifstream fileopen(ss.str()); + REQUIRE(false == fileopen.good()); + + LogTestController::getInstance().reset(); +}
