arpadboda commented on a change in pull request #773: URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r416040742
########## File path: libminifi/include/utils/MinifiConcurrentQueue.h ########## @@ -99,6 +106,19 @@ class ConcurrentQueue { return true; } + template<typename Functor> + bool dequeueApplyImpl(std::unique_lock<std::mutex>& lck, Functor&& fun) { + checkLock(lck); + if (queue_.empty()) { + return false; + } + T elem = std::move(queue_.front()); + queue_.pop_front(); + lck.unlock(); Review comment: It seems like a timed bomb for me. You pass a lock (which you locked) as a reference and the called function may(!!!) unlock it, but only if it's not empty. On the caller side it's a nightmare to handle. The only way I can imagine this to work is moving the lock (this function actually takes ownership of that), so the caller is aware that it no longer holds it under any circumstances. Still not nice, but at least it protects against the undefined ownership issue. ########## File path: libminifi/include/utils/MinifiConcurrentQueue.h ########## @@ -127,33 +147,58 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> { using ConcurrentQueue<T>::empty; using ConcurrentQueue<T>::clear; - template <typename... Args> void enqueue(Args&&... args) { ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...); if (running_) { cv_.notify_one(); } } - + bool dequeueWait(T& out) { + if (!running_) { + return false; + } std::unique_lock<std::mutex> lck(this->mtx_); - cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped - return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out); + cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped + return ConcurrentQueue<T>::tryDequeueImpl(lck, out); + } + + template<typename Functor> + bool dequeueApplyWait(Functor&& fun) { + if (!running_) { + return false; + } + std::unique_lock<std::mutex> lck(this->mtx_); + cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped + return ConcurrentQueue<T>::dequeueApplyImpl(lck, std::forward<Functor>(fun)); } template< class Rep, class Period > bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) { + if (!running_) { + return false; + } std::unique_lock<std::mutex> lck(this->mtx_); cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do - return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out); + return ConcurrentQueue<T>::tryDequeueImpl(lck, out); Review comment: As first look this wouldn't hurt, but I would prefer checking the running flag in scoped of the locked mutex to make sure. ########## File path: libminifi/include/utils/MinifiConcurrentQueue.h ########## @@ -127,33 +147,58 @@ class ConditionConcurrentQueue : private ConcurrentQueue<T> { using ConcurrentQueue<T>::empty; using ConcurrentQueue<T>::clear; - template <typename... Args> void enqueue(Args&&... args) { ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...); if (running_) { cv_.notify_one(); } } - + bool dequeueWait(T& out) { + if (!running_) { + return false; + } std::unique_lock<std::mutex> lck(this->mtx_); - cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped - return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out); + cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped + return ConcurrentQueue<T>::tryDequeueImpl(lck, out); + } + + template<typename Functor> + bool dequeueApplyWait(Functor&& fun) { + if (!running_) { + return false; + } + std::unique_lock<std::mutex> lck(this->mtx_); + cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped + return ConcurrentQueue<T>::dequeueApplyImpl(lck, std::forward<Functor>(fun)); } template< class Rep, class Period > bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) { + if (!running_) { + return false; + } std::unique_lock<std::mutex> lck(this->mtx_); cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do - return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out); + return ConcurrentQueue<T>::tryDequeueImpl(lck, out); + } + + template<typename Functor, class Rep, class Period> + bool dequeueApplyWaitFor(Functor&& fun, const std::chrono::duration<Rep, Period>& time) { + if (!running_) { Review comment: Here as well. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org