Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 5aa596fd8 -> a6e503451
MINIFICPP-385: Add countdown latch and notify stop so that we gracefully close sockets. This closes #258. Signed-off-by: Bin Qiu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/a6e50345 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/a6e50345 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/a6e50345 Branch: refs/heads/master Commit: a6e5034517452e523e4cf73a3cb944b4a71e67ac Parents: 5aa596f Author: Marc Parisi <[email protected]> Authored: Fri Jan 26 08:23:46 2018 -0500 Committer: Bin Qiu <[email protected]> Committed: Wed Feb 7 09:31:09 2018 -0800 ---------------------------------------------------------------------- libminifi/include/RemoteProcessorGroupPort.h | 32 ++++++++++++++++++++++- libminifi/src/RemoteProcessorGroupPort.cpp | 13 +++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a6e50345/libminifi/include/RemoteProcessorGroupPort.h ---------------------------------------------------------------------- diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h index 386e4d7..c1dfea1 100644 --- a/libminifi/include/RemoteProcessorGroupPort.h +++ b/libminifi/include/RemoteProcessorGroupPort.h @@ -37,6 +37,34 @@ namespace org { namespace apache { namespace nifi { namespace minifi { + +/** + * Count down latch implementation that's used across + * all threads of the RPG. This is okay since the latch increments + * and decrements based on its construction. Using RAII we should + * never have the concern of thread safety. + */ +class RPGLatch{ + public: + RPGLatch(bool increment = true){ + static std::atomic<int> latch_count (0); + count = &latch_count; + if (increment) + count++; + } + + ~RPGLatch(){ + count--; + } + + int getCount(){ + return *count; + } + + private: + std::atomic<int> *count; +}; + // RemoteProcessorGroupPort Class class RemoteProcessorGroupPort : public core::Processor { public: @@ -120,6 +148,8 @@ class RemoteProcessorGroupPort : public core::Processor { // refresh site2site peer list void refreshPeerList(); + virtual void notifyStop(); + protected: std::shared_ptr<io::StreamFactory> stream_factory_; @@ -132,7 +162,7 @@ class RemoteProcessorGroupPort : public core::Processor { // Transaction Direction sitetosite::TransferDirection direction_; // Transmitting - bool transmitting_; + std::atomic<bool> transmitting_; // timeout uint64_t timeout_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/a6e50345/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 31b6c89..436a7a7 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -151,6 +151,17 @@ void RemoteProcessorGroupPort::initialize() { logger_->log_trace("Finished initialization"); } +void RemoteProcessorGroupPort::notifyStop(){ + transmitting_ = false; + RPGLatch count(false); // we're just a monitor + // we use the latch + while(count.getCount() > 0); + std::unique_ptr<sitetosite::SiteToSiteClient> nextProtocol = nullptr; + while(available_protocols_.try_dequeue(nextProtocol)){ + // clear all protocols now + } +} + void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { std::string value; if (context->getProperty(portUUID.getName(), value)) { @@ -173,6 +184,8 @@ void RemoteProcessorGroupPort::onTrigger(const std::shared_ptr<core::ProcessCont return; } + RPGLatch count; + std::string value; logger_->log_trace("On trigger %s", getUUIDStr());
