arpadboda commented on a change in pull request #773:
URL: https://github.com/apache/nifi-minifi-cpp/pull/773#discussion_r416040742



##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -99,6 +106,19 @@ class ConcurrentQueue {
     return true;
   }
 
+  template<typename Functor>
+  bool dequeueApplyImpl(std::unique_lock<std::mutex>& lck, Functor&& fun) {
+    checkLock(lck);
+    if (queue_.empty()) {
+      return false;
+    }
+    T elem = std::move(queue_.front());
+    queue_.pop_front();
+    lck.unlock();

Review comment:
       It seems like a timed bomb for me. 
   You pass a lock (which you locked) as a reference and the called function 
may(!!!) unlock it, but only if it's not empty. 
   On the caller side it's a nightmare to handle. 
   
   The only way I can imagine this to work is moving the lock (this function 
actually takes ownership of that), so the caller is aware that it no longer 
holds it under any circumstances. 
   Still not nice, but at least it protects against the undefined ownership 
issue. 

##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -127,33 +147,58 @@ class ConditionConcurrentQueue : private 
ConcurrentQueue<T> {
   using ConcurrentQueue<T>::empty;
   using ConcurrentQueue<T>::clear;
 
-
   template <typename... Args>
   void enqueue(Args&&... args) {
     ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
     if (running_) {
       cv_.notify_one();
     }
   }
-  
+
   bool dequeueWait(T& out) {
+    if (!running_) {
+      return false;
+    }
     std::unique_lock<std::mutex> lck(this->mtx_);
-    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped 
-    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped
+    return ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+
+  template<typename Functor>
+  bool dequeueApplyWait(Functor&& fun) {
+    if (!running_) {
+      return false;
+    }
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped
+    return ConcurrentQueue<T>::dequeueApplyImpl(lck, 
std::forward<Functor>(fun));
   }
 
   template< class Rep, class Period >
   bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) {
+    if (!running_) {
+      return false;
+    }
     std::unique_lock<std::mutex> lck(this->mtx_);
     cv_.wait_for(lck, time, [this, &lck]{ return !running_ || 
!this->emptyImpl(lck); });  // Wake up with timeout or in case there is 
something to do
-    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+    return ConcurrentQueue<T>::tryDequeueImpl(lck, out);

Review comment:
       As first look this wouldn't hurt, but I would prefer checking the 
running flag in scoped of the locked mutex to make sure. 

##########
File path: libminifi/include/utils/MinifiConcurrentQueue.h
##########
@@ -127,33 +147,58 @@ class ConditionConcurrentQueue : private 
ConcurrentQueue<T> {
   using ConcurrentQueue<T>::empty;
   using ConcurrentQueue<T>::clear;
 
-
   template <typename... Args>
   void enqueue(Args&&... args) {
     ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
     if (running_) {
       cv_.notify_one();
     }
   }
-  
+
   bool dequeueWait(T& out) {
+    if (!running_) {
+      return false;
+    }
     std::unique_lock<std::mutex> lck(this->mtx_);
-    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped 
-    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped
+    return ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+
+  template<typename Functor>
+  bool dequeueApplyWait(Functor&& fun) {
+    if (!running_) {
+      return false;
+    }
+    std::unique_lock<std::mutex> lck(this->mtx_);
+    cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); 
 // Only wake up if there is something to return or stopped
+    return ConcurrentQueue<T>::dequeueApplyImpl(lck, 
std::forward<Functor>(fun));
   }
 
   template< class Rep, class Period >
   bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) {
+    if (!running_) {
+      return false;
+    }
     std::unique_lock<std::mutex> lck(this->mtx_);
     cv_.wait_for(lck, time, [this, &lck]{ return !running_ || 
!this->emptyImpl(lck); });  // Wake up with timeout or in case there is 
something to do
-    return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+    return ConcurrentQueue<T>::tryDequeueImpl(lck, out);
+  }
+
+  template<typename Functor, class Rep, class Period>
+  bool dequeueApplyWaitFor(Functor&& fun, const std::chrono::duration<Rep, 
Period>& time) {
+    if (!running_) {

Review comment:
       Here as well. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to