Copilot commented on code in PR #12370:
URL: https://github.com/apache/gluten/pull/12370#discussion_r3500214415


##########
cpp/velox/utils/CachedBatchQueue.h:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+
+namespace gluten {
+
+template <typename T>
+class CachedBatchQueue {
+ public:
+  explicit CachedBatchQueue(const int64_t capacity) : capacity_(capacity) {}
+
+  void put(std::shared_ptr<T> batch) {
+    std::unique_lock<std::mutex> lock(mtx_);
+    const auto batchSize = batch->numBytes();
+
+    VELOX_CHECK_LE(batchSize, capacity_, "Batch size exceeds queue capacity");
+
+    notFull_.wait(lock, [&]() { return totalSize_ + batchSize <= capacity_; });
+
+    queue_.push(std::move(batch));
+    totalSize_ += batchSize;
+
+    notEmpty_.notify_one();
+  }

Review Comment:
   CachedBatchQueue::put() waits only on `totalSize_ + batchSize <= capacity_` 
and ignores `noMoreBatches_`. If the consumer stops early (e.g., native shuffle 
reader calls stop) while the queue is full, producer threads can block forever 
in this wait even after `noMoreBatches()` notifies, leading to deadlock/leaked 
threads.



##########
cpp/velox/utils/CachedBatchQueue.h:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+
+namespace gluten {
+
+template <typename T>
+class CachedBatchQueue {
+ public:
+  explicit CachedBatchQueue(const int64_t capacity) : capacity_(capacity) {}
+
+  void put(std::shared_ptr<T> batch) {
+    std::unique_lock<std::mutex> lock(mtx_);
+    const auto batchSize = batch->numBytes();
+
+    VELOX_CHECK_LE(batchSize, capacity_, "Batch size exceeds queue capacity");
+
+    notFull_.wait(lock, [&]() { return totalSize_ + batchSize <= capacity_; });
+
+    queue_.push(std::move(batch));
+    totalSize_ += batchSize;
+
+    notEmpty_.notify_one();
+  }
+
+  std::shared_ptr<T> get() {
+    std::unique_lock<std::mutex> lock(mtx_);
+    notEmpty_.wait(lock, [&]() { return noMoreBatches_ || !queue_.empty(); });
+
+    if (queue_.empty()) {
+      return nullptr;
+    }
+    auto batch = std::move(queue_.front());
+    LOG(INFO) << "Trying to get from cached buffer queue. Queue length: " << 
queue_.size()
+              << ", total size in queue: " << totalSize_ << ", current batch 
size: " << batch->numBytes() << std::endl;
+
+    queue_.pop();
+    totalSize_ -= batch->numBytes();
+
+    notFull_.notify_one();
+    return batch;
+  }
+
+  void noMoreBatches() {
+    std::lock_guard<std::mutex> lock(mtx_);
+    noMoreBatches_ = true;
+    notFull_.notify_all();
+    notEmpty_.notify_all();
+  }
+
+  int64_t size() const {
+    return totalSize_;
+  }
+
+  bool empty() const {
+    return queue_.empty();
+  }
+
+ private:
+  int64_t capacity_;
+  int64_t totalSize_{0};
+  bool noMoreBatches_{false};
+
+  std::queue<std::shared_ptr<T>> queue_;
+
+  std::mutex mtx_;
+  std::condition_variable notEmpty_;
+  std::condition_variable notFull_;

Review Comment:
   CachedBatchQueue::size() / empty() read `totalSize_` and `queue_` without 
holding `mtx_`. These members are mutated under the mutex in put/get, so 
reading them without synchronization is a data race.



##########
cpp/velox/utils/CachedBatchQueue.h:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+
+namespace gluten {
+
+template <typename T>
+class CachedBatchQueue {
+ public:
+  explicit CachedBatchQueue(const int64_t capacity) : capacity_(capacity) {}
+
+  void put(std::shared_ptr<T> batch) {
+    std::unique_lock<std::mutex> lock(mtx_);
+    const auto batchSize = batch->numBytes();
+
+    VELOX_CHECK_LE(batchSize, capacity_, "Batch size exceeds queue capacity");
+
+    notFull_.wait(lock, [&]() { return totalSize_ + batchSize <= capacity_; });
+
+    queue_.push(std::move(batch));
+    totalSize_ += batchSize;
+
+    notEmpty_.notify_one();
+  }
+
+  std::shared_ptr<T> get() {
+    std::unique_lock<std::mutex> lock(mtx_);
+    notEmpty_.wait(lock, [&]() { return noMoreBatches_ || !queue_.empty(); });
+
+    if (queue_.empty()) {
+      return nullptr;
+    }
+    auto batch = std::move(queue_.front());
+    LOG(INFO) << "Trying to get from cached buffer queue. Queue length: " << 
queue_.size()
+              << ", total size in queue: " << totalSize_ << ", current batch 
size: " << batch->numBytes() << std::endl;
+

Review Comment:
   CachedBatchQueue::get() logs at INFO level (and uses `std::endl`) on every 
dequeue. This is in the hot path and can heavily spam executor logs / add 
avoidable overhead.



##########
cpp/velox/compute/VeloxBackend.cc:
##########
@@ -208,6 +208,8 @@ void VeloxBackend::init(
     
velox::exec::Operator::registerOperator(std::make_unique<CudfVectorStreamOperatorTranslator>());
     velox::cudf_velox::registerSparkFunctions("");
     velox::cudf_velox::registerSparkAggregateFunctions("");
+    readerThreadPool_ = std::make_unique<ReaderThreadPool>(
+        backendConf_->get<int32_t>(kShuffleReaderThreads, 
std::thread::hardware_concurrency()));
   }

Review Comment:
   VeloxBackend::init() constructs `readerThreadPool_` twice (once inside the 
GPU-enabled block and again unconditionally later). This causes an extra pool 
creation + thread spawn + log spam, and the first pool is immediately destroyed.



##########
cpp/velox/compute/VeloxBackend.cc:
##########
@@ -294,12 +296,19 @@ void VeloxBackend::init(
   registerShuffleDictionaryWriterFactory([](MemoryManager* memoryManager, 
arrow::util::Codec* codec) {
     return std::make_unique<ArrowShuffleDictionaryWriter>(memoryManager, 
codec);
   });
+
+  readerThreadPool_ = std::make_unique<ReaderThreadPool>(
+      backendConf_->get<int32_t>(kShuffleReaderThreads, 
std::thread::hardware_concurrency()));

Review Comment:
   `std::thread::hardware_concurrency()` can return 0, and the config may also 
be set to 0. Creating a ReaderThreadPool with 0 workers will cause async 
shuffle reads to stall forever (no producers). It's safer to clamp the thread 
count to >= 1 here.



##########
cpp/velox/shuffle/ReaderThreadPool.cc:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "shuffle/ReaderThreadPool.h"
+#include <glog/logging.h>
+
+namespace gluten {
+
+ReaderThreadPool::ReaderThreadPool(size_t numThreads) : 
numThreads_(numThreads) {
+  workers_.reserve(numThreads);
+  for (size_t i = 0; i < numThreads; ++i) {
+    workers_.emplace_back([this]() { workerThread(); });
+  }
+  LOG(WARNING) << "Created ReaderThreadPool with " << numThreads << " 
threads.";
+}
+
+ReaderThreadPool::~ReaderThreadPool() {
+  shutdown();
+}
+
+void ReaderThreadPool::submitBatch(std::vector<Task> tasks, int32_t priority) {
+  std::lock_guard<std::mutex> lock(taskQueueMtx_);
+  if (stop_.load(std::memory_order_acquire)) {
+    return;
+  }
+  for (auto& task : tasks) {
+    tasks_.push({std::move(task), priority});
+  }
+}

Review Comment:
   ReaderThreadPool::submitBatch() enqueues tasks but never wakes worker 
threads. After the first `start()` call, any later submit will leave workers 
blocked in `wakeUpCV_.wait(...)` indefinitely (until shutdown).



##########
cpp/velox/shuffle/VeloxGpuShuffleReader.cc:
##########
@@ -62,62 +78,126 @@ 
VeloxGpuHashShuffleReaderDeserializer::VeloxGpuHashShuffleReaderDeserializer(
       rowType_(rowType),
       readerBufferSize_(readerBufferSize),
       memoryManager_(memoryManager),
+      threadPool_(threadPool),
       deserializeTime_(deserializeTime),
       decompressTime_(decompressTime) {}
 
-bool VeloxGpuHashShuffleReaderDeserializer::resolveNextBlockType() {
-  GLUTEN_ASSIGN_OR_THROW(auto blockType, readBlockType(in_.get()));
-  switch (blockType) {
-    case BlockType::kEndOfStream:
-      return false;
-    case BlockType::kPlainPayload:
-      return true;
-    default:
-      throw GlutenException(fmt::format("Unsupported block type: {}", 
static_cast<int32_t>(blockType)));
+VeloxGpuHashShuffleReaderDeserializer::~VeloxGpuHashShuffleReaderDeserializer()
 {
+  // Wait for all reader threads to complete before destroying
+  if (!isStopped()) {
+    stop();
   }
+
+  decompressTime_ += decompressTimeCounter_.load(std::memory_order_relaxed);
+  deserializeTime_ += deserializeTimeCounter_.load(std::memory_order_relaxed);
 }
 
-void VeloxGpuHashShuffleReaderDeserializer::loadNextStream() {
-  if (reachedEos_) {
-    return;
+std::unique_ptr<ColumnarBatchIterator> 
VeloxGpuHashShuffleReaderDeserializer::deserializeStreams(int32_t priority) {
+  batchQueue_ = std::make_unique<CachedBatchQueue<GpuBufferColumnarBatch>>(1L 
<< 30);
+
+  if (!threadPool_) {
+    throw GlutenException("Thread pool must be provided to 
VeloxGpuHashShuffleReaderDeserializer");
+  }
+
+  const size_t numThreads = threadPool_->getNumThreads();
+  activeReaders_.store(numThreads);
+
+  // Submit reader tasks to the thread pool.
+  std::vector<ReaderThreadPool::Task> tasks;
+  tasks.reserve(numThreads);
+  for (size_t i = 0; i < numThreads; ++i) {
+    tasks.emplace_back([this]() { read(); });
   }
+  threadPool_->submitBatch(std::move(tasks), priority);
 
-  auto in = 
streamReader_->readNextStream(memoryManager_->defaultArrowMemoryPool());
-  if (in == nullptr) {
-    reachedEos_ = true;
-    return;
+  if (priority == 0) {
+    threadPool_->start();
   }
 
-  GLUTEN_ASSIGN_OR_THROW(
-      in_,
-      arrow::io::BufferedInputStream::Create(
-          readerBufferSize_, memoryManager_->defaultArrowMemoryPool(), 
std::move(in)));
+  return 
std::make_unique<AsyncShuffleReaderIterator<GpuBufferColumnarBatch>>(batchQueue_.get());
 }
 
-std::shared_ptr<ColumnarBatch> VeloxGpuHashShuffleReaderDeserializer::next() {
-  if (in_ == nullptr) {
-    loadNextStream();
+void VeloxGpuHashShuffleReaderDeserializer::stop() {
+  // Signal threads to stop if not already stopped.
+  stop_.store(true, std::memory_order_release);
+  // Wait for all reader threads to complete.
+  std::unique_lock<std::mutex> lock(completionMtx_);
+  completionCV_.wait(lock, [this] { return 
activeReaders_.load(std::memory_order_acquire) == 0; });
+}

Review Comment:
   VeloxGpuHashShuffleReaderDeserializer::stop() can deadlock if any producer 
thread is blocked inside `batchQueue_->put()` (e.g., queue is full and consumer 
stopped early). stop() sets `stop_` but doesn't signal the queue to unblock 
producers/consumer before waiting for `activeReaders_ == 0`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to