westonpace commented on a change in pull request #9519: URL: https://github.com/apache/arrow/pull/9519#discussion_r578742222
########## File path: cpp/src/arrow/vendored/ProducerConsumerQueue.h ########## @@ -0,0 +1,211 @@ +// Vendored from git tag v2021.02.15.00 + +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// @author Bo Hu (b...@fb.com) +// @author Jordan DeLong (delon...@fb.com) + +#pragma once + +#include <atomic> +#include <cassert> +#include <cstdlib> +#include <memory> +#include <stdexcept> +#include <type_traits> +#include <utility> + +namespace folly { + +// Vendored from folly/Portability.h +namespace { +#if defined(__arm__) +#define FOLLY_ARM 1 +#else +#define FOLLY_ARM 0 +#endif + +#if defined(__s390x__) +#define FOLLY_S390X 1 +#else +#define FOLLY_S390X 0 +#endif + +constexpr bool kIsArchArm = FOLLY_ARM == 1; +constexpr bool kIsArchS390X = FOLLY_S390X == 1; +} // namespace + +// Vendored from folly/lang/Align.h +namespace { + +constexpr std::size_t hardware_destructive_interference_size = + (kIsArchArm || kIsArchS390X) ? 64 : 128; + +} // namespace + +/* + * ProducerConsumerQueue is a one producer and one consumer queue + * without locks. + */ +template <class T> +struct ProducerConsumerQueue { + typedef T value_type; + + ProducerConsumerQueue(const ProducerConsumerQueue&) = delete; + ProducerConsumerQueue& operator=(const ProducerConsumerQueue&) = delete; + + // size must be >= 2. + // + // Also, note that the number of usable slots in the queue at any + // given time is actually (size-1), so if you start with an empty queue, + // isFull() will return true after size-1 insertions. + explicit ProducerConsumerQueue(uint32_t size) + : size_(size), + records_(static_cast<T*>(std::malloc(sizeof(T) * size))), + readIndex_(0), + writeIndex_(0) { + assert(size >= 2); + if (!records_) { + throw std::bad_alloc(); + } + } + + ~ProducerConsumerQueue() { + // We need to destruct anything that may still exist in our queue. + // (No real synchronization needed at destructor time: only one + // thread can be doing this.) + if (!std::is_trivially_destructible<T>::value) { + size_t readIndex = readIndex_; + size_t endIndex = writeIndex_; + while (readIndex != endIndex) { + records_[readIndex].~T(); + if (++readIndex == size_) { + readIndex = 0; + } + } + } + + std::free(records_); + } + + template <class... Args> + bool write(Args&&... recordArgs) { + auto const currentWrite = writeIndex_.load(std::memory_order_relaxed); + auto nextRecord = currentWrite + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + if (nextRecord != readIndex_.load(std::memory_order_acquire)) { + new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...); + writeIndex_.store(nextRecord, std::memory_order_release); + return true; + } + + // queue is full + return false; Review comment: Right. I had originally thought "one queue to fix it all" but I was overly ambitious. Not only is the spinning a problem but the SPSC part is a problem too. A work stealing thread pool could potentially have multiple consumers and any thread pool, even the one we have today, will have multiple producers. I'll clean up the text on ARROW-11588 so it is less about "one queue" and more about traceability. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org