Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 44704b363 -> a8de19653


MINIFI-246: Adding tests that were used to find bug. Solved by MINIFI-193

MINIFI-246: Resolve second issue with provenance repository getting corrupt

This closes #72.

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a8de1965
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a8de1965
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a8de1965

Branch: refs/heads/master
Commit: a8de19653424454f8e6a889d89ec23e8f5dfa228
Parents: 44704b3
Author: Marc Parisi <[email protected]>
Authored: Thu Mar 30 12:16:03 2017 -0400
Committer: Aldrin Piri <[email protected]>
Committed: Fri Mar 31 15:58:19 2017 -0400

----------------------------------------------------------------------
 libminifi/include/FlowFileRecord.h              |  13 +-
 libminifi/include/core/Repository.h             |   7 +-
 libminifi/include/core/core.h                   |   9 +-
 .../core/repository/FlowFileRepository.h        |  18 +-
 .../include/provenance/ProvenanceRepository.h   |  12 +
 libminifi/src/FlowController.cpp                |   5 +-
 libminifi/src/FlowFileRecord.cpp                |   3 -
 libminifi/src/core/Record.cpp                   |   2 +
 libminifi/src/core/RepositoryFactory.cpp        |   4 +-
 .../src/core/repository/FlowFileRepository.cpp  |   2 +-
 libminifi/src/io/Serializable.cpp               | 255 +++++++++----------
 libminifi/src/provenance/Provenance.cpp         |   1 +
 .../src/provenance/ProvenanceRepository.cpp     |   3 +
 libminifi/test/TestBase.h                       |  38 ++-
 libminifi/test/unit/ProcessorTests.cpp          |   2 -
 libminifi/test/unit/ProvenanceTestHelper.h      |   1 -
 libminifi/test/unit/ProvenanceTests.cpp         |   4 -
 libminifi/test/unit/RepoTests.cpp               | 151 +++++++++++
 18 files changed, 375 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/include/FlowFileRecord.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowFileRecord.h 
b/libminifi/include/FlowFileRecord.h
index ca0856c..5c6f049 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -146,12 +146,17 @@ class FlowFileRecord : public core::FlowFile, public 
io::Serializable {
   const std::string getConnectionUuid() {
     return uuid_connection_;
   }
-  
-  const std::string getContentFullPath()
-  {
+
+  /**
+   * Set the UUID connection.
+   */
+  void setUuidConnection(const std::string &uuid_connection) {
+    uuid_connection_ = uuid_connection;
+  }
+
+  const std::string getContentFullPath() {
     return content_full_fath_;
   }
-  
 
   FlowFileRecord &operator=(const FlowFileRecord &);
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/include/core/Repository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Repository.h 
b/libminifi/include/core/Repository.h
index a668df5..e096023 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -137,12 +137,13 @@ class Repository : public CoreComponent {
   uint64_t repoSize();
   // size of the directory
   std::atomic<uint64_t> repo_size_;
-
- private:
   // Run function for the thread
-    void threadExecutor(){
+  void threadExecutor(){
       run();
     }
+
+  
+    
 };
 
 } /* namespace core */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/include/core/core.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/core.h b/libminifi/include/core/core.h
index 9f86100..a70dbd4 100644
--- a/libminifi/include/core/core.h
+++ b/libminifi/include/core/core.h
@@ -66,13 +66,13 @@ struct class_operations {
 
 
 template<typename T>
-typename std::enable_if<!class_operations<T>::value, T*>::type instantiate() {
+typename std::enable_if<!class_operations<T>::value, std::shared_ptr<T>>::type 
instantiate() {
   throw std::runtime_error("Cannot instantiate class");
 }
 
 template<typename T>
-typename std::enable_if<class_operations<T>::value, T*>::type instantiate() {
-  return new T();
+typename std::enable_if<class_operations<T>::value, std::shared_ptr<T>>::type 
instantiate() {
+  return std::make_shared<T>();
 }
 
 /**
@@ -140,6 +140,9 @@ class CoreComponent {
   const std::string & getUUIDStr()  {
     return uuidStr_;
   }
+  
+  void loadComponent(){
+  }
 
  protected:
   // A global unique identifier

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/include/core/repository/FlowFileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/FlowFileRepository.h 
b/libminifi/include/core/repository/FlowFileRepository.h
index 31e655a..0115588 100644
--- a/libminifi/include/core/repository/FlowFileRepository.h
+++ b/libminifi/include/core/repository/FlowFileRepository.h
@@ -153,9 +153,25 @@ class FlowFileRepository : public core::Repository, public 
std::enable_shared_fr
                  return false;
   }
   
-  void loadFlowFileToConnections(std::map<std::string, 
std::shared_ptr<minifi::Connection>> &connectionMap);
+  void setConnectionMap(std::map<std::string, 
std::shared_ptr<minifi::Connection>> &connectionMap)
+  {
+    this->connectionMap=connectionMap;
+  }
+  void loadComponent();
   
+   void start() {
+  if (this->purge_period_ <= 0)
+    return;
+  if (running_)
+    return;
+  thread_ = std::thread(&FlowFileRepository::run, shared_from_this());
+  thread_.detach();
+  running_ = true;
+  logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
+}
+
  private:
+  std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
   leveldb::DB* db_;
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/include/provenance/ProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/provenance/ProvenanceRepository.h 
b/libminifi/include/provenance/ProvenanceRepository.h
index 0f8ee5d..8dc152f 100644
--- a/libminifi/include/provenance/ProvenanceRepository.h
+++ b/libminifi/include/provenance/ProvenanceRepository.h
@@ -58,6 +58,17 @@ class ProvenanceRepository : public core::Repository,
     if (db_)
       delete db_;
   }
+  
+  void start() {
+  if (this->purge_period_ <= 0)
+    return;
+  if (running_)
+    return;
+  thread_ = std::thread(&ProvenanceRepository::run, shared_from_this());
+  thread_.detach();
+  running_ = true;
+  logger_->log_info("%s Repository Monitor Thread Start", name_.c_str());
+}
 
   // initialize
   virtual bool initialize() {
@@ -164,3 +175,4 @@ class ProvenanceRepository : public core::Repository,
 } /* namespace apache */
 } /* namespace org */
 #endif /* LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ */
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index e472a9a..433387a 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -152,6 +152,8 @@ FlowController::~FlowController() {
   unload();
   if (NULL != protocol_)
     delete protocol_;
+  flow_file_repo_ = nullptr;
+  provenance_repo_ = nullptr;
 
 }
 
@@ -264,7 +266,8 @@ void FlowController::loadFlowRepo() {
     }
     auto rep = std::static_pointer_cast<core::repository::FlowFileRepository>(
         flow_file_repo_);
-    rep->loadFlowFileToConnections(connectionMap);
+    rep->setConnectionMap(connectionMap);
+    flow_file_repo_->loadComponent();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 7383574..562a685 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -269,9 +269,6 @@ bool FlowFileRecord::Serialize() {
     return false;
   }
 
-  // Persistent to the DB
-  
-
   if (flow_repository_->Put(uuid_str_,
                             const_cast<uint8_t*>(outStream.getBuffer()),
                             outStream.getSize())) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/core/Record.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Record.cpp b/libminifi/src/core/Record.cpp
index dbf0102..6f33300 100644
--- a/libminifi/src/core/Record.cpp
+++ b/libminifi/src/core/Record.cpp
@@ -161,6 +161,8 @@ bool FlowFile::updateAttribute(const std::string key, const 
std::string value) {
 }
 
 bool FlowFile::addAttribute(const std::string &key, const std::string &value) {
+
+
   auto it = attributes_.find(key);
   if (it != attributes_.end()) {
     // attribute already there in the map

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/core/RepositoryFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/RepositoryFactory.cpp 
b/libminifi/src/core/RepositoryFactory.cpp
index ef0b9ef..9bdc7c3 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -32,11 +32,11 @@ namespace core {
       std::shared_ptr<core::Repository> return_obj = nullptr;
       if (class_name_lc == "flowfilerepository") {
 
-        return_obj = 
std::shared_ptr<core::Repository>((core::Repository*)instantiate<core::repository::FlowFileRepository>());
+        return_obj = instantiate<core::repository::FlowFileRepository>();
       } else if (class_name_lc == "provenancerepository") {
 
 
-       return_obj = 
std::shared_ptr<core::Repository>((core::Repository*)instantiate<provenance::ProvenanceRepository>());
+       return_obj = 
instantiate<provenance::ProvenanceRepository>();//std::shared_ptr<core::Repository>((core::Repository*)instantiate<provenance::ProvenanceRepository>());
 
       }
       

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/core/repository/FlowFileRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp 
b/libminifi/src/core/repository/FlowFileRepository.cpp
index c495a67..8f13f39 100644
--- a/libminifi/src/core/repository/FlowFileRepository.cpp
+++ b/libminifi/src/core/repository/FlowFileRepository.cpp
@@ -47,7 +47,7 @@ void FlowFileRepository::run() {
   return;
 }
 
-void FlowFileRepository::loadFlowFileToConnections(std::map<std::string, 
std::shared_ptr<minifi::Connection>> &connectionMap)
+void FlowFileRepository::loadComponent()
  {
 
   std::vector<std::string> purgeList;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/io/Serializable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/Serializable.cpp 
b/libminifi/src/io/Serializable.cpp
index f8c623a..7b7f2bd 100644
--- a/libminifi/src/io/Serializable.cpp
+++ b/libminifi/src/io/Serializable.cpp
@@ -30,198 +30,195 @@ namespace io {
 
 #define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
 
-
-
 #define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)),1)
 
 template<typename T>
-int Serializable::writeData(const T &t,DataStream *stream) {
-    uint8_t bytes[sizeof t];
-    std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
-              static_cast<const char*>(static_cast<const void*>(&t)) + sizeof 
t,
-              bytes);
-    return stream->writeData(bytes, sizeof t);
+int Serializable::writeData(const T &t, DataStream *stream) {
+  uint8_t bytes[sizeof t];
+  std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
+            static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t,
+            bytes);
+  return stream->writeData(bytes, sizeof t);
 }
 
 template<typename T>
 int Serializable::writeData(const T &t, uint8_t *to_vec) {
-    std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
-              static_cast<const char*>(static_cast<const void*>(&t)) + sizeof 
t,
-              to_vec);
-    return sizeof t;
+  std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
+            static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t,
+            to_vec);
+  return sizeof t;
 }
 
 template<typename T>
 int Serializable::writeData(const T &t, std::vector<uint8_t> &to_vec) {
-    uint8_t bytes[sizeof t];
-    std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
-              static_cast<const char*>(static_cast<const void*>(&t)) + sizeof 
t,
-              bytes);
-    to_vec.insert(to_vec.end(), &bytes[0], &bytes[sizeof t]);
-    return sizeof t;
+  uint8_t bytes[sizeof t];
+  std::copy(static_cast<const char*>(static_cast<const void*>(&t)),
+            static_cast<const char*>(static_cast<const void*>(&t)) + sizeof t,
+            bytes);
+  to_vec.insert(to_vec.end(), &bytes[0], &bytes[sizeof t]);
+  return sizeof t;
 }
 
-
-
-
-
-int Serializable::write(uint8_t value,DataStream *stream) {
-    return stream->writeData(&value, 1);
+int Serializable::write(uint8_t value, DataStream *stream) {
+  return stream->writeData(&value, 1);
 }
-int Serializable::write(char value,DataStream *stream) {
-    return stream->writeData((uint8_t *) &value, 1);
+int Serializable::write(char value, DataStream *stream) {
+  return stream->writeData((uint8_t *) &value, 1);
 }
 
-int Serializable::write(uint8_t *value, int len,DataStream *stream) {
-    return stream->writeData(value, len);
+int Serializable::write(uint8_t *value, int len, DataStream *stream) {
+  return stream->writeData(value, len);
 }
 
 int Serializable::write(bool value) {
-    uint8_t temp = value;
-    return write(temp);
+  uint8_t temp = value;
+  return write(temp);
 }
 
-int Serializable::read(uint8_t &value,DataStream *stream) {
-    uint8_t buf;
-    
-    int ret = stream->readData(&buf, 1);
-    if (ret == 1)
-        value = buf;
-    return ret;
+int Serializable::read(uint8_t &value, DataStream *stream) {
+  uint8_t buf;
+
+  int ret = stream->readData(&buf, 1);
+  if (ret == 1)
+    value = buf;
+  return ret;
 }
 
-int Serializable::read(char &value,DataStream *stream) {
-    uint8_t buf;
+int Serializable::read(char &value, DataStream *stream) {
+  uint8_t buf;
 
-    int ret = stream->readData(&buf, 1);
-    if (ret == 1)
-        value = (char) buf;
-    return ret;
+  int ret = stream->readData(&buf, 1);
+  if (ret == 1)
+    value = (char) buf;
+  return ret;
 }
 
-int Serializable::read(uint8_t *value, int len,DataStream *stream) {
-    return stream->readData(value, len);
+int Serializable::read(uint8_t *value, int len, DataStream *stream) {
+  return stream->readData(value, len);
 }
 
-int Serializable::read(uint16_t &value,DataStream *stream, bool 
is_little_endian) {
-    return stream->read(value, is_little_endian);
+int Serializable::read(uint16_t &value, DataStream *stream,
+                       bool is_little_endian) {
+  return stream->read(value, is_little_endian);
 }
 
-int Serializable::read(uint32_t &value,DataStream *stream, bool 
is_little_endian) {
-    return stream->read(value, is_little_endian);
+int Serializable::read(uint32_t &value, DataStream *stream,
+                       bool is_little_endian) {
+  return stream->read(value, is_little_endian);
 
 }
-int Serializable::read(uint64_t &value,DataStream *stream, bool 
is_little_endian) {
-    return stream->read(value, is_little_endian);
+int Serializable::read(uint64_t &value, DataStream *stream,
+                       bool is_little_endian) {
+  return stream->read(value, is_little_endian);
 
 }
 
-int Serializable::write(uint32_t base_value,DataStream *stream, bool 
is_little_endian) {
+int Serializable::write(uint32_t base_value, DataStream *stream,
+                        bool is_little_endian) {
 
-    const uint32_t value = is_little_endian ? htonl(base_value) : base_value;
+  const uint32_t value = is_little_endian ? htonl(base_value) : base_value;
 
-    return writeData(value,stream);
+  return writeData(value, stream);
 }
 
-int Serializable::write(uint64_t base_value,DataStream *stream, bool 
is_little_endian) {
+int Serializable::write(uint64_t base_value, DataStream *stream,
+                        bool is_little_endian) {
 
-    const uint64_t value =
-        is_little_endian == 1 ? htonll_r(base_value) : base_value;
-    return writeData(value,stream);
+  const uint64_t value =
+      is_little_endian == 1 ? htonll_r(base_value) : base_value;
+  return writeData(value, stream);
 }
 
-int Serializable::write(uint16_t base_value,DataStream *stream, bool 
is_little_endian) {
+int Serializable::write(uint16_t base_value, DataStream *stream,
+                        bool is_little_endian) {
 
-    const uint16_t value =
-        is_little_endian == 1 ? htons(base_value) : base_value;
+  const uint16_t value = is_little_endian == 1 ? htons(base_value) : 
base_value;
 
-    return writeData(value,stream);
+  return writeData(value, stream);
 }
 
-int Serializable::readUTF(std::string &str,DataStream *stream, bool widen) {
-    uint32_t utflen=0;
-    int ret = 1;
-    if (!widen) {
-        uint16_t shortLength = 0;
-        ret = read(shortLength,stream);
-        utflen = shortLength;
-
-        if (ret <= 0)
-            return ret;
-    } else {
-        uint32_t len;
-        ret = read(len,stream);
-
-        if (ret <= 0)
-            return ret;
-        utflen = len;
-    }
+int Serializable::readUTF(std::string &str, DataStream *stream, bool widen) {
+  uint32_t utflen = 0;
+  int ret = 1;
+  if (!widen) {
+    uint16_t shortLength = 0;
+    ret = read(shortLength, stream);
+    utflen = shortLength;
 
+    if (ret <= 0)
+      return ret;
+  } else {
+    uint32_t len;
+    ret = read(len, stream);
 
+    if (ret <= 0)
+      return ret;
+    utflen = len;
+  }
 
-    if (utflen == 0)
-        return 1;
+  if (utflen == 0) {
+    str = "";
+    return 1;
+  }
 
-    std::vector<uint8_t> buf;
-    ret = stream->readData(buf, utflen);
+  std::vector<uint8_t> buf;
+  ret = stream->readData(buf, utflen);
 
-    // The number of chars produced may be less than utflen
-    str = std::string((const char*)&buf[0],utflen);
+  // The number of chars produced may be less than utflen
+  str = std::string((const char*) &buf[0], utflen);
 
-    return utflen;
+  return utflen;
 }
 
-int Serializable::writeUTF(std::string str,DataStream *stream, bool widen) {
-    int inLength = str.length();
-    uint32_t utflen = 0;
-    int currentPtr = 0;
+int Serializable::writeUTF(std::string str, DataStream *stream, bool widen) {
+  int inLength = str.length();
+  uint32_t utflen = 0;
+  int currentPtr = 0;
 
-   utflen = str.length();
+  utflen = str.length();
 
-    if (utflen > 65535)
-        return -1;
+  if (utflen > 65535)
+    return -1;
 
-    if (utflen == 0) {
-
-        if (!widen) {
-            uint16_t shortLen = utflen;
-            write(shortLen,stream);
-        } else {
-          write(utflen,stream);
-        }
-        return 1;
-    }
-
-    std::vector<uint8_t> utf_to_write;
-    if (!widen) {
-        utf_to_write.resize(utflen);
-    } else {
-        utf_to_write.resize(utflen);
-    }
-
-    int i = 0;
-
-
-    uint8_t *underlyingPtr = &utf_to_write[0];
-    for (auto c : str) {
-      writeData(c, underlyingPtr++);
-    }
-    int ret;
+  if (utflen == 0) {
 
     if (!widen) {
-
-        uint16_t short_length = utflen;
-        write(short_length,stream);
-        ret = stream->writeData(utf_to_write.data(), utflen);
+      uint16_t shortLen = utflen;
+      write(shortLen, stream);
     } else {
-        //utflen += 4;
-        write(utflen,stream);
-        ret = stream->writeData(utf_to_write.data(), utflen);
+      write(utflen, stream);
     }
-    return ret;
+    return 1;
+  }
+
+  std::vector<uint8_t> utf_to_write;
+  if (!widen) {
+    utf_to_write.resize(utflen);
+  } else {
+    utf_to_write.resize(utflen);
+  }
+
+  int i = 0;
+
+  uint8_t *underlyingPtr = &utf_to_write[0];
+  for (auto c : str) {
+    writeData(c, underlyingPtr++);
+  }
+  int ret;
+
+  if (!widen) {
+
+    uint16_t short_length = utflen;
+    write(short_length, stream);
+    ret = stream->writeData(utf_to_write.data(), utflen);
+  } else {
+    //utflen += 4;
+    write(utflen, stream);
+    ret = stream->writeData(utf_to_write.data(), utflen);
+  }
+  return ret;
 }
 
-
 } /* namespace io */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/provenance/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/Provenance.cpp 
b/libminifi/src/provenance/Provenance.cpp
index a90e182..289f026 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -238,6 +238,7 @@ bool ProvenanceEventRecord::Serialize(
                        _eventIdStr.c_str(), outStream.getSize());
   }
 
+
   // cleanup
 
   return true;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/src/provenance/ProvenanceRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp 
b/libminifi/src/provenance/ProvenanceRepository.cpp
index 88455be..6fe332b 100644
--- a/libminifi/src/provenance/ProvenanceRepository.cpp
+++ b/libminifi/src/provenance/ProvenanceRepository.cpp
@@ -31,6 +31,7 @@ void ProvenanceRepository::run() {
   // threshold for purge
   uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
   while (running_) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
     std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
     uint64_t curTime = getTimeMillis();
     uint64_t size = repoSize();
@@ -53,6 +54,7 @@ void ProvenanceRepository::run() {
       }
       delete it;
       std::vector<std::string>::iterator itPurge;
+      
       for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) 
{
         std::string eventId = *itPurge;
         logger_->log_info("ProvenanceRepository Repo Purge %s",
@@ -64,6 +66,7 @@ void ProvenanceRepository::run() {
       repo_full_ = true;
     else
       repo_full_ = false;
+    
   }
   return;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/test/TestBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index f73174b..4f926d3 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -18,15 +18,16 @@
 
 #ifndef LIBMINIFI_TEST_TESTBASE_H_
 #define LIBMINIFI_TEST_TESTBASE_H_
+#include <dirent.h>
 #include <cstdio>
 #include <cstdlib>
 #include "ResourceClaim.h"
 #include "catch.hpp"
 #include <vector>
+#include "core/logging/LogAppenders.h"
 #include "core/logging/Logger.h"
 #include "core/core.h"
 
-
 class LogTestController {
  public:
   LogTestController(const std::string level = "debug") {
@@ -52,16 +53,51 @@ class TestController {
 
   ~TestController() {
     for (auto dir : directories) {
+      DIR *created_dir;
+      struct dirent *dir_entry;
+      created_dir = opendir(dir);
+      if (created_dir != NULL) {
+        while ((dir_entry = readdir(created_dir)) != NULL) {
+          if (dir_entry->d_name[0] != '.') {
+
+            std::string file(dir);
+            file += "/";
+            file += dir_entry->d_name;
+            unlink(file.c_str());
+          }
+        }
+      }
+      closedir(created_dir);
       rmdir(dir);
     }
   }
 
+  void setDebugToConsole() {
+    std::ostringstream oss;
+    std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+        logging::BaseLogger>(
+        new org::apache::nifi::minifi::core::logging::OutputStreamAppender(
+            std::cout, minifi::Configure::getConfigure()));
+    std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
+    logger->updateLogger(std::move(outputLogger));
+  }
+
+  void setNullAppender() {
+
+    std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr<
+        logging::BaseLogger>(
+        new org::apache::nifi::minifi::core::logging::NullAppender());
+    std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger();
+    logger->updateLogger(std::move(outputLogger));
+  }
+
   void enableDebug() {
     log.enableDebug();
   }
 
   char *createTempDirectory(char *format) {
     char *dir = mkdtemp(format);
+    directories.push_back(dir);
     return dir;
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp 
b/libminifi/test/unit/ProcessorTests.cpp
index 4f08d5d..c040e4d 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -105,7 +105,6 @@ TEST_CASE("Test Find file", "[getfileCreate2]") {
   processor->setScheduledState(core::ScheduledState::RUNNING);
   processor->onTrigger(&context, &session);
   unlink(ss.str().c_str());
-  rmdir(dir);
   reporter = session.getProvenanceReporter();
 
   REQUIRE( processor->getName() == "getfileCreate2");
@@ -243,7 +242,6 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") {
   processor->setScheduledState(core::ScheduledState::RUNNING);
   processor->onTrigger(&context, &session);
   unlink(ss.str().c_str());
-  rmdir(dir);
   reporter = session.getProvenanceReporter();
 
   records = reporter->getEvents();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/test/unit/ProvenanceTestHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h 
b/libminifi/test/unit/ProvenanceTestHelper.h
index cb8f520..1e16aa6 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -130,7 +130,6 @@ class TestFlowController : public minifi::FlowController {
   }
  protected:
   void initializePaths(const std::string &adjustedFilename) {
-    std::cout << "what" << std::endl;
   }
 };
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/test/unit/ProvenanceTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTests.cpp 
b/libminifi/test/unit/ProvenanceTests.cpp
index 624601c..c73cef2 100644
--- a/libminifi/test/unit/ProvenanceTests.cpp
+++ b/libminifi/test/unit/ProvenanceTests.cpp
@@ -16,8 +16,6 @@
  * limitations under the License.
  */
 
-#ifndef PROVENANCE_TESTS
-#define PROVENANCE_TESTS
 #include "../TestBase.h"
 
 #include "ProvenanceTestHelper.h"
@@ -92,5 +90,3 @@ TEST_CASE("Test Flowfile record added to provenance", 
"[TestFlowAndProv1]") {
   REQUIRE(record2.getChildrenUuids().size() == 0);
 
 }
-
-#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a8de1965/libminifi/test/unit/RepoTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/RepoTests.cpp 
b/libminifi/test/unit/RepoTests.cpp
new file mode 100644
index 0000000..9237e7e
--- /dev/null
+++ b/libminifi/test/unit/RepoTests.cpp
@@ -0,0 +1,151 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+
+#include "ProvenanceTestHelper.h"
+#include "provenance/Provenance.h"
+#include "FlowFileRecord.h"
+#include "core/core.h"
+#include "core/repository/FlowFileRepository.h"
+
+TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
+
+  TestController testController;
+
+  //testController.setDebugToConsole();
+
+  char format[] = "/tmp/testRepo.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  std::shared_ptr<core::repository::FlowFileRepository> repository =
+      std::make_shared<core::repository::FlowFileRepository>(
+          dir, 0,
+          0,
+          1);
+
+  repository->initialize();
+
+  minifi::FlowFileRecord record(repository);
+
+  record.addAttribute("keyA", "");
+
+  REQUIRE( true == record.Serialize() );
+
+  repository->stop();
+
+
+  testController.setNullAppender();
+
+
+}
+
+
+
+TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") {
+
+  TestController testController;
+
+  //testController.setDebugToConsole();
+
+  char format[] = "/tmp/testRepo.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  std::shared_ptr<core::repository::FlowFileRepository> repository =
+      std::make_shared<core::repository::FlowFileRepository>(
+          dir, 0,
+          0,
+          1);
+
+  repository->initialize();
+
+  minifi::FlowFileRecord record(repository);
+
+
+
+  record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
+
+  record.addAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd");
+
+
+  REQUIRE( true == record.Serialize() );
+
+  repository->stop();
+
+
+  testController.setNullAppender();
+
+
+}
+
+
+TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
+
+  TestController testController;
+
+  //testController.setDebugToConsole();
+
+  char format[] = "/tmp/testRepo.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+  std::shared_ptr<core::repository::FlowFileRepository> repository =
+      std::make_shared<core::repository::FlowFileRepository>(
+          dir, 0,
+          0,
+          1);
+
+  repository->initialize();
+
+  minifi::FlowFileRecord record(repository);
+
+  minifi::FlowFileRecord record2(repository);
+
+  std::string uuid = record.getUUIDStr();
+
+
+  record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
+
+  record.addAttribute("keyB", "");
+
+  record.addAttribute("", "");
+
+  record.updateAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd2");
+
+  record.addAttribute("", "sdgsdg");
+
+
+
+
+  REQUIRE( true == record.Serialize() );
+
+  repository->stop();
+
+  record2.DeSerialize(uuid);
+
+  std::string value;
+  REQUIRE(true == record2.getAttribute("",value));
+
+  REQUIRE( "hasdgasdgjsdgasgdsgsadaskgasd2" == value);
+
+  REQUIRE(false == record2.getAttribute("key",value));
+  REQUIRE(true == record2.getAttribute("keyA",value));
+  REQUIRE( "hasdgasdgjsdgasgdsgsadaskgasd" == value);
+
+  REQUIRE(true == record2.getAttribute("keyB",value));
+  REQUIRE( "" == value);
+
+
+
+}

Reply via email to