This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 8e8a81e3b0 [GLUTEN-10933][VL] feat: Support cached the batches in cpu
cache (#11758)
8e8a81e3b0 is described below
commit 8e8a81e3b0a92781bf19551bf152983953a0ce25
Author: Chengcheng Jin <[email protected]>
AuthorDate: Sat Mar 14 01:03:20 2026 +0800
[GLUTEN-10933][VL] feat: Support cached the batches in cpu cache (#11758)
Cache the batch in cpu cache, and wait for the join threads to fetch one by
one, the build threads will start to fetch as soon as possible, but the probe
thread need to wait for build finished.
The buffer size is controlled by
`spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes`
temporally, the size may be changed by the remaining memory in the server.
Test:
Test in local SF100, adjust the config to enable caching batch.
```
--conf spark.gluten.sql.columnar.backend.velox.cudf.batchSize=10000 \
--conf
spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes=1024MB
```
The log prints `Prefetched 171 batches (24057900 bytes) before blocking on
GPU lock`
Next step:
Prefetch the probe side batch when build starts.
Related issue: #10933
---
.../utils/GpuBufferBatchResizerJniWrapper.java | 3 +-
.../org/apache/gluten/config/VeloxConfig.scala | 9 ++++
.../GpuResizeBufferColumnarBatchExec.scala | 6 ++-
...GpuBufferBatchResizeForShuffleInputOutput.scala | 15 +++---
cpp/velox/config/VeloxConfig.h | 2 +
cpp/velox/cudf/GpuLock.cc | 13 ++++++
cpp/velox/cudf/GpuLock.h | 9 +---
cpp/velox/jni/VeloxJniWrapper.cc | 3 +-
cpp/velox/memory/GpuBufferColumnarBatch.cc | 4 +-
cpp/velox/tests/VeloxGpuShuffleWriterTest.cc | 1 +
cpp/velox/utils/GpuBufferBatchResizer.cc | 54 +++++++++++++++++++---
cpp/velox/utils/GpuBufferBatchResizer.h | 14 ++++++
docs/velox-configuration.md | 1 +
13 files changed, 110 insertions(+), 24 deletions(-)
diff --git
a/backends-velox/src/main/java/org/apache/gluten/utils/GpuBufferBatchResizerJniWrapper.java
b/backends-velox/src/main/java/org/apache/gluten/utils/GpuBufferBatchResizerJniWrapper.java
index 01fb7bf390..9ccc8fecf4 100644
---
a/backends-velox/src/main/java/org/apache/gluten/utils/GpuBufferBatchResizerJniWrapper.java
+++
b/backends-velox/src/main/java/org/apache/gluten/utils/GpuBufferBatchResizerJniWrapper.java
@@ -36,5 +36,6 @@ public class GpuBufferBatchResizerJniWrapper implements
RuntimeAware {
return runtime.getHandle();
}
- public native long create(int minOutputBatchSize, ColumnarBatchInIterator
itr);
+ public native long create(
+ int minOutputBatchSize, long maxPrefetchBatchBytes,
ColumnarBatchInIterator itr);
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index 993c554808..898eacd186 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -86,6 +86,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def cudfBatchSize: Int = getConf(CUDF_BATCH_SIZE)
+ def cudfShuffleMaxPrefetchBytes: Long =
getConf(CUDF_SHUFFLE_MAX_PREFETCH_BYTES)
+
def orcUseColumnNames: Boolean = getConf(ORC_USE_COLUMN_NAMES)
def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES)
@@ -685,6 +687,13 @@ object VeloxConfig extends ConfigRegistry {
.intConf
.createWithDefault(Integer.MAX_VALUE)
+ val CUDF_SHUFFLE_MAX_PREFETCH_BYTES =
+
buildConf("spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes")
+ .doc("Maximum bytes to prefetch in CPU memory during GPU shuffle read
while waiting" +
+ "for GPU available.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("1028MB")
+
val MEMORY_DUMP_ON_EXIT =
buildConf("spark.gluten.monitor.memoryDumpOnExit")
.internal()
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/GpuResizeBufferColumnarBatchExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/GpuResizeBufferColumnarBatchExec.scala
index 9de62e5bb8..b95a46c36e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/GpuResizeBufferColumnarBatchExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/GpuResizeBufferColumnarBatchExec.scala
@@ -34,7 +34,10 @@ import scala.collection.JavaConverters._
/**
* An operator to resize input BufferBatches generated by shuffle reader, and
convert to cudf table.
*/
-case class GpuResizeBufferColumnarBatchExec(override val child: SparkPlan,
minOutputBatchSize: Int)
+case class GpuResizeBufferColumnarBatchExec(
+ override val child: SparkPlan,
+ minOutputBatchSize: Int,
+ maxPrefetchBatchBytes: Long)
extends ColumnarToColumnarExec(child) {
override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
@@ -44,6 +47,7 @@ case class GpuResizeBufferColumnarBatchExec(override val
child: SparkPlan, minOu
.create(runtime)
.create(
minOutputBatchSize,
+ maxPrefetchBatchBytes,
new ColumnarBatchInIterator(BackendsApiManager.getBackendName,
in.asJava))
new ColumnarBatchOutIterator(runtime, outHandle).asScala
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/GpuBufferBatchResizeForShuffleInputOutput.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/GpuBufferBatchResizeForShuffleInputOutput.scala
index bb30dc6c27..22de2305f0 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/GpuBufferBatchResizeForShuffleInputOutput.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/GpuBufferBatchResizeForShuffleInputOutput.scala
@@ -36,6 +36,7 @@ case class GpuBufferBatchResizeForShuffleInputOutput()
extends Rule[SparkPlan] {
val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange
val preferredBatchBytes = VeloxConfig.get.veloxPreferredBatchBytes
val batchSize = VeloxConfig.get.cudfBatchSize
+ val prefetchBatchBytes = VeloxConfig.get.cudfShuffleMaxPrefetchBytes
plan.transformUp {
case shuffle: ColumnarShuffleExchangeExec
if shuffle.shuffleWriterType == HashShuffleWriterType &&
@@ -46,36 +47,38 @@ case class GpuBufferBatchResizeForShuffleInputOutput()
extends Rule[SparkPlan] {
case a @ AQEShuffleReadExec(
ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExecBase, _),
_) =>
- GpuResizeBufferColumnarBatchExec(a, batchSize)
+ GpuResizeBufferColumnarBatchExec(a, batchSize, prefetchBatchBytes)
case a @ AQEShuffleReadExec(
ShuffleQueryStageExec(_, ReusedExchangeExec(_, _:
ColumnarShuffleExchangeExecBase), _),
_) =>
- GpuResizeBufferColumnarBatchExec(a, batchSize)
+ GpuResizeBufferColumnarBatchExec(a, batchSize, prefetchBatchBytes)
// Since it's transformed in a bottom to up order, so we may first
encounter
// ShuffeQueryStageExec, which is transformed to
VeloxResizeBatchesExec(ShuffeQueryStageExec),
// then we see AQEShuffleReadExec
case a @ AQEShuffleReadExec(
GpuResizeBufferColumnarBatchExec(
s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExecBase,
_),
+ _,
_),
_) =>
- GpuResizeBufferColumnarBatchExec(a.copy(child = s), batchSize)
+ GpuResizeBufferColumnarBatchExec(a.copy(child = s), batchSize,
prefetchBatchBytes)
case a @ AQEShuffleReadExec(
GpuResizeBufferColumnarBatchExec(
s @ ShuffleQueryStageExec(
_,
ReusedExchangeExec(_, _: ColumnarShuffleExchangeExecBase),
_),
+ _,
_),
_) =>
- GpuResizeBufferColumnarBatchExec(a.copy(child = s), batchSize)
+ GpuResizeBufferColumnarBatchExec(a.copy(child = s), batchSize,
prefetchBatchBytes)
case s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExecBase, _)
=>
- GpuResizeBufferColumnarBatchExec(s, batchSize)
+ GpuResizeBufferColumnarBatchExec(s, batchSize, prefetchBatchBytes)
case s @ ShuffleQueryStageExec(
_,
ReusedExchangeExec(_, _: ColumnarShuffleExchangeExecBase),
_) =>
- GpuResizeBufferColumnarBatchExec(s, batchSize)
+ GpuResizeBufferColumnarBatchExec(s, batchSize, prefetchBatchBytes)
}
}
}
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index 6195620fc8..33a66f4649 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -208,6 +208,8 @@ const std::string kVeloxPreferredBatchBytes =
"spark.gluten.sql.columnar.backend
const std::string kCudfEnableTableScan =
"spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan";
const bool kCudfEnableTableScanDefault = false;
const std::string kCudfHiveConnectorId = "cudf-hive";
+const std::string kCudfShuffleMaxPrefetchBytes =
"spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes";
+const int64_t kCudfShuffleMaxPrefetchBytesDefault = 1028L * 1024 * 1024; //
1028MB
const std::string kStaticBackendConfPrefix = "spark.gluten.velox.";
const std::string kDynamicBackendConfPrefix =
"spark.gluten.sql.columnar.backend.velox.";
diff --git a/cpp/velox/cudf/GpuLock.cc b/cpp/velox/cudf/GpuLock.cc
index f28bbbe8a5..85ddbcd578 100644
--- a/cpp/velox/cudf/GpuLock.cc
+++ b/cpp/velox/cudf/GpuLock.cc
@@ -55,6 +55,19 @@ void lockGpu() {
getGpuLockState().gGpuOwner = tid;
}
+bool tryLockGpu() {
+ std::thread::id tid = std::this_thread::get_id();
+ std::unique_lock<std::mutex> lock(getGpuLockState().gGpuMutex);
+ if (getGpuLockState().gGpuOwner == tid) {
+ return true;
+ }
+ if (getGpuLockState().gGpuOwner.has_value()) {
+ return false;
+ }
+ getGpuLockState().gGpuOwner = tid;
+ return true;
+}
+
void unlockGpu() {
std::thread::id tid = std::this_thread::get_id();
std::unique_lock<std::mutex> lock(getGpuLockState().gGpuMutex);
diff --git a/cpp/velox/cudf/GpuLock.h b/cpp/velox/cudf/GpuLock.h
index 1cc8640df7..bbf55491a7 100644
--- a/cpp/velox/cudf/GpuLock.h
+++ b/cpp/velox/cudf/GpuLock.h
@@ -21,14 +21,9 @@
namespace gluten {
-/**
- * @brief Acquire the GPU lock (reentrant within the same thread)
- */
+/// Acquire the GPU lock, blocking until available. Reentrant for the same
thread.
void lockGpu();
-
-/**
- * @brief Release the GPU lock (must be called by the owning thread)
- */
+bool tryLockGpu();
void unlockGpu();
} // namespace gluten
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index ed1cd5e85d..4705a646e2 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -454,6 +454,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_utils_GpuBufferBatchResizerJniWra
JNIEnv* env,
jobject wrapper,
jint minOutputBatchSize,
+ jlong maxPrefetchBatchBytes,
jobject jIter) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
@@ -461,7 +462,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_utils_GpuBufferBatchResizerJniWra
auto pool =
dynamic_cast<VeloxMemoryManager*>(ctx->memoryManager())->getLeafMemoryPool();
auto iter = makeJniColumnarBatchIterator(env, jIter, ctx);
auto appender = std::make_shared<ResultIterator>(
- std::make_unique<GpuBufferBatchResizer>(arrowPool, pool.get(),
minOutputBatchSize, std::move(iter)));
+ std::make_unique<GpuBufferBatchResizer>(arrowPool, pool.get(),
minOutputBatchSize, maxPrefetchBatchBytes, std::move(iter)));
return ctx->saveObject(appender);
JNI_METHOD_END(kInvalidObjectHandle)
}
diff --git a/cpp/velox/memory/GpuBufferColumnarBatch.cc
b/cpp/velox/memory/GpuBufferColumnarBatch.cc
index 4b4a5ee58f..3bbd153785 100644
--- a/cpp/velox/memory/GpuBufferColumnarBatch.cc
+++ b/cpp/velox/memory/GpuBufferColumnarBatch.cc
@@ -54,7 +54,9 @@ int64_t GpuBufferColumnarBatch::numBytes() {
if (!numBytes_.has_value()) {
int64_t bytes = 0;
for (const auto& buffer : buffers_) {
- bytes += buffer->size();
+ if (buffer) {
+ bytes += buffer->size();
+ }
}
numBytes_ = bytes;
}
diff --git a/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc
b/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc
index 99c560029d..28e9e5e15a 100644
--- a/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc
@@ -116,6 +116,7 @@ RowVectorPtr
mergeBufferColumnarBatches(std::vector<std::shared_ptr<GpuBufferCol
getDefaultMemoryManager()->defaultArrowMemoryPool(),
getDefaultMemoryManager()->getLeafMemoryPool().get(),
1200, // output one batch
+ 102400000, // prefetch up to 100MB data
std::make_unique<ColumnarBatchArray>(bufferBatches));
auto cb = resizer.next();
auto batch = std::dynamic_pointer_cast<VeloxColumnarBatch>(cb);
diff --git a/cpp/velox/utils/GpuBufferBatchResizer.cc
b/cpp/velox/utils/GpuBufferBatchResizer.cc
index 921f05c63b..e97f4b5876 100644
--- a/cpp/velox/utils/GpuBufferBatchResizer.cc
+++ b/cpp/velox/utils/GpuBufferBatchResizer.cc
@@ -17,7 +17,6 @@
#include "GpuBufferBatchResizer.h"
#include "cudf/GpuLock.h"
-#include "memory/GpuBufferColumnarBatch.h"
#include "utils/Timer.h"
#include "velox/experimental/cudf/exec/GpuResources.h"
#include "velox/experimental/cudf/exec/VeloxCudfInterop.h"
@@ -168,41 +167,82 @@ GpuBufferBatchResizer::GpuBufferBatchResizer(
arrow::MemoryPool* arrowPool,
facebook::velox::memory::MemoryPool* pool,
int32_t minOutputBatchSize,
+ int64_t maxPrefetchSize,
std::unique_ptr<ColumnarBatchIterator> in)
: arrowPool_(arrowPool),
pool_(pool),
minOutputBatchSize_(minOutputBatchSize),
+ maxPrefetchSize_(maxPrefetchSize),
in_(std::move(in)) {
VELOX_CHECK_GT(minOutputBatchSize_, 0, "minOutputBatchSize should be larger
than 0");
+ VELOX_CHECK_GT(maxPrefetchSize_, 0, "maxPrefetchSize should be larger than
0");
}
-std::shared_ptr<ColumnarBatch> GpuBufferBatchResizer::next() {
+std::shared_ptr<GpuBufferColumnarBatch>
GpuBufferBatchResizer::fetchAndComposeBatch() {
std::vector<std::shared_ptr<GpuBufferColumnarBatch>> cachedBatches;
int32_t cachedRows = 0;
while (cachedRows < minOutputBatchSize_) {
auto nextCb = in_->next();
if (!nextCb) {
- // No more input.
break;
}
auto nextBatch = std::dynamic_pointer_cast<GpuBufferColumnarBatch>(nextCb);
VELOX_CHECK_NOT_NULL(nextBatch);
if (nextBatch->numRows() == 0) {
- continue;
+ continue;
}
cachedRows += nextBatch->numRows();
cachedBatches.push_back(std::move(nextBatch));
}
+
if (cachedRows == 0) {
return nullptr;
}
- // Compose all cached batches into one
- auto batch = GpuBufferColumnarBatch::compose(arrowPool_, cachedBatches,
cachedRows);
+ return GpuBufferColumnarBatch::compose(arrowPool_, cachedBatches,
cachedRows);
+}
+
+std::shared_ptr<ColumnarBatch> GpuBufferBatchResizer::next() {
+ // Ensure at least one batch is in the prefetch queue.
+ if (prefetchQueue_.empty()) {
+ auto batch = fetchAndComposeBatch();
+ if (batch) {
+ prefetchedBytes_ += batch->numBytes();
+ prefetchQueue_.push_back(std::move(batch));
+ }
+ }
+
+ if (prefetchQueue_.empty()) {
+ return nullptr;
+ }
+
+ // Try to acquire the GPU lock non-blockingly. While we can't get it,
+ // keep prefetching more batches into CPU memory within the budget.
+ while (!tryLockGpu()) {
+ if (prefetchedBytes_ < maxPrefetchSize_) {
+ auto batch = fetchAndComposeBatch();
+ if (batch) {
+ prefetchedBytes_ += batch->numBytes();
+ prefetchQueue_.push_back(std::move(batch));
+ } else {
+ // All the batches consumed.
+ LOG(WARNING) << "Prefetched " << prefetchQueue_.size() << " batches ("
<< prefetchedBytes_ << " bytes) before blocking on GPU lock.";
+ lockGpu();
+ break;
+ }
+ } else {
+ LOG(WARNING) << "Prefetched " << prefetchQueue_.size() << " batches ("
<< prefetchedBytes_ << " bytes) before blocking on GPU lock.";
+ lockGpu();
+ break;
+ }
+ }
- lockGpu();
+ // GPU lock acquired. Pop one batch and convert to cuDF on GPU.
+ auto batch = std::move(prefetchQueue_.front());
+ prefetchQueue_.pop_front();
+ prefetchedBytes_ -= batch->numBytes();
return makeCudfTable(batch->getRowType(), batch->numRows(),
batch->buffers(), pool_);
}
diff --git a/cpp/velox/utils/GpuBufferBatchResizer.h
b/cpp/velox/utils/GpuBufferBatchResizer.h
index b3ed086629..e48c033ac4 100644
--- a/cpp/velox/utils/GpuBufferBatchResizer.h
+++ b/cpp/velox/utils/GpuBufferBatchResizer.h
@@ -15,7 +15,12 @@
* limitations under the License.
*/
+#pragma once
+
+#include <deque>
+
#include "memory/ColumnarBatchIterator.h"
+#include "memory/GpuBufferColumnarBatch.h"
#include "memory/VeloxColumnarBatch.h"
#include "utils/Exception.h"
#include "velox/common/memory/MemoryPool.h"
@@ -28,6 +33,7 @@ class GpuBufferBatchResizer : public ColumnarBatchIterator {
arrow::MemoryPool* arrowPool,
facebook::velox::memory::MemoryPool* pool,
int32_t minOutputBatchSize,
+ int64_t maxPrefetchSize,
std::unique_ptr<ColumnarBatchIterator> in);
std::shared_ptr<ColumnarBatch> next() override;
@@ -35,10 +41,18 @@ class GpuBufferBatchResizer : public ColumnarBatchIterator {
int64_t spillFixedSize(int64_t size) override;
private:
+ /// Read and compose one batch from the input iterator (CPU-only work).
+ /// Returns nullptr if input is exhausted.
+ std::shared_ptr<GpuBufferColumnarBatch> fetchAndComposeBatch();
+
arrow::MemoryPool* arrowPool_;
facebook::velox::memory::MemoryPool* pool_;
const int32_t minOutputBatchSize_;
+ const int64_t maxPrefetchSize_;
std::unique_ptr<ColumnarBatchIterator> in_;
+
+ std::deque<std::shared_ptr<GpuBufferColumnarBatch>> prefetchQueue_;
+ int64_t prefetchedBytes_ = 0;
};
} // namespace gluten
diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md
index ff437f01bc..2fa3d26235 100644
--- a/docs/velox-configuration.md
+++ b/docs/velox-configuration.md
@@ -25,6 +25,7 @@ nav_order: 16
| spark.gluten.sql.columnar.backend.velox.cudf.enableValidation
| true | Heuristics you can apply to validate a cuDF/GPU plan
and only offload when the entire stage can be fully and profitably executed on
GPU
[...]
| spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent
| 50 | The initial percent of GPU memory to allocate for
memory resource for one thread.
[...]
| spark.gluten.sql.columnar.backend.velox.cudf.memoryResource
| async | GPU RMM memory resource.
[...]
+| spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes
| 1028MB | Maximum bytes to prefetch in CPU memory during GPU
shuffle read while waitingfor GPU available.
[...]
| spark.gluten.sql.columnar.backend.velox.directorySizeGuess
| 32KB | Deprecated, rename to
spark.gluten.sql.columnar.backend.velox.footerEstimatedSize
[...]
| spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled
| false | Disables caching if false. File handle cache should
be disabled if files are mutable, i.e. file content may change while file path
stays the same.
[...]
| spark.gluten.sql.columnar.backend.velox.filePreloadThreshold
| 1MB | Set the file preload threshold for velox file scan,
refer to Velox's file-preload-threshold
[...]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]