gitmodimo commented on code in PR #45421: URL: https://github.com/apache/arrow/pull/45421#discussion_r1979282258
########## cpp/src/arrow/acero/concurrent_queue_internal.h: ########## @@ -130,28 +141,41 @@ class BackpressureConcurrentQueue : public ConcurrentQueue<T> { explicit BackpressureConcurrentQueue(BackpressureHandler handler) : handler_(std::move(handler)) {} + // Pops the last item from the queue. Must be called on a non-empty queue T Pop() { std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex()); DoHandle do_handle(*this); return ConcurrentQueue<T>::PopUnlocked(); } - void Push(const T& item) { + // Pops the last item from the queue but waits if the queue is empty until new items are + // pushed. + T WaitAndPop() { std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex()); + ConcurrentQueue<T>::WaitUntilNonEmpty(lock); DoHandle do_handle(*this); - ConcurrentQueue<T>::PushUnlocked(item); + return ConcurrentQueue<T>::PopUnlocked(); } - void Clear() { + // Pops the last item from the queue, or returns a nullopt if empty + std::optional<T> TryPop() { std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex()); DoHandle do_handle(*this); - ConcurrentQueue<T>::ClearUnlocked(); + return ConcurrentQueue<T>::TryPopUnlocked(); } - std::optional<T> TryPop() { + // Pushes an item to the queue + void Push(const T& item) { std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex()); DoHandle do_handle(*this); - return ConcurrentQueue<T>::TryPopUnlocked(); + ConcurrentQueue<T>::PushUnlocked(item); + } + + // Clears the queue + void Clear() { + std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex()); + DoHandle do_handle(*this); + ConcurrentQueue<T>::ClearUnlocked(); Review Comment: changed order to match ConcurrentQueue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org