MINIFI-249: Update prov repo to better abstract deser. - Deserialization and serialization are better abstracted into SerializableComponent allowing us to use all repos with the same [de]serialization interfaces. - Update Test resources to use local http server when possible - Allow for different volatile configurations - Update removals and link FlowFileRepo to content repo
MINIFI-330: convert const char* to std::string This closes #110. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/fe634853 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/fe634853 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/fe634853 Branch: refs/heads/master Commit: fe63485342fc0e84250887871826b9f17b1f9a94 Parents: 20622f6 Author: Marc Parisi <[email protected]> Authored: Wed May 24 14:39:08 2017 -0400 Committer: Aldrin Piri <[email protected]> Committed: Thu Jul 27 12:43:47 2017 -0400 ---------------------------------------------------------------------- cmake/BuildTests.cmake | 4 +- libminifi/include/Connection.h | 6 +- libminifi/include/EventDrivenSchedulingAgent.h | 5 +- libminifi/include/FlowController.h | 23 +- libminifi/include/FlowFileRecord.h | 31 +- libminifi/include/ResourceClaim.h | 35 +- libminifi/include/SchedulingAgent.h | 12 +- libminifi/include/Site2SiteClientProtocol.h | 35 +- libminifi/include/ThreadedSchedulingAgent.h | 7 +- libminifi/include/TimerDrivenSchedulingAgent.h | 7 +- libminifi/include/core/ConfigurationFactory.h | 11 +- libminifi/include/core/ContentRepository.h | 60 ++ libminifi/include/core/Core.h | 3 + libminifi/include/core/FlowConfiguration.h | 16 +- libminifi/include/core/ProcessContext.h | 27 +- libminifi/include/core/ProcessGroup.h | 3 +- libminifi/include/core/ProcessSession.h | 2 +- libminifi/include/core/Repository.h | 92 ++- libminifi/include/core/RepositoryFactory.h | 16 +- libminifi/include/core/SerializableComponent.h | 88 +++ libminifi/include/core/StreamManager.h | 81 +++ .../include/core/logging/LoggerConfiguration.h | 3 +- .../SiteToSiteProvenanceReportingTask.h | 4 +- .../include/core/repository/AtomicRepoEntries.h | 501 ++++++++++++++++ .../core/repository/FileSystemRepository.h | 72 +++ .../core/repository/FlowFileRepository.h | 25 +- .../core/repository/VolatileContentRepository.h | 138 +++++ .../repository/VolatileFlowFileRepository.h | 82 +++ .../repository/VolatileProvenanceRepository.h | 60 ++ .../core/repository/VolatileRepository.h | 517 +++++++++-------- libminifi/include/core/yaml/YamlConfiguration.h | 39 +- libminifi/include/io/AtomicEntryStream.h | 205 +++++++ libminifi/include/io/BaseStream.h | 13 + libminifi/include/io/ClientSocket.h | 6 +- libminifi/include/io/DataStream.h | 8 +- libminifi/include/io/FileStream.h | 136 +++++ libminifi/include/processors/ExecuteProcess.h | 8 +- libminifi/include/processors/GenerateFlowFile.h | 6 +- libminifi/include/processors/InvokeHTTP.h | 6 + libminifi/include/processors/ListenHTTP.h | 2 +- libminifi/include/processors/ListenSyslog.h | 10 +- libminifi/include/processors/LogAttribute.h | 28 +- libminifi/include/processors/PutFile.h | 3 +- libminifi/include/properties/Configure.h | 1 + libminifi/include/provenance/Provenance.h | 62 +- .../include/provenance/ProvenanceRepository.h | 82 ++- libminifi/include/utils/ByteInputCallBack.h | 20 +- libminifi/src/ConfigurationListener.cpp | 12 +- libminifi/src/Configure.cpp | 52 +- libminifi/src/Connection.cpp | 9 +- libminifi/src/FlowControlProtocol.cpp | 5 +- libminifi/src/FlowController.cpp | 47 +- libminifi/src/FlowFileRecord.cpp | 16 +- libminifi/src/HttpConfigurationListener.cpp | 16 +- libminifi/src/Properties.cpp | 4 +- libminifi/src/RemoteProcessorGroupPort.cpp | 65 ++- libminifi/src/ResourceClaim.cpp | 17 +- libminifi/src/SchedulingAgent.cpp | 12 +- libminifi/src/Site2SiteClientProtocol.cpp | 19 +- libminifi/src/ThreadedSchedulingAgent.cpp | 8 +- libminifi/src/controllers/SSLContextService.cpp | 14 +- libminifi/src/core/ClassLoader.cpp | 6 +- libminifi/src/core/ConfigurableComponent.cpp | 13 +- libminifi/src/core/ConfigurationFactory.cpp | 12 +- libminifi/src/core/Connectable.cpp | 12 +- libminifi/src/core/Core.cpp | 5 + libminifi/src/core/FlowConfiguration.cpp | 12 +- libminifi/src/core/FlowFile.cpp | 2 +- libminifi/src/core/ProcessGroup.cpp | 29 +- libminifi/src/core/ProcessSession.cpp | 571 ++++++++++--------- libminifi/src/core/ProcessSessionFactory.cpp | 2 +- libminifi/src/core/Processor.cpp | 20 +- libminifi/src/core/Repository.cpp | 6 +- libminifi/src/core/RepositoryFactory.cpp | 47 +- .../StandardControllerServiceNode.cpp | 6 +- .../src/core/logging/LoggerConfiguration.cpp | 16 +- .../SiteToSiteProvenanceReportingTask.cpp | 31 +- .../core/repository/FileSystemRepository.cpp | 54 ++ .../src/core/repository/FlowFileRepository.cpp | 54 +- .../repository/VolatileContentRepository.cpp | 183 ++++++ .../src/core/repository/VolatileRepository.cpp | 29 +- libminifi/src/core/yaml/YamlConfiguration.cpp | 66 ++- libminifi/src/io/AtomicEntryStream.cpp | 34 ++ libminifi/src/io/ClientSocket.cpp | 14 +- libminifi/src/io/FileStream.cpp | 160 ++++++ libminifi/src/io/StreamFactory.cpp | 6 +- libminifi/src/processors/ExecuteProcess.cpp | 6 +- libminifi/src/processors/GenerateFlowFile.cpp | 4 +- libminifi/src/processors/GetFile.cpp | 8 +- libminifi/src/processors/InvokeHTTP.cpp | 33 +- libminifi/src/processors/ListenHTTP.cpp | 10 +- libminifi/src/processors/ListenSyslog.cpp | 2 +- libminifi/src/processors/LogAttribute.cpp | 9 +- libminifi/src/processors/PutFile.cpp | 45 +- libminifi/src/processors/TailFile.cpp | 26 +- libminifi/src/provenance/Provenance.cpp | 50 +- .../src/provenance/ProvenanceRepository.cpp | 18 +- libminifi/test/CPPLINT.cfg | 1 + libminifi/test/TestBase.cpp | 211 +++++++ libminifi/test/TestBase.h | 102 +++- libminifi/test/TestServer.h | 137 +++++ .../ControllerServiceIntegrationTests.cpp | 11 +- .../HttpConfigurationListenerTest.cpp | 67 +-- .../test/integration/HttpGetIntegrationTest.cpp | 67 ++- .../integration/HttpPostIntegrationTest.cpp | 34 +- .../integration/ProvenanceReportingTest.cpp | 11 +- .../test/integration/Site2SiteRestTest.cpp | 61 +- .../test/integration/TestExecuteProcess.cpp | 20 +- libminifi/test/resources/TestHTTPGet.yml | 2 +- libminifi/test/resources/TestHTTPGetSecure.yml | 2 +- libminifi/test/resources/TestHTTPPost.yml | 2 +- libminifi/test/resources/cn.ckey.pem | 1 - libminifi/test/resources/cn.crt.pem | 1 - libminifi/test/resources/nifi-cert.pem | 27 + libminifi/test/unit/FileStreamTests.cpp | 210 +++++++ libminifi/test/unit/InvokeHTTPTests.cpp | 173 ++---- libminifi/test/unit/ProcessorTests.cpp | 289 ++++------ libminifi/test/unit/ProvenanceTestHelper.h | 70 ++- libminifi/test/unit/ProvenanceTests.cpp | 33 +- libminifi/test/unit/RepoTests.cpp | 14 +- libminifi/test/unit/TailFileTests.cpp | 187 +++--- libminifi/test/unit/YamlConfigurationTests.cpp | 6 +- main/MiNiFiMain.cpp | 59 +- 123 files changed, 4638 insertions(+), 1658 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/cmake/BuildTests.cmake ---------------------------------------------------------------------- diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake index 59f1d59..9500792 100644 --- a/cmake/BuildTests.cmake +++ b/cmake/BuildTests.cmake @@ -67,7 +67,7 @@ GETSOURCEFILES(INTEGRATION_TESTS "${TEST_DIR}/integration/") SET(UNIT_TEST_COUNT 0) FOREACH(testfile ${UNIT_TESTS}) get_filename_component(testfilename "${testfile}" NAME_WE) - add_executable("${testfilename}" "${TEST_DIR}/unit/${testfile}" ${SPD_SOURCES}) + add_executable("${testfilename}" "${TEST_DIR}/unit/${testfile}" ${SPD_SOURCES} "${TEST_DIR}/TestBase.cpp") createTests("${testfilename}") MATH(EXPR UNIT_TEST_COUNT "${UNIT_TEST_COUNT}+1") add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR}) @@ -77,7 +77,7 @@ message("-- Finished building ${UNIT_TEST_COUNT} unit test file(s)...") SET(INT_TEST_COUNT 0) FOREACH(testfile ${INTEGRATION_TESTS}) get_filename_component(testfilename "${testfile}" NAME_WE) - add_executable("${testfilename}" "${TEST_DIR}/integration/${testfile}" ${SPD_SOURCES}) + add_executable("${testfilename}" "${TEST_DIR}/integration/${testfile}" ${SPD_SOURCES} "${TEST_DIR}/TestBase.cpp") createTests("${testfilename}") #message("Adding ${testfilename} from ${testfile}") MATH(EXPR INT_TEST_COUNT "${INT_TEST_COUNT}+1") http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/Connection.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h index be51fce..ff32baf 100644 --- a/libminifi/include/Connection.h +++ b/libminifi/include/Connection.h @@ -47,7 +47,9 @@ class Connection : public core::Connectable, public std::enable_shared_from_this /* * Create a new processor */ - explicit Connection(std::shared_ptr<core::Repository> flow_repository, std::string name, uuid_t uuid = NULL, uuid_t srcUUID = NULL, uuid_t destUUID = NULL); + explicit Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, uuid_t uuid = NULL, + uuid_t srcUUID = NULL, + uuid_t destUUID = NULL); // Destructor virtual ~Connection() { } @@ -168,6 +170,8 @@ class Connection : public core::Connectable, public std::enable_shared_from_this std::atomic<uint64_t> expired_duration_; // flow file repository std::shared_ptr<core::Repository> flow_repository_; + // content repository reference. + std::shared_ptr<core::ContentRepository> content_repo_; private: // Mutex for protection http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/EventDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h index 6a63dc5..c838b11 100644 --- a/libminifi/include/EventDrivenSchedulingAgent.h +++ b/libminifi/include/EventDrivenSchedulingAgent.h @@ -38,8 +38,9 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { /*! * Create a new event driven scheduling agent. */ - EventDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configuration) - : ThreadedSchedulingAgent(controller_service_provider, repo, configuration) { + EventDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration) + : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration) { } // Destructor virtual ~EventDrivenSchedulingAgent() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/FlowController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index c2fef2a..d9a0452 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -70,10 +70,23 @@ class FlowController : public core::controller::ControllerServiceProvider, publi /** * Flow controller constructor */ - FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, - std::unique_ptr<core::FlowConfiguration> flow_configuration, - const std::string name = DEFAULT_ROOT_GROUP_NAME, - bool headless_mode = false); + explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, + std::unique_ptr<core::FlowConfiguration> flow_configuration, + std::shared_ptr<core::ContentRepository> content_repo, const std::string name, bool headless_mode); + + explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, + std::unique_ptr<core::FlowConfiguration> flow_configuration, + std::shared_ptr<core::ContentRepository> content_repo) + : FlowController(provenance_repo, flow_file_repo, configure, std::move(flow_configuration), content_repo, DEFAULT_ROOT_GROUP_NAME, false) + { + } + + explicit FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, + std::unique_ptr<core::FlowConfiguration> flow_configuration) + : FlowController(provenance_repo, flow_file_repo, configure, std::move(flow_configuration), std::make_shared<core::repository::FileSystemRepository>(), DEFAULT_ROOT_GROUP_NAME, false) + { + content_repo_->initialize(configure); + } // Destructor virtual ~FlowController(); @@ -301,6 +314,8 @@ class FlowController : public core::controller::ControllerServiceProvider, publi // FlowFile Repo std::shared_ptr<core::Repository> flow_file_repo_; + std::shared_ptr<core::ContentRepository> content_repo_; + // Flow Engines // Flow Timer Scheduler std::shared_ptr<TimerDrivenSchedulingAgent> timer_scheduler_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/FlowFileRecord.h ---------------------------------------------------------------------- diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h index 3d6057d..d6e5f2e 100644 --- a/libminifi/include/FlowFileRecord.h +++ b/libminifi/include/FlowFileRecord.h @@ -30,7 +30,8 @@ #include <sstream> #include <fstream> #include <set> - +#include "core/ContentRepository.h" +#include "io/BaseStream.h" #include "io/Serializable.h" #include "core/FlowFile.h" #include "utils/TimeUtil.h" @@ -81,11 +82,21 @@ inline const char *FlowAttributeKey(FlowAttribute attribute) { // throw exception for error class InputStreamCallback { public: - virtual void process(std::ifstream *stream) = 0; + virtual ~InputStreamCallback() { + + } + //virtual void process(std::ifstream *stream) = 0; + + virtual int64_t process(std::shared_ptr<io::BaseStream> stream) = 0; }; class OutputStreamCallback { public: - virtual void process(std::ofstream *stream) = 0; + virtual ~OutputStreamCallback() { + + } + //virtual void process(std::ofstream *stream) = 0; + virtual int64_t process(std::shared_ptr<io::BaseStream> stream) = 0; + }; class FlowFileRecord : public core::FlowFile, public io::Serializable { @@ -94,14 +105,17 @@ class FlowFileRecord : public core::FlowFile, public io::Serializable { /* * Create a new flow record */ - explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::map<std::string, std::string> attributes, std::shared_ptr<ResourceClaim> claim = nullptr); + explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::map<std::string, std::string> attributes, + std::shared_ptr<ResourceClaim> claim = nullptr); - explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::FlowFile> &event); + explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::shared_ptr<core::FlowFile> &event); - explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::FlowFile> &event, const std::string &uuidConnection); + explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::shared_ptr<core::FlowFile> &event, + const std::string &uuidConnection); - explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository) + explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo) : FlowFile(), + content_repo_(content_repo), flow_repository_(flow_repository), snapshot_("") { @@ -168,6 +182,9 @@ class FlowFileRecord : public core::FlowFile, public io::Serializable { // repository reference. std::shared_ptr<core::Repository> flow_repository_; + // content repo reference. + std::shared_ptr<core::ContentRepository> content_repo_; + // Snapshot flow record for session rollback bool snapshot_; // Prevent default copy constructor and assignment operation http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/ResourceClaim.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h index 49faed1..19a67fa 100644 --- a/libminifi/include/ResourceClaim.h +++ b/libminifi/include/ResourceClaim.h @@ -25,9 +25,11 @@ #include <vector> #include <queue> #include <map> +#include <memory> #include <mutex> #include <atomic> #include "core/Core.h" +#include "core/StreamManager.h" #include "properties/Configure.h" #include "utils/Id.h" @@ -40,7 +42,7 @@ namespace minifi { #define DEFAULT_CONTENT_DIRECTORY "./content_repository" // ResourceClaim Class -class ResourceClaim { +class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> { public: @@ -49,7 +51,9 @@ class ResourceClaim { /*! * Create a new resource claim */ - ResourceClaim(const std::string contentDirectory = default_directory_path); + ResourceClaim(std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, const std::string contentDirectory = default_directory_path); + + ResourceClaim(const std::string path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, bool deleted = false); // Destructor virtual ~ResourceClaim() { } @@ -59,7 +63,11 @@ class ResourceClaim { } // decreaseFlowFileRecordOwenedCount void decreaseFlowFileRecordOwnedCount() { - --_flowFileRecordOwnedCount; + + if (_flowFileRecordOwnedCount > 0) { + _flowFileRecordOwnedCount--; + } + } // getFlowFileRecordOwenedCount uint64_t getFlowFileRecordOwnedCount() { @@ -74,14 +82,35 @@ class ResourceClaim { _contentFullPath = path; } + void deleteClaim() { + if (!deleted_) + { + deleted_ = true; + } + + } + + friend std::ostream& operator<<(std::ostream& stream, const ResourceClaim& claim) { + stream << claim._contentFullPath; + return stream; + } + + friend std::ostream& operator<<(std::ostream& stream, const std::shared_ptr<ResourceClaim>& claim) { + stream << claim->_contentFullPath; + return stream; + } protected: + std::atomic<bool> deleted_; // Full path to the content std::string _contentFullPath; // How many FlowFileRecord Own this cliam std::atomic<uint64_t> _flowFileRecordOwnedCount; + std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager_; + private: + // Logger std::shared_ptr<logging::Logger> logger_; // Prevent default copy constructor and assignment operation http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/SchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 22f79db..1ff3fac 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -52,14 +52,19 @@ class SchedulingAgent { /*! * Create a new scheduling agent. */ - SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configuration) + SchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_repo, + std::shared_ptr<core::ContentRepository> content_repo, + std::shared_ptr<Configure> configuration) : configure_(configuration), admin_yield_duration_(0), bored_yield_duration_(0), + content_repo_(content_repo), controller_service_provider_(controller_service_provider), logger_(logging::LoggerFactory<SchedulingAgent>::getLogger()) { running_ = false; repo_ = repo; + flow_repo_ = flow_repo; utils::ThreadPool<bool> pool = utils::ThreadPool<bool>(configure_->getInt(Configure::nifi_flow_engine_threads, 8), true); component_lifecycle_thread_pool_ = std::move(pool); component_lifecycle_thread_pool_.start(); @@ -77,7 +82,6 @@ class SchedulingAgent { // start void start() { running_ = true; - } // stop void stop() { @@ -108,6 +112,10 @@ class SchedulingAgent { std::shared_ptr<Configure> configure_; std::shared_ptr<core::Repository> repo_; + + std::shared_ptr<core::Repository> flow_repo_; + + std::shared_ptr<core::ContentRepository> content_repo_; // thread pool for components. utils::ThreadPool<bool> component_lifecycle_thread_pool_; // controller service provider reference http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/Site2SiteClientProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Site2SiteClientProtocol.h b/libminifi/include/Site2SiteClientProtocol.h index 8d89004..dcb551a 100644 --- a/libminifi/include/Site2SiteClientProtocol.h +++ b/libminifi/include/Site2SiteClientProtocol.h @@ -549,7 +549,8 @@ class Site2SiteClientProtocol { : _packet(packet) { } DataPacket *_packet; - void process(std::ofstream *stream) { + //void process(std::ofstream *stream) { + int64_t process(std::shared_ptr<io::BaseStream> stream) { uint8_t buffer[8192]; int len = _packet->_size; while (len > 0) { @@ -557,11 +558,12 @@ class Site2SiteClientProtocol { int ret = _packet->_transaction->getStream().readData(buffer, size); if (ret != size) { _packet->_protocol->logger_->log_error("Site2Site Receive Flow Size %d Failed %d", size, ret); - break; + return -1; } - stream->write((const char *) buffer, size); + stream->write(buffer, size); len -= size; } + return len; } }; // Nest Callback Class for read stream @@ -571,22 +573,29 @@ class Site2SiteClientProtocol { : _packet(packet) { } DataPacket *_packet; - void process(std::ifstream *stream) { + int64_t process(std::shared_ptr<io::BaseStream> stream) { _packet->_size = 0; - uint8_t buffer[8192]; + uint8_t buffer[8192] = { 0 }; int readSize; - while (stream->good()) { - if (!stream->read((char *) buffer, 8192)) - readSize = stream->gcount(); - else - readSize = 8192; + size_t size = 0; + do { + readSize = stream->read(buffer, 8192); + + if (readSize == 0) { + break; + } + if (readSize < 0) { + return -1; + } int ret = _packet->_transaction->getStream().writeData(buffer, readSize); if (ret != readSize) { _packet->_protocol->logger_->log_error("Site2Site Send Flow Size %d Failed %d", readSize, ret); - break; + return -1; } - _packet->_size += readSize; - } + size += readSize; + } while (size < stream->getSize()); + _packet->_size = size; + return size; } }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/ThreadedSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/ThreadedSchedulingAgent.h b/libminifi/include/ThreadedSchedulingAgent.h index 50ab6c9..b4db4bf 100644 --- a/libminifi/include/ThreadedSchedulingAgent.h +++ b/libminifi/include/ThreadedSchedulingAgent.h @@ -42,8 +42,11 @@ class ThreadedSchedulingAgent : public SchedulingAgent { /*! * Create a new threaded scheduling agent. */ - ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configuration) - : SchedulingAgent(controller_service_provider, repo, configuration), + ThreadedSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_repo, + std::shared_ptr<core::ContentRepository> content_repo, + std::shared_ptr<Configure> configuration) + : SchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration), logger_(logging::LoggerFactory<ThreadedSchedulingAgent>::getLogger()) { } // Destructor http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/TimerDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/TimerDrivenSchedulingAgent.h b/libminifi/include/TimerDrivenSchedulingAgent.h index 597dc76..816bcec 100644 --- a/libminifi/include/TimerDrivenSchedulingAgent.h +++ b/libminifi/include/TimerDrivenSchedulingAgent.h @@ -37,8 +37,11 @@ class TimerDrivenSchedulingAgent : public ThreadedSchedulingAgent { /*! * Create a new processor */ - TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<Configure> configure) - : ThreadedSchedulingAgent(controller_service_provider, repo, configure) { + TimerDrivenSchedulingAgent(std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider, std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_repo, + std::shared_ptr<core::ContentRepository> content_repo, + std::shared_ptr<Configure> configure) + : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configure) { } // Destructor virtual ~TimerDrivenSchedulingAgent() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/ConfigurationFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ConfigurationFactory.h b/libminifi/include/core/ConfigurationFactory.h index b58c170..61af8cd 100644 --- a/libminifi/include/core/ConfigurationFactory.h +++ b/libminifi/include/core/ConfigurationFactory.h @@ -30,6 +30,7 @@ namespace core { template<typename T> typename std::enable_if<!class_operations<T>::value, T*>::type instantiate(const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_file_repo, + const std::shared_ptr<core::ContentRepository> &content_repo, std::shared_ptr<Configure> configuration, const std::string path) { throw std::runtime_error("Cannot instantiate class"); @@ -37,16 +38,20 @@ typename std::enable_if<!class_operations<T>::value, T*>::type instantiate(const template<typename T> typename std::enable_if<class_operations<T>::value, T*>::type instantiate(const std::shared_ptr<core::Repository> &repo, const std::shared_ptr<core::Repository> &flow_file_repo, + const std::shared_ptr<core::ContentRepository> &content_repo, const std::shared_ptr<io::StreamFactory> &stream_factory, - std::shared_ptr<Configure> configuration, const std::string path) { - return new T(repo, flow_file_repo, stream_factory, configuration, path); + std::shared_ptr<Configure> configuration, + const std::string path) { + return new T(repo, flow_file_repo, content_repo, stream_factory, configuration, path); } /** * Configuration factory is used to create a new FlowConfiguration * object. */ -std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, +std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, + std::shared_ptr<core::ContentRepository> content_repo, + std::shared_ptr<Configure> configure, std::shared_ptr<io::StreamFactory> stream_factory, const std::string configuration_class_name, const std::string path = "", bool fail_safe = false); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/ContentRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h new file mode 100644 index 0000000..b544ca0 --- /dev/null +++ b/libminifi/include/core/ContentRepository.h @@ -0,0 +1,60 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_CONTENTREPOSITORY_H_ +#define LIBMINIFI_INCLUDE_CORE_CONTENTREPOSITORY_H_ + +#include "properties/Configure.h" +#include "ResourceClaim.h" +#include "io/DataStream.h" +#include "io/BaseStream.h" +#include "StreamManager.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +/** + * Content repository definition that extends StreamManager. + */ +class ContentRepository : public StreamManager<minifi::ResourceClaim> { + public: + virtual ~ContentRepository() { + + } + + /** + * initialize this content repository using the provided configuration. + */ + virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0; + + /** + * Stops this repository. + */ + virtual void stop() = 0; + +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_CONTENTREPOSITORY_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/Core.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h index c32eb59..1dc79e7 100644 --- a/libminifi/include/core/Core.h +++ b/libminifi/include/core/Core.h @@ -19,6 +19,7 @@ #define LIBMINIFI_INCLUDE_CORE_CORE_H_ #include <cstdlib> +#include <iostream> #include <memory> #include <string> #include <uuid/uuid.h> @@ -132,6 +133,8 @@ class CoreComponent { */ void setUUID(uuid_t uuid); + void setUUIDStr(const std::string uuidStr); + /** * Returns the UUID through the provided object. * @param uuid uuid struct to which we will copy the memory http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index 3429166..43d2bc0 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -58,11 +58,12 @@ class FlowConfiguration : public CoreComponent { * Constructor that will be used for configuring * the flow controller. */ - explicit FlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<io::StreamFactory> stream_factory, - std::shared_ptr<Configure> configuration, - const std::string path) + explicit FlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo, + std::shared_ptr<io::StreamFactory> stream_factory, + std::shared_ptr<Configure> configuration, const std::string path) : CoreComponent(core::getClassName<FlowConfiguration>()), flow_file_repo_(flow_file_repo), + content_repo_(content_repo), config_path_(path), stream_factory_(stream_factory), configuration_(configuration), @@ -76,8 +77,9 @@ class FlowConfiguration : public CoreComponent { // Create Processor (Node/Input/Output Port) based on the name std::shared_ptr<core::Processor> createProcessor(std::string name, uuid_t uuid); // Create Root Processor Group - std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, - uuid_t uuid, int version); + + std::unique_ptr<core::ProcessGroup> createRootProcessGroup(std::string name, uuid_t uuid, int version); + std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid); // Create Remote Processor Group @@ -100,7 +102,7 @@ class FlowConfiguration : public CoreComponent { } virtual std::unique_ptr<core::ProcessGroup> getRootFromPayload( - std::string &yamlConfigPayload) { + std::string &yamlConfigPayload) { return nullptr; } @@ -127,6 +129,8 @@ class FlowConfiguration : public CoreComponent { std::string config_path_; // flow file repo std::shared_ptr<core::Repository> flow_file_repo_; + // content repository. + std::shared_ptr<core::ContentRepository> content_repo_; // stream factory std::shared_ptr<io::StreamFactory> stream_factory_; std::shared_ptr<Configure> configuration_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/ProcessContext.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h index 48e0108..f6aaf5e 100644 --- a/libminifi/include/core/ProcessContext.h +++ b/libminifi/include/core/ProcessContext.h @@ -25,8 +25,10 @@ #include <mutex> #include <atomic> #include <algorithm> - +#include <memory> #include "Property.h" +#include "core/ContentRepository.h" +#include "core/repository/FileSystemRepository.h" #include "core/controller/ControllerServiceProvider.h" #include "core/controller/ControllerServiceLookup.h" #include "core/logging/LoggerConfiguration.h" @@ -46,10 +48,13 @@ class ProcessContext : public controller::ControllerServiceLookup { /*! * Create a new process context associated with the processor/controller service/state manager */ - ProcessContext(ProcessorNode &processor, std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider, std::shared_ptr<core::Repository> repo) + ProcessContext(ProcessorNode &processor, std::shared_ptr<controller::ControllerServiceProvider> &controller_service_provider, std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_repo, + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>()) : processor_node_(processor), controller_service_provider_(controller_service_provider), - logger_(logging::LoggerFactory<ProcessContext>::getLogger()) { + logger_(logging::LoggerFactory<ProcessContext>::getLogger()), + content_repo_(content_repo), flow_repo_(flow_repo) { repo_ = repo; } // Destructor @@ -92,6 +97,18 @@ class ProcessContext : public controller::ControllerServiceLookup { return repo_; } + /** + * Returns a reference to the content repository for the running instance. + * @return content repository shared pointer. + */ + std::shared_ptr<core::ContentRepository> getContentRepository() { + return content_repo_; + } + + std::shared_ptr<core::Repository> getFlowFileRepository() { + return flow_repo_; + } + // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer ProcessContext(const ProcessContext &parent) = delete; @@ -145,6 +162,10 @@ class ProcessContext : public controller::ControllerServiceLookup { std::shared_ptr<controller::ControllerServiceProvider> controller_service_provider_; // repository shared pointer. std::shared_ptr<core::Repository> repo_; + std::shared_ptr<core::Repository> flow_repo_; + + // repository shared pointer. + std::shared_ptr<core::ContentRepository> content_repo_; // Processor ProcessorNode processor_node_; // Logger http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/ProcessGroup.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index 410480a..a0e51e3 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -56,8 +56,7 @@ class ProcessGroup { /*! * Create a new process group */ - ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, int version = 0, - ProcessGroup *parent = NULL); + ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, int version = 0, ProcessGroup *parent = NULL); // Destructor virtual ~ProcessGroup(); // Set Processor Name http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/ProcessSession.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index ad79d12..d853e9b 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -50,7 +50,7 @@ class ProcessSession { ProcessSession(ProcessContext *processContext = NULL) : process_context_(processContext), logger_(logging::LoggerFactory<ProcessSession>::getLogger()) { - logger_->log_trace("ProcessSession created for %s", process_context_->getProcessorNode().getName().c_str()); + logger_->log_trace("ProcessSession created for %s", process_context_->getProcessorNode().getName()); auto repo = processContext->getProvenanceRepository(); provenance_report_ = new provenance::ProvenanceReporter(repo, process_context_->getProcessorNode().getUUIDStr(), process_context_->getProcessorNode().getName()); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/Repository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h index 5f7e6c2..f1b47ae 100644 --- a/libminifi/include/core/Repository.h +++ b/libminifi/include/core/Repository.h @@ -31,7 +31,8 @@ #include <string> #include <thread> #include <vector> - +#include "core/ContentRepository.h" +#include "core/SerializableComponent.h" #include "properties/Configure.h" #include "core/logging/LoggerConfiguration.h" #include "core/Property.h" @@ -52,15 +53,15 @@ namespace core { #define MAX_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute #define REPOSITORY_PURGE_PERIOD (2500) // 2500 msec -class Repository : public CoreComponent { +class Repository : public core::SerializableComponent { public: /* * Constructor for the repository */ Repository(std::string repo_name = "Repository", std::string directory = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = - MAX_REPOSITORY_STORAGE_SIZE, + MAX_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) - : CoreComponent(repo_name), + : core::SerializableComponent(repo_name), thread_(), logger_(logging::LoggerFactory<Repository>::getLogger()) { directory_ = directory; @@ -81,7 +82,7 @@ class Repository : public CoreComponent { return true; } // Put - virtual bool Put(std::string key, uint8_t *buf, int bufLen) { + virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) { return true; } // Delete @@ -89,7 +90,14 @@ class Repository : public CoreComponent { return true; } - virtual bool Get(std::string key, std::string &value) { + virtual bool Delete(std::vector<std::shared_ptr<core::SerializableComponent>> &storedValues) { + bool found = true; + for (auto storedValue : storedValues) { + found &= Delete(storedValue->getName()); + } + return found; + } + virtual bool Get(const std::string &key, std::string &value) { return false; } @@ -109,10 +117,82 @@ class Repository : public CoreComponent { virtual bool isRunning() { return running_; } + + /** + * Specialization that allows us to serialize max_size objects into store. + * the lambdaConstructor will create objects to put into store + * @param store vector in which we can store serialized object + * @param max_size reference that stores the max number of objects to retrieve and serialize. + * upon return max_size will represent the number of serialized objects. + * @return status of this operation + * + * Base implementation returns true; + */ + virtual bool Serialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t max_size) { + return true; + } + + /** + * Specialization that allows us to deserialize max_size objects into store. + * @param store vector in which we can store deserialized object + * @param max_size reference that stores the max number of objects to retrieve and deserialize. + * upon return max_size will represent the number of deserialized objects. + * @return status of this operation + * + * Base implementation returns true; + */ + virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) { + return true; + } + + /** + * Specialization that allows us to deserialize max_size objects into store. + * the lambdaConstructor will create objects to put into store + * @param store vector in which we can store deserialized object + * @param max_size reference that stores the max number of objects to retrieve and deserialize. + * upon return max_size will represent the number of deserialized objects. + * @param lambdaConstructor reference that will create the objects for store + * @return status of this operation + * + * Base implementation returns true; + */ + virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambdaConstructor) { + return true; + } + + /** + * Base implementation returns true; + */ + virtual bool Serialize(const std::shared_ptr<core::SerializableComponent> &store) { + return true; + } + + /** + * Base implementation returns true; + */ + virtual bool DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) { + return true; + } + + /** + * Base implementation returns true; + */ + virtual bool DeSerialize(const uint8_t *buffer, const size_t bufferSize) { + return true; + } + + virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) { + return Put(key, buffer, bufferSize); + } + uint64_t incrementSize(const char *fpath, const struct stat *sb, int typeflag) { return (repo_size_ += sb->st_size); } + virtual void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { + + } + // Prevent default copy constructor and assignment operation // Only support pass by reference or pointer Repository(const Repository &parent) = delete; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/RepositoryFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/RepositoryFactory.h b/libminifi/include/core/RepositoryFactory.h index 9fafb57..b123a6d 100644 --- a/libminifi/include/core/RepositoryFactory.h +++ b/libminifi/include/core/RepositoryFactory.h @@ -19,8 +19,8 @@ #ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_ #define LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_ +#include "core/ContentRepository.h" #include "core/Repository.h" -#include "core/repository/VolatileRepository.h" #include "Core.h" namespace org { @@ -30,8 +30,22 @@ namespace minifi { namespace core { +/** + * Create a repository represented by the configuration class name + * @param configuration_class_name configuration class name + * @param fail_safe determines whether or not to make the default class if configuration_class_name is invalid + * @param repo_name name of the repository + */ std::shared_ptr<core::Repository> createRepository(const std::string configuration_class_name, bool fail_safe = false, const std::string repo_name = ""); +/** + * Create a context repository + * @param configuration_class_name configuration class name + * @param fail_safe determines whether or not to make the default class if configuration_class_name is invalid + * @param repo_name name of the repository + */ +std::shared_ptr<core::ContentRepository> createContentRepository(const std::string configuration_class_name, bool fail_safe = false, const std::string repo_name = ""); + } /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/SerializableComponent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/SerializableComponent.h b/libminifi/include/core/SerializableComponent.h new file mode 100644 index 0000000..f7f9feb --- /dev/null +++ b/libminifi/include/core/SerializableComponent.h @@ -0,0 +1,88 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_ +#define LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_ + +#include "io/Serializable.h" +#include "core/Core.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +/** + * Represents a component that is serializable and an extension point of core Component + */ +class SerializableComponent : public core::CoreComponent, public minifi::io::Serializable { + + public: + + SerializableComponent(const std::string name, uuid_t uuid = nullptr) + : core::CoreComponent(name, uuid) { + + } + + virtual ~SerializableComponent() { + + } + + /** + * Serialize this object into the the store + * @param store object in which we are serializing data into + * @return status of this serialization. + */ + virtual bool Serialize(const std::shared_ptr<core::SerializableComponent> &store) = 0; + + /** + * Deserialization from the parameter store into the current object + * @param store from which we are deserializing the current object + * @return status of this deserialization. + */ + virtual bool DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) = 0; + + /** + * Deserializes the current object using buffer + * @param buffer buffer from which we can deserialize the currenet object + * @param bufferSize length of buffer from which we can deserialize the current object. + * @return status of the deserialization. + */ + virtual bool DeSerialize(const uint8_t *buffer, const size_t bufferSize) = 0; + + /** + * Serialization of this object into buffer + * @param key string that represents this objects identifier + * @param buffer buffer that contains the serialized object + * @param bufferSize length of buffer + * @return status of serialization + */ + virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) { + return false; + } + +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_ */ + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/StreamManager.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/StreamManager.h b/libminifi/include/core/StreamManager.h new file mode 100644 index 0000000..468526d --- /dev/null +++ b/libminifi/include/core/StreamManager.h @@ -0,0 +1,81 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_STREAMMANAGER_H_ +#define LIBMINIFI_INCLUDE_CORE_STREAMMANAGER_H_ + +#include "properties/Configure.h" +#include "ResourceClaim.h" +#include "io/DataStream.h" +#include "io/BaseStream.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +/** + * Purpose: Provides a base for all stream based managers. The goal here is to provide + * a small set of interfaces that provide a small set of operations to provide state + * management for streams. + */ +template<typename T> +class StreamManager { + public: + virtual ~StreamManager() { + + } + + /** + * Create a write stream using the streamId as a reference. + * @param streamId stream identifier + * @return stream pointer. + */ + virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<T> &streamId) = 0; + + /** + * Create a read stream using the streamId as a reference. + * @param streamId stream identifier + * @return stream pointer. + */ + virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<T> &streamId) = 0; + + /** + * Closes the stream + * @param streamId stream identifier + * @return result of operation. + */ + virtual bool close(const std::shared_ptr<T> &streamId) = 0; + + /** + * Removes the stream from this stream manager. The end result + * is dependent on the stream manager implementation. + * @param streamId stream identifier + * @return result of operation. + */ + virtual bool remove(const std::shared_ptr<T> &streamId) = 0; + +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_STREAMMANAGER_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/logging/LoggerConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/logging/LoggerConfiguration.h b/libminifi/include/core/logging/LoggerConfiguration.h index aa4a1d0..787fec5 100644 --- a/libminifi/include/core/logging/LoggerConfiguration.h +++ b/libminifi/include/core/logging/LoggerConfiguration.h @@ -105,8 +105,7 @@ class LoggerConfiguration { protected: static std::shared_ptr<internal::LoggerNamespace> initialize_namespaces(const std::shared_ptr<LoggerProperties> &logger_properties); static std::shared_ptr<spdlog::logger> get_logger(std::shared_ptr<Logger> logger, const std::shared_ptr<internal::LoggerNamespace> &root_namespace, const std::string &name, - std::shared_ptr<spdlog::formatter> formatter, - bool remove_if_present = false); + std::shared_ptr<spdlog::formatter> formatter, bool remove_if_present = false); private: static std::shared_ptr<internal::LoggerNamespace> create_default_root(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h index e1d80e8..2bd4099 100644 --- a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h +++ b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h @@ -54,7 +54,6 @@ class SiteToSiteProvenanceReportingTask : public minifi::RemoteProcessorGroupPor } //! Destructor virtual ~SiteToSiteProvenanceReportingTask() { - } //! Report Task Name static constexpr char const* ReportTaskName = "SiteToSiteProvenanceReportingTask"; @@ -62,7 +61,8 @@ class SiteToSiteProvenanceReportingTask : public minifi::RemoteProcessorGroupPor public: //! Get provenance json report - void getJsonReport(core::ProcessContext *context, core::ProcessSession *session, std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records, std::string &report); + void getJsonReport(core::ProcessContext *context, core::ProcessSession *session, std::vector<std::shared_ptr<core::SerializableComponent>> &records, std::string &report); + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); //! OnTrigger method, implemented by NiFi SiteToSiteProvenanceReportingTask virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/repository/AtomicRepoEntries.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/AtomicRepoEntries.h b/libminifi/include/core/repository/AtomicRepoEntries.h new file mode 100644 index 0000000..c681060 --- /dev/null +++ b/libminifi/include/core/repository/AtomicRepoEntries.h @@ -0,0 +1,501 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ref_count_hip. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_ +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_ + +#include <cstddef> +#include <cstring> +#include <iostream> +#include <chrono> +#include <functional> +#include <atomic> +#include <vector> +#include <map> +#include <iterator> + +static uint16_t accounting_size = sizeof(std::vector<uint8_t>) + sizeof(std::string) + sizeof(size_t); + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +/** + * Purpose: Repo value represents an item that will support a move operation within an AtomicEntry + * + * Justification: Since AtomicEntry is a static entry that does not move or change, the underlying + * RepoValue can be changed to support atomic operations. + */ +template<typename T> +class RepoValue { + public: + + explicit RepoValue() { + } + + /** + * Constructor that populates the item allowing for a custom key comparator. + * @param key key for this repo value. + * @param ptr buffer + * @param size size buffer + * @param comparator custom comparator. + */ + explicit RepoValue(T key, const uint8_t *ptr, size_t size, std::function<bool(T, T)> comparator = nullptr) + : key_(key), + comparator_(comparator) { + if (nullptr == ptr) { + size = 0; + } + buffer_.resize(size); + if (size > 0) { + std::memcpy(buffer_.data(), ptr, size); + } + } + + /** + * RepoValue that moves the other object into this. + */ + explicit RepoValue(RepoValue<T> &&other) +noexcept : key_(std::move(other.key_)), + buffer_(std::move(other.buffer_)), + comparator_(std::move(other.comparator_)) { + } + + ~RepoValue() + { + } + + T &getKey() { + return key_; + } + + /** + * Sets the key, relacing the custom comparator if needed. + */ + void setKey(const T key, std::function<bool(T,T)> comparator = nullptr) { + key_ = key; + comparator_ = comparator; + } + + /** + * Determines if the key is the same using the custom comparator + * @param other object to compare against + * @return result of the comparison + */ + inline bool isEqual(RepoValue<T> *other) + { + return comparator_ == nullptr ? key_ == other->key_ : comparator_(key_,other->key_); + } + + /** + * Determines if the key is the same using the custom comparator + * @param other object to compare against + * @return result of the comparison + */ + inline bool isKey(T other) + { + return comparator_ == nullptr ? key_ == other : comparator_(key_,other); + } + + /** + * Clears the buffer. + */ + void clearBuffer() { + buffer_.resize(0); + buffer_.clear(); + } + + /** + * Return the size of the memory within the key + * buffer, the size of timestamp, and the general + * system word size + */ + uint64_t size() { + return buffer_.size(); + } + + size_t getBufferSize() { + return buffer_.size(); + } + + const uint8_t *getBuffer() + { + return buffer_.data(); + } + + /** + * Places the contents of buffer into str + * @param strnig into which we are placing the memory contained in buffer. + */ + void emplace(std::string &str) { + str.insert(0, reinterpret_cast<const char*>(buffer_.data()), buffer_.size()); + } + + /** + * Appends ptr to the end of buffer. + * @param ptr pointer containing data to add to buffer_ + */ + void append(uint8_t *ptr, size_t size) + { + buffer_.insert(buffer_.end(), ptr, ptr + size); + } + + RepoValue<T> &operator=(RepoValue<T> &&other) noexcept { + key_ = std::move(other.key_); + buffer_ = std::move(other.buffer_); + return *this; + } + + private: + T key_; + std::function<bool(T,T)> comparator_; + std::vector<uint8_t> buffer_; + }; + + /** + * Purpose: Atomic Entry allows us to create a statically + * sized ring buffer, with the ability to create + * + **/ +template<typename T> +class AtomicEntry { + + public: + /** + * Constructor that accepts a max size and an atomic counter for the total + * size allowd by this and other atomic entries. + */ + explicit AtomicEntry(std::atomic<size_t> *total_size, size_t *max_size) + : write_pending_(false), + has_value_(false), + accumulated_repo_size_(total_size), + max_repo_size_(max_size), + ref_count_(0), + free_required(false) { + + } + + /** + * Sets the repo value, moving the old value into old_value. + * @param new_value new value to move into value_. + * @param old_value the previous value of value_ will be moved into old_value + * @param prev_size size reclaimed. + * @return result of this set. If true old_value will be populated. + */ + bool setRepoValue(RepoValue<T> &new_value, RepoValue<T> &old_value, size_t &prev_size) { + // delete the underlying pointer + bool lock = false; + if (!write_pending_.compare_exchange_weak(lock, true)) + { + return false; + } + if (has_value_) { + prev_size = value_.size(); + } + old_value = std::move(value_); + value_ = std::move(new_value); + has_value_ = true; + try_unlock(); + return true; + } + + + AtomicEntry<T> *takeOwnership() + { + bool lock = false; + if (!write_pending_.compare_exchange_weak(lock, true) ) + return nullptr; + + ref_count_++; + + try_unlock(); + + return this; + } + /** + * A test and set operation, which is used to allow a function to test + * if an item can be released and a function used for reclaiming memory associated + * with said object. + * A custom comparator can be provided to augment the key being added into value_ + */ + bool testAndSetKey(const T str, std::function<bool(T)> releaseTest = nullptr, std::function<void(T)> reclaimer = nullptr, std::function<bool(T, T)> comparator = nullptr) { + bool lock = false; + + if (!write_pending_.compare_exchange_weak(lock, true) ) + return false; + + if (has_value_) { + // we either don't have a release test or we cannot release this + // entity + if (releaseTest != nullptr && reclaimer != nullptr && releaseTest(value_.getKey())) + { + reclaimer(value_.getKey()); + } + else if (free_required && ref_count_ == 0) + { + size_t bufferSize = value_.getBufferSize(); + value_.clearBuffer(); + has_value_ = false; + if (accumulated_repo_size_ != nullptr) { + *accumulated_repo_size_ -= bufferSize; + } + free_required = false; + } + else { + try_unlock(); + return false; + } + + } + ref_count_=1; + value_.setKey(str, comparator); + has_value_ = true; + try_unlock(); + return true; + } + + /** + * Moved the value into the argument + * @param value the previous value will be moved into this parameter + * @return success of get operation based on whether or not this atomic entry has a value. + */ + bool getValue(RepoValue<T> &value) { + try_lock(); + if (!has_value_) { + try_unlock(); + return false; + } + value = std::move(value_); + has_value_ = false; + try_unlock(); + return true; + } + + /** + * Moved the value into the argument + * @param value the previous value will be moved into this parameter + * @return success of get operation based on whether or not this atomic entry has a value. + */ + bool getValue(const T &key, RepoValue<T> &value) { + try_lock(); + if (!has_value_) { + try_unlock(); + return false; + } + if (!value_.isKey(key)) { + try_unlock(); + return false; + } + value = std::move(value_); + has_value_ = false; + try_unlock(); + return true; + } + + void decrementOwnership(){ + try_lock(); + if (!has_value_) { + try_unlock(); + return; + } + if (ref_count_ > 0){ + ref_count_--; + } + if (ref_count_ == 0 && free_required) + { + size_t bufferSize = value_.getBufferSize(); + value_.clearBuffer(); + has_value_ = false; + if (accumulated_repo_size_ != nullptr) { + *accumulated_repo_size_ -= bufferSize; + } + free_required = false; + } + else{ + } + try_unlock(); + } + + /** + * Moved the value into the argument + * @param value the previous value will be moved into this parameter + * @return success of get operation based on whether or not this atomic entry has a value. + */ + bool getValue(const T &key, RepoValue<T> **value) { + try_lock(); + if (!has_value_) { + try_unlock(); + return false; + } + if (!value_.isKey(key)) { + try_unlock(); + return false; + } + ref_count_++; + *value = &value_; + try_unlock(); + return true; + } + + /** + * Operation that will be used to test and free if a release is required without + * setting a new object. + * @param releaseTest function that will be used to free the RepoValue key from + * this atomic entry. + * @param freedValue informs the caller if an item was freed. + */ + T testAndFree(std::function<bool(T)> releaseTest, bool &freedValue) { + try_lock(); + T ref; + if (!has_value_) { + try_unlock(); + return ref; + } + + if (releaseTest(value_.getKey())) { + size_t bufferSize = value_.getBufferSize(); + value_.clearBuffer(); + ref = value_.getKey(); + has_value_ = false; + if (accumulated_repo_size_ != nullptr) { + *accumulated_repo_size_ -= bufferSize; + } + + } + try_unlock(); + return ref; + } + + size_t getLength() + { + size_t size = 0; + try_lock(); + size = value_.getBufferSize(); + try_unlock(); + return size; + + } + + /** + * sets has_value to false; however, does not call + * any external entity to further free RepoValue + */ + bool freeValue(const T &key) { + try_lock(); + if (!has_value_) { + try_unlock(); + return false; + } + if (!value_.isKey(key)) { + try_unlock(); + return false; + } + if (ref_count_ > 0) + { + free_required = true; + try_unlock(); + return true; + } + size_t bufferSize = value_.getBufferSize(); + value_.clearBuffer(); + has_value_ = false; + if (accumulated_repo_size_ != nullptr) { + *accumulated_repo_size_ -= bufferSize; + } + free_required = false; + try_unlock(); + return true; + } + + /** + * Appends buffer onto this atomic entry if key matches + * the current RepoValue's key. + */ + bool insert(const T key, uint8_t *buffer, size_t size) { + try_lock(); + + if (!has_value_) { + try_unlock(); + return false; + } + + if (!value_.isKey(key)) { + try_unlock(); + return false; + } + + if ((accumulated_repo_size_ != nullptr && max_repo_size_ != nullptr) && (*accumulated_repo_size_ + size > *max_repo_size_)) { + // can't support this write + try_unlock(); + return false; + } + + value_.append(buffer, size); + (*accumulated_repo_size_) += size; + try_unlock(); + return true; + } + + private: + + /** + * Spin lock to unlock the current atomic entry. + */ + inline void try_lock() { + bool lock = false; + while (!write_pending_.compare_exchange_weak(lock, true,std::memory_order_acquire)) { + lock = false; + // attempt again + } + } + + /** + * Spin lock to unlock the current atomic entry. + */ + inline void try_unlock() { + bool lock = true; + while (!write_pending_.compare_exchange_weak(lock, false,std::memory_order_acquire)) { + lock = true; + // attempt again + } + } + + // atomic size pointer. + std::atomic<size_t> *accumulated_repo_size_; + // max size + size_t *max_repo_size_; + // determines if a write is pending. + std::atomic<bool> write_pending_; + // used to determine if a value is present in this atomic entry. + std::atomic<bool> has_value_; + std::atomic<uint16_t> ref_count_; + std::atomic<bool> free_required; + // repo value. + RepoValue<T> value_; +}; + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_ATOMICREPOENTRIES_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/repository/FileSystemRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h new file mode 100644 index 0000000..84bf01e --- /dev/null +++ b/libminifi/include/core/repository/FileSystemRepository.h @@ -0,0 +1,72 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_FileSystemRepository_H_ +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_FileSystemRepository_H_ + +#include "core/Core.h" +#include "../ContentRepository.h" +#include "properties/Configure.h" +#include "core/logging/LoggerConfiguration.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +/** + * FileSystemRepository is a content repository that stores data onto the local file system. + */ +class FileSystemRepository : public core::ContentRepository, public core::CoreComponent { + public: + FileSystemRepository(std::string name = getClassName<FileSystemRepository>()) + : core::CoreComponent(name), + logger_(logging::LoggerFactory<FileSystemRepository>::getLogger()) { + + } + virtual ~FileSystemRepository() { + + } + + virtual bool initialize(const std::shared_ptr<minifi::Configure> &configuration); + + virtual void stop(); + + virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<minifi::ResourceClaim> &claim); + + virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<minifi::ResourceClaim> &claim); + + virtual bool close(const std::shared_ptr<minifi::ResourceClaim> &claim) { + return remove(claim); + } + + virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim); + + private: + + std::shared_ptr<logging::Logger> logger_; +}; + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_FileSystemRepository_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/repository/FlowFileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/FlowFileRepository.h b/libminifi/include/core/repository/FlowFileRepository.h index 2e19286..28b9c05 100644 --- a/libminifi/include/core/repository/FlowFileRepository.h +++ b/libminifi/include/core/repository/FlowFileRepository.h @@ -48,12 +48,10 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr // Constructor FlowFileRepository(const std::string repo_name = "", std::string directory = FLOWFILE_REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, - int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, - uint64_t purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD) + int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD) : Repository(repo_name.length() > 0 ? repo_name : core::getClassName<FlowFileRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod), - logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()) - - { + logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()), + content_repo_(nullptr) { db_ = NULL; } @@ -95,11 +93,12 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr virtual void run(); - virtual bool Put(std::string key, uint8_t *buf, int bufLen) { + virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) { // persistent to the DB leveldb::Slice value((const char *) buf, bufLen); leveldb::Status status; + repo_size_+=bufLen; status = db_->Put(leveldb::WriteOptions(), key, value); if (status.ok()) return true; @@ -115,7 +114,9 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr leveldb::Status status; status = db_->Delete(leveldb::WriteOptions(), key); if (status.ok()) + { return true; + } else return false; } @@ -123,7 +124,7 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr * Sets the value from the provided key * @return status of the get operation. */ - virtual bool Get(std::string key, std::string &value) { + virtual bool Get(const std::string &key, std::string &value) { leveldb::Status status; status = db_->Get(leveldb::ReadOptions(), key, &value); if (status.ok()) @@ -135,21 +136,25 @@ class FlowFileRepository : public core::Repository, public std::enable_shared_fr void setConnectionMap(std::map<std::string, std::shared_ptr<minifi::Connection>> &connectionMap) { this->connectionMap = connectionMap; } - void loadComponent(); + virtual void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo); void start() { if (this->purge_period_ <= 0) + { return; + } if (running_) + { return; - thread_ = std::thread(&FlowFileRepository::run, shared_from_this()); - thread_.detach(); + } running_ = true; + thread_ = std::thread(&FlowFileRepository::run, shared_from_this()); logger_->log_info("%s Repository Monitor Thread Start", name_.c_str()); } private: std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap; + std::shared_ptr<core::ContentRepository> content_repo_; leveldb::DB* db_; std::shared_ptr<logging::Logger> logger_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/repository/VolatileContentRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h new file mode 100644 index 0000000..306a812 --- /dev/null +++ b/libminifi/include/core/repository/VolatileContentRepository.h @@ -0,0 +1,138 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileContentRepository_H_ +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileContentRepository_H_ + +#include "core/Core.h" +#include "AtomicRepoEntries.h" +#include "io/AtomicEntryStream.h" +#include "../ContentRepository.h" +#include "core/repository/VolatileRepository.h" +#include "properties/Configure.h" +#include "core/logging/LoggerConfiguration.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +/** + * Purpose: Stages content into a volatile area of memory. Note that when the maximum number + * of entries is consumed we will rollback a session to wait for others to be freed. + */ +class VolatileContentRepository : public core::ContentRepository, public core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>> { + public: + + static const char *minimal_locking; + + explicit VolatileContentRepository(std::string name = getClassName<VolatileContentRepository>()) + : core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>>(name), + logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()), + minimize_locking_(true) { + max_count_ = 15000; + } + virtual ~VolatileContentRepository() { + if (!minimize_locking_) { + std::lock_guard<std::mutex> lock(map_mutex_); + for (const auto &item : master_list_) + { + delete item.second; + } + master_list_.clear(); + } + + } + + /** + * Initialize the volatile content repo + * @param configure configuration + */ + virtual bool initialize(const std::shared_ptr<Configure> &configure); + + /** + * Stop any thread associated with the volatile content repository. + */ + virtual void stop(); + + /** + * Creates writable stream. + * @param claim resource claim + * @return BaseStream shared pointer that represents the stream the consumer will write to. + */ + virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<minifi::ResourceClaim> &claim); + + /** + * Creates readable stream. + * @param claim resource claim + * @return BaseStream shared pointer that represents the stream from which the consumer will read.. + */ + virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<minifi::ResourceClaim> &claim); + + /** + * Closes the claim. + * @return whether or not the claim is associated with content stored in volatile memory. + */ + virtual bool close(const std::shared_ptr<minifi::ResourceClaim> &claim) { + return remove(claim); + } + + /** + * Closes the claim. + * @return whether or not the claim is associated with content stored in volatile memory. + */ + virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim); + + protected: + virtual void start(); + + virtual void run(); + + template<typename T2> + std::shared_ptr<T2> shared_from_parent() + { + return std::static_pointer_cast<T2>(shared_from_this()); + } + + private: + + bool minimize_locking_; + + // function pointers that are associated with the claims. + std::function<bool(std::shared_ptr<minifi::ResourceClaim>, std::shared_ptr<minifi::ResourceClaim>)> resource_claim_comparator_; + std::function<bool(std::shared_ptr<minifi::ResourceClaim>)> resource_claim_check_; + std::function<void(std::shared_ptr<minifi::ResourceClaim>)> claim_reclaimer_; + + // logger + std::shared_ptr<logging::Logger> logger_; + + // mutex and master list that represent a cache of Atomic entries. this exists so that we don't have to walk the atomic entry list. + // The idea is to reduce the computational complexity while keeping access as maximally lock free as we can. + std::mutex map_mutex_; + + std::map<std::string, AtomicEntry<std::shared_ptr<minifi::ResourceClaim>>*> master_list_; +}; + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_VolatileContentRepository_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/include/core/repository/VolatileFlowFileRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h new file mode 100644 index 0000000..059c1de --- /dev/null +++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h @@ -0,0 +1,82 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEFLOWFILEREPOSITORY_H_ +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEFLOWFILEREPOSITORY_H_ + +#include "VolatileRepository.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +/** + * Volatile flow file repository. keeps a running counter of the current location, freeing + * those which we no longer hold. + */ +class VolatileFlowFileRepository : public VolatileRepository<std::string> +{ + public: + explicit VolatileFlowFileRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = + MAX_REPOSITORY_STORAGE_SIZE, + uint64_t purgePeriod = REPOSITORY_PURGE_PERIOD) + : VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod) + + { + purge_required_ = true; + content_repo_ = nullptr; + } + + virtual void run() { + repo_full_ = false; + while (running_) { + std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); + if (purge_required_ && nullptr != content_repo_) + { + std::lock_guard<std::mutex> lock(purge_mutex_); + for (auto purgeItem : purge_list_) + { + std::shared_ptr<minifi::ResourceClaim> newClaim = std::make_shared<minifi::ResourceClaim>(purgeItem, content_repo_, true); + content_repo_->remove(newClaim); + } + purge_list_.resize(0); + purge_list_.clear(); + } + } + } + + void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { + content_repo_ = content_repo; + + } + + protected: + + std::shared_ptr<core::ContentRepository> content_repo_; + +}; +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEFLOWFILEREPOSITORY_H_ */
