Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 5aa596fd8 -> a6e503451


MINIFICPP-385: Add countdown latch and notify stop so that we gracefully close 
sockets.

This closes #258.

Signed-off-by: Bin Qiu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a6e50345
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a6e50345
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a6e50345

Branch: refs/heads/master
Commit: a6e5034517452e523e4cf73a3cb944b4a71e67ac
Parents: 5aa596f
Author: Marc Parisi <[email protected]>
Authored: Fri Jan 26 08:23:46 2018 -0500
Committer: Bin Qiu <[email protected]>
Committed: Wed Feb 7 09:31:09 2018 -0800

----------------------------------------------------------------------
 libminifi/include/RemoteProcessorGroupPort.h | 32 ++++++++++++++++++++++-
 libminifi/src/RemoteProcessorGroupPort.cpp   | 13 +++++++++
 2 files changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a6e50345/libminifi/include/RemoteProcessorGroupPort.h
----------------------------------------------------------------------
diff --git a/libminifi/include/RemoteProcessorGroupPort.h 
b/libminifi/include/RemoteProcessorGroupPort.h
index 386e4d7..c1dfea1 100644
--- a/libminifi/include/RemoteProcessorGroupPort.h
+++ b/libminifi/include/RemoteProcessorGroupPort.h
@@ -37,6 +37,34 @@ namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
+
+/**
+ * Count down latch implementation that's used across
+ * all threads of the RPG. This is okay since the latch increments
+ * and decrements based on its construction. Using RAII we should
+ * never have the concern of thread safety.
+ */
+class RPGLatch{
+ public:
+  RPGLatch(bool increment = true){
+    static std::atomic<int> latch_count (0);
+    count = &latch_count;
+    if (increment)
+      count++;
+  }
+
+  ~RPGLatch(){
+    count--;
+  }
+
+  int getCount(){
+    return *count;
+  }
+
+ private:
+  std::atomic<int> *count;
+};
+
 // RemoteProcessorGroupPort Class
 class RemoteProcessorGroupPort : public core::Processor {
  public:
@@ -120,6 +148,8 @@ class RemoteProcessorGroupPort : public core::Processor {
   // refresh site2site peer list
   void refreshPeerList();
 
+  virtual void notifyStop();
+
  protected:
 
   std::shared_ptr<io::StreamFactory> stream_factory_;
@@ -132,7 +162,7 @@ class RemoteProcessorGroupPort : public core::Processor {
   // Transaction Direction
   sitetosite::TransferDirection direction_;
   // Transmitting
-  bool transmitting_;
+  std::atomic<bool> transmitting_;
   // timeout
   uint64_t timeout_;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a6e50345/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp 
b/libminifi/src/RemoteProcessorGroupPort.cpp
index 31b6c89..436a7a7 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -151,6 +151,17 @@ void RemoteProcessorGroupPort::initialize() {
   logger_->log_trace("Finished initialization");
 }
 
+void RemoteProcessorGroupPort::notifyStop(){
+  transmitting_ = false;
+  RPGLatch count(false); // we're just a monitor
+  // we use the latch
+  while(count.getCount() > 0);
+  std::unique_ptr<sitetosite::SiteToSiteClient> nextProtocol = nullptr;
+  while(available_protocols_.try_dequeue(nextProtocol)){
+    // clear all protocols now
+  }
+}
+
 void RemoteProcessorGroupPort::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   std::string value;
   if (context->getProperty(portUUID.getName(), value)) {
@@ -173,6 +184,8 @@ void RemoteProcessorGroupPort::onTrigger(const 
std::shared_ptr<core::ProcessCont
     return;
   }
 
+  RPGLatch count;
+
   std::string value;
 
   logger_->log_trace("On trigger %s", getUUIDStr());

Reply via email to