Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 62ce9b0aa -> 39183135c


MINIFICPP-266: Make wait_time purely virtual so that they must be implemented 
by the thread or executor

MINIFICPP-266: Update Unit test to ensure we are abiding by heartbeat period

This closes #165.

Signed-off-by: Bin Qiu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/39183135
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/39183135
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/39183135

Branch: refs/heads/master
Commit: 39183135c6df3bd8037d94a28924d70d2045f4cc
Parents: 62ce9b0
Author: Marc Parisi <[email protected]>
Authored: Sun Oct 29 12:13:04 2017 -0400
Committer: Bin Qiu <[email protected]>
Committed: Tue Oct 31 16:10:55 2017 -0700

----------------------------------------------------------------------
 libminifi/include/c2/C2Agent.h                  |  5 ++
 libminifi/include/core/state/StateManager.h     |  5 +-
 libminifi/include/core/state/UpdateController.h | 13 ++++-
 libminifi/include/processors/GetTCP.h           |  5 ++
 libminifi/include/utils/ThreadPool.h            |  4 +-
 libminifi/src/FlowController.cpp                |  5 +-
 libminifi/src/core/state/StateManager.cpp       |  9 ++-
 libminifi/test/curl-tests/C2UpdateTest.cpp      | 60 +++++++++-----------
 libminifi/test/unit/ThreadPoolTests.cpp         |  5 ++
 9 files changed, 65 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/include/c2/C2Agent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index c2021c9..b5d4d31 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -73,6 +73,11 @@ class C2Agent : public state::UpdateController, public 
state::metrics::MetricsSi
    */
   virtual int16_t setMetrics(const std::shared_ptr<state::metrics::Metrics> 
&metric);
 
+  int64_t getHeartBestDelay(){
+    std::lock_guard<std::mutex> lock(heartbeat_mutex);
+    return heart_beat_period_;
+  }
+
  protected:
 
   /**

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/include/core/state/StateManager.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/StateManager.h 
b/libminifi/include/core/state/StateManager.h
index 412b02e..2eee1ae 100644
--- a/libminifi/include/core/state/StateManager.h
+++ b/libminifi/include/core/state/StateManager.h
@@ -96,12 +96,13 @@ class StateManager : public metrics::MetricsReporter, 
public metrics::MetricsSin
    * Registers and update controller
    * @param updateController update controller to add.
    */
-  bool registerUpdateListener(const std::shared_ptr<UpdateController> 
&updateController);
+  bool registerUpdateListener(const std::shared_ptr<UpdateController> 
&updateController, const int64_t &delay);
+
 
   /**
    * Base metrics function will employ the default metrics listener.
    */
-  virtual bool startMetrics();
+  virtual bool startMetrics(const int64_t &delay);
 
  private:
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/include/core/state/UpdateController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/UpdateController.h 
b/libminifi/include/core/state/UpdateController.h
index 9d4d2f6..fb5f887 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -119,12 +119,14 @@ class Update {
  */
 class UpdateRunner : public utils::AfterExecute<Update> {
  public:
-  explicit UpdateRunner(std::atomic<bool> &running)
-      : running_(&running) {
+  explicit UpdateRunner(std::atomic<bool> &running, const int64_t &delay)
+      : running_(&running),
+        delay_(delay) {
   }
 
   explicit UpdateRunner(UpdateRunner && other)
-      : running_(std::move(other.running_)) {
+      : running_(std::move(other.running_)),
+        delay_(std::move(other.delay_)) {
 
   }
 
@@ -143,10 +145,15 @@ class UpdateRunner : public utils::AfterExecute<Update> {
     return !*running_;
   }
 
+  virtual int64_t wait_time() {
+    return delay_;
+  }
  protected:
 
   std::atomic<bool> *running_;
 
+  int64_t delay_;
+
 };
 
 class StateController {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/include/processors/GetTCP.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/GetTCP.h 
b/libminifi/include/processors/GetTCP.h
index 78a9c01..f649286 100644
--- a/libminifi/include/processors/GetTCP.h
+++ b/libminifi/include/processors/GetTCP.h
@@ -67,6 +67,11 @@ class SocketAfterExecute : public utils::AfterExecute<int> {
       return false;
   }
 
+  virtual int64_t wait_time(){
+    // wait 500ms
+    return 500;
+  }
+
  protected:
   std::atomic<bool> running_;
   std::map<std::string, std::future<int>*> *list_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/include/utils/ThreadPool.h
----------------------------------------------------------------------
diff --git a/libminifi/include/utils/ThreadPool.h 
b/libminifi/include/utils/ThreadPool.h
index 0c71922..0ae2e5b 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -57,9 +57,7 @@ class AfterExecute {
    * Time to wait before re-running this task if necessary
    * @return milliseconds since epoch after which we are eligible to re-run 
this task.
    */
-  virtual int64_t wait_time() {
-    return 0;
-  }
+  virtual int64_t wait_time() = 0;
 };
 
 /**

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 28294c9..0975032 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -363,9 +363,12 @@ void FlowController::initializeC2() {
       c2_enabled_ = true;
     }
     state::StateManager::initialize();
+
     std::shared_ptr<c2::C2Agent> agent = 
std::make_shared<c2::C2Agent>(std::dynamic_pointer_cast<FlowController>(shared_from_this()),
 std::dynamic_pointer_cast<FlowController>(shared_from_this()),
                                                                        
configuration_);
-    registerUpdateListener(agent);
+    registerUpdateListener(agent, agent->getHeartBestDelay());
+
+    state::StateManager::startMetrics(agent->getHeartBestDelay());
   }
   if (!c2_enabled_) {
     return;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/src/core/state/StateManager.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/state/StateManager.cpp 
b/libminifi/src/core/state/StateManager.cpp
index 11feabf..e14646f 100644
--- a/libminifi/src/core/state/StateManager.cpp
+++ b/libminifi/src/core/state/StateManager.cpp
@@ -34,7 +34,6 @@ void StateManager::initialize() {
   listener_thread_pool_.setMaxConcurrentTasks(3);
   listener_thread_pool_.start();
   controller_running_ = true;
-  startMetrics();
 }
 /**
  * State management operations.
@@ -99,14 +98,14 @@ int16_t 
StateManager::getMetrics(std::vector<std::shared_ptr<metrics::Metrics>>
   return -1;
 }
 
-bool StateManager::registerUpdateListener(const 
std::shared_ptr<UpdateController> &updateController) {
+bool StateManager::registerUpdateListener(const 
std::shared_ptr<UpdateController> &updateController, const int64_t &delay) {
   auto functions = updateController->getFunctions();
 
   updateControllers.push_back(updateController);
   // run all functions independently
 
   for (auto function : functions) {
-    std::unique_ptr<utils::AfterExecute<Update>> after_execute = 
std::unique_ptr<utils::AfterExecute<Update>>(new 
UpdateRunner(isStateMonitorRunning()));
+    std::unique_ptr<utils::AfterExecute<Update>> after_execute = 
std::unique_ptr<utils::AfterExecute<Update>>(new 
UpdateRunner(isStateMonitorRunning(), delay));
     utils::Worker<Update> functor(function, "listeners", 
std::move(after_execute));
     std::future<Update> future;
     if (!listener_thread_pool_.execute(std::move(functor), future)) {
@@ -120,8 +119,8 @@ bool StateManager::registerUpdateListener(const 
std::shared_ptr<UpdateController
 /**
  * Base metrics function will employ the default metrics listener.
  */
-bool StateManager::startMetrics() {
-  std::unique_ptr<utils::AfterExecute<Update>> after_execute = 
std::unique_ptr<utils::AfterExecute<Update>>(new 
UpdateRunner(isStateMonitorRunning()));
+bool StateManager::startMetrics(const int64_t &delay) {
+  std::unique_ptr<utils::AfterExecute<Update>> after_execute = 
std::unique_ptr<utils::AfterExecute<Update>>(new 
UpdateRunner(isStateMonitorRunning(), delay));
   utils::Worker<Update> functor(metrics_listener_->getFunction(), "metrics", 
std::move(after_execute));
   if (!listener_thread_pool_.execute(std::move(functor), 
metrics_listener_->getFuture())) {
     // denote failure

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/test/curl-tests/C2UpdateTest.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/curl-tests/C2UpdateTest.cpp 
b/libminifi/test/curl-tests/C2UpdateTest.cpp
index 4f1eb6c..1a9fe5b 100644
--- a/libminifi/test/curl-tests/C2UpdateTest.cpp
+++ b/libminifi/test/curl-tests/C2UpdateTest.cpp
@@ -54,7 +54,11 @@ static std::vector<std::string> responses;
 
 class ConfigHandler : public CivetHandler {
  public:
+  ConfigHandler() {
+    calls_ = 0;
+  }
   bool handlePost(CivetServer *server, struct mg_connection *conn) {
+    calls_++;
     if (responses.size() > 0) {
       std::string top_str = responses.back();
       responses.pop_back();
@@ -88,6 +92,7 @@ class ConfigHandler : public CivetHandler {
     return true;
   }
   std::string test_file_location_;
+  std::atomic<size_t> calls_;
 };
 
 int main(int argc, char **argv) {
@@ -133,55 +138,46 @@ int main(int argc, char **argv) {
     responses.push_back(response);
   }
 
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<
-      minifi::Configure>();
+  std::shared_ptr<minifi::Configure> configuration = 
std::make_shared<minifi::Configure>();
 
-  configuration->set("c2.rest.url",
-                     "http://localhost:9090/update";);
+  configuration->set("c2.rest.url", "http://localhost:9090/update";);
+  configuration->set("c2.agent.heartbeat.period", "1000");
   mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
 
-  std::shared_ptr<core::Repository> test_repo =
-      std::make_shared<TestRepository>();
-  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<
-      TestFlowRepository>();
+  std::shared_ptr<core::Repository> test_repo = 
std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> test_flow_repo = 
std::make_shared<TestFlowRepository>();
 
-  configuration->set(minifi::Configure::nifi_flow_configuration_file,
-                     test_file_location);
+  configuration->set(minifi::Configure::nifi_flow_configuration_file, 
test_file_location);
 
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared
-      <minifi::io::StreamFactory>(configuration);
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = 
std::make_shared<minifi::io::StreamFactory>(configuration);
   std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr
-      <core::YamlConfiguration
-      >(new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory,
-                                    configuration,
-                                    test_file_location));
-  std::shared_ptr<TestRepository> repo = std::static_pointer_cast
-      <TestRepository>(test_repo);
-
-  std::shared_ptr<minifi::FlowController> controller =
-      std::make_shared<minifi::FlowController
-      >(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), 
content_repo, DEFAULT_ROOT_GROUP_NAME, true);
-
-  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, 
stream_factory,
-                                      configuration,
-                                      test_file_location);
-
-  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
-                                                                
test_file_location);
-  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup
-      >(ptr.get());
+  std::unique_ptr<core::FlowConfiguration> yaml_ptr = 
std::unique_ptr<core::YamlConfiguration>(
+      new core::YamlConfiguration(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location));
+  std::shared_ptr<TestRepository> repo = 
std::static_pointer_cast<TestRepository>(test_repo);
+
+  std::shared_ptr<minifi::FlowController> controller = 
std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, 
configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
+  true);
+
+  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, 
stream_factory, configuration, test_file_location);
+
+  std::unique_ptr<core::ProcessGroup> ptr = 
yaml_config.getRoot(test_file_location);
+  std::shared_ptr<core::ProcessGroup> pg = 
std::shared_ptr<core::ProcessGroup>(ptr.get());
   ptr.release();
+  auto start = std::chrono::system_clock::now();
 
   controller->load();
   controller->start();
   waitToVerifyProcessor();
 
   controller->waitUnload(60000);
+  auto then = std::chrono::system_clock::now();
+
+  auto milliseconds = 
std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
   std::string logs = LogTestController::getInstance().log_output.str();
   assert(logs.find("Starting to reload Flow Controller with flow control name 
MiNiFi Flow, version 0") != std::string::npos);
   LogTestController::getInstance().reset();
   rmdir("./content_repository");
+  assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
 
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/39183135/libminifi/test/unit/ThreadPoolTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ThreadPoolTests.cpp 
b/libminifi/test/unit/ThreadPoolTests.cpp
index cb9c536..587fb9b 100644
--- a/libminifi/test/unit/ThreadPoolTests.cpp
+++ b/libminifi/test/unit/ThreadPoolTests.cpp
@@ -57,6 +57,11 @@ class WorkerNumberExecutions : public 
utils::AfterExecute<int> {
     return runs;
   }
 
+  virtual int64_t wait_time() {
+    // wait 50ms
+    return 50;
+  }
+
  protected:
   int runs;
   int tasks;

Reply via email to