Copilot commented on code in PR #12370: URL: https://github.com/apache/gluten/pull/12370#discussion_r3500214415
########## cpp/velox/utils/CachedBatchQueue.h: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <condition_variable> +#include <mutex> +#include <queue> + +namespace gluten { + +template <typename T> +class CachedBatchQueue { + public: + explicit CachedBatchQueue(const int64_t capacity) : capacity_(capacity) {} + + void put(std::shared_ptr<T> batch) { + std::unique_lock<std::mutex> lock(mtx_); + const auto batchSize = batch->numBytes(); + + VELOX_CHECK_LE(batchSize, capacity_, "Batch size exceeds queue capacity"); + + notFull_.wait(lock, [&]() { return totalSize_ + batchSize <= capacity_; }); + + queue_.push(std::move(batch)); + totalSize_ += batchSize; + + notEmpty_.notify_one(); + } Review Comment: CachedBatchQueue::put() waits only on `totalSize_ + batchSize <= capacity_` and ignores `noMoreBatches_`. If the consumer stops early (e.g., native shuffle reader calls stop) while the queue is full, producer threads can block forever in this wait even after `noMoreBatches()` notifies, leading to deadlock/leaked threads. ########## cpp/velox/utils/CachedBatchQueue.h: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <condition_variable> +#include <mutex> +#include <queue> + +namespace gluten { + +template <typename T> +class CachedBatchQueue { + public: + explicit CachedBatchQueue(const int64_t capacity) : capacity_(capacity) {} + + void put(std::shared_ptr<T> batch) { + std::unique_lock<std::mutex> lock(mtx_); + const auto batchSize = batch->numBytes(); + + VELOX_CHECK_LE(batchSize, capacity_, "Batch size exceeds queue capacity"); + + notFull_.wait(lock, [&]() { return totalSize_ + batchSize <= capacity_; }); + + queue_.push(std::move(batch)); + totalSize_ += batchSize; + + notEmpty_.notify_one(); + } + + std::shared_ptr<T> get() { + std::unique_lock<std::mutex> lock(mtx_); + notEmpty_.wait(lock, [&]() { return noMoreBatches_ || !queue_.empty(); }); + + if (queue_.empty()) { + return nullptr; + } + auto batch = std::move(queue_.front()); + LOG(INFO) << "Trying to get from cached buffer queue. Queue length: " << queue_.size() + << ", total size in queue: " << totalSize_ << ", current batch size: " << batch->numBytes() << std::endl; + + queue_.pop(); + totalSize_ -= batch->numBytes(); + + notFull_.notify_one(); + return batch; + } + + void noMoreBatches() { + std::lock_guard<std::mutex> lock(mtx_); + noMoreBatches_ = true; + notFull_.notify_all(); + notEmpty_.notify_all(); + } + + int64_t size() const { + return totalSize_; + } + + bool empty() const { + return queue_.empty(); + } + + private: + int64_t capacity_; + int64_t totalSize_{0}; + bool noMoreBatches_{false}; + + std::queue<std::shared_ptr<T>> queue_; + + std::mutex mtx_; + std::condition_variable notEmpty_; + std::condition_variable notFull_; Review Comment: CachedBatchQueue::size() / empty() read `totalSize_` and `queue_` without holding `mtx_`. These members are mutated under the mutex in put/get, so reading them without synchronization is a data race. ########## cpp/velox/utils/CachedBatchQueue.h: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <condition_variable> +#include <mutex> +#include <queue> + +namespace gluten { + +template <typename T> +class CachedBatchQueue { + public: + explicit CachedBatchQueue(const int64_t capacity) : capacity_(capacity) {} + + void put(std::shared_ptr<T> batch) { + std::unique_lock<std::mutex> lock(mtx_); + const auto batchSize = batch->numBytes(); + + VELOX_CHECK_LE(batchSize, capacity_, "Batch size exceeds queue capacity"); + + notFull_.wait(lock, [&]() { return totalSize_ + batchSize <= capacity_; }); + + queue_.push(std::move(batch)); + totalSize_ += batchSize; + + notEmpty_.notify_one(); + } + + std::shared_ptr<T> get() { + std::unique_lock<std::mutex> lock(mtx_); + notEmpty_.wait(lock, [&]() { return noMoreBatches_ || !queue_.empty(); }); + + if (queue_.empty()) { + return nullptr; + } + auto batch = std::move(queue_.front()); + LOG(INFO) << "Trying to get from cached buffer queue. Queue length: " << queue_.size() + << ", total size in queue: " << totalSize_ << ", current batch size: " << batch->numBytes() << std::endl; + Review Comment: CachedBatchQueue::get() logs at INFO level (and uses `std::endl`) on every dequeue. This is in the hot path and can heavily spam executor logs / add avoidable overhead. ########## cpp/velox/compute/VeloxBackend.cc: ########## @@ -208,6 +208,8 @@ void VeloxBackend::init( velox::exec::Operator::registerOperator(std::make_unique<CudfVectorStreamOperatorTranslator>()); velox::cudf_velox::registerSparkFunctions(""); velox::cudf_velox::registerSparkAggregateFunctions(""); + readerThreadPool_ = std::make_unique<ReaderThreadPool>( + backendConf_->get<int32_t>(kShuffleReaderThreads, std::thread::hardware_concurrency())); } Review Comment: VeloxBackend::init() constructs `readerThreadPool_` twice (once inside the GPU-enabled block and again unconditionally later). This causes an extra pool creation + thread spawn + log spam, and the first pool is immediately destroyed. ########## cpp/velox/compute/VeloxBackend.cc: ########## @@ -294,12 +296,19 @@ void VeloxBackend::init( registerShuffleDictionaryWriterFactory([](MemoryManager* memoryManager, arrow::util::Codec* codec) { return std::make_unique<ArrowShuffleDictionaryWriter>(memoryManager, codec); }); + + readerThreadPool_ = std::make_unique<ReaderThreadPool>( + backendConf_->get<int32_t>(kShuffleReaderThreads, std::thread::hardware_concurrency())); Review Comment: `std::thread::hardware_concurrency()` can return 0, and the config may also be set to 0. Creating a ReaderThreadPool with 0 workers will cause async shuffle reads to stall forever (no producers). It's safer to clamp the thread count to >= 1 here. ########## cpp/velox/shuffle/ReaderThreadPool.cc: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "shuffle/ReaderThreadPool.h" +#include <glog/logging.h> + +namespace gluten { + +ReaderThreadPool::ReaderThreadPool(size_t numThreads) : numThreads_(numThreads) { + workers_.reserve(numThreads); + for (size_t i = 0; i < numThreads; ++i) { + workers_.emplace_back([this]() { workerThread(); }); + } + LOG(WARNING) << "Created ReaderThreadPool with " << numThreads << " threads."; +} + +ReaderThreadPool::~ReaderThreadPool() { + shutdown(); +} + +void ReaderThreadPool::submitBatch(std::vector<Task> tasks, int32_t priority) { + std::lock_guard<std::mutex> lock(taskQueueMtx_); + if (stop_.load(std::memory_order_acquire)) { + return; + } + for (auto& task : tasks) { + tasks_.push({std::move(task), priority}); + } +} Review Comment: ReaderThreadPool::submitBatch() enqueues tasks but never wakes worker threads. After the first `start()` call, any later submit will leave workers blocked in `wakeUpCV_.wait(...)` indefinitely (until shutdown). ########## cpp/velox/shuffle/VeloxGpuShuffleReader.cc: ########## @@ -62,62 +78,126 @@ VeloxGpuHashShuffleReaderDeserializer::VeloxGpuHashShuffleReaderDeserializer( rowType_(rowType), readerBufferSize_(readerBufferSize), memoryManager_(memoryManager), + threadPool_(threadPool), deserializeTime_(deserializeTime), decompressTime_(decompressTime) {} -bool VeloxGpuHashShuffleReaderDeserializer::resolveNextBlockType() { - GLUTEN_ASSIGN_OR_THROW(auto blockType, readBlockType(in_.get())); - switch (blockType) { - case BlockType::kEndOfStream: - return false; - case BlockType::kPlainPayload: - return true; - default: - throw GlutenException(fmt::format("Unsupported block type: {}", static_cast<int32_t>(blockType))); +VeloxGpuHashShuffleReaderDeserializer::~VeloxGpuHashShuffleReaderDeserializer() { + // Wait for all reader threads to complete before destroying + if (!isStopped()) { + stop(); } + + decompressTime_ += decompressTimeCounter_.load(std::memory_order_relaxed); + deserializeTime_ += deserializeTimeCounter_.load(std::memory_order_relaxed); } -void VeloxGpuHashShuffleReaderDeserializer::loadNextStream() { - if (reachedEos_) { - return; +std::unique_ptr<ColumnarBatchIterator> VeloxGpuHashShuffleReaderDeserializer::deserializeStreams(int32_t priority) { + batchQueue_ = std::make_unique<CachedBatchQueue<GpuBufferColumnarBatch>>(1L << 30); + + if (!threadPool_) { + throw GlutenException("Thread pool must be provided to VeloxGpuHashShuffleReaderDeserializer"); + } + + const size_t numThreads = threadPool_->getNumThreads(); + activeReaders_.store(numThreads); + + // Submit reader tasks to the thread pool. + std::vector<ReaderThreadPool::Task> tasks; + tasks.reserve(numThreads); + for (size_t i = 0; i < numThreads; ++i) { + tasks.emplace_back([this]() { read(); }); } + threadPool_->submitBatch(std::move(tasks), priority); - auto in = streamReader_->readNextStream(memoryManager_->defaultArrowMemoryPool()); - if (in == nullptr) { - reachedEos_ = true; - return; + if (priority == 0) { + threadPool_->start(); } - GLUTEN_ASSIGN_OR_THROW( - in_, - arrow::io::BufferedInputStream::Create( - readerBufferSize_, memoryManager_->defaultArrowMemoryPool(), std::move(in))); + return std::make_unique<AsyncShuffleReaderIterator<GpuBufferColumnarBatch>>(batchQueue_.get()); } -std::shared_ptr<ColumnarBatch> VeloxGpuHashShuffleReaderDeserializer::next() { - if (in_ == nullptr) { - loadNextStream(); +void VeloxGpuHashShuffleReaderDeserializer::stop() { + // Signal threads to stop if not already stopped. + stop_.store(true, std::memory_order_release); + // Wait for all reader threads to complete. + std::unique_lock<std::mutex> lock(completionMtx_); + completionCV_.wait(lock, [this] { return activeReaders_.load(std::memory_order_acquire) == 0; }); +} Review Comment: VeloxGpuHashShuffleReaderDeserializer::stop() can deadlock if any producer thread is blocked inside `batchQueue_->put()` (e.g., queue is full and consumer stopped early). stop() sets `stop_` but doesn't signal the queue to unblock producers/consumer before waiting for `activeReaders_ == 0`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
