Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 44704b363 -> a8de19653
MINIFI-246: Adding tests that were used to find bug. Solved by MINIFI-193 MINIFI-246: Resolve second issue with provenance repository getting corrupt This closes #72. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a8de1965 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a8de1965 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a8de1965 Branch: refs/heads/master Commit: a8de19653424454f8e6a889d89ec23e8f5dfa228 Parents: 44704b3 Author: Marc Parisi <[email protected]> Authored: Thu Mar 30 12:16:03 2017 -0400 Committer: Aldrin Piri <[email protected]> Committed: Fri Mar 31 15:58:19 2017 -0400 ---------------------------------------------------------------------- libminifi/include/FlowFileRecord.h | 13 +- libminifi/include/core/Repository.h | 7 +- libminifi/include/core/core.h | 9 +- .../core/repository/FlowFileRepository.h | 18 +- .../include/provenance/ProvenanceRepository.h | 12 + libminifi/src/FlowController.cpp | 5 +- libminifi/src/FlowFileRecord.cpp | 3 - libminifi/src/core/Record.cpp | 2 + libminifi/src/core/RepositoryFactory.cpp | 4 +- .../src/core/repository/FlowFileRepository.cpp | 2 +- libminifi/src/io/Serializable.cpp | 255 +++++++++---------- libminifi/src/provenance/Provenance.cpp | 1 + .../src/provenance/ProvenanceRepository.cpp | 3 + libminifi/test/TestBase.h | 38 ++- libminifi/test/unit/ProcessorTests.cpp | 2 - libminifi/test/unit/ProvenanceTestHelper.h | 1 - libminifi/test/unit/ProvenanceTests.cpp | 4 - libminifi/test/unit/RepoTests.cpp | 151 +++++++++++ 18 files changed, 375 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/include/FlowFileRecord.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h index ca0856c..5c6f049 100644 --- a/libminifi/include/FlowFileRecord.h +++ b/libminifi/include/FlowFileRecord.h @@ -146,12 +146,17 @@ class FlowFileRecord : public core::FlowFile, public io::Serializable { const std::string getConnectionUuid() { return uuid_connection_; } - - const std::string getContentFullPath() - { + + /** + * Set the UUID connection. + */ + void setUuidConnection(const std::string &uuid_connection) { + uuid_connection_ = uuid_connection; + } + + const std::string getContentFullPath() { return content_full_fath_; } - FlowFileRecord &operator=(const FlowFileRecord &); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/include/core/Repository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h index a668df5..e096023 100644 --- a/libminifi/include/core/Repository.h +++ b/libminifi/include/core/Repository.h @@ -137,12 +137,13 @@ class Repository : public CoreComponent { uint64_t repoSize(); // size of the directory std::atomic<uint64_t> repo_size_; - - private: // Run function for the thread - void threadExecutor(){ + void threadExecutor(){ run(); } + + + }; } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/include/core/core.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/core.h b/libminifi/include/core/core.h index 9f86100..a70dbd4 100644 --- a/libminifi/include/core/core.h +++ b/libminifi/include/core/core.h @@ -66,13 +66,13 @@ struct class_operations { template<typename T> -typename std::enable_if<!class_operations<T>::value, T*>::type instantiate() { +typename std::enable_if<!class_operations<T>::value, std::shared_ptr<T>>::type instantiate() { throw std::runtime_error("Cannot instantiate class"); } template<typename T> -typename std::enable_if<class_operations<T>::value, T*>::type instantiate() { - return new T(); +typename std::enable_if<class_operations<T>::value, std::shared_ptr<T>>::type instantiate() { + return std::make_shared<T>(); } /** @@ -140,6 +140,9 @@ class CoreComponent { const std::string & getUUIDStr() { return uuidStr_; } + + void loadComponent(){ + } protected: // A global unique identifier http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/include/core/repository/FlowFileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/FlowFileRepository.h b/libminifi/include/core/repository/FlowFileRepository.h index 31e655a..0115588 100644 --- a/libminifi/include/core/repository/FlowFileRepository.h +++ b/libminifi/include/core/repository/FlowFileRepository.h @@ -153,9 +153,25 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr return false; } - void loadFlowFileToConnections(std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap); + void setConnectionMap(std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap) + { + this->connectionMap=connectionMap; + } + void loadComponent(); + void start() { + if (this->purge_period_ <= 0) + return; + if (running_) + return; + thread_ = std::thread(&FlowFileRepository::run, shared_from_this()); + thread_.detach(); + running_ = true; + logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); +} + private: + std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap; leveldb::DB* db_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/include/provenance/ProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h index 0f8ee5d..8dc152f 100644 --- a/libminifi/include/provenance/ProvenanceRepository.h +++ b/libminifi/include/provenance/ProvenanceRepository.h @@ -58,6 +58,17 @@ class ProvenanceRepository : public core::Repository, if (db_) delete db_; } + + void start() { + if (this->purge_period_ <= 0) + return; + if (running_) + return; + thread_ = std::thread(&ProvenanceRepository::run, shared_from_this()); + thread_.detach(); + running_ = true; + logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); +} // initialize virtual bool initialize() { @@ -164,3 +175,4 @@ class ProvenanceRepository : public core::Repository, } /* namespace apache */ } /* namespace org */ #endif /* LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ */ + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index e472a9a..433387a 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -152,6 +152,8 @@ FlowController::~FlowController() { unload(); if (NULL != protocol_) delete protocol_; + flow_file_repo_ = nullptr; + provenance_repo_ = nullptr; } @@ -264,7 +266,8 @@ void FlowController::loadFlowRepo() { } auto rep = std::static_pointer_cast<core::repository::FlowFileRepository>( flow_file_repo_); - rep->loadFlowFileToConnections(connectionMap); + rep->setConnectionMap(connectionMap); + flow_file_repo_->loadComponent(); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/FlowFileRecord.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index 7383574..562a685 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -269,9 +269,6 @@ bool FlowFileRecord::Serialize() { return false; } - // Persistent to the DB - - if (flow_repository_->Put(uuid_str_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/core/Record.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Record.cpp b/libminifi/src/core/Record.cpp index dbf0102..6f33300 100644 --- a/libminifi/src/core/Record.cpp +++ b/libminifi/src/core/Record.cpp @@ -161,6 +161,8 @@ bool FlowFile::updateAttribute(const std::string key, const std::string value) { } bool FlowFile::addAttribute(const std::string &key, const std::string &value) { + + auto it = attributes_.find(key); if (it != attributes_.end()) { // attribute already there in the map http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/core/RepositoryFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp index ef0b9ef..9bdc7c3 100644 --- a/libminifi/src/core/RepositoryFactory.cpp +++ b/libminifi/src/core/RepositoryFactory.cpp @@ -32,11 +32,11 @@ namespace core { std::shared_ptr<core::Repository> return_obj = nullptr; if (class_name_lc == "flowfilerepository") { - return_obj = std::shared_ptr<core::Repository>((core::Repository*)instantiate<core::repository::FlowFileRepository>()); + return_obj = instantiate<core::repository::FlowFileRepository>(); } else if (class_name_lc == "provenancerepository") { - return_obj = std::shared_ptr<core::Repository>((core::Repository*)instantiate<provenance::ProvenanceRepository>()); + return_obj = instantiate<provenance::ProvenanceRepository>();//std::shared_ptr<core::Repository>((core::Repository*)instantiate<provenance::ProvenanceRepository>()); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/core/repository/FlowFileRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp index c495a67..8f13f39 100644 --- a/libminifi/src/core/repository/FlowFileRepository.cpp +++ b/libminifi/src/core/repository/FlowFileRepository.cpp @@ -47,7 +47,7 @@ void FlowFileRepository::run() { return; } -void FlowFileRepository::loadFlowFileToConnections(std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap) +void FlowFileRepository::loadComponent() { std::vector<std::string> purgeList; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/io/Serializable.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp index f8c623a..7b7f2bd 100644 --- a/libminifi/src/io/Serializable.cpp +++ b/libminifi/src/io/Serializable.cpp @@ -30,198 +30,195 @@ namespace io { #define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32)) - - #define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)),1) template<typename T> -int Serializable::writeData(const T &t,DataStream *stream) { - uint8_t bytes[sizeof t]; - std::copy(static_cast<const char*>(static_cast<const void*>(&t)), - static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, - bytes); - return stream->writeData(bytes, sizeof t); +int Serializable::writeData(const T &t, DataStream *stream) { + uint8_t bytes[sizeof t]; + std::copy(static_cast<const char*>(static_cast<const void*>(&t)), + static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, + bytes); + return stream->writeData(bytes, sizeof t); } template<typename T> int Serializable::writeData(const T &t, uint8_t *to_vec) { - std::copy(static_cast<const char*>(static_cast<const void*>(&t)), - static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, - to_vec); - return sizeof t; + std::copy(static_cast<const char*>(static_cast<const void*>(&t)), + static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, + to_vec); + return sizeof t; } template<typename T> int Serializable::writeData(const T &t, std::vector<uint8_t> &to_vec) { - uint8_t bytes[sizeof t]; - std::copy(static_cast<const char*>(static_cast<const void*>(&t)), - static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, - bytes); - to_vec.insert(to_vec.end(), &bytes[0], &bytes[sizeof t]); - return sizeof t; + uint8_t bytes[sizeof t]; + std::copy(static_cast<const char*>(static_cast<const void*>(&t)), + static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t, + bytes); + to_vec.insert(to_vec.end(), &bytes[0], &bytes[sizeof t]); + return sizeof t; } - - - - -int Serializable::write(uint8_t value,DataStream *stream) { - return stream->writeData(&value, 1); +int Serializable::write(uint8_t value, DataStream *stream) { + return stream->writeData(&value, 1); } -int Serializable::write(char value,DataStream *stream) { - return stream->writeData((uint8_t *) &value, 1); +int Serializable::write(char value, DataStream *stream) { + return stream->writeData((uint8_t *) &value, 1); } -int Serializable::write(uint8_t *value, int len,DataStream *stream) { - return stream->writeData(value, len); +int Serializable::write(uint8_t *value, int len, DataStream *stream) { + return stream->writeData(value, len); } int Serializable::write(bool value) { - uint8_t temp = value; - return write(temp); + uint8_t temp = value; + return write(temp); } -int Serializable::read(uint8_t &value,DataStream *stream) { - uint8_t buf; - - int ret = stream->readData(&buf, 1); - if (ret == 1) - value = buf; - return ret; +int Serializable::read(uint8_t &value, DataStream *stream) { + uint8_t buf; + + int ret = stream->readData(&buf, 1); + if (ret == 1) + value = buf; + return ret; } -int Serializable::read(char &value,DataStream *stream) { - uint8_t buf; +int Serializable::read(char &value, DataStream *stream) { + uint8_t buf; - int ret = stream->readData(&buf, 1); - if (ret == 1) - value = (char) buf; - return ret; + int ret = stream->readData(&buf, 1); + if (ret == 1) + value = (char) buf; + return ret; } -int Serializable::read(uint8_t *value, int len,DataStream *stream) { - return stream->readData(value, len); +int Serializable::read(uint8_t *value, int len, DataStream *stream) { + return stream->readData(value, len); } -int Serializable::read(uint16_t &value,DataStream *stream, bool is_little_endian) { - return stream->read(value, is_little_endian); +int Serializable::read(uint16_t &value, DataStream *stream, + bool is_little_endian) { + return stream->read(value, is_little_endian); } -int Serializable::read(uint32_t &value,DataStream *stream, bool is_little_endian) { - return stream->read(value, is_little_endian); +int Serializable::read(uint32_t &value, DataStream *stream, + bool is_little_endian) { + return stream->read(value, is_little_endian); } -int Serializable::read(uint64_t &value,DataStream *stream, bool is_little_endian) { - return stream->read(value, is_little_endian); +int Serializable::read(uint64_t &value, DataStream *stream, + bool is_little_endian) { + return stream->read(value, is_little_endian); } -int Serializable::write(uint32_t base_value,DataStream *stream, bool is_little_endian) { +int Serializable::write(uint32_t base_value, DataStream *stream, + bool is_little_endian) { - const uint32_t value = is_little_endian ? htonl(base_value) : base_value; + const uint32_t value = is_little_endian ? htonl(base_value) : base_value; - return writeData(value,stream); + return writeData(value, stream); } -int Serializable::write(uint64_t base_value,DataStream *stream, bool is_little_endian) { +int Serializable::write(uint64_t base_value, DataStream *stream, + bool is_little_endian) { - const uint64_t value = - is_little_endian == 1 ? htonll_r(base_value) : base_value; - return writeData(value,stream); + const uint64_t value = + is_little_endian == 1 ? htonll_r(base_value) : base_value; + return writeData(value, stream); } -int Serializable::write(uint16_t base_value,DataStream *stream, bool is_little_endian) { +int Serializable::write(uint16_t base_value, DataStream *stream, + bool is_little_endian) { - const uint16_t value = - is_little_endian == 1 ? htons(base_value) : base_value; + const uint16_t value = is_little_endian == 1 ? htons(base_value) : base_value; - return writeData(value,stream); + return writeData(value, stream); } -int Serializable::readUTF(std::string &str,DataStream *stream, bool widen) { - uint32_t utflen=0; - int ret = 1; - if (!widen) { - uint16_t shortLength = 0; - ret = read(shortLength,stream); - utflen = shortLength; - - if (ret <= 0) - return ret; - } else { - uint32_t len; - ret = read(len,stream); - - if (ret <= 0) - return ret; - utflen = len; - } +int Serializable::readUTF(std::string &str, DataStream *stream, bool widen) { + uint32_t utflen = 0; + int ret = 1; + if (!widen) { + uint16_t shortLength = 0; + ret = read(shortLength, stream); + utflen = shortLength; + if (ret <= 0) + return ret; + } else { + uint32_t len; + ret = read(len, stream); + if (ret <= 0) + return ret; + utflen = len; + } - if (utflen == 0) - return 1; + if (utflen == 0) { + str = ""; + return 1; + } - std::vector<uint8_t> buf; - ret = stream->readData(buf, utflen); + std::vector<uint8_t> buf; + ret = stream->readData(buf, utflen); - // The number of chars produced may be less than utflen - str = std::string((const char*)&buf[0],utflen); + // The number of chars produced may be less than utflen + str = std::string((const char*) &buf[0], utflen); - return utflen; + return utflen; } -int Serializable::writeUTF(std::string str,DataStream *stream, bool widen) { - int inLength = str.length(); - uint32_t utflen = 0; - int currentPtr = 0; +int Serializable::writeUTF(std::string str, DataStream *stream, bool widen) { + int inLength = str.length(); + uint32_t utflen = 0; + int currentPtr = 0; - utflen = str.length(); + utflen = str.length(); - if (utflen > 65535) - return -1; + if (utflen > 65535) + return -1; - if (utflen == 0) { - - if (!widen) { - uint16_t shortLen = utflen; - write(shortLen,stream); - } else { - write(utflen,stream); - } - return 1; - } - - std::vector<uint8_t> utf_to_write; - if (!widen) { - utf_to_write.resize(utflen); - } else { - utf_to_write.resize(utflen); - } - - int i = 0; - - - uint8_t *underlyingPtr = &utf_to_write[0]; - for (auto c : str) { - writeData(c, underlyingPtr++); - } - int ret; + if (utflen == 0) { if (!widen) { - - uint16_t short_length = utflen; - write(short_length,stream); - ret = stream->writeData(utf_to_write.data(), utflen); + uint16_t shortLen = utflen; + write(shortLen, stream); } else { - //utflen += 4; - write(utflen,stream); - ret = stream->writeData(utf_to_write.data(), utflen); + write(utflen, stream); } - return ret; + return 1; + } + + std::vector<uint8_t> utf_to_write; + if (!widen) { + utf_to_write.resize(utflen); + } else { + utf_to_write.resize(utflen); + } + + int i = 0; + + uint8_t *underlyingPtr = &utf_to_write[0]; + for (auto c : str) { + writeData(c, underlyingPtr++); + } + int ret; + + if (!widen) { + + uint16_t short_length = utflen; + write(short_length, stream); + ret = stream->writeData(utf_to_write.data(), utflen); + } else { + //utflen += 4; + write(utflen, stream); + ret = stream->writeData(utf_to_write.data(), utflen); + } + return ret; } - } /* namespace io */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/provenance/Provenance.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp index a90e182..289f026 100644 --- a/libminifi/src/provenance/Provenance.cpp +++ b/libminifi/src/provenance/Provenance.cpp @@ -238,6 +238,7 @@ bool ProvenanceEventRecord::Serialize( _eventIdStr.c_str(), outStream.getSize()); } + // cleanup return true; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/provenance/ProvenanceRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp index 88455be..6fe332b 100644 --- a/libminifi/src/provenance/ProvenanceRepository.cpp +++ b/libminifi/src/provenance/ProvenanceRepository.cpp @@ -31,6 +31,7 @@ void ProvenanceRepository::run() { // threshold for purge uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4; while (running_) { + std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); uint64_t curTime = getTimeMillis(); uint64_t size = repoSize(); @@ -53,6 +54,7 @@ void ProvenanceRepository::run() { } delete it; std::vector<std::string>::iterator itPurge; + for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) { std::string eventId = *itPurge; logger_->log_info("ProvenanceRepository Repo Purge %s", @@ -64,6 +66,7 @@ void ProvenanceRepository::run() { repo_full_ = true; else repo_full_ = false; + } return; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/test/TestBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index f73174b..4f926d3 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -18,15 +18,16 @@ #ifndef LIBMINIFI_TEST_TESTBASE_H_ #define LIBMINIFI_TEST_TESTBASE_H_ +#include <dirent.h> #include <cstdio> #include <cstdlib> #include "ResourceClaim.h" #include "catch.hpp" #include <vector> +#include "core/logging/LogAppenders.h" #include "core/logging/Logger.h" #include "core/core.h" - class LogTestController { public: LogTestController(const std::string level = "debug") { @@ -52,16 +53,51 @@ class TestController { ~TestController() { for (auto dir : directories) { + DIR *created_dir; + struct dirent *dir_entry; + created_dir = opendir(dir); + if (created_dir != NULL) { + while ((dir_entry = readdir(created_dir)) != NULL) { + if (dir_entry->d_name[0] != '.') { + + std::string file(dir); + file += "/"; + file += dir_entry->d_name; + unlink(file.c_str()); + } + } + } + closedir(created_dir); rmdir(dir); } } + void setDebugToConsole() { + std::ostringstream oss; + std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr< + logging::BaseLogger>( + new org::apache::nifi::minifi::core::logging::OutputStreamAppender( + std::cout, minifi::Configure::getConfigure())); + std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); + logger->updateLogger(std::move(outputLogger)); + } + + void setNullAppender() { + + std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr< + logging::BaseLogger>( + new org::apache::nifi::minifi::core::logging::NullAppender()); + std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); + logger->updateLogger(std::move(outputLogger)); + } + void enableDebug() { log.enableDebug(); } char *createTempDirectory(char *format) { char *dir = mkdtemp(format); + directories.push_back(dir); return dir; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/test/unit/ProcessorTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp index 4f08d5d..c040e4d 100644 --- a/libminifi/test/unit/ProcessorTests.cpp +++ b/libminifi/test/unit/ProcessorTests.cpp @@ -105,7 +105,6 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { processor->setScheduledState(core::ScheduledState::RUNNING); processor->onTrigger(&context, &session); unlink(ss.str().c_str()); - rmdir(dir); reporter = session.getProvenanceReporter(); REQUIRE( processor->getName() == "getfileCreate2"); @@ -243,7 +242,6 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") { processor->setScheduledState(core::ScheduledState::RUNNING); processor->onTrigger(&context, &session); unlink(ss.str().c_str()); - rmdir(dir); reporter = session.getProvenanceReporter(); records = reporter->getEvents(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/test/unit/ProvenanceTestHelper.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index cb8f520..1e16aa6 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -130,7 +130,6 @@ class TestFlowController : public minifi::FlowController { } protected: void initializePaths(const std::string &adjustedFilename) { - std::cout << "what" << std::endl; } }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/test/unit/ProvenanceTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTests.cpp b/libminifi/test/unit/ProvenanceTests.cpp index 624601c..c73cef2 100644 --- a/libminifi/test/unit/ProvenanceTests.cpp +++ b/libminifi/test/unit/ProvenanceTests.cpp @@ -16,8 +16,6 @@ * limitations under the License. */ -#ifndef PROVENANCE_TESTS -#define PROVENANCE_TESTS #include "../TestBase.h" #include "ProvenanceTestHelper.h" @@ -92,5 +90,3 @@ TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") { REQUIRE(record2.getChildrenUuids().size() == 0); } - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/test/unit/RepoTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/RepoTests.cpp b/libminifi/test/unit/RepoTests.cpp new file mode 100644 index 0000000..9237e7e --- /dev/null +++ b/libminifi/test/unit/RepoTests.cpp @@ -0,0 +1,151 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../TestBase.h" + +#include "ProvenanceTestHelper.h" +#include "provenance/Provenance.h" +#include "FlowFileRecord.h" +#include "core/core.h" +#include "core/repository/FlowFileRepository.h" + +TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") { + + TestController testController; + + //testController.setDebugToConsole(); + + char format[] = "/tmp/testRepo.XXXXXX"; + char *dir = testController.createTempDirectory(format); + std::shared_ptr<core::repository::FlowFileRepository> repository = + std::make_shared<core::repository::FlowFileRepository>( + dir, 0, + 0, + 1); + + repository->initialize(); + + minifi::FlowFileRecord record(repository); + + record.addAttribute("keyA", ""); + + REQUIRE( true == record.Serialize() ); + + repository->stop(); + + + testController.setNullAppender(); + + +} + + + +TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") { + + TestController testController; + + //testController.setDebugToConsole(); + + char format[] = "/tmp/testRepo.XXXXXX"; + char *dir = testController.createTempDirectory(format); + std::shared_ptr<core::repository::FlowFileRepository> repository = + std::make_shared<core::repository::FlowFileRepository>( + dir, 0, + 0, + 1); + + repository->initialize(); + + minifi::FlowFileRecord record(repository); + + + + record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd"); + + record.addAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd"); + + + REQUIRE( true == record.Serialize() ); + + repository->stop(); + + + testController.setNullAppender(); + + +} + + +TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") { + + TestController testController; + + //testController.setDebugToConsole(); + + char format[] = "/tmp/testRepo.XXXXXX"; + char *dir = testController.createTempDirectory(format); + std::shared_ptr<core::repository::FlowFileRepository> repository = + std::make_shared<core::repository::FlowFileRepository>( + dir, 0, + 0, + 1); + + repository->initialize(); + + minifi::FlowFileRecord record(repository); + + minifi::FlowFileRecord record2(repository); + + std::string uuid = record.getUUIDStr(); + + + record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd"); + + record.addAttribute("keyB", ""); + + record.addAttribute("", ""); + + record.updateAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd2"); + + record.addAttribute("", "sdgsdg"); + + + + + REQUIRE( true == record.Serialize() ); + + repository->stop(); + + record2.DeSerialize(uuid); + + std::string value; + REQUIRE(true == record2.getAttribute("",value)); + + REQUIRE( "hasdgasdgjsdgasgdsgsadaskgasd2" == value); + + REQUIRE(false == record2.getAttribute("key",value)); + REQUIRE(true == record2.getAttribute("keyA",value)); + REQUIRE( "hasdgasdgjsdgasgdsgsadaskgasd" == value); + + REQUIRE(true == record2.getAttribute("keyB",value)); + REQUIRE( "" == value); + + + +}
