Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 62ce9b0aa -> 39183135c
MINIFICPP-266: Make wait_time purely virtual so that they must be implemented by the thread or executor MINIFICPP-266: Update Unit test to ensure we are abiding by heartbeat period This closes #165. Signed-off-by: Bin Qiu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/39183135 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/39183135 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/39183135 Branch: refs/heads/master Commit: 39183135c6df3bd8037d94a28924d70d2045f4cc Parents: 62ce9b0 Author: Marc Parisi <[email protected]> Authored: Sun Oct 29 12:13:04 2017 -0400 Committer: Bin Qiu <[email protected]> Committed: Tue Oct 31 16:10:55 2017 -0700 ---------------------------------------------------------------------- libminifi/include/c2/C2Agent.h | 5 ++ libminifi/include/core/state/StateManager.h | 5 +- libminifi/include/core/state/UpdateController.h | 13 ++++- libminifi/include/processors/GetTCP.h | 5 ++ libminifi/include/utils/ThreadPool.h | 4 +- libminifi/src/FlowController.cpp | 5 +- libminifi/src/core/state/StateManager.cpp | 9 ++- libminifi/test/curl-tests/C2UpdateTest.cpp | 60 +++++++++----------- libminifi/test/unit/ThreadPoolTests.cpp | 5 ++ 9 files changed, 65 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/include/c2/C2Agent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h index c2021c9..b5d4d31 100644 --- a/libminifi/include/c2/C2Agent.h +++ b/libminifi/include/c2/C2Agent.h @@ -73,6 +73,11 @@ class C2Agent : public state::UpdateController, public state::metrics::MetricsSi */ virtual int16_t setMetrics(const std::shared_ptr<state::metrics::Metrics> &metric); + int64_t getHeartBestDelay(){ + std::lock_guard<std::mutex> lock(heartbeat_mutex); + return heart_beat_period_; + } + protected: /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/include/core/state/StateManager.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/StateManager.h b/libminifi/include/core/state/StateManager.h index 412b02e..2eee1ae 100644 --- a/libminifi/include/core/state/StateManager.h +++ b/libminifi/include/core/state/StateManager.h @@ -96,12 +96,13 @@ class StateManager : public metrics::MetricsReporter, public metrics::MetricsSin * Registers and update controller * @param updateController update controller to add. */ - bool registerUpdateListener(const std::shared_ptr<UpdateController> &updateController); + bool registerUpdateListener(const std::shared_ptr<UpdateController> &updateController, const int64_t &delay); + /** * Base metrics function will employ the default metrics listener. */ - virtual bool startMetrics(); + virtual bool startMetrics(const int64_t &delay); private: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/include/core/state/UpdateController.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h index 9d4d2f6..fb5f887 100644 --- a/libminifi/include/core/state/UpdateController.h +++ b/libminifi/include/core/state/UpdateController.h @@ -119,12 +119,14 @@ class Update { */ class UpdateRunner : public utils::AfterExecute<Update> { public: - explicit UpdateRunner(std::atomic<bool> &running) - : running_(&running) { + explicit UpdateRunner(std::atomic<bool> &running, const int64_t &delay) + : running_(&running), + delay_(delay) { } explicit UpdateRunner(UpdateRunner && other) - : running_(std::move(other.running_)) { + : running_(std::move(other.running_)), + delay_(std::move(other.delay_)) { } @@ -143,10 +145,15 @@ class UpdateRunner : public utils::AfterExecute<Update> { return !*running_; } + virtual int64_t wait_time() { + return delay_; + } protected: std::atomic<bool> *running_; + int64_t delay_; + }; class StateController { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/include/processors/GetTCP.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/GetTCP.h b/libminifi/include/processors/GetTCP.h index 78a9c01..f649286 100644 --- a/libminifi/include/processors/GetTCP.h +++ b/libminifi/include/processors/GetTCP.h @@ -67,6 +67,11 @@ class SocketAfterExecute : public utils::AfterExecute<int> { return false; } + virtual int64_t wait_time(){ + // wait 500ms + return 500; + } + protected: std::atomic<bool> running_; std::map<std::string, std::future<int>*> *list_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/include/utils/ThreadPool.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 0c71922..0ae2e5b 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -57,9 +57,7 @@ class AfterExecute { * Time to wait before re-running this task if necessary * @return milliseconds since epoch after which we are eligible to re-run this task. */ - virtual int64_t wait_time() { - return 0; - } + virtual int64_t wait_time() = 0; }; /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 28294c9..0975032 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -363,9 +363,12 @@ void FlowController::initializeC2() { c2_enabled_ = true; } state::StateManager::initialize(); + std::shared_ptr<c2::C2Agent> agent = std::make_shared<c2::C2Agent>(std::dynamic_pointer_cast<FlowController>(shared_from_this()), std::dynamic_pointer_cast<FlowController>(shared_from_this()), configuration_); - registerUpdateListener(agent); + registerUpdateListener(agent, agent->getHeartBestDelay()); + + state::StateManager::startMetrics(agent->getHeartBestDelay()); } if (!c2_enabled_) { return; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/src/core/state/StateManager.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/state/StateManager.cpp b/libminifi/src/core/state/StateManager.cpp index 11feabf..e14646f 100644 --- a/libminifi/src/core/state/StateManager.cpp +++ b/libminifi/src/core/state/StateManager.cpp @@ -34,7 +34,6 @@ void StateManager::initialize() { listener_thread_pool_.setMaxConcurrentTasks(3); listener_thread_pool_.start(); controller_running_ = true; - startMetrics(); } /** * State management operations. @@ -99,14 +98,14 @@ int16_t StateManager::getMetrics(std::vector<std::shared_ptr<metrics::Metrics>> return -1; } -bool StateManager::registerUpdateListener(const std::shared_ptr<UpdateController> &updateController) { +bool StateManager::registerUpdateListener(const std::shared_ptr<UpdateController> &updateController, const int64_t &delay) { auto functions = updateController->getFunctions(); updateControllers.push_back(updateController); // run all functions independently for (auto function : functions) { - std::unique_ptr<utils::AfterExecute<Update>> after_execute = std::unique_ptr<utils::AfterExecute<Update>>(new UpdateRunner(isStateMonitorRunning())); + std::unique_ptr<utils::AfterExecute<Update>> after_execute = std::unique_ptr<utils::AfterExecute<Update>>(new UpdateRunner(isStateMonitorRunning(), delay)); utils::Worker<Update> functor(function, "listeners", std::move(after_execute)); std::future<Update> future; if (!listener_thread_pool_.execute(std::move(functor), future)) { @@ -120,8 +119,8 @@ bool StateManager::registerUpdateListener(const std::shared_ptr<UpdateController /** * Base metrics function will employ the default metrics listener. */ -bool StateManager::startMetrics() { - std::unique_ptr<utils::AfterExecute<Update>> after_execute = std::unique_ptr<utils::AfterExecute<Update>>(new UpdateRunner(isStateMonitorRunning())); +bool StateManager::startMetrics(const int64_t &delay) { + std::unique_ptr<utils::AfterExecute<Update>> after_execute = std::unique_ptr<utils::AfterExecute<Update>>(new UpdateRunner(isStateMonitorRunning(), delay)); utils::Worker<Update> functor(metrics_listener_->getFunction(), "metrics", std::move(after_execute)); if (!listener_thread_pool_.execute(std::move(functor), metrics_listener_->getFuture())) { // denote failure http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/test/curl-tests/C2UpdateTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/C2UpdateTest.cpp b/libminifi/test/curl-tests/C2UpdateTest.cpp index 4f1eb6c..1a9fe5b 100644 --- a/libminifi/test/curl-tests/C2UpdateTest.cpp +++ b/libminifi/test/curl-tests/C2UpdateTest.cpp @@ -54,7 +54,11 @@ static std::vector<std::string> responses; class ConfigHandler : public CivetHandler { public: + ConfigHandler() { + calls_ = 0; + } bool handlePost(CivetServer *server, struct mg_connection *conn) { + calls_++; if (responses.size() > 0) { std::string top_str = responses.back(); responses.pop_back(); @@ -88,6 +92,7 @@ class ConfigHandler : public CivetHandler { return true; } std::string test_file_location_; + std::atomic<size_t> calls_; }; int main(int argc, char **argv) { @@ -133,55 +138,46 @@ int main(int argc, char **argv) { responses.push_back(response); } - std::shared_ptr<minifi::Configure> configuration = std::make_shared< - minifi::Configure>(); + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - configuration->set("c2.rest.url", - "http://localhost:9090/update"); + configuration->set("c2.rest.url", "http://localhost:9090/update"); + configuration->set("c2.agent.heartbeat.period", "1000"); mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - std::shared_ptr<core::Repository> test_repo = - std::make_shared<TestRepository>(); - std::shared_ptr<core::Repository> test_flow_repo = std::make_shared< - TestFlowRepository>(); + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); - configuration->set(minifi::Configure::nifi_flow_configuration_file, - test_file_location); + configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared - <minifi::io::StreamFactory>(configuration); + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr - <core::YamlConfiguration - >(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, - configuration, - test_file_location)); - std::shared_ptr<TestRepository> repo = std::static_pointer_cast - <TestRepository>(test_repo); - - std::shared_ptr<minifi::FlowController> controller = - std::make_shared<minifi::FlowController - >(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true); - - core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, - configuration, - test_file_location); - - std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( - test_file_location); - std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup - >(ptr.get()); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, + true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); ptr.release(); + auto start = std::chrono::system_clock::now(); controller->load(); controller->start(); waitToVerifyProcessor(); controller->waitUnload(60000); + auto then = std::chrono::system_clock::now(); + + auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count(); std::string logs = LogTestController::getInstance().log_output.str(); assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos); LogTestController::getInstance().reset(); rmdir("./content_repository"); + assert(h_ex.calls_ <= (milliseconds / 1000) + 1); return 0; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/test/unit/ThreadPoolTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp index cb9c536..587fb9b 100644 --- a/libminifi/test/unit/ThreadPoolTests.cpp +++ b/libminifi/test/unit/ThreadPoolTests.cpp @@ -57,6 +57,11 @@ class WorkerNumberExecutions : public utils::AfterExecute<int> { return runs; } + virtual int64_t wait_time() { + // wait 50ms + return 50; + } + protected: int runs; int tasks;
