This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new a3835e193 MINIFICPP-1812 Clean up Repository threads
a3835e193 is described below
commit a3835e1938db1287b0063d159b453b8a70f7a48b
Author: Adam Markovics <[email protected]>
AuthorDate: Thu Sep 1 12:21:00 2022 +0200
MINIFICPP-1812 Clean up Repository threads
Closes #1328
Signed-off-by: Marton Szasz <[email protected]>
---
controller/Controller.h | 22 +++-
extensions/coap/tests/CoapIntegrationBase.h | 4 +-
extensions/http-curl/tests/C2PauseResumeTest.cpp | 2 +-
.../tests/ControllerServiceIntegrationTests.cpp | 5 +-
extensions/http-curl/tests/VerifyInvokeHTTP.h | 2 +-
.../tests/PrometheusMetricsPublisherTest.cpp | 4 +-
.../rocksdb-repos/DatabaseContentRepository.h | 2 +-
extensions/rocksdb-repos/FlowFileRepository.cpp | 6 +-
extensions/rocksdb-repos/FlowFileRepository.h | 70 +++++------
extensions/rocksdb-repos/ProvenanceRepository.cpp | 2 +-
extensions/rocksdb-repos/ProvenanceRepository.h | 77 +++++-------
.../tests/integration/TestExecuteProcess.cpp | 8 +-
.../tests/unit/YamlConfigurationTests.cpp | 56 ++++-----
libminifi/include/core/ContentRepository.h | 15 +--
libminifi/include/core/Repository.h | 116 ++++++-----------
libminifi/include/core/RepositoryFactory.h | 21 +---
libminifi/include/core/ThreadedRepository.h | 106 ++++++++++++++++
.../include/core/repository/FileSystemRepository.h | 32 ++---
.../core/repository/VolatileContentRepository.h | 58 ++++-----
.../core/repository/VolatileFlowFileRepository.h | 23 ++--
.../core/repository/VolatileProvenanceRepository.h | 25 ++--
.../include/core/repository/VolatileRepository.h | 140 ++++++++-------------
libminifi/src/FlowController.cpp | 13 +-
libminifi/src/c2/C2Client.cpp | 1 +
libminifi/src/core/Repository.cpp | 64 ----------
libminifi/src/core/RepositoryFactory.cpp | 93 +++++++-------
.../src/core/repository/FileSystemRepository.cpp | 26 ++--
.../core/repository/VolatileContentRepository.cpp | 19 ---
libminifi/test/flow-tests/SessionTests.cpp | 3 +-
libminifi/test/flow-tests/TestControllerWithFlow.h | 2 +-
libminifi/test/integration/IntegrationBase.h | 4 +-
.../test/integration/ProvenanceReportingTest.cpp | 5 +-
.../test/persistence-tests/PersistenceTests.cpp | 6 +-
libminifi/test/rocksdb-tests/ProvenanceTests.cpp | 16 +--
libminifi/test/rocksdb-tests/RepoTests.cpp | 11 +-
libminifi/test/unit/MetricsTests.cpp | 2 +-
libminifi/test/unit/ProvenanceTestHelper.h | 136 +++++++++-----------
libminifi/test/unit/SchedulingAgentTests.cpp | 4 +-
minifi_main/MiNiFiMain.cpp | 10 +-
nanofi/include/cxx/Instance.h | 3 +-
40 files changed, 543 insertions(+), 671 deletions(-)
diff --git a/controller/Controller.h b/controller/Controller.h
index 0254d8ef6..b30f79748 100644
--- a/controller/Controller.h
+++ b/controller/Controller.h
@@ -26,6 +26,7 @@
#include "io/ClientSocket.h"
#include "c2/ControllerSocketProtocol.h"
#include "utils/gsl.h"
+#include "Exception.h"
#include "FlowController.h"
/**
@@ -246,15 +247,22 @@
std::shared_ptr<org::apache::nifi::minifi::core::controller::ControllerService>
configuration->get(org::apache::nifi::minifi::Configure::nifi_provenance_repository_class_name,
prov_repo_class);
// Create repos for flow record and provenance
- const std::shared_ptr prov_repo =
org::apache::nifi::minifi::core::createRepository(prov_repo_class, true,
"provenance");
+ const std::shared_ptr prov_repo =
org::apache::nifi::minifi::core::createRepository(prov_repo_class,
"provenance");
+ if (!prov_repo) {
+ throw
org::apache::nifi::minifi::Exception(org::apache::nifi::minifi::REPOSITORY_EXCEPTION,
"Could not create provenance repository");
+ }
prov_repo->initialize(configuration);
configuration->get(org::apache::nifi::minifi::Configure::nifi_flow_repository_class_name,
flow_repo_class);
- const std::shared_ptr flow_repo =
org::apache::nifi::minifi::core::createRepository(flow_repo_class, true,
"flowfile");
+ const std::shared_ptr flow_repo =
org::apache::nifi::minifi::core::createRepository(flow_repo_class, "flowfile");
+ if (!flow_repo) {
+ throw
org::apache::nifi::minifi::Exception(org::apache::nifi::minifi::REPOSITORY_EXCEPTION,
"Could not create flowfile repository");
+ }
flow_repo->initialize(configuration);
+
configuration->get(org::apache::nifi::minifi::Configure::nifi_content_repository_class_name,
content_repo_class);
const std::shared_ptr content_repo =
org::apache::nifi::minifi::core::createContentRepository(content_repo_class,
true, "content");
@@ -295,12 +303,18 @@ void printManifest(const
std::shared_ptr<org::apache::nifi::minifi::Configure> &
configuration->get(org::apache::nifi::minifi::Configure::nifi_provenance_repository_class_name,
prov_repo_class);
// Create repos for flow record and provenance
- const std::shared_ptr prov_repo =
org::apache::nifi::minifi::core::createRepository(prov_repo_class, true,
"provenance");
+ const std::shared_ptr prov_repo =
org::apache::nifi::minifi::core::createRepository(prov_repo_class,
"provenance");
+ if (!prov_repo) {
+ throw
org::apache::nifi::minifi::Exception(org::apache::nifi::minifi::REPOSITORY_EXCEPTION,
"Could not create provenance repository");
+ }
prov_repo->initialize(configuration);
configuration->get(org::apache::nifi::minifi::Configure::nifi_flow_repository_class_name,
flow_repo_class);
- const std::shared_ptr flow_repo =
org::apache::nifi::minifi::core::createRepository(flow_repo_class, true,
"flowfile");
+ const std::shared_ptr flow_repo =
org::apache::nifi::minifi::core::createRepository(flow_repo_class, "flowfile");
+ if (!flow_repo) {
+ throw
org::apache::nifi::minifi::Exception(org::apache::nifi::minifi::REPOSITORY_EXCEPTION,
"Could not create flowfile repository");
+ }
flow_repo->initialize(configuration);
diff --git a/extensions/coap/tests/CoapIntegrationBase.h
b/extensions/coap/tests/CoapIntegrationBase.h
index 964b0de1a..7f62adf50 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -53,7 +53,7 @@ class CoapIntegrationBase : public IntegrationBase {
void run(const std::optional<std::string>& test_file_location = {}, const
std::optional<std::string>& = {}) override {
testSetup();
- std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestThreadedRepository>();
std::shared_ptr<core::Repository> test_flow_repo =
std::make_shared<TestFlowRepository>();
if (test_file_location) {
@@ -72,8 +72,6 @@ class CoapIntegrationBase : public IntegrationBase {
queryRootProcessGroup(pg);
- std::shared_ptr<TestRepository> repo =
std::static_pointer_cast<TestRepository>(test_repo);
-
std::shared_ptr<minifi::FlowController> controller =
std::make_shared<minifi::FlowController>(test_repo, test_flow_repo,
configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
std::make_shared<utils::file::FileSystem>(), []{});
diff --git a/extensions/http-curl/tests/C2PauseResumeTest.cpp
b/extensions/http-curl/tests/C2PauseResumeTest.cpp
index d8294d849..bc152618c 100644
--- a/extensions/http-curl/tests/C2PauseResumeTest.cpp
+++ b/extensions/http-curl/tests/C2PauseResumeTest.cpp
@@ -123,7 +123,7 @@ int main(int argc, char **argv) {
harness.setKeyDir(args.key_dir);
PauseResumeHandler responder{flow_resumed_successfully,
harness.getConfiguration()};
- std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestThreadedRepository>();
std::shared_ptr<core::Repository> test_flow_repo =
std::make_shared<TestFlowRepository>();
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
configuration->set(minifi::Configure::nifi_default_directory, args.key_dir);
diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
index 12525b06d..a89a5cf61 100644
--- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
+++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
@@ -53,14 +53,12 @@ int main(int argc, char **argv) {
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
- std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestThreadedRepository>();
std::shared_ptr<core::Repository> test_flow_repo =
std::make_shared<TestFlowRepository>();
configuration->set(minifi::Configure::nifi_flow_configuration_file,
args.test_file);
- std::string client_cert = "cn.crt.pem";
std::string priv_key_file = "cn.ckey.pem";
std::string passphrase = "cn.pass";
- std::string ca_cert = "nifi-cert.pem";
configuration->set(minifi::Configure::nifi_security_client_certificate,
args.test_file);
configuration->set(minifi::Configure::nifi_security_client_private_key,
priv_key_file);
configuration->set(minifi::Configure::nifi_security_client_pass_phrase,
passphrase);
@@ -71,7 +69,6 @@ int main(int argc, char **argv) {
content_repo->initialize(configuration);
std::unique_ptr<core::FlowConfiguration> yaml_ptr =
std::make_unique<core::YamlConfiguration>(
test_repo, test_repo, content_repo, stream_factory, configuration,
args.test_file);
- std::shared_ptr<TestRepository> repo =
std::static_pointer_cast<TestRepository>(test_repo);
const auto controller = std::make_shared<minifi::FlowController>(test_repo,
test_flow_repo, configuration, std::move(yaml_ptr),
content_repo,
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTP.h
b/extensions/http-curl/tests/VerifyInvokeHTTP.h
index fbca3f9b0..a4ce0196d 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTP.h
+++ b/extensions/http-curl/tests/VerifyInvokeHTTP.h
@@ -81,7 +81,7 @@ class VerifyInvokeHTTP : public HTTPIntegrationBase {
virtual void setupFlow(const std::optional<std::string>& flow_yml_path) {
testSetup();
- std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestThreadedRepository>();
std::shared_ptr<core::Repository> test_flow_repo =
std::make_shared<TestFlowRepository>();
if (flow_yml_path) {
diff --git a/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp
b/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp
index 111089909..b10b779d0 100644
--- a/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp
+++ b/extensions/prometheus/tests/PrometheusMetricsPublisherTest.cpp
@@ -49,8 +49,8 @@ class PrometheusPublisherTestFixture {
public:
explicit PrometheusPublisherTestFixture(bool user_dummy_exposer)
: configuration_(std::make_shared<Configure>()),
- provenance_repo_(core::createRepository("provenancerepository", true)),
- flow_file_repo_(core::createRepository("flowfilerepository", true)),
+ provenance_repo_(core::createRepository("provenancerepository")),
+ flow_file_repo_(core::createRepository("flowfilerepository")),
response_node_loader_(configuration_, provenance_repo_, flow_file_repo_,
nullptr) {
std::unique_ptr<DummyMetricsExposer> dummy_exposer;
if (user_dummy_exposer) {
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h
b/extensions/rocksdb-repos/DatabaseContentRepository.h
index e93071589..08e6a4896 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -62,7 +62,7 @@ class DatabaseContentRepository : public
core::ContentRepository, public core::C
bool initialize(const std::shared_ptr<minifi::Configure> &configuration)
override;
- void stop() override;
+ void stop();
std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim,
bool append = false) override;
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp
b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 39355dc80..2412bf97f 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -111,10 +111,10 @@ void FlowFileRepository::printStats() {
void FlowFileRepository::run() {
auto last = std::chrono::steady_clock::now();
- if (running_) {
+ if (isRunning()) {
prune_stored_flowfiles();
}
- while (running_) {
+ while (isRunning()) {
std::this_thread::sleep_for(purge_period_);
flush();
auto now = std::chrono::steady_clock::now();
@@ -193,7 +193,7 @@ void FlowFileRepository::prune_stored_flowfiles() {
}
}
-bool FlowFileRepository::ExecuteWithRetry(std::function<rocksdb::Status()>
operation) {
+bool FlowFileRepository::ExecuteWithRetry(const
std::function<rocksdb::Status()>& operation) {
std::chrono::milliseconds waitTime = 0ms;
for (int i=0; i < 3; ++i) {
auto status = operation();
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h
b/extensions/rocksdb-repos/FlowFileRepository.h
index 487363cd1..86e9d652e 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -26,10 +26,10 @@
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/utilities/checkpoint.h"
-#include "core/Repository.h"
#include "core/Core.h"
-#include "Connection.h"
#include "core/logging/LoggerConfiguration.h"
+#include "core/ThreadedRepository.h"
+#include "Connection.h"
#include "concurrentqueue.h"
#include "database/RocksDatabase.h"
#include "encryption/RocksDbEncryptionProvider.h"
@@ -37,17 +37,18 @@
#include "SwapManager.h"
#include "FlowFileLoader.h"
#include "range/v3/algorithm/all_of.hpp"
+#include "utils/Literals.h"
namespace org::apache::nifi::minifi::core::repository {
#ifdef WIN32
-#define FLOWFILE_REPOSITORY_DIRECTORY ".\\flowfile_repository"
-#define FLOWFILE_CHECKPOINT_DIRECTORY ".\\flowfile_checkpoint"
+constexpr auto FLOWFILE_REPOSITORY_DIRECTORY = ".\\flowfile_repository";
+constexpr auto FLOWFILE_CHECKPOINT_DIRECTORY = ".\\flowfile_checkpoint";
#else
-#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository"
-#define FLOWFILE_CHECKPOINT_DIRECTORY "./flowfile_checkpoint"
+constexpr auto FLOWFILE_REPOSITORY_DIRECTORY = "./flowfile_repository";
+constexpr auto FLOWFILE_CHECKPOINT_DIRECTORY = "./flowfile_checkpoint";
#endif
-#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
+constexpr auto MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE = 10_MiB;
constexpr auto MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME =
std::chrono::minutes(10);
constexpr auto FLOWFILE_REPOSITORY_PURGE_PERIOD = std::chrono::seconds(2);
constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS =
std::chrono::milliseconds(500);
@@ -56,23 +57,22 @@ constexpr auto
FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = std::chrono::mill
* Flow File repository
* Design: Extends Repository and implements the run function, using rocksdb
as the primary substrate.
*/
-class FlowFileRepository : public core::Repository, public SwapManager, public
std::enable_shared_from_this<FlowFileRepository> {
+class FlowFileRepository : public ThreadedRepository, public SwapManager {
public:
static constexpr const char* ENCRYPTION_KEY_NAME =
"nifi.flowfile.repository.encryption.key";
- // Constructor
FlowFileRepository(const std::string& name, const utils::Identifier&
/*uuid*/)
: FlowFileRepository(name) {
}
- explicit FlowFileRepository(const std::string repo_name = "",
- const std::string& checkpoint_dir =
FLOWFILE_CHECKPOINT_DIRECTORY,
+ explicit FlowFileRepository(const std::string& repo_name = "",
+ std::string checkpoint_dir =
FLOWFILE_CHECKPOINT_DIRECTORY,
std::string directory = FLOWFILE_REPOSITORY_DIRECTORY,
std::chrono::milliseconds maxPartitionMillis =
MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
int64_t maxPartitionBytes =
MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE,
std::chrono::milliseconds purgePeriod =
FLOWFILE_REPOSITORY_PURGE_PERIOD)
: core::SerializableComponent(repo_name),
- Repository(repo_name.length() > 0 ? repo_name :
core::getClassName<FlowFileRepository>(), std::move(directory),
maxPartitionMillis, maxPartitionBytes, purgePeriod),
+ ThreadedRepository(repo_name.length() > 0 ? repo_name :
core::getClassName<FlowFileRepository>(), std::move(directory),
maxPartitionMillis, maxPartitionBytes, purgePeriod),
checkpoint_dir_(std::move(checkpoint_dir)),
content_repo_(nullptr),
checkpoint_(nullptr),
@@ -88,7 +88,7 @@ class FlowFileRepository : public core::Repository, public
SwapManager, public s
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
- bool isNoop() override {
+ bool isNoop() const override {
return false;
}
@@ -96,7 +96,6 @@ class FlowFileRepository : public core::Repository, public
SwapManager, public s
virtual void printStats();
- // initialize
bool initialize(const std::shared_ptr<Configure> &configure) override {
config_ = configure;
std::string value;
@@ -141,9 +140,7 @@ class FlowFileRepository : public core::Repository, public
SwapManager, public s
}
}
- void run() override;
-
- bool Put(std::string key, const uint8_t *buf, size_t bufLen) override {
+ bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override
{
// persistent to the DB
auto opendb = db_->open();
if (!opendb) {
@@ -172,16 +169,15 @@ class FlowFileRepository : public core::Repository,
public SwapManager, public s
return ExecuteWithRetry(operation);
}
-
/**
- *
* Deletes the key
* @return status of the delete operation
*/
- bool Delete(std::string key) override {
+ bool Delete(const std::string& key) override {
keys_to_delete.enqueue(key);
return true;
}
+
/**
* Sets the value from the provided key
* @return status of the get operation.
@@ -196,29 +192,22 @@ class FlowFileRepository : public core::Repository,
public SwapManager, public s
void loadComponent(const std::shared_ptr<core::ContentRepository>
&content_repo) override;
- void start() override {
- if (this->purge_period_ <= std::chrono::milliseconds(0)) {
- return;
- }
- if (running_) {
- return;
- }
- running_ = true;
- thread_ = std::thread(&FlowFileRepository::run, this);
- logger_->log_debug("%s Repository Monitor Thread Start", getName());
+ bool start() override {
+ const bool ret = ThreadedRepository::start();
if (swap_loader_) {
swap_loader_->start();
}
+ return ret;
}
- void stop() override {
+ bool stop() override {
if (swap_loader_) {
swap_loader_->stop();
}
- core::Repository::stop();
+ return ThreadedRepository::stop();
}
- void store(std::vector<std::shared_ptr<core::FlowFile>> flow_files) override
{
+ void store([[maybe_unused]] std::vector<std::shared_ptr<core::FlowFile>>
flow_files) override {
gsl_Expects(ranges::all_of(flow_files, &FlowFile::isStored));
// pass, flowfiles are already persisted in the repository
}
@@ -228,11 +217,10 @@ class FlowFileRepository : public core::Repository,
public SwapManager, public s
}
private:
- bool ExecuteWithRetry(std::function<rocksdb::Status()> operation);
+ void run() override;
+
+ bool ExecuteWithRetry(const std::function<rocksdb::Status()>& operation);
- /**
- * Initialize the repository
- */
void initialize_repository();
/**
@@ -241,11 +229,12 @@ class FlowFileRepository : public core::Repository,
public SwapManager, public s
*/
static bool need_checkpoint(minifi::internal::OpenRocksDb& opendb);
- /**
- * Prunes stored flow files.
- */
void prune_stored_flowfiles();
+ std::thread& getThread() override {
+ return thread_;
+ }
+
std::string checkpoint_dir_;
moodycamel::ConcurrentQueue<std::string> keys_to_delete;
std::shared_ptr<core::ContentRepository> content_repo_;
@@ -254,6 +243,7 @@ class FlowFileRepository : public core::Repository, public
SwapManager, public s
std::unique_ptr<FlowFileLoader> swap_loader_;
std::shared_ptr<logging::Logger> logger_;
std::shared_ptr<minifi::Configure> config_;
+ std::thread thread_;
};
} // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp
b/extensions/rocksdb-repos/ProvenanceRepository.cpp
index ee30c9f39..d4619ad6c 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.cpp
+++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp
@@ -39,7 +39,7 @@ void ProvenanceRepository::printStats() {
void ProvenanceRepository::run() {
size_t count = 0;
- while (running_) {
+ while (isRunning()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
count++;
// Hack, to be removed in scope of
https://issues.apache.org/jira/browse/MINIFICPP-1145
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h
b/extensions/rocksdb-repos/ProvenanceRepository.h
index 713fce870..0ad58ed21 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -26,28 +26,32 @@
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
-#include "core/Repository.h"
#include "core/Core.h"
-#include "provenance/Provenance.h"
#include "core/logging/LoggerConfiguration.h"
+#include "core/ThreadedRepository.h"
+#include "provenance/Provenance.h"
+#include "utils/Literals.h"
namespace org::apache::nifi::minifi::provenance {
-#define PROVENANCE_DIRECTORY "./provenance_repository"
-#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024) // 10M
+constexpr auto PROVENANCE_DIRECTORY = "./provenance_repository";
+constexpr auto MAX_PROVENANCE_STORAGE_SIZE = 10_MiB;
constexpr auto MAX_PROVENANCE_ENTRY_LIFE_TIME = std::chrono::minutes(1);
constexpr auto PROVENANCE_PURGE_PERIOD = std::chrono::milliseconds(2500);
-class ProvenanceRepository : public core::Repository {
+class ProvenanceRepository : public core::ThreadedRepository {
public:
ProvenanceRepository(const std::string& name, const utils::Identifier&
/*uuid*/)
: ProvenanceRepository(name) {
}
- explicit ProvenanceRepository(const std::string& repo_name = "", std::string
directory = PROVENANCE_DIRECTORY, std::chrono::milliseconds maxPartitionMillis
= MAX_PROVENANCE_ENTRY_LIFE_TIME,
- int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE,
std::chrono::milliseconds purgePeriod = PROVENANCE_PURGE_PERIOD)
- : core::SerializableComponent(repo_name),
- Repository(repo_name.length() > 0 ? repo_name :
core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis,
maxPartitionBytes, purgePeriod) {
- db_ = nullptr;
+
+ explicit ProvenanceRepository(const std::string& repo_name = "", std::string
directory = PROVENANCE_DIRECTORY,
+ std::chrono::milliseconds maxPartitionMillis =
MAX_PROVENANCE_ENTRY_LIFE_TIME,
+ int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE,
+ std::chrono::milliseconds purgePeriod = PROVENANCE_PURGE_PERIOD)
+ : core::SerializableComponent(repo_name),
+ ThreadedRepository(repo_name.length() > 0 ? repo_name :
core::getClassName<ProvenanceRepository>(), directory,
+ maxPartitionMillis, maxPartitionBytes, purgePeriod) {
}
~ProvenanceRepository() override {
@@ -60,19 +64,10 @@ class ProvenanceRepository : public core::Repository {
void printStats();
- bool isNoop() override {
+ bool isNoop() const override {
return false;
}
- void start() override {
- if (running_)
- return;
- running_ = true;
- thread_ = std::thread(&ProvenanceRepository::run, this);
- logger_->log_debug("%s Repository Monitor Thread Start", name_);
- }
-
- // initialize
bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure>
&config) override {
std::string value;
if (config->get(Configure::nifi_provenance_repository_directory_default,
value)) {
@@ -85,9 +80,10 @@ class ProvenanceRepository : public core::Repository {
logger_->log_debug("MiNiFi Provenance Max Partition Bytes %d",
max_partition_bytes_);
if (config->get(Configure::nifi_provenance_repository_max_storage_time,
value)) {
if (auto max_partition =
utils::timeutils::StringToDuration<std::chrono::milliseconds>(value))
- max_partition_millis_ = *max_partition;
+ max_partition_millis_ = *max_partition;
}
- logger_->log_debug("MiNiFi Provenance Max Storage Time: [%" PRId64 "] ms",
int64_t{max_partition_millis_.count()});
+ logger_->log_debug("MiNiFi Provenance Max Storage Time: [%" PRId64 "] ms",
+ int64_t{max_partition_millis_.count()});
rocksdb::Options options;
options.create_if_missing = true;
options.use_direct_io_for_flush_and_compaction = true;
@@ -121,8 +117,8 @@ class ProvenanceRepository : public core::Repository {
return true;
}
- // Put
- bool Put(std::string key, const uint8_t *buf, size_t bufLen) override {
+
+ bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override
{
// persist to the DB
rocksdb::Slice value((const char *) buf, bufLen);
return db_->Put(rocksdb::WriteOptions(), key, value).ok();
@@ -140,12 +136,11 @@ class ProvenanceRepository : public core::Repository {
return db_->Write(rocksdb::WriteOptions(), &batch).ok();
}
- // Delete
- bool Delete(std::string /*key*/) override {
+ bool Delete(const std::string& /*key*/) override {
// The repo is cleaned up by itself, there is no need to delete items.
return true;
}
- // Get
+
bool Get(const std::string &key, std::string &value) override {
return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
}
@@ -168,7 +163,8 @@ class ProvenanceRepository : public core::Repository {
return true;
}
- bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
&records, size_t &max_size,
std::function<std::shared_ptr<core::SerializableComponent>()> lambda) override {
+ bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
&records, size_t &max_size,
+
std::function<std::shared_ptr<core::SerializableComponent>()> lambda) override {
std::unique_ptr<rocksdb::Iterator>
it(db_->NewIterator(rocksdb::ReadOptions()));
size_t requested_batch = max_size;
max_size = 0;
@@ -185,19 +181,6 @@ class ProvenanceRepository : public core::Repository {
return max_size > 0;
}
- void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>>
&records, int maxSize) {
- std::unique_ptr<rocksdb::Iterator>
it(db_->NewIterator(rocksdb::ReadOptions()));
- for (it->SeekToFirst(); it->Valid(); it->Next()) {
- std::shared_ptr<ProvenanceEventRecord> eventRead =
std::make_shared<ProvenanceEventRecord>();
- std::string key = it->key().ToString();
- if (records.size() >= (uint64_t)maxSize)
- break;
- if (eventRead->DeSerialize(gsl::make_span(it->value()).as_span<const
std::byte>())) {
- records.push_back(eventRead);
- }
- }
- }
-
bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
&store, size_t &max_size) override {
std::unique_ptr<rocksdb::Iterator>
it(db_->NewIterator(rocksdb::ReadOptions()));
max_size = 0;
@@ -214,12 +197,9 @@ class ProvenanceRepository : public core::Repository {
return max_size > 0;
}
- // destroy
void destroy() {
db_.reset();
}
- // Run function for the thread
- void run() override;
uint64_t getKeyCount() const {
std::string key_count;
@@ -231,11 +211,20 @@ class ProvenanceRepository : public core::Repository {
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
ProvenanceRepository(const ProvenanceRepository &parent) = delete;
+
ProvenanceRepository &operator=(const ProvenanceRepository &parent) = delete;
private:
+ // Run function for the thread
+ void run() override;
+
+ std::thread& getThread() override {
+ return thread_;
+ }
+
std::unique_ptr<rocksdb::DB> db_;
std::shared_ptr<core::logging::Logger> logger_ =
core::logging::LoggerFactory<ProvenanceRepository>::getLogger();
+ std::thread thread_;
};
} // namespace org::apache::nifi::minifi::provenance
diff --git
a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
index 7b932244e..9d6ae3da1 100644
--- a/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
+++ b/extensions/standard-processors/tests/integration/TestExecuteProcess.cpp
@@ -50,13 +50,9 @@ int main(int /*argc*/, char ** /*argv*/) {
std::shared_ptr<core::Processor> processor =
std::make_shared<org::apache::nifi::minifi::processors::ExecuteProcess>("executeProcess");
processor->setMaxConcurrentTasks(1);
- std::shared_ptr<core::Repository> test_repo =
- std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestThreadedRepository>();
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
- std::shared_ptr<TestRepository> repo =
- std::static_pointer_cast<TestRepository>(test_repo);
- std::shared_ptr<minifi::FlowController> controller =
- std::make_shared<TestFlowController>(test_repo, test_repo, content_repo);
+ std::shared_ptr<minifi::FlowController> controller =
std::make_shared<TestFlowController>(test_repo, test_repo, content_repo);
utils::Identifier processoruuid = processor->getUUID();
assert(processoruuid);
diff --git
a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index dad668e31..30bdd9b38 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -34,8 +34,8 @@ using namespace std::literals::chrono_literals;
TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
TestController test_controller;
- std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository", true);
+ std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository");
+ std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
@@ -225,8 +225,8 @@ Provenance Reporting:
TEST_CASE("Test YAML v3 Invalid Type", "[YamlConfiguration3]") {
TestController test_controller;
- std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository", true);
+ std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository");
+ std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
@@ -351,8 +351,8 @@ NiFi Properties Overrides: {}
TEST_CASE("Test YAML v3 Config Processing", "[YamlConfiguration3]") {
TestController test_controller;
- std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository", true);
+ std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository");
+ std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
@@ -505,8 +505,8 @@ TEST_CASE("Test Dynamic Unsupported",
"[YamlConfigurationDynamicUnsupported]") {
logTestController.setDebug<TestPlan>();
logTestController.setTrace<core::YamlConfiguration>();
- std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository", true);
+ std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository");
+ std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
@@ -541,8 +541,8 @@ TEST_CASE("Test Required Property",
"[YamlConfigurationRequiredProperty]") {
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
- std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository", true);
+ std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository");
+ std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
@@ -586,8 +586,8 @@ TEST_CASE("Test Required Property 2",
"[YamlConfigurationRequiredProperty2]") {
logTestController.setDebug<core::YamlConfiguration>();
logTestController.setDebug<core::Processor>();
- std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository", true);
+ std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository");
+ std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
@@ -635,8 +635,8 @@ TEST_CASE("Test Dependent Property",
"[YamlConfigurationDependentProperty]") {
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
- std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository", true);
+ std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository");
+ std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
@@ -657,8 +657,8 @@ TEST_CASE("Test Dependent Property 2",
"[YamlConfigurationDependentProperty2]")
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
- std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository", true);
+ std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository");
+ std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
@@ -687,8 +687,8 @@ TEST_CASE("Test Exclusive Property",
"[YamlConfigurationExclusiveProperty]") {
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
- std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository", true);
+ std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository");
+ std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
@@ -707,8 +707,8 @@ TEST_CASE("Test Regex Property",
"[YamlConfigurationRegexProperty]") {
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
- std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository", true);
+ std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository");
+ std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
@@ -728,8 +728,8 @@ TEST_CASE("Test Exclusive Property 2",
"[YamlConfigurationExclusiveProperty2]")
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
- std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository", true);
+ std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository");
+ std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
@@ -756,8 +756,8 @@ TEST_CASE("Test Regex Property 2",
"[YamlConfigurationRegexProperty2]") {
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
- std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository", true);
+ std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository");
+ std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
@@ -784,8 +784,8 @@ TEST_CASE("Test Regex Property 2",
"[YamlConfigurationRegexProperty2]") {
TEST_CASE("Test YAML Config With Funnel", "[YamlConfiguration]") {
TestController test_controller;
- std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository", true);
+ std::shared_ptr<core::Repository> testProvRepo =
core::createRepository("provenancerepository");
+ std::shared_ptr<core::Repository> testFlowFileRepo =
core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
@@ -875,8 +875,8 @@ Remote Process Groups: []
TEST_CASE("Test UUID duplication checks", "[YamlConfiguration]") {
TestController test_controller;
- std::shared_ptr<core::Repository> test_prov_repo =
core::createRepository("provenancerepository", true);
- std::shared_ptr<core::Repository> test_flow_file_repo =
core::createRepository("flowfilerepository", true);
+ std::shared_ptr<core::Repository> test_prov_repo =
core::createRepository("provenancerepository");
+ std::shared_ptr<core::Repository> test_flow_file_repo =
core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> stream_factory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
diff --git a/libminifi/include/core/ContentRepository.h
b/libminifi/include/core/ContentRepository.h
index 68818c17a..e109eb97a 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -42,29 +42,24 @@ namespace core {
*/
class ContentRepository : public StreamManager<minifi::ResourceClaim>, public
utils::EnableSharedFromThis<ContentRepository> {
public:
- virtual ~ContentRepository() = default;
+ ~ContentRepository() override = default;
/**
* initialize this content repository using the provided configuration.
*/
virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0;
- virtual std::string getStoragePath() const;
+ std::string getStoragePath() const override;
virtual std::shared_ptr<ContentSession> createSession();
- /**
- * Stops this repository.
- */
- virtual void stop() = 0;
-
void reset();
- virtual uint32_t getStreamCount(const minifi::ResourceClaim &streamId);
+ uint32_t getStreamCount(const minifi::ResourceClaim &streamId) override;
- virtual void incrementStreamCount(const minifi::ResourceClaim &streamId);
+ void incrementStreamCount(const minifi::ResourceClaim &streamId) override;
- virtual StreamState decrementStreamCount(const minifi::ResourceClaim
&streamId);
+ StreamState decrementStreamCount(const minifi::ResourceClaim &streamId)
override;
protected:
std::string directory_;
diff --git a/libminifi/include/core/Repository.h
b/libminifi/include/core/Repository.h
index 5e265d091..5e2169187 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -20,30 +20,29 @@
#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_H_
#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_H_
-#include <memory>
-#include <utility>
#include <atomic>
#include <cstdint>
#include <cstring>
-#include <iostream>
#include <map>
+#include <memory>
#include <set>
#include <string>
-#include <thread>
+#include <utility>
#include <vector>
+
+#include "Core.h"
+#include "ResourceClaim.h"
+#include "core/Connectable.h"
#include "core/ContentRepository.h"
+#include "core/Property.h"
#include "core/SerializableComponent.h"
-#include "properties/Configure.h"
#include "core/logging/LoggerFactory.h"
-#include "core/Property.h"
-#include "ResourceClaim.h"
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
-#include "Core.h"
-#include "core/Connectable.h"
-#include "core/TraceableResource.h"
+#include "properties/Configure.h"
#include "utils/BackTrace.h"
#include "SwapManager.h"
+#include "utils/Literals.h"
+#include "utils/StringUtils.h"
+#include "utils/TimeUtil.h"
#ifndef WIN32
#include <sys/stat.h>
@@ -51,50 +50,42 @@
namespace org::apache::nifi::minifi::core {
-#define REPOSITORY_DIRECTORY "./repo"
-#define MAX_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
+constexpr auto REPOSITORY_DIRECTORY = "./repo";
+constexpr auto MAX_REPOSITORY_STORAGE_SIZE = 10_MiB;
constexpr auto MAX_REPOSITORY_ENTRY_LIFE_TIME = std::chrono::minutes(10);
constexpr auto REPOSITORY_PURGE_PERIOD = std::chrono::milliseconds(2500);
-class Repository : public virtual core::SerializableComponent, public
core::TraceableResource {
+class Repository : public virtual core::SerializableComponent {
public:
- /*
- * Constructor for the repository
- */
- explicit Repository(std::string repo_name = "Repository",
+ explicit Repository(const std::string& repo_name = "Repository",
std::string directory = REPOSITORY_DIRECTORY,
std::chrono::milliseconds maxPartitionMillis =
MAX_REPOSITORY_ENTRY_LIFE_TIME,
int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD)
: core::SerializableComponent(repo_name),
- thread_(),
+ directory_(std::move(directory)),
+ max_partition_millis_(maxPartitionMillis),
+ max_partition_bytes_(maxPartitionBytes),
+ purge_period_(purgePeriod),
+ repo_full_(false),
repo_size_(0),
logger_(logging::LoggerFactory<Repository>::getLogger()) {
- directory_ = directory;
- max_partition_millis_ = maxPartitionMillis;
- max_partition_bytes_ = maxPartitionBytes;
- purge_period_ = purgePeriod;
- running_ = false;
- repo_full_ = false;
}
- // Destructor
- ~Repository() override {
- stop();
- }
+ virtual bool initialize(const std::shared_ptr<Configure>& /*configure*/) = 0;
- virtual bool isNoop() {
- return true;
- }
+ virtual bool start() = 0;
- virtual void flush();
+ virtual bool stop() = 0;
- // initialize
- virtual bool initialize(const std::shared_ptr<Configure>& /*configure*/) {
+ virtual bool isNoop() const {
return true;
}
- // Put
- virtual bool Put(std::string /*key*/, const uint8_t* /*buf*/, size_t
/*bufLen*/) {
+
+ virtual void flush() {
+ }
+
+ virtual bool Put(const std::string& /*key*/, const uint8_t* /*buf*/, size_t
/*bufLen*/) {
return true;
}
@@ -102,8 +93,7 @@ class Repository : public virtual
core::SerializableComponent, public core::Trac
return true;
}
- // Delete
- virtual bool Delete(std::string /*key*/) {
+ virtual bool Delete(const std::string& /*key*/) {
return true;
}
@@ -127,30 +117,10 @@ class Repository : public virtual
core::SerializableComponent, public core::Trac
return false;
}
- // Run function for the thread
- virtual void run() {
- // no op
- }
-
- /**
- * Since SerializableComponents represent a runnable object, we should
return traces
- */
- BackTrace getTraces() override {
- return TraceResolver::getResolver().getBackTrace(getName(),
thread_.native_handle());
- }
-
- // Start the repository monitor thread
- virtual void start();
- // Stop the repository monitor thread
- virtual void stop();
// whether the repo is full
virtual bool isFull() {
return repo_full_;
}
- // whether the repo is enable
- bool isRunning() override {
- return running_;
- }
/**
* Specialization that allows us to serialize max_size objects into store.
@@ -219,14 +189,12 @@ class Repository : public virtual
core::SerializableComponent, public core::Trac
return Put(key, buffer, bufferSize);
}
- uint64_t incrementSize(const char* /*fpath*/, const struct stat *sb, int
/*typeflag*/) {
- return (repo_size_ += sb->st_size);
- }
-
virtual void loadComponent(const std::shared_ptr<core::ContentRepository>&
/*content_repo*/) {
}
- virtual uint64_t getRepoSize();
+ virtual uint64_t getRepoSize() const {
+ return repo_size_;
+ }
std::string getDirectory() const {
return directory_;
@@ -241,30 +209,16 @@ class Repository : public virtual
core::SerializableComponent, public core::Trac
std::map<std::string, core::Connectable*> containers_;
std::map<std::string, core::Connectable*> connection_map_;
- // Mutex for protection
- std::mutex mutex_;
- // repository directory
std::string directory_;
- // max db entry life time
+ // max db entry lifetime
std::chrono::milliseconds max_partition_millis_;
// max db size
int64_t max_partition_bytes_;
- // purge period
std::chrono::milliseconds purge_period_;
- // thread
- std::thread thread_;
- // whether the monitoring thread is running for the repo while it was enabled
- std::atomic<bool> running_;
- // whether stop accepting provenace event
+ // whether to stop accepting provenance event
std::atomic<bool> repo_full_;
- // repoSize
- // size of the directory
std::atomic<uint64_t> repo_size_;
- // Run function for the thread
- void threadExecutor() {
- run();
- }
private:
std::shared_ptr<logging::Logger> logger_;
diff --git a/libminifi/include/core/RepositoryFactory.h
b/libminifi/include/core/RepositoryFactory.h
index 2a56728b2..60ba5775a 100644
--- a/libminifi/include/core/RepositoryFactory.h
+++ b/libminifi/include/core/RepositoryFactory.h
@@ -26,33 +26,24 @@
#include "core/Repository.h"
#include "Core.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-
-namespace core {
+namespace org::apache::nifi::minifi::core {
/**
- * Create a repository represented by the configuration class name
+ * Create a context repository
* @param configuration_class_name configuration class name
* @param fail_safe determines whether or not to make the default class if
configuration_class_name is invalid
* @param repo_name name of the repository
*/
-std::unique_ptr<core::Repository> createRepository(const std::string&
configuration_class_name, bool fail_safe = false, const std::string& repo_name
= "");
+std::unique_ptr<core::ContentRepository> createContentRepository(const
std::string& configuration_class_name, bool fail_safe = false, const
std::string& repo_name = "");
/**
- * Create a context repository
+ * Create a repository represented by the configuration class name
* @param configuration_class_name configuration class name
* @param fail_safe determines whether or not to make the default class if
configuration_class_name is invalid
* @param repo_name name of the repository
*/
-std::unique_ptr<core::ContentRepository> createContentRepository(const
std::string& configuration_class_name, bool fail_safe = false, const
std::string& repo_name = "");
+std::unique_ptr<core::Repository> createRepository(const std::string&
configuration_class_name, const std::string& repo_name = "");
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::core
#endif // LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_
diff --git a/libminifi/include/core/ThreadedRepository.h
b/libminifi/include/core/ThreadedRepository.h
new file mode 100644
index 000000000..e03aed9e5
--- /dev/null
+++ b/libminifi/include/core/ThreadedRepository.h
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <thread>
+
+#include "Repository.h"
+#include "TraceableResource.h"
+
+namespace org::apache::nifi::minifi::core {
+
+class ThreadedRepository : public core::Repository, public
core::TraceableResource {
+ public:
+ using Repository::Repository;
+
+ ~ThreadedRepository() override {
+ if (running_state_.load() != RunningState::Stopped) {
+ logger_->log_error("Thread of %s should have been stopped in subclass
before ThreadedRepository's destruction", name_);
+ }
+ }
+
+ bool initialize(const std::shared_ptr<Configure>& /*configure*/) override {
+ return true;
+ }
+
+ // Starts repository monitor thread
+ bool start() override {
+ // if Stopped, turn to Starting, otherwise return
+ RunningState expected{RunningState::Stopped};
+ if (!running_state_.compare_exchange_strong(expected,
RunningState::Starting)) {
+ return false;
+ }
+ if (purge_period_ <= std::chrono::milliseconds(0)) {
+ running_state_.store(RunningState::Running);
+ return true;
+ }
+ getThread() = std::thread(&ThreadedRepository::run, this);
+ running_state_.store(RunningState::Running);
+
+ logger_->log_debug("%s ThreadedRepository monitor thread start", name_);
+ return true;
+ }
+
+ // Stops repository monitor thread
+ bool stop() override {
+ // if RUNNING, turn to STOPPING, otherwise return
+ RunningState expected{RunningState::Running};
+ if (!running_state_.compare_exchange_strong(expected,
RunningState::Stopping)) {
+ return false;
+ }
+ if (getThread().joinable()) {
+ getThread().join();
+ }
+ running_state_.store(RunningState::Stopped);
+ logger_->log_debug("%s ThreadedRepository monitor thread stop", name_);
+ return true;
+ }
+
+ bool isRunning() override {
+ return running_state_.load() == RunningState::Running;
+ }
+
+ BackTrace getTraces() override {
+ return TraceResolver::getResolver().getBackTrace(getName(),
getThread().native_handle());
+ }
+
+ private:
+ virtual void run() = 0;
+
+ /**
+ * READ BEFORE USING!
+ * @returns repository monitor thread
+ * Thread-owning overriding subclasses MUST also call stop() in their
destructor
+ * to prevent the thread still using their members after they are are
destructed (it's too late in the destructor of this base class)
+ */
+ virtual std::thread& getThread() = 0;
+
+ enum class RunningState : uint8_t {
+ Starting,
+ Running,
+ Stopping,
+ Stopped
+ };
+
+ std::atomic<RunningState> running_state_{RunningState::Stopped};
+ std::shared_ptr<logging::Logger> logger_
{logging::LoggerFactory<ThreadedRepository>::getLogger()};
+};
+
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/repository/FileSystemRepository.h
b/libminifi/include/core/repository/FileSystemRepository.h
index 01e231c2a..54ce36b6c 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -25,49 +25,39 @@
#include "../ContentRepository.h"
#include "properties/Configure.h"
#include "core/logging/LoggerFactory.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace repository {
+
+namespace org::apache::nifi::minifi::core::repository {
/**
* FileSystemRepository is a content repository that stores data onto the
local file system.
*/
class FileSystemRepository : public core::ContentRepository, public
core::CoreComponent {
public:
- FileSystemRepository(std::string name =
getClassName<FileSystemRepository>()) // NOLINT
+ explicit FileSystemRepository(const std::string& name =
getClassName<FileSystemRepository>())
: core::CoreComponent(name),
logger_(logging::LoggerFactory<FileSystemRepository>::getLogger()) {
}
- virtual ~FileSystemRepository() = default;
- virtual bool initialize(const std::shared_ptr<minifi::Configure>
&configuration);
+ virtual ~FileSystemRepository() = default;
- virtual void stop();
+ virtual bool initialize(const std::shared_ptr<minifi::Configure>&
configuration);
- bool exists(const minifi::ResourceClaim &streamId);
+ bool exists(const minifi::ResourceClaim& streamId);
- virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim
&claim, bool append = false);
+ virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim&
claim, bool append = false);
- virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim
&claim);
+ virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim&
claim);
- virtual bool close(const minifi::ResourceClaim &claim) {
+ virtual bool close(const minifi::ResourceClaim& claim) {
return remove(claim);
}
- virtual bool remove(const minifi::ResourceClaim &claim);
+ virtual bool remove(const minifi::ResourceClaim& claim);
private:
std::shared_ptr<logging::Logger> logger_;
};
-} // namespace repository
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::core::repository
#endif // LIBMINIFI_INCLUDE_CORE_REPOSITORY_FILESYSTEMREPOSITORY_H_
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h
b/libminifi/include/core/repository/VolatileContentRepository.h
index 52122ebad..2759af81c 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILECONTENTREPOSITORY_H_
#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILECONTENTREPOSITORY_H_
@@ -32,23 +32,13 @@
#include "core/logging/LoggerFactory.h"
#include "utils/GeneralUtils.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace repository {
+namespace org::apache::nifi::minifi::core::repository {
/**
- * Purpose: Stages content into a volatile area of memory. Note that when
the maximum number
+ * Purpose: Stages content into a volatile area of memory. Note that when the
maximum number
* of entries is consumed we will rollback a session to wait for others to be
freed.
*/
-class VolatileContentRepository :
- public core::ContentRepository,
- public core::repository::VolatileRepository<ResourceClaim::Path>,
- public utils::EnableSharedFromThis<VolatileContentRepository> {
- using utils::EnableSharedFromThis<VolatileContentRepository>::sharedFromThis;
-
+class VolatileContentRepository : public core::ContentRepository, public
core::repository::VolatileRepository<ResourceClaim::Path> {
public:
static const char *minimal_locking;
@@ -59,7 +49,8 @@ class VolatileContentRepository :
logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()) {
max_count_ = 15000;
}
- virtual ~VolatileContentRepository() {
+
+ ~VolatileContentRepository() override {
logger_->log_debug("Clearing repository");
if (!minimize_locking_) {
std::lock_guard<std::mutex> lock(map_mutex_);
@@ -70,38 +61,41 @@ class VolatileContentRepository :
}
}
+ bool start() override {
+ return true;
+ }
+
+ bool stop() override {
+ return true;
+ }
+
/**
* Initialize the volatile content repo
* @param configure configuration
*/
- virtual bool initialize(const std::shared_ptr<Configure> &configure);
-
- /**
- * Stop any thread associated with the volatile content repository.
- */
- virtual void stop();
+ bool initialize(const std::shared_ptr<Configure> &configure) override;
/**
* Creates writable stream.
* @param claim resource claim
* @return BaseStream shared pointer that represents the stream the consumer
will write to.
*/
- virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim
&claim, bool append);
+ std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim,
bool append) override;
/**
* Creates readable stream.
* @param claim resource claim
* @return BaseStream shared pointer that represents the stream from which
the consumer will read..
*/
- virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim
&claim);
+ std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim)
override;
- virtual bool exists(const minifi::ResourceClaim &claim);
+ bool exists(const minifi::ResourceClaim &claim) override;
/**
* Closes the claim.
* @return whether or not the claim is associated with content stored in
volatile memory.
*/
- virtual bool close(const minifi::ResourceClaim &claim) {
+ bool close(const minifi::ResourceClaim &claim) override {
return remove(claim);
}
@@ -109,12 +103,7 @@ class VolatileContentRepository :
* Closes the claim.
* @return whether or not the claim is associated with content stored in
volatile memory.
*/
- virtual bool remove(const minifi::ResourceClaim &claim);
-
- protected:
- virtual void start();
-
- virtual void run();
+ bool remove(const minifi::ResourceClaim &claim) override;
private:
bool minimize_locking_;
@@ -129,11 +118,6 @@ class VolatileContentRepository :
std::shared_ptr<logging::Logger> logger_;
};
-} // namespace repository
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::core::repository
#endif // LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILECONTENTREPOSITORY_H_
diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h
b/libminifi/include/core/repository/VolatileFlowFileRepository.h
index ddf93810b..79fbf7671 100644
--- a/libminifi/include/core/repository/VolatileFlowFileRepository.h
+++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h
@@ -23,6 +23,7 @@
#include "VolatileRepository.h"
#include "FlowFileRecord.h"
+#include "core/ThreadedRepository.h"
#include "utils/gsl.h"
namespace org {
@@ -36,12 +37,10 @@ namespace repository {
* Volatile flow file repository. keeps a running counter of the current
location, freeing
* those which we no longer hold.
*/
-class VolatileFlowFileRepository : public VolatileRepository<std::string>,
public utils::EnableSharedFromThis<VolatileFlowFileRepository> {
- using
utils::EnableSharedFromThis<VolatileFlowFileRepository>::sharedFromThis;
-
+class VolatileFlowFileRepository : public VolatileRepository<std::string,
core::ThreadedRepository> {
public:
- explicit VolatileFlowFileRepository(std::string repo_name = "",
- std::string /*dir*/ =
REPOSITORY_DIRECTORY,
+ explicit VolatileFlowFileRepository(const std::string& repo_name = "",
+ const std::string& /*dir*/ =
REPOSITORY_DIRECTORY,
std::chrono::milliseconds
maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME,
int64_t maxPartitionBytes =
MAX_REPOSITORY_STORAGE_SIZE,
std::chrono::milliseconds purgePeriod =
REPOSITORY_PURGE_PERIOD)
@@ -51,15 +50,23 @@ class VolatileFlowFileRepository : public
VolatileRepository<std::string>, publi
content_repo_ = nullptr;
}
+ ~VolatileFlowFileRepository() override {
+ stop();
+ }
+
+ private:
void run() override {
- repo_full_ = false;
- while (running_) {
+ while (isRunning()) {
std::this_thread::sleep_for(purge_period_);
flush();
}
flush();
}
+ std::thread& getThread() override {
+ return thread_;
+ }
+
void flush() override {
if (purge_required_ && nullptr != content_repo_) {
std::lock_guard<std::mutex> lock(purge_mutex_);
@@ -80,7 +87,6 @@ class VolatileFlowFileRepository : public
VolatileRepository<std::string>, publi
content_repo_ = content_repo;
}
- protected:
void emplace(RepoValue<std::string> &old_value) override {
std::string buffer;
old_value.emplace(buffer);
@@ -89,6 +95,7 @@ class VolatileFlowFileRepository : public
VolatileRepository<std::string>, publi
}
std::shared_ptr<core::ContentRepository> content_repo_;
+ std::thread thread_;
};
} // namespace repository
} // namespace core
diff --git a/libminifi/include/core/repository/VolatileProvenanceRepository.h
b/libminifi/include/core/repository/VolatileProvenanceRepository.h
index 14ab1c035..a8f03f198 100644
--- a/libminifi/include/core/repository/VolatileProvenanceRepository.h
+++ b/libminifi/include/core/repository/VolatileProvenanceRepository.h
@@ -21,6 +21,7 @@
#include <string>
#include "VolatileRepository.h"
+#include "core/ThreadedRepository.h"
namespace org {
namespace apache {
@@ -29,10 +30,7 @@ namespace minifi {
namespace core {
namespace repository {
-/**
- * Volatile provenance repository.
- */
-class VolatileProvenanceRepository : public VolatileRepository<std::string> {
+class VolatileProvenanceRepository : public VolatileRepository<std::string,
core::ThreadedRepository> {
public:
explicit VolatileProvenanceRepository(std::string repo_name = "",
std::string /*dir*/ =
REPOSITORY_DIRECTORY,
@@ -43,14 +41,23 @@ class VolatileProvenanceRepository : public
VolatileRepository<std::string> {
purge_required_ = false;
}
- virtual void run() {
- repo_full_ = false;
+ ~VolatileProvenanceRepository() override {
+ stop();
+ }
+
+ private:
+ void run() override {
+ }
+
+ std::thread& getThread() override {
+ return thread_;
}
- protected:
- virtual void emplace(RepoValue<std::string> &old_value) {
+
+ void emplace(RepoValue<std::string> &old_value) override {
purge_list_.push_back(old_value.getKey());
}
- private:
+
+ std::thread thread_;
};
} // namespace repository
diff --git a/libminifi/include/core/repository/VolatileRepository.h
b/libminifi/include/core/repository/VolatileRepository.h
index 67f48bc64..d3771e612 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -39,23 +39,16 @@ namespace nifi {
namespace minifi {
namespace core {
namespace repository {
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Woverloaded-virtual"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Woverloaded-virtual"
-#endif
+
/**
* Flow File repository
* Design: Extends Repository and implements the run function, using RocksDB
as the primary substrate.
*/
-template<typename T>
-class VolatileRepository : public core::Repository {
+template<typename KeyType, typename RepositoryType = core::Repository>
+class VolatileRepository : public RepositoryType {
public:
static const char *volatile_repo_max_count;
static const char *volatile_repo_max_bytes;
- // Constructor
explicit VolatileRepository(std::string repo_name = "",
std::string /*dir*/ = REPOSITORY_DIRECTORY,
@@ -63,7 +56,7 @@ class VolatileRepository : public core::Repository {
int64_t maxPartitionBytes =
MAX_REPOSITORY_STORAGE_SIZE,
std::chrono::milliseconds purgePeriod =
REPOSITORY_PURGE_PERIOD)
: core::SerializableComponent(repo_name),
- Repository(repo_name.length() > 0 ? repo_name :
core::getClassName<VolatileRepository>(), "", maxPartitionMillis,
maxPartitionBytes, purgePeriod),
+ RepositoryType(repo_name.length() > 0 ? repo_name :
core::getClassName<VolatileRepository>(), "", maxPartitionMillis,
maxPartitionBytes, purgePeriod),
current_size_(0),
current_index_(0),
max_count_(10000),
@@ -75,14 +68,11 @@ class VolatileRepository : public core::Repository {
~VolatileRepository() override;
/**
- * Initialize thevolatile repsitory
+ * Initialize the volatile repository
**/
-
bool initialize(const std::shared_ptr<Configure> &configure) override;
- void run() override = 0;
-
- bool isNoop() override {
+ bool isNoop() const override {
return false;
}
@@ -91,26 +81,26 @@ class VolatileRepository : public core::Repository {
* @param key key to add to the repository
* @param buf buffer
**/
- bool Put(T key, const uint8_t *buf, size_t bufLen) override;
+ bool Put(const KeyType& key, const uint8_t *buf, size_t bufLen) override;
/**
* Places new objects into the volatile memory area
* @param data the key-value pairs to add to the repository
**/
- bool MultiPut(const std::vector<std::pair<T,
std::unique_ptr<io::BufferStream>>>& data) override;
+ bool MultiPut(const std::vector<std::pair<KeyType,
std::unique_ptr<io::BufferStream>>>& data) override;
/**
* Deletes the key
* @return status of the delete operation
*/
- bool Delete(T key) override;
+ bool Delete(const KeyType& key) override;
/**
* Sets the value from the provided key. Once the item is retrieved
* it may not be retrieved again.
* @return status of the get operation.
*/
- bool Get(const T &key, std::string &value) override;
+ bool Get(const KeyType& key, std::string &value) override;
/**
* Deserializes objects into store
* @param store vector in which we will store newly created objects.
@@ -131,37 +121,23 @@ class VolatileRepository : public core::Repository {
*/
void loadComponent(const std::shared_ptr<core::ContentRepository>
&content_repo) override;
- void start() override;
-
- uint64_t getRepoSize() override {
+ uint64_t getRepoSize() const override {
return current_size_;
}
protected:
- virtual void emplace(RepoValue<T> &old_value) {
+ virtual void emplace(RepoValue<KeyType> &old_value) {
std::lock_guard<std::mutex> lock(purge_mutex_);
purge_list_.push_back(old_value.getKey());
}
- /**
- * Tests whether or not the current size exceeds the capacity
- * if the new prospectiveSize is inserted.
- * @param prospectiveSize size of item to be added.
- */
- inline bool exceedsCapacity(uint32_t prospectiveSize) {
- if (current_size_ + prospectiveSize > max_size_)
- return true;
- else
- return false;
- }
-
// current size of the volatile repo.
std::atomic<size_t> current_size_;
// current index.
std::atomic<uint16_t> current_index_;
// value vector that exists for non blocking iteration over
// objects that store data for this repo instance.
- std::vector<AtomicEntry<T>*> value_vector_;
+ std::vector<AtomicEntry<KeyType>*> value_vector_;
// max count we are allowed to store.
uint32_t max_count_;
@@ -172,42 +148,40 @@ class VolatileRepository : public core::Repository {
std::mutex purge_mutex_;
// purge list
- std::vector<T> purge_list_;
+ std::vector<KeyType> purge_list_;
private:
std::shared_ptr<logging::Logger> logger_;
};
-template<typename T>
-const char *VolatileRepository<T>::volatile_repo_max_count = "max.count";
-template<typename T>
-const char *VolatileRepository<T>::volatile_repo_max_bytes = "max.bytes";
+template<typename KeyType, typename RepositoryType>
+const char *VolatileRepository<KeyType,
RepositoryType>::volatile_repo_max_count = "max.count";
+template<typename KeyType, typename RepositoryType>
+const char *VolatileRepository<KeyType,
RepositoryType>::volatile_repo_max_bytes = "max.bytes";
-template<typename T>
-void VolatileRepository<T>::loadComponent(const
std::shared_ptr<core::ContentRepository>& /*content_repo*/) {
+template<typename KeyType, typename RepositoryType>
+void VolatileRepository<KeyType, RepositoryType>::loadComponent(const
std::shared_ptr<core::ContentRepository>& /*content_repo*/) {
}
// Destructor
-template<typename T>
-VolatileRepository<T>::~VolatileRepository() {
+template<typename KeyType, typename RepositoryType>
+VolatileRepository<KeyType, RepositoryType>::~VolatileRepository() {
for (auto ent : value_vector_) {
delete ent;
}
-
- stop();
}
/**
- * Initialize the volatile repsitory
+ * Initialize the volatile repository
**/
-template<typename T>
-bool VolatileRepository<T>::initialize(const std::shared_ptr<Configure>
&configure) {
+template<typename KeyType, typename RepositoryType>
+bool VolatileRepository<KeyType, RepositoryType>::initialize(const
std::shared_ptr<Configure> &configure) {
std::string value = "";
if (configure != nullptr) {
int64_t max_cnt = 0;
std::stringstream strstream;
- strstream << Configure::nifi_volatile_repository_options << getName() <<
"." << volatile_repo_max_count;
+ strstream << Configure::nifi_volatile_repository_options <<
RepositoryType::getName() << "." << volatile_repo_max_count;
if (configure->get(strstream.str(), value)) {
if (core::Property::StringToInt(value, max_cnt)) {
max_count_ = gsl::narrow<uint32_t>(max_cnt);
@@ -217,7 +191,7 @@ bool VolatileRepository<T>::initialize(const
std::shared_ptr<Configure> &configu
strstream.str("");
strstream.clear();
int64_t max_bytes = 0;
- strstream << Configure::nifi_volatile_repository_options << getName() <<
"." << volatile_repo_max_bytes;
+ strstream << Configure::nifi_volatile_repository_options <<
RepositoryType::getName() << "." << volatile_repo_max_bytes;
if (configure->get(strstream.str(), value)) {
if (core::Property::StringToInt(value, max_bytes)) {
if (max_bytes <= 0) {
@@ -229,11 +203,11 @@ bool VolatileRepository<T>::initialize(const
std::shared_ptr<Configure> &configu
}
}
- logging::LOG_INFO(logger_) << "Resizing value_vector_ for " << getName() <<
" count is " << max_count_;
- logging::LOG_INFO(logger_) << "Using a maximum size for " << getName() << "
of " << max_size_;
+ logging::LOG_INFO(logger_) << "Resizing value_vector_ for " <<
RepositoryType::getName() << " count is " << max_count_;
+ logging::LOG_INFO(logger_) << "Using a maximum size for " <<
RepositoryType::getName() << " of " << max_size_;
value_vector_.reserve(max_count_);
for (uint32_t i = 0; i < max_count_; i++) {
- value_vector_.emplace_back(new AtomicEntry<T>(¤t_size_, &max_size_));
+ value_vector_.emplace_back(new AtomicEntry<KeyType>(¤t_size_,
&max_size_));
}
return true;
}
@@ -243,14 +217,14 @@ bool VolatileRepository<T>::initialize(const
std::shared_ptr<Configure> &configu
* @param key key to add to the repository
* @param buf buffer
**/
-template<typename T>
-bool VolatileRepository<T>::Put(T key, const uint8_t *buf, size_t bufLen) {
- RepoValue<T> new_value(key, buf, bufLen);
+template<typename KeyType, typename RepositoryType>
+bool VolatileRepository<KeyType, RepositoryType>::Put(const KeyType& key,
const uint8_t *buf, size_t bufLen) {
+ RepoValue<KeyType> new_value(key, buf, bufLen);
const size_t size = new_value.size();
bool updated = false;
size_t reclaimed_size = 0;
- RepoValue<T> old_value;
+ RepoValue<KeyType> old_value;
do {
uint16_t private_index = current_index_.fetch_add(1);
// round robin through the beginning
@@ -266,7 +240,6 @@ bool VolatileRepository<T>::Put(T key, const uint8_t *buf,
size_t bufLen) {
updated = value_vector_.at(private_index)->setRepoValue(new_value,
old_value, reclaimed_size);
logger_->log_debug("Set repo value at %u out of %u updated %u current_size
%u, adding %u to %u", private_index, max_count_, updated == true,
reclaimed_size, size, current_size_.load());
if (updated && reclaimed_size > 0) {
- std::lock_guard<std::mutex> lock(mutex_);
emplace(old_value);
}
if (reclaimed_size > 0) {
@@ -287,8 +260,8 @@ bool VolatileRepository<T>::Put(T key, const uint8_t *buf,
size_t bufLen) {
return true;
}
-template<typename T>
-bool VolatileRepository<T>::MultiPut(const std::vector<std::pair<T,
std::unique_ptr<io::BufferStream>>>& data) {
+template<typename KeyType, typename RepositoryType>
+bool VolatileRepository<KeyType, RepositoryType>::MultiPut(const
std::vector<std::pair<KeyType, std::unique_ptr<io::BufferStream>>>& data) {
for (const auto& item : data) {
if (!Put(item.first, item.second->getBuffer().template as_span<const
uint8_t>().data(), item.second->size())) {
return false;
@@ -301,12 +274,12 @@ bool VolatileRepository<T>::MultiPut(const
std::vector<std::pair<T, std::unique_
* Deletes the key
* @return status of the delete operation
*/
-template<typename T>
-bool VolatileRepository<T>::Delete(T key) {
+template<typename KeyType, typename RepositoryType>
+bool VolatileRepository<KeyType, RepositoryType>::Delete(const KeyType& key) {
logger_->log_debug("Delete from volatile");
for (auto ent : value_vector_) {
// let the destructor do the cleanup
- RepoValue<T> value;
+ RepoValue<KeyType> value;
if (ent->getValue(key, value)) {
current_size_ -= value.size();
logger_->log_debug("Delete and pushed into purge_list from volatile");
@@ -321,11 +294,11 @@ bool VolatileRepository<T>::Delete(T key) {
* it may not be retrieved again.
* @return status of the get operation.
*/
-template<typename T>
-bool VolatileRepository<T>::Get(const T &key, std::string &value) {
+template<typename KeyType, typename RepositoryType>
+bool VolatileRepository<KeyType, RepositoryType>::Get(const KeyType &key,
std::string &value) {
for (auto ent : value_vector_) {
// let the destructor do the cleanup
- RepoValue<T> repo_value;
+ RepoValue<KeyType> repo_value;
if (ent->getValue(key, repo_value)) {
current_size_ -= value.size();
repo_value.emplace(value);
@@ -335,13 +308,14 @@ bool VolatileRepository<T>::Get(const T &key, std::string
&value) {
return false;
}
-template<typename T>
-bool
VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
&store, size_t &max_size,
std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
+template<typename KeyType, typename RepositoryType>
+bool VolatileRepository<KeyType,
RepositoryType>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
&store,
+ size_t &max_size,
std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
size_t requested_batch = max_size;
max_size = 0;
for (auto ent : value_vector_) {
// let the destructor do the cleanup
- RepoValue<T> repo_value;
+ RepoValue<KeyType> repo_value;
if (ent->getValue(repo_value)) {
std::shared_ptr<core::SerializableComponent> newComponent = lambda();
@@ -364,13 +338,13 @@ bool
VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::Serial
}
}
-template<typename T>
-bool
VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
&store, size_t &max_size) {
+template<typename KeyType, typename RepositoryType>
+bool VolatileRepository<KeyType,
RepositoryType>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>
&store, size_t &max_size) {
logger_->log_debug("VolatileRepository -- DeSerialize %u",
current_size_.load());
max_size = 0;
for (auto ent : value_vector_) {
// let the destructor do the cleanup
- RepoValue<T> repo_value;
+ RepoValue<KeyType> repo_value;
if (ent->getValue(repo_value)) {
// we've taken ownership of this repo value
@@ -388,22 +362,6 @@ bool
VolatileRepository<T>::DeSerialize(std::vector<std::shared_ptr<core::Serial
}
}
-template<typename T>
-void VolatileRepository<T>::start() {
- if (this->purge_period_ <= std::chrono::milliseconds(0))
- return;
- if (running_)
- return;
- running_ = true;
- thread_ = std::thread(&VolatileRepository<T>::run, this);
- logger_->log_debug("%s Repository Monitor Thread Start", name_);
-}
-#if defined(__clang__)
-#pragma clang diagnostic pop
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic pop
-#endif
-
} // namespace repository
} // namespace core
} // namespace minifi
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index e19acb5d0..02ef0914b 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -43,6 +43,7 @@
#include "io/NetworkPrioritizer.h"
#include "io/FileStream.h"
#include "core/ClassLoader.h"
+#include "core/ThreadedRepository.h"
namespace org::apache::nifi::minifi {
@@ -513,10 +514,14 @@ uint64_t FlowController::getUptime() {
std::vector<BackTrace> FlowController::getTraces() {
std::vector<BackTrace> traces{thread_pool_.getTraces()};
- auto prov_repo_trace = provenance_repo_->getTraces();
- traces.emplace_back(std::move(prov_repo_trace));
- auto flow_repo_trace = flow_file_repo_->getTraces();
- traces.emplace_back(std::move(flow_repo_trace));
+ if (auto provenance_repo =
std::dynamic_pointer_cast<core::ThreadedRepository>(provenance_repo_)) {
+ auto prov_repo_trace = provenance_repo->getTraces();
+ traces.emplace_back(std::move(prov_repo_trace));
+ }
+ if (auto flow_file_repo =
std::dynamic_pointer_cast<core::ThreadedRepository>(flow_file_repo_)) {
+ auto flow_repo_trace = flow_file_repo->getTraces();
+ traces.emplace_back(std::move(flow_repo_trace));
+ }
auto my_traces = TraceResolver::getResolver().getBackTrace("main");
traces.emplace_back(std::move(my_traces));
return traces;
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index 139661909..667a429ed 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -30,6 +30,7 @@
#include "core/controller/ControllerServiceProvider.h"
#include "c2/C2Agent.h"
#include "core/state/nodes/FlowInformation.h"
+#include "properties/Configuration.h"
#include "utils/file/FileSystem.h"
#include "utils/file/FileUtils.h"
#include "utils/gsl.h"
diff --git a/libminifi/src/core/Repository.cpp
b/libminifi/src/core/Repository.cpp
deleted file mode 100644
index a6703092c..000000000
--- a/libminifi/src/core/Repository.cpp
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "core/Repository.h"
-#include <cstdint>
-
-#include "io/BufferStream.h"
-#include "core/logging/Logger.h"
-#include "provenance/Provenance.h"
-
-using namespace std::literals::chrono_literals;
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-
-void Repository::start() {
- if (this->purge_period_ <= 0ms)
- return;
- if (running_)
- return;
- running_ = true;
- thread_ = std::thread(&Repository::threadExecutor, this);
- logger_->log_debug("%s Repository Monitor Thread Start", name_);
-}
-
-void Repository::stop() {
- if (!running_)
- return;
- running_ = false;
- if (thread_.joinable())
- thread_.join();
- logger_->log_debug("%s Repository Monitor Thread Stop", name_);
-}
-
-// repoSize
-uint64_t Repository::getRepoSize() {
- return repo_size_;
-}
-
-void Repository::flush() {
-}
-
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/libminifi/src/core/RepositoryFactory.cpp
b/libminifi/src/core/RepositoryFactory.cpp
index 47ff0bb0b..6ee1e6af1 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -17,7 +17,6 @@
#include "core/RepositoryFactory.h"
#include <memory>
#include <string>
-#include <utility>
#include <algorithm>
#include "core/ContentRepository.h"
#include "core/repository/VolatileContentRepository.h"
@@ -29,75 +28,81 @@
using namespace std::literals::chrono_literals;
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
-std::unique_ptr<core::Repository> createRepository(const std::string&
configuration_class_name, bool fail_safe, const std::string& repo_name) {
+std::unique_ptr<core::ContentRepository>
+createContentRepository(const std::string& configuration_class_name, bool
fail_safe, const std::string& repo_name) {
std::string class_name_lc = configuration_class_name;
std::transform(class_name_lc.begin(), class_name_lc.end(),
class_name_lc.begin(), ::tolower);
try {
- auto return_obj =
core::ClassLoader::getDefaultClassLoader().instantiate<core::Repository>(class_name_lc,
class_name_lc);
+ auto return_obj =
core::ClassLoader::getDefaultClassLoader().instantiate<core::ContentRepository>(class_name_lc,
+
class_name_lc);
if (return_obj) {
- return_obj->setName(repo_name);
return return_obj;
}
- // if the desired repos don't exist, we can try doing string matches and
reoly on volatile repositories
- if (class_name_lc == "flowfilerepository" || class_name_lc ==
"volatileflowfilerepository") {
- return_obj =
instantiate<repository::VolatileFlowFileRepository>(repo_name);
- } else if (class_name_lc == "provenancerepository" || class_name_lc ==
"volatileprovenancefilerepository") {
- return_obj =
instantiate<repository::VolatileProvenanceRepository>(repo_name);
- } else if (class_name_lc == "nooprepository") {
- return_obj = instantiate<core::Repository>(repo_name);
- }
- if (return_obj) {
- return return_obj;
+ if (class_name_lc == "volatilecontentrepository") {
+ return
std::make_unique<core::repository::VolatileContentRepository>(repo_name);
+ } else if (class_name_lc == "filesystemrepository") {
+ return
std::make_unique<core::repository::FileSystemRepository>(repo_name);
}
if (fail_safe) {
- return std::make_unique<core::Repository>("fail_safe", "fail_safe", 1ms,
1, 1ms);
+ return
std::make_unique<core::repository::VolatileContentRepository>("fail_safe");
} else {
throw std::runtime_error("Support for the provided configuration class
could not be found");
}
- } catch (const std::runtime_error &) {
+ } catch (const std::runtime_error&) {
if (fail_safe) {
- return std::make_unique<core::Repository>("fail_safe", "fail_safe", 1ms,
1, 1ms);
+ return
std::make_unique<core::repository::VolatileContentRepository>("fail_safe");
}
}
throw std::runtime_error("Support for the provided configuration class could
not be found");
}
-std::unique_ptr<core::ContentRepository> createContentRepository(const
std::string& configuration_class_name, bool fail_safe, const std::string&
repo_name) {
+class NoOpThreadedRepository : public core::ThreadedRepository {
+ public:
+ explicit NoOpThreadedRepository(const std::string& repo_name)
+ : core::SerializableComponent(repo_name),
+ ThreadedRepository(repo_name) {
+ }
+
+ ~NoOpThreadedRepository() override {
+ stop();
+ }
+
+ private:
+ void run() override {
+ }
+
+ std::thread& getThread() override {
+ return thread_;
+ }
+
+ std::thread thread_;
+};
+
+std::unique_ptr<core::Repository> createRepository(const std::string&
configuration_class_name, const std::string& repo_name) {
std::string class_name_lc = configuration_class_name;
std::transform(class_name_lc.begin(), class_name_lc.end(),
class_name_lc.begin(), ::tolower);
try {
- auto return_obj =
core::ClassLoader::getDefaultClassLoader().instantiate<core::ContentRepository>(class_name_lc,
class_name_lc);
+ auto return_obj =
core::ClassLoader::getDefaultClassLoader().instantiate<core::ThreadedRepository>(class_name_lc,
+
class_name_lc);
if (return_obj) {
+ return_obj->setName(repo_name);
return return_obj;
}
- if (class_name_lc == "volatilecontentrepository") {
- return
std::make_unique<core::repository::VolatileContentRepository>(repo_name);
- } else if (class_name_lc == "filesystemrepository") {
- return
std::make_unique<core::repository::FileSystemRepository>(repo_name);
- }
- if (fail_safe) {
- return
std::make_unique<core::repository::VolatileContentRepository>("fail_safe");
- } else {
- throw std::runtime_error("Support for the provided configuration class
could not be found");
- }
- } catch (const std::runtime_error &) {
- if (fail_safe) {
- return
std::make_unique<core::repository::VolatileContentRepository>("fail_safe");
+ // if the desired repos don't exist, we can try doing string matches and
rely on volatile repositories
+ if (class_name_lc == "flowfilerepository" || class_name_lc ==
"volatileflowfilerepository") {
+ return instantiate<repository::VolatileFlowFileRepository>(repo_name);
+ } else if (class_name_lc == "provenancerepository" || class_name_lc ==
"volatileprovenancefilerepository") {
+ return instantiate<repository::VolatileProvenanceRepository>(repo_name);
+ } else if (class_name_lc == "nooprepository") {
+ return std::make_unique<core::NoOpThreadedRepository>(repo_name);
}
+ return {};
+ } catch (const std::runtime_error&) {
+ throw;
}
-
- throw std::runtime_error("Support for the provided configuration class could
not be found");
}
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp
b/libminifi/src/core/repository/FileSystemRepository.cpp
index 0b233cd8a..85f42bfb4 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -22,14 +22,9 @@
#include "io/FileStream.h"
#include "utils/file/FileUtils.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace repository {
+namespace org::apache::nifi::minifi::core::repository {
-bool FileSystemRepository::initialize(const std::shared_ptr<minifi::Configure>
&configuration) {
+bool FileSystemRepository::initialize(const
std::shared_ptr<minifi::Configure>& configuration) {
std::string value;
if
(configuration->get(Configure::nifi_dbcontent_repository_directory_default,
value)) {
directory_ = value;
@@ -39,31 +34,24 @@ bool FileSystemRepository::initialize(const
std::shared_ptr<minifi::Configure> &
utils::file::create_dir(directory_);
return true;
}
-void FileSystemRepository::stop() {
-}
-std::shared_ptr<io::BaseStream> FileSystemRepository::write(const
minifi::ResourceClaim &claim, bool append) {
+std::shared_ptr<io::BaseStream> FileSystemRepository::write(const
minifi::ResourceClaim& claim, bool append) {
return std::make_shared<io::FileStream>(claim.getContentFullPath(), append);
}
-bool FileSystemRepository::exists(const minifi::ResourceClaim &streamId) {
+bool FileSystemRepository::exists(const minifi::ResourceClaim& streamId) {
std::ifstream file(streamId.getContentFullPath());
return file.good();
}
-std::shared_ptr<io::BaseStream> FileSystemRepository::read(const
minifi::ResourceClaim &claim) {
+std::shared_ptr<io::BaseStream> FileSystemRepository::read(const
minifi::ResourceClaim& claim) {
return std::make_shared<io::FileStream>(claim.getContentFullPath(), 0,
false);
}
-bool FileSystemRepository::remove(const minifi::ResourceClaim &claim) {
+bool FileSystemRepository::remove(const minifi::ResourceClaim& claim) {
logger_->log_debug("Deleting resource %s", claim.getContentFullPath());
std::remove(claim.getContentFullPath().c_str());
return true;
}
-} /* namespace repository */
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp
b/libminifi/src/core/repository/VolatileContentRepository.cpp
index a6fd0bb39..db642e973 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -50,29 +50,10 @@ bool VolatileContentRepository::initialize(const
std::shared_ptr<Configure> &con
}
value_vector_.clear();
}
- start();
return true;
}
-void VolatileContentRepository::stop() {
- running_ = false;
-}
-
-void VolatileContentRepository::run() {
-}
-
-void VolatileContentRepository::start() {
- if (this->purge_period_ <= 0ms)
- return;
- if (running_)
- return;
- thread_ = std::thread(&VolatileContentRepository::run, sharedFromThis());
- thread_.detach();
- running_ = true;
- logger_->log_info("%s Repository Monitor Thread Start", getName());
-}
-
std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const
minifi::ResourceClaim &claim, bool /*append*/) {
logger_->log_info("enter write for %s", claim.getContentFullPath());
{
diff --git a/libminifi/test/flow-tests/SessionTests.cpp
b/libminifi/test/flow-tests/SessionTests.cpp
index ed317d4c2..36a15a08d 100644
--- a/libminifi/test/flow-tests/SessionTests.cpp
+++ b/libminifi/test/flow-tests/SessionTests.cpp
@@ -30,6 +30,7 @@
#include "core/ProcessSession.h"
#include "core/ProcessorNode.h"
#include "core/Processor.h"
+#include "core/RepositoryFactory.h"
#include "repository/VolatileContentRepository.h"
namespace {
@@ -68,7 +69,7 @@ TEST_CASE("Import null data") {
config->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "content_repository"));
config->set(minifi::Configure::nifi_flowfile_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
- auto prov_repo = std::make_shared<core::Repository>();
+ std::shared_ptr<core::Repository> prov_repo =
core::createRepository("nooprepository");
std::shared_ptr<core::Repository> ff_repository =
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository",
SESSIONTEST_FLOWFILE_CHECKPOINT_DIR);
std::shared_ptr<core::ContentRepository> content_repo;
SECTION("VolatileContentRepository") {
diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h
b/libminifi/test/flow-tests/TestControllerWithFlow.h
index b834b8374..b311b15c3 100644
--- a/libminifi/test/flow-tests/TestControllerWithFlow.h
+++ b/libminifi/test/flow-tests/TestControllerWithFlow.h
@@ -55,7 +55,7 @@ class TestControllerWithFlow: public TestController{
}
void setupFlow() {
- std::shared_ptr<core::Repository> prov_repo =
std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> prov_repo =
std::make_shared<TestThreadedRepository>();
std::shared_ptr<core::Repository> ff_repo =
std::make_shared<TestFlowRepository>();
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
diff --git a/libminifi/test/integration/IntegrationBase.h
b/libminifi/test/integration/IntegrationBase.h
index 3c58d67cc..58c4a790a 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -140,7 +140,7 @@ void IntegrationBase::run(const std::optional<std::string>&
test_file_location,
using namespace std::literals::chrono_literals;
testSetup();
- std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestThreadedRepository>();
std::shared_ptr<core::Repository> test_flow_repo =
std::make_shared<TestFlowRepository>();
if (test_file_location) {
@@ -187,8 +187,6 @@ void IntegrationBase::run(const std::optional<std::string>&
test_file_location,
std::shared_ptr<core::ProcessGroup> pg(flow_config->getRoot());
queryRootProcessGroup(pg);
- std::shared_ptr<TestRepository> repo =
std::static_pointer_cast<TestRepository>(test_repo);
-
const auto request_restart = [&, this] {
++restart_requested_count_;
running = true;
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp
b/libminifi/test/integration/ProvenanceReportingTest.cpp
index 80aad7d0e..902d81671 100644
--- a/libminifi/test/integration/ProvenanceReportingTest.cpp
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -50,15 +50,14 @@ int main(int argc, char **argv) {
LogTestController::getInstance().setDebug<core::ProcessGroup>();
std::shared_ptr<minifi::Configure> configuration =
std::make_shared<minifi::Configure>();
- std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestRepository>();
- std::shared_ptr<core::Repository> test_flow_repo =
std::make_shared<TestFlowRepository>();
+ auto test_repo = std::make_shared<TestThreadedRepository>();
+ auto test_flow_repo = std::make_shared<TestFlowRepository>();
configuration->set(minifi::Configure::nifi_flow_configuration_file,
test_file_location);
std::shared_ptr<minifi::io::StreamFactory> stream_factory =
minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
std::unique_ptr<core::FlowConfiguration> yaml_ptr =
std::make_unique<core::YamlConfiguration>(
test_repo, test_repo, content_repo, stream_factory, configuration,
test_file_location);
- std::shared_ptr<TestRepository> repo =
std::static_pointer_cast<TestRepository>(test_repo);
const auto controller = std::make_shared<minifi::FlowController>(
test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
content_repo, DEFAULT_ROOT_GROUP_NAME,
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp
b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 5fe5cf24a..a0369062a 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -179,8 +179,8 @@ TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") {
config->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "content_repository"));
config->set(minifi::Configure::nifi_flowfile_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
- std::shared_ptr<core::Repository> prov_repo =
std::make_shared<TestRepository>();
- std::shared_ptr<core::repository::FlowFileRepository> ff_repository =
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository",
PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR);
+ std::shared_ptr<core::Repository> prov_repo =
std::make_shared<TestThreadedRepository>();
+ auto ff_repository =
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository",
PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::FileSystemRepository>();
ff_repository->initialize(config);
content_repo->initialize(config);
@@ -288,7 +288,7 @@ TEST_CASE("Persisted flowFiles are updated on
modification", "[TestP1]") {
config->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "content_repository"));
config->set(minifi::Configure::nifi_flowfile_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
- std::shared_ptr<core::Repository> prov_repo =
std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> prov_repo =
std::make_shared<TestThreadedRepository>();
std::shared_ptr<core::Repository> ff_repository =
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository",
PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR);
std::shared_ptr<core::ContentRepository> content_repo;
SECTION("VolatileContentRepository") {
diff --git a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
index 817b85f7b..7a3f06557 100644
--- a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
+++ b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
@@ -24,8 +24,8 @@
#include "core/Core.h"
#include "core/repository/AtomicRepoEntries.h"
#include "core/repository/VolatileProvenanceRepository.h"
+#include "core/RepositoryFactory.h"
#include "FlowFileRecord.h"
-#include "FlowFileRepository.h"
#include "provenance/Provenance.h"
#include "../unit/ProvenanceTestHelper.h"
#include "../TestBase.h"
@@ -36,7 +36,7 @@ using namespace std::literals::chrono_literals;
TEST_CASE("Test Provenance record create",
"[Testprovenance::ProvenanceEventRecord]") {
provenance::ProvenanceEventRecord
record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah",
"blahblah");
- REQUIRE(record1.getAttributes().size() == 0);
+ REQUIRE(record1.getAttributes().empty());
REQUIRE(record1.getAlternateIdentifierUri().length() == 0);
}
@@ -86,7 +86,7 @@ TEST_CASE("Test Flowfile record added to provenance",
"[TestFlowAndProv1]") {
utils::Identifier childId = record2.getChildrenUuids().at(0);
REQUIRE(childId == ffr1->getUUID());
record2.removeChildUuid(childId);
- REQUIRE(record2.getChildrenUuids().size() == 0);
+ REQUIRE(record2.getChildrenUuids().empty());
}
TEST_CASE("Test Provenance record serialization Volatile",
"[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
@@ -100,7 +100,7 @@ TEST_CASE("Test Provenance record serialization Volatile",
"[Testprovenance::Pro
auto sample = 65555ms;
std::shared_ptr<core::Repository> testRepository =
std::make_shared<core::repository::VolatileProvenanceRepository>();
- testRepository->initialize(0);
+ testRepository->initialize(nullptr);
record1.setEventDuration(sample);
record1.Serialize(testRepository);
@@ -126,7 +126,7 @@ TEST_CASE("Test Flowfile record added to provenance using
Volatile Repo", "[Test
auto sample = 65555ms;
std::shared_ptr<core::Repository> testRepository =
std::make_shared<core::repository::VolatileProvenanceRepository>();
- testRepository->initialize(0);
+ testRepository->initialize(nullptr);
record1.setEventDuration(sample);
record1.Serialize(testRepository);
@@ -138,7 +138,7 @@ TEST_CASE("Test Flowfile record added to provenance using
Volatile Repo", "[Test
utils::Identifier childId = record2.getChildrenUuids().at(0);
REQUIRE(childId == ffr1->getUUID());
record2.removeChildUuid(childId);
- REQUIRE(record2.getChildrenUuids().size() == 0);
+ REQUIRE(record2.getChildrenUuids().empty());
}
TEST_CASE("Test Provenance record serialization NoOp",
"[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
@@ -151,8 +151,8 @@ TEST_CASE("Test Provenance record serialization NoOp",
"[Testprovenance::Provena
auto sample = 65555ms;
- std::shared_ptr<core::Repository> testRepository =
std::make_shared<core::Repository>();
- testRepository->initialize(0);
+ std::shared_ptr<core::Repository> testRepository =
core::createRepository("nooprepository");
+ testRepository->initialize(nullptr);
record1.setEventDuration(sample);
REQUIRE(record1.Serialize(testRepository) == true);
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp
b/libminifi/test/rocksdb-tests/RepoTests.cpp
index cfd1b9ac6..f02dff669 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -59,10 +59,10 @@ class TestProcessor : public minifi::core::Processor {
} // namespace
TEST_CASE("Test Repo Names", "[TestFFR1]") {
- auto repoA = minifi::core::createRepository("FlowFileRepository", false,
"flowfile");
+ auto repoA = minifi::core::createRepository("FlowFileRepository",
"flowfile");
REQUIRE("flowfile" == repoA->getName());
- auto repoB = minifi::core::createRepository("ProvenanceRepository", false,
"provenance");
+ auto repoB = minifi::core::createRepository("ProvenanceRepository",
"provenance");
REQUIRE("provenance" == repoB->getName());
}
@@ -287,8 +287,8 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
config->set(minifi::Configure::nifi_dbcontent_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "content_repository"));
config->set(minifi::Configure::nifi_flowfile_repository_directory_default,
utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
- std::shared_ptr<core::Repository> prov_repo =
std::make_shared<TestRepository>();
- std::shared_ptr<core::repository::FlowFileRepository> ff_repository =
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository",
REPOTEST_FLOWFILE_CHECKPOINT_DIR);
+ std::shared_ptr<core::Repository> prov_repo =
std::make_shared<TestThreadedRepository>();
+ auto ff_repository =
std::make_shared<core::repository::FlowFileRepository>("flowFileRepository",
REPOTEST_FLOWFILE_CHECKPOINT_DIR);
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::FileSystemRepository>();
ff_repository->initialize(config);
content_repo->initialize(config);
@@ -366,7 +366,8 @@ TEST_CASE("Flush deleted flowfiles before shutdown",
"[TestFFR7]") {
public:
explicit TestFlowFileRepository(const std::string& name)
: core::SerializableComponent(name),
- FlowFileRepository(name, REPOTEST_FLOWFILE_CHECKPOINT_DIR,
FLOWFILE_REPOSITORY_DIRECTORY, 10min, MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE,
1ms) {}
+ FlowFileRepository(name, REPOTEST_FLOWFILE_CHECKPOINT_DIR,
core::repository::FLOWFILE_REPOSITORY_DIRECTORY,
+ 10min,
core::repository::MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1ms) {}
void flush() override {
FlowFileRepository::flush();
if (onFlush_) {
diff --git a/libminifi/test/unit/MetricsTests.cpp
b/libminifi/test/unit/MetricsTests.cpp
index 6c4faafbb..a3c96128b 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -95,7 +95,7 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
REQUIRE("RepositoryMetrics" == metrics.getName());
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+ auto repo = std::make_shared<TestThreadedRepository>();
metrics.addRepository(repo);
{
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h
b/libminifi/test/unit/ProvenanceTestHelper.h
index dd724061a..194db5c96 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -29,53 +29,36 @@
#include <vector>
#include "core/repository/VolatileContentRepository.h"
#include "core/Processor.h"
+#include "core/ThreadedRepository.h"
#include "Connection.h"
#include "FlowController.h"
#include "properties/Configure.h"
#include "provenance/Provenance.h"
#include "SwapManager.h"
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Woverloaded-virtual"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Woverloaded-virtual"
-#endif
-
using namespace std::literals::chrono_literals;
-/**
- * Test repository
- */
-class TestRepository : public org::apache::nifi::minifi::core::Repository {
+template <typename T_BaseRepository>
+class TestRepositoryBase : public T_BaseRepository {
public:
- TestRepository()
+ TestRepositoryBase()
: org::apache::nifi::minifi::core::SerializableComponent("repo_name"),
- Repository("repo_name", "./dir", 1s, 100, 0ms) {
- }
-
- ~TestRepository() override {
- stop();
+ T_BaseRepository("repo_name", "./dir", 1s, 100, 0ms) {
}
bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure>
&) override {
return true;
}
- void start() override {
- running_ = true;
- }
-
void setFull() {
- repo_full_ = true;
+ T_BaseRepository::repo_full_ = true;
}
- bool isNoop() override {
+ bool isNoop() const override {
return false;
}
- bool Put(std::string key, const uint8_t *buf, size_t bufLen) override {
+ bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override
{
std::lock_guard<std::mutex> lock{repository_results_mutex_};
repository_results_.emplace(key, std::string{reinterpret_cast<const
char*>(buf), bufLen});
return true;
@@ -95,7 +78,7 @@ class TestRepository : public
org::apache::nifi::minifi::core::Repository {
return Put(key, buffer, bufferSize);
}
- bool Delete(std::string key) override {
+ bool Delete(const std::string& key) override {
std::lock_guard<std::mutex> lock{repository_results_mutex_};
repository_results_.erase(key);
return true;
@@ -124,7 +107,7 @@ class TestRepository : public
org::apache::nifi::minifi::core::Repository {
break;
}
const auto eventRead = store.at(max_size);
- eventRead->DeSerialize(gsl::make_span(entry.second).as_span<const
std::byte>());
+ eventRead->DeSerialize(gsl::make_span(entry.second).template
as_span<const std::byte>());
++max_size;
}
return true;
@@ -150,50 +133,66 @@ class TestRepository : public
org::apache::nifi::minifi::core::Repository {
return repository_results_;
}
- void
getProvenanceRecord(std::vector<std::shared_ptr<org::apache::nifi::minifi::provenance::ProvenanceEventRecord>>
&records, int maxSize) {
- std::lock_guard<std::mutex> lock{repository_results_mutex_};
- for (const auto &entry : repository_results_) {
- if (records.size() >= static_cast<uint64_t>(maxSize))
- break;
- const auto eventRead =
std::make_shared<org::apache::nifi::minifi::provenance::ProvenanceEventRecord>();
+ protected:
+ mutable std::mutex repository_results_mutex_;
+ std::map<std::string, std::string> repository_results_;
+};
- if (eventRead->DeSerialize(gsl::make_span(entry.second).as_span<const
std::byte>())) {
- records.push_back(eventRead);
- }
- }
+class TestRepository : public
TestRepositoryBase<org::apache::nifi::minifi::core::Repository> {
+ public:
+ TestRepository()
+ : org::apache::nifi::minifi::core::SerializableComponent("repo_name") {
+ }
+
+ bool start() override {
+ return true;
+ }
+
+ bool stop() override {
+ return true;
+ }
+};
+
+class TestThreadedRepository : public
TestRepositoryBase<org::apache::nifi::minifi::core::ThreadedRepository> {
+ public:
+ TestThreadedRepository()
+ : org::apache::nifi::minifi::core::SerializableComponent("repo_name") {
}
+ ~TestThreadedRepository() override {
+ stop();
+ }
+
+ private:
void run() override {
// do nothing
}
- protected:
- mutable std::mutex repository_results_mutex_;
- std::map<std::string, std::string> repository_results_;
+ std::thread& getThread() override {
+ return thread_;
+ }
+
+ std::thread thread_;
};
-class TestFlowRepository : public org::apache::nifi::minifi::core::Repository {
+class TestFlowRepository : public
org::apache::nifi::minifi::core::ThreadedRepository {
public:
TestFlowRepository()
: org::apache::nifi::minifi::core::SerializableComponent("ff"),
- org::apache::nifi::minifi::core::Repository("ff", "./dir", 1s, 100,
0ms) {
- }
-
- ~TestFlowRepository() override {
- stop();
+ org::apache::nifi::minifi::core::ThreadedRepository("ff", "./dir", 1s,
100, 0ms) {
}
bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure>
&) override {
return true;
}
- bool Put(std::string key, const uint8_t *buf, size_t bufLen) override {
+ bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override
{
std::lock_guard<std::mutex> lock{repository_results_mutex_};
repository_results_.emplace(key, std::string{reinterpret_cast<const
char*>(buf), bufLen});
return true;
}
- // Delete
- bool Delete(std::string key) override {
+
+ bool Delete(const std::string& key) override {
std::lock_guard<std::mutex> lock{repository_results_mutex_};
repository_results_.erase(key);
return true;
@@ -210,36 +209,22 @@ class TestFlowRepository : public
org::apache::nifi::minifi::core::Repository {
}
}
- std::map<std::string, std::string> getRepoMap() const {
- std::lock_guard<std::mutex> lock{repository_results_mutex_};
- return repository_results_;
- }
-
- void
getProvenanceRecord(std::vector<std::shared_ptr<org::apache::nifi::minifi::provenance::ProvenanceEventRecord>>
&records, int maxSize) {
- std::lock_guard<std::mutex> lock{repository_results_mutex_};
- for (const auto &entry : repository_results_) {
- if (records.size() >= static_cast<uint64_t>(maxSize))
- break;
- const auto eventRead =
std::make_shared<org::apache::nifi::minifi::provenance::ProvenanceEventRecord>();
-
- if (eventRead->DeSerialize(gsl::make_span(entry.second).as_span<const
std::byte>())) {
- records.push_back(eventRead);
- }
- }
+ void loadComponent(const
std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>&
/*content_repo*/) override {
}
- void loadComponent(const
std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>&
content_repo) override {
- content_repo_ = content_repo;
+ private:
+ void run() override {
}
- void run() override {
- // do nothing
+ std::thread& getThread() override {
+ return thread_;
}
protected:
mutable std::mutex repository_results_mutex_;
std::map<std::string, std::string> repository_results_;
std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>
content_repo_;
+ std::thread thread_;
};
class TestFlowController : public org::apache::nifi::minifi::FlowController {
@@ -286,23 +271,18 @@ class TestFlowController : public
org::apache::nifi::minifi::FlowController {
}
std::shared_ptr<org::apache::nifi::minifi::core::Processor>
createProcessor(const std::string& /*name*/, const
org::apache::nifi::minifi::utils::Identifier& /*uuid*/) {
- return 0;
+ return nullptr;
}
org::apache::nifi::minifi::core::ProcessGroup *createRootProcessGroup(const
std::string& /*name*/, const org::apache::nifi::minifi::utils::Identifier&
/*uuid*/) {
- return 0;
+ return nullptr;
}
org::apache::nifi::minifi::core::ProcessGroup
*createRemoteProcessGroup(const std::string& /*name*/, const
org::apache::nifi::minifi::utils::Identifier& /*uuid*/) {
- return 0;
+ return nullptr;
}
std::shared_ptr<org::apache::nifi::minifi::Connection>
createConnection(const std::string& /*name*/, const
org::apache::nifi::minifi::utils::Identifier& /*uuid*/) {
- return 0;
+ return nullptr;
}
};
-#if defined(__clang__)
-#pragma clang diagnostic pop
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic pop
-#endif
diff --git a/libminifi/test/unit/SchedulingAgentTests.cpp
b/libminifi/test/unit/SchedulingAgentTests.cpp
index 6cb2fc68d..b80c7563c 100644
--- a/libminifi/test/unit/SchedulingAgentTests.cpp
+++ b/libminifi/test/unit/SchedulingAgentTests.cpp
@@ -46,9 +46,9 @@ class CountOnTriggersProcessor : public
minifi::core::Processor {
TEST_CASE("SchedulingAgentTests", "[SchedulingAgent]") {
- std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestRepository>();
+ std::shared_ptr<core::Repository> test_repo =
std::make_shared<TestThreadedRepository>();
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
- std::shared_ptr<TestRepository> repo =
std::static_pointer_cast<TestRepository>(test_repo);
+ auto repo = std::static_pointer_cast<TestThreadedRepository>(test_repo);
std::shared_ptr<minifi::FlowController> controller =
std::make_shared<TestFlowController>(test_repo, test_repo, content_repo);
diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp
index 1d90cef9c..8dee11d0f 100644
--- a/minifi_main/MiNiFiMain.cpp
+++ b/minifi_main/MiNiFiMain.cpp
@@ -286,18 +286,18 @@ int main(int argc, char **argv) {
configure->get(minifi::Configure::nifi_provenance_repository_class_name,
prov_repo_class);
// Create repos for flow record and provenance
- std::shared_ptr<core::Repository> prov_repo =
core::createRepository(prov_repo_class, true, "provenance");
+ std::shared_ptr prov_repo = core::createRepository(prov_repo_class,
"provenance");
- if (!prov_repo->initialize(configure)) {
+ if (!prov_repo || !prov_repo->initialize(configure)) {
logger->log_error("Provenance repository failed to initialize,
exiting..");
exit(1);
}
configure->get(minifi::Configure::nifi_flow_repository_class_name,
flow_repo_class);
- std::shared_ptr<core::Repository> flow_repo =
core::createRepository(flow_repo_class, true, "flowfile");
+ std::shared_ptr flow_repo = core::createRepository(flow_repo_class,
"flowfile");
- if (!flow_repo->initialize(configure)) {
+ if (!flow_repo || !flow_repo->initialize(configure)) {
logger->log_error("Flow file repository failed to initialize,
exiting..");
exit(1);
}
@@ -343,7 +343,7 @@ int main(int argc, char **argv) {
std::vector<std::string> repo_paths;
repo_paths.reserve(3);
// REPOSITORY_DIRECTORY is a dummy path used by noop repositories
- const auto path_valid = [](const std::string& p) { return !p.empty()
&& p != REPOSITORY_DIRECTORY; };
+ const auto path_valid = [](const std::string& p) { return !p.empty()
&& p != org::apache::nifi::minifi::core::REPOSITORY_DIRECTORY; };
auto prov_repo_path = prov_repo->getDirectory();
auto flow_repo_path = flow_repo->getDirectory();
auto content_repo_storage_path = content_repo->getStoragePath();
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index dac38ca76..40d0f039f 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -29,6 +29,7 @@
#include "core/repository/VolatileContentRepository.h"
#include "core/repository/FileSystemRepository.h"
#include "core/Repository.h"
+#include "core/RepositoryFactory.h"
#include "C2CallbackAgent.h"
#include "core/Connectable.h"
@@ -67,7 +68,7 @@ class Instance {
public:
explicit Instance(const std::string &url, const std::string &port, const
std::string &repo_class_name = "")
- : no_op_repo_(std::make_shared<minifi::core::Repository>()),
+ : no_op_repo_(minifi::core::createRepository("nooprepository")),
url_(url),
configure_(std::make_shared<Configure>()) {
if (repo_class_name == "filesystemrepository") {