adam-markovics commented on code in PR #1328:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1328#discussion_r873907644


##########
libminifi/src/core/RepositoryFactory.cpp:
##########
@@ -96,6 +96,62 @@ std::unique_ptr<core::ContentRepository> 
createContentRepository(const std::stri
   throw std::runtime_error("Support for the provided configuration class could 
not be found");
 }
 
+class NoOpThreadedRepository : public core::ThreadedRepository {
+ public:
+  explicit NoOpThreadedRepository(const std::string& repo_name)
+    : core::SerializableComponent(repo_name),
+    ThreadedRepository(repo_name) {
+  }
+
+  ~NoOpThreadedRepository() override {
+    stop();
+  }
+
+ private:
+  void run() override {
+  }
+
+  std::thread& getThread() override {
+    return thread_;
+  }
+
+  std::thread thread_;
+};
+
+std::unique_ptr<core::ThreadedRepository> createThreadedRepository(const 
std::string& configuration_class_name, bool fail_safe, const std::string& 
repo_name) {
+  std::string class_name_lc = configuration_class_name;
+  std::transform(class_name_lc.begin(), class_name_lc.end(), 
class_name_lc.begin(), ::tolower);
+  try {
+    auto return_obj = 
core::ClassLoader::getDefaultClassLoader().instantiate<core::ThreadedRepository>(class_name_lc,
 class_name_lc);
+    if (return_obj) {
+      return_obj->setName(repo_name);
+      return return_obj;
+    }
+    // if the desired repos don't exist, we can try doing string matches and 
rely on volatile repositories
+    if (class_name_lc == "flowfilerepository" || class_name_lc == 
"volatileflowfilerepository") {
+      return_obj = 
instantiate<repository::VolatileFlowFileRepository>(repo_name);
+    } else if (class_name_lc == "provenancerepository" || class_name_lc == 
"volatileprovenancefilerepository") {
+      return_obj = 
instantiate<repository::VolatileProvenanceRepository>(repo_name);
+    } else if (class_name_lc == "nooprepository") {
+      return_obj = std::make_unique<core::NoOpThreadedRepository>(repo_name);
+    }
+    if (return_obj) {
+      return return_obj;
+    }
+    if (fail_safe) {
+      return {};
+    } else {
+      throw std::runtime_error("Support for the provided configuration class 
could not be found");
+    }
+  } catch (const std::runtime_error &) {
+    if (fail_safe) {
+      return {};
+    }
+  }
+
+  throw std::runtime_error("Support for the provided configuration class could 
not be found");
+}
+
 } /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */

Review Comment:
   Done.



##########
libminifi/src/core/RepositoryFactory.cpp:
##########
@@ -96,6 +96,62 @@ std::unique_ptr<core::ContentRepository> 
createContentRepository(const std::stri
   throw std::runtime_error("Support for the provided configuration class could 
not be found");
 }
 
+class NoOpThreadedRepository : public core::ThreadedRepository {
+ public:
+  explicit NoOpThreadedRepository(const std::string& repo_name)
+    : core::SerializableComponent(repo_name),
+    ThreadedRepository(repo_name) {
+  }
+
+  ~NoOpThreadedRepository() override {
+    stop();
+  }
+
+ private:
+  void run() override {
+  }
+
+  std::thread& getThread() override {
+    return thread_;
+  }
+
+  std::thread thread_;
+};
+
+std::unique_ptr<core::ThreadedRepository> createThreadedRepository(const 
std::string& configuration_class_name, bool fail_safe, const std::string& 
repo_name) {

Review Comment:
   Done.



##########
libminifi/src/core/RepositoryFactory.cpp:
##########
@@ -96,6 +96,62 @@ std::unique_ptr<core::ContentRepository> 
createContentRepository(const std::stri
   throw std::runtime_error("Support for the provided configuration class could 
not be found");
 }
 
+class NoOpThreadedRepository : public core::ThreadedRepository {
+ public:
+  explicit NoOpThreadedRepository(const std::string& repo_name)
+    : core::SerializableComponent(repo_name),
+    ThreadedRepository(repo_name) {
+  }
+
+  ~NoOpThreadedRepository() override {
+    stop();
+  }
+
+ private:
+  void run() override {
+  }
+
+  std::thread& getThread() override {
+    return thread_;
+  }
+
+  std::thread thread_;
+};
+
+std::unique_ptr<core::ThreadedRepository> createThreadedRepository(const 
std::string& configuration_class_name, bool fail_safe, const std::string& 
repo_name) {
+  std::string class_name_lc = configuration_class_name;
+  std::transform(class_name_lc.begin(), class_name_lc.end(), 
class_name_lc.begin(), ::tolower);
+  try {
+    auto return_obj = 
core::ClassLoader::getDefaultClassLoader().instantiate<core::ThreadedRepository>(class_name_lc,
 class_name_lc);
+    if (return_obj) {
+      return_obj->setName(repo_name);
+      return return_obj;
+    }
+    // if the desired repos don't exist, we can try doing string matches and 
rely on volatile repositories
+    if (class_name_lc == "flowfilerepository" || class_name_lc == 
"volatileflowfilerepository") {
+      return_obj = 
instantiate<repository::VolatileFlowFileRepository>(repo_name);
+    } else if (class_name_lc == "provenancerepository" || class_name_lc == 
"volatileprovenancefilerepository") {
+      return_obj = 
instantiate<repository::VolatileProvenanceRepository>(repo_name);
+    } else if (class_name_lc == "nooprepository") {
+      return_obj = std::make_unique<core::NoOpThreadedRepository>(repo_name);
+    }
+    if (return_obj) {
+      return return_obj;
+    }
+    if (fail_safe) {
+      return {};
+    } else {
+      throw std::runtime_error("Support for the provided configuration class 
could not be found");
+    }
+  } catch (const std::runtime_error &) {
+    if (fail_safe) {
+      return {};
+    }
+  }
+
+  throw std::runtime_error("Support for the provided configuration class could 
not be found");

Review Comment:
   Done.



##########
libminifi/src/core/RepositoryFactory.cpp:
##########
@@ -96,6 +96,62 @@ std::unique_ptr<core::ContentRepository> 
createContentRepository(const std::stri
   throw std::runtime_error("Support for the provided configuration class could 
not be found");
 }
 
+class NoOpThreadedRepository : public core::ThreadedRepository {
+ public:
+  explicit NoOpThreadedRepository(const std::string& repo_name)
+    : core::SerializableComponent(repo_name),
+    ThreadedRepository(repo_name) {
+  }
+
+  ~NoOpThreadedRepository() override {
+    stop();
+  }
+
+ private:
+  void run() override {
+  }
+
+  std::thread& getThread() override {
+    return thread_;
+  }
+
+  std::thread thread_;
+};
+
+std::unique_ptr<core::ThreadedRepository> createThreadedRepository(const 
std::string& configuration_class_name, bool fail_safe, const std::string& 
repo_name) {
+  std::string class_name_lc = configuration_class_name;
+  std::transform(class_name_lc.begin(), class_name_lc.end(), 
class_name_lc.begin(), ::tolower);
+  try {
+    auto return_obj = 
core::ClassLoader::getDefaultClassLoader().instantiate<core::ThreadedRepository>(class_name_lc,
 class_name_lc);
+    if (return_obj) {
+      return_obj->setName(repo_name);
+      return return_obj;
+    }
+    // if the desired repos don't exist, we can try doing string matches and 
rely on volatile repositories
+    if (class_name_lc == "flowfilerepository" || class_name_lc == 
"volatileflowfilerepository") {
+      return_obj = 
instantiate<repository::VolatileFlowFileRepository>(repo_name);
+    } else if (class_name_lc == "provenancerepository" || class_name_lc == 
"volatileprovenancefilerepository") {
+      return_obj = 
instantiate<repository::VolatileProvenanceRepository>(repo_name);
+    } else if (class_name_lc == "nooprepository") {
+      return_obj = std::make_unique<core::NoOpThreadedRepository>(repo_name);
+    }
+    if (return_obj) {
+      return return_obj;
+    }

Review Comment:
   Done.



##########
libminifi/include/core/repository/VolatileRepository.h:
##########
@@ -39,31 +39,24 @@ namespace nifi {
 namespace minifi {
 namespace core {
 namespace repository {
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Woverloaded-virtual"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Woverloaded-virtual"
-#endif
+
 /**
  * Flow File repository
  * Design: Extends Repository and implements the run function, using RocksDB 
as the primary substrate.
  */
-template<typename T>
-class VolatileRepository : public core::Repository {
+template<typename T, typename T_Repository = core::Repository>

Review Comment:
   Done.



##########
libminifi/include/core/Repository.h:
##########
@@ -240,30 +205,20 @@ class Repository : public virtual 
core::SerializableComponent, public core::Trac
   std::map<std::string, core::Connectable*> containers_;
 
   std::map<std::string, core::Connectable*> connection_map_;
-  // Mutex for protection
-  std::mutex mutex_;
   // repository directory

Review Comment:
   Done.



##########
libminifi/include/core/Repository.h:
##########
@@ -240,30 +205,20 @@ class Repository : public virtual 
core::SerializableComponent, public core::Trac
   std::map<std::string, core::Connectable*> containers_;
 
   std::map<std::string, core::Connectable*> connection_map_;
-  // Mutex for protection
-  std::mutex mutex_;
   // repository directory
   std::string directory_;
-  // max db entry life time
+  // max db entry lifetime
   std::chrono::milliseconds max_partition_millis_;
   // max db size
   int64_t max_partition_bytes_;
   // purge period
   std::chrono::milliseconds purge_period_;
-  // thread
-  std::thread thread_;
-  // whether the monitoring thread is running for the repo while it was enabled
-  std::atomic<bool> running_;
-  // whether stop accepting provenace event
+  // whether stop accepting provenance event

Review Comment:
   Done.



##########
extensions/rocksdb-repos/ProvenanceRepository.h:
##########
@@ -26,35 +26,31 @@
 #include "rocksdb/db.h"
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
-#include "core/Repository.h"
 #include "core/Core.h"
-#include "provenance/Provenance.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "core/ThreadedRepository.h"
+#include "provenance/Provenance.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace provenance {
+namespace org::apache::nifi::minifi::provenance {
 
-#define PROVENANCE_DIRECTORY "./provenance_repository"
-#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024)  // 10M
+constexpr auto PROVENANCE_DIRECTORY = "./provenance_repository";
+constexpr auto MAX_PROVENANCE_STORAGE_SIZE = 10 * 1024 * 1024;  // 10M
 constexpr auto MAX_PROVENANCE_ENTRY_LIFE_TIME = std::chrono::minutes(1);
 constexpr auto PROVENANCE_PURGE_PERIOD = std::chrono::milliseconds(2500);
 
-class ProvenanceRepository : public core::Repository {
+class ProvenanceRepository : public core::ThreadedRepository {
  public:
-  ProvenanceRepository(const std::string& name, const utils::Identifier& 
/*uuid*/)
-      : ProvenanceRepository(name) {
-  }
-  // Constructor
-  /*!
-   * Create a new provenance repository
-   */
-  explicit ProvenanceRepository(const std::string& repo_name = "", std::string 
directory = PROVENANCE_DIRECTORY, std::chrono::milliseconds maxPartitionMillis 
= MAX_PROVENANCE_ENTRY_LIFE_TIME,
-      int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE, 
std::chrono::milliseconds purgePeriod = PROVENANCE_PURGE_PERIOD)
-      : core::SerializableComponent(repo_name),
-        Repository(repo_name.length() > 0 ? repo_name : 
core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis, 
maxPartitionBytes, purgePeriod) {
+  ProvenanceRepository(const std::string &name, const utils::Identifier & 
/*uuid*/)
+          : ProvenanceRepository(name) {
+  }
+
+  explicit ProvenanceRepository(const std::string &repo_name = "", std::string 
directory = PROVENANCE_DIRECTORY,
+                                std::chrono::milliseconds maxPartitionMillis = 
MAX_PROVENANCE_ENTRY_LIFE_TIME,
+                                int64_t maxPartitionBytes = 
MAX_PROVENANCE_STORAGE_SIZE,
+                                std::chrono::milliseconds purgePeriod = 
PROVENANCE_PURGE_PERIOD)
+          : core::SerializableComponent(repo_name),
+            ThreadedRepository(repo_name.length() > 0 ? repo_name : 
core::getClassName<ProvenanceRepository>(), directory,
+                       maxPartitionMillis, maxPartitionBytes, purgePeriod) {

Review Comment:
   Done.



##########
extensions/rocksdb-repos/FlowFileRepository.h:
##########
@@ -27,30 +27,25 @@
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
 #include "rocksdb/utilities/checkpoint.h"
-#include "core/Repository.h"
 #include "core/Core.h"
-#include "Connection.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "core/ThreadedRepository.h"
+#include "Connection.h"
 #include "concurrentqueue.h"
 #include "database/RocksDatabase.h"
 #include "encryption/RocksDbEncryptionProvider.h"
 #include "utils/crypto/EncryptionProvider.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace repository {
+namespace org::apache::nifi::minifi::core::repository {
 
 #ifdef WIN32
-#define FLOWFILE_REPOSITORY_DIRECTORY ".\\flowfile_repository"
-#define FLOWFILE_CHECKPOINT_DIRECTORY ".\\flowfile_checkpoint"
+constexpr auto FLOWFILE_REPOSITORY_DIRECTORY = ".\\flowfile_repository";
+constexpr auto FLOWFILE_CHECKPOINT_DIRECTORY = ".\\flowfile_checkpoint";
 #else
-#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository"
-#define FLOWFILE_CHECKPOINT_DIRECTORY "./flowfile_checkpoint"
+constexpr auto FLOWFILE_REPOSITORY_DIRECTORY = "./flowfile_repository";
+constexpr auto FLOWFILE_CHECKPOINT_DIRECTORY = "./flowfile_checkpoint";
 #endif
-#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024)  // 10M
+constexpr auto MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE = 10*1024*1024;  // 10M

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to