This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 40a8988b04 [GLUTEN-10933][VL] Introduce GPU ShuffleWriterType
kGpuHashShuffle (#10984)
40a8988b04 is described below
commit 40a8988b040e4b06657ead9f54c4a6ff49749167
Author: Jin Chengcheng <[email protected]>
AuthorDate: Mon Nov 3 16:34:47 2025 +0000
[GLUTEN-10933][VL] Introduce GPU ShuffleWriterType kGpuHashShuffle (#10984)
---
.../backendsapi/clickhouse/CHSparkPlanExecApi.scala | 3 +--
.../shuffle/VeloxCelebornColumnarBatchSerializer.scala | 3 +--
.../gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala | 6 ++----
.../gluten/vectorized/ColumnarBatchSerializer.scala | 12 ++++--------
cpp/core/jni/JniWrapper.cc | 4 +---
cpp/core/shuffle/Options.h | 7 +------
cpp/core/shuffle/ShuffleWriter.cc | 6 ++++++
cpp/velox/compute/VeloxRuntime.cc | 3 +--
cpp/velox/shuffle/GpuShuffleReader.h | 3 +++
cpp/velox/shuffle/VeloxShuffleReader.cc | 15 ++++++---------
cpp/velox/shuffle/VeloxShuffleReader.h | 4 +---
cpp/velox/tests/VeloxShuffleWriterTest.cc | 3 +--
.../apache/gluten/vectorized/ShuffleReaderJniWrapper.java | 3 +--
.../org/apache/gluten/backendsapi/SparkPlanExecApi.scala | 3 +--
.../scala/org/apache/gluten/config/GlutenConfig.scala | 4 ++++
.../scala/org/apache/gluten/config/ReservedKeys.scala | 1 +
.../sql/execution/GPUColumnarShuffleExchangeExec.scala | 15 ++++++++++++++-
17 files changed, 49 insertions(+), 46 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 58b8e6e3f5..dbaccb16c8 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -454,8 +454,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
override def createColumnarBatchSerializer(
schema: StructType,
metrics: Map[String, SQLMetric],
- shuffleWriterType: ShuffleWriterType,
- enableCudf: Boolean): Serializer = {
+ shuffleWriterType: ShuffleWriterType): Serializer = {
val readBatchNumRows = metrics("avgReadBatchNumRows")
val numOutputRows = metrics("numOutputRows")
val dataSize = metrics("dataSize")
diff --git
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index 529945d9f4..ce32a9b7ad 100644
---
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -103,8 +103,7 @@ private class CelebornColumnarBatchSerializerInstance(
batchSize,
readerBufferSize,
deserializerBufferSize,
- shuffleWriterType.name,
- false
+ shuffleWriterType.name
)
// Close shuffle reader instance as lately as the end of task processing,
// since the native reader could hold a reference to memory pool that
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 1c65cc778f..20f52997ab 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -633,8 +633,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
override def createColumnarBatchSerializer(
schema: StructType,
metrics: Map[String, SQLMetric],
- shuffleWriterType: ShuffleWriterType,
- enableCudf: Boolean): Serializer = {
+ shuffleWriterType: ShuffleWriterType): Serializer = {
val numOutputRows = metrics("numOutputRows")
val deserializeTime = metrics("deserializeTime")
val readBatchNumRows = metrics("avgReadBatchNumRows")
@@ -659,8 +658,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
numOutputRows,
deserializeTime,
decompressTime,
- shuffleWriterType,
- enableCudf)
+ shuffleWriterType)
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
index 2369cf3642..3b5fce63f8 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
@@ -51,8 +51,7 @@ class ColumnarBatchSerializer(
numOutputRows: SQLMetric,
deserializeTime: SQLMetric,
decompressTime: SQLMetric,
- shuffleWriterType: ShuffleWriterType,
- enableCudf: Boolean)
+ shuffleWriterType: ShuffleWriterType)
extends Serializer
with Serializable {
@@ -64,8 +63,7 @@ class ColumnarBatchSerializer(
numOutputRows,
deserializeTime,
decompressTime,
- shuffleWriterType,
- enableCudf)
+ shuffleWriterType)
}
override def supportsRelocationOfSerializedObjects: Boolean = true
@@ -77,8 +75,7 @@ private class ColumnarBatchSerializerInstanceImpl(
numOutputRows: SQLMetric,
deserializeTime: SQLMetric,
decompressTime: SQLMetric,
- shuffleWriterType: ShuffleWriterType,
- enableCudf: Boolean)
+ shuffleWriterType: ShuffleWriterType)
extends ColumnarBatchSerializerInstance
with Logging {
@@ -114,8 +111,7 @@ private class ColumnarBatchSerializerInstanceImpl(
batchSize,
readerBufferSize,
deserializerBufferSize,
- shuffleWriterType.name,
- enableCudf)
+ shuffleWriterType.name)
// Close shuffle reader instance as lately as the end of task processing,
// since the native reader could hold a reference to memory pool that
// was used to create all buffers read from shuffle reader. The pool
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index f913f90f64..0420f5a6c3 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -1083,8 +1083,7 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
jint batchSize,
jlong readerBufferSize,
jlong deserializerBufferSize,
- jstring shuffleWriterType,
- jboolean enableCudf) {
+ jstring shuffleWriterType) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
@@ -1096,7 +1095,6 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
options.batchSize = batchSize;
options.readerBufferSize = readerBufferSize;
options.deserializerBufferSize = deserializerBufferSize;
- options.enableCudf = enableCudf;
options.shuffleWriterType =
ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterType));
std::shared_ptr<arrow::Schema> schema =
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 2273322f67..aaf653f59c 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -43,7 +43,7 @@ static constexpr int64_t kDefaultDeserializerBufferSize = 1
<< 20;
static constexpr int64_t kDefaultShuffleFileBufferSize = 32 << 10;
static constexpr bool kDefaultEnableDictionary = false;
-enum class ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle };
+enum class ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle,
kGpuHashShuffle };
enum class PartitionWriterType { kLocal, kRss };
@@ -62,11 +62,6 @@ struct ShuffleReaderOptions {
// Buffer size when deserializing rows into columnar batches. Only used for
sort-based shuffle.
int64_t deserializerBufferSize = kDefaultDeserializerBufferSize;
-
- // When true, convert the buffers to cudf table.
- // Add a lock after reader produces the Vector, the next operator should be
CudfFromVelox.
- // After move the shuffle read operation to gpu, move the lock to start read.
- bool enableCudf = false;
};
struct ShuffleWriterOptions {
diff --git a/cpp/core/shuffle/ShuffleWriter.cc
b/cpp/core/shuffle/ShuffleWriter.cc
index 099794ddb7..71df2b1a7b 100644
--- a/cpp/core/shuffle/ShuffleWriter.cc
+++ b/cpp/core/shuffle/ShuffleWriter.cc
@@ -24,6 +24,7 @@ namespace {
const std::string kHashShuffleName = "hash";
const std::string kSortShuffleName = "sort";
const std::string kRssSortShuffleName = "rss_sort";
+const std::string kGpuHashShuffleName = "gpu_hash";
} // namespace
ShuffleWriterType ShuffleWriter::stringToType(const std::string& typeString) {
@@ -36,6 +37,9 @@ ShuffleWriterType ShuffleWriter::stringToType(const
std::string& typeString) {
if (typeString == kRssSortShuffleName) {
return ShuffleWriterType::kRssSortShuffle;
}
+ if (typeString == kGpuHashShuffleName) {
+ return ShuffleWriterType::kGpuHashShuffle;
+ }
throw GlutenException("Unrecognized shuffle writer type: " + typeString);
}
@@ -47,6 +51,8 @@ std::string ShuffleWriter::typeToString(ShuffleWriterType
type) {
return kSortShuffleName;
case ShuffleWriterType::kRssSortShuffle:
return kRssSortShuffleName;
+ case ShuffleWriterType::kGpuHashShuffle:
+ return kGpuHashShuffleName;
}
GLUTEN_UNREACHABLE();
}
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index 5ddf146404..bc858f85c6 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -300,8 +300,7 @@ std::shared_ptr<ShuffleReader>
VeloxRuntime::createShuffleReader(
options.readerBufferSize,
options.deserializerBufferSize,
memoryManager(),
- options.shuffleWriterType,
- options.enableCudf);
+ options.shuffleWriterType);
return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
}
diff --git a/cpp/velox/shuffle/GpuShuffleReader.h
b/cpp/velox/shuffle/GpuShuffleReader.h
index 3de5a8228d..4617b0c2b6 100644
--- a/cpp/velox/shuffle/GpuShuffleReader.h
+++ b/cpp/velox/shuffle/GpuShuffleReader.h
@@ -27,6 +27,9 @@
namespace gluten {
+/// Convert the buffers to cudf table.
+/// Add a lock after reader produces the Vector, relase the lock after the
thread processes all the batches.
+/// After move the shuffle read operation to gpu, move the lock to start read.
class GpuHashShuffleReaderDeserializer final : public ColumnarBatchIterator {
public:
GpuHashShuffleReaderDeserializer(
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index b6c1bf4b42..2dac73224a 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -803,10 +803,8 @@
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
int64_t readerBufferSize,
int64_t deserializerBufferSize,
VeloxMemoryManager* memoryManager,
- ShuffleWriterType shuffleWriterType,
- bool enableCudf)
- : enableCudf_(enableCudf),
- schema_(schema),
+ ShuffleWriterType shuffleWriterType)
+ : schema_(schema),
codec_(codec),
veloxCompressionType_(veloxCompressionType),
rowType_(rowType),
@@ -821,10 +819,9 @@
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
std::unique_ptr<ColumnarBatchIterator>
VeloxShuffleReaderDeserializerFactory::createDeserializer(
const std::shared_ptr<StreamReader>& streamReader) {
switch (shuffleWriterType_) {
- case ShuffleWriterType::kHashShuffle:
- #ifdef GLUTEN_ENABLE_GPU
- if (enableCudf_) {
- return std::make_unique<GpuHashShuffleReaderDeserializer>(
+ case ShuffleWriterType::kGpuHashShuffle:
+#ifdef GLUTEN_ENABLE_GPU
+ return std::make_unique<GpuHashShuffleReaderDeserializer>(
streamReader,
schema_,
codec_,
@@ -836,8 +833,8 @@ std::unique_ptr<ColumnarBatchIterator>
VeloxShuffleReaderDeserializerFactory::cr
hasComplexType_,
deserializeTime_,
decompressTime_);
- }
#endif
+ case ShuffleWriterType::kHashShuffle:
return std::make_unique<VeloxHashShuffleReaderDeserializer>(
streamReader,
schema_,
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h
b/cpp/velox/shuffle/VeloxShuffleReader.h
index bdc67d2643..26a1634f4d 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -169,8 +169,7 @@ class VeloxShuffleReaderDeserializerFactory {
int64_t readerBufferSize,
int64_t deserializerBufferSize,
VeloxMemoryManager* memoryManager,
- ShuffleWriterType shuffleWriterType,
- bool enableCudf);
+ ShuffleWriterType shuffleWriterType);
std::unique_ptr<ColumnarBatchIterator> createDeserializer(const
std::shared_ptr<StreamReader>& streamReader);
@@ -181,7 +180,6 @@ class VeloxShuffleReaderDeserializerFactory {
private:
void initFromSchema();
- const bool enableCudf_;
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::util::Codec> codec_;
facebook::velox::common::CompressionKind veloxCompressionType_;
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index c2906c67d8..0d62faafee 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -305,8 +305,7 @@ class VeloxShuffleWriterTest : public
::testing::TestWithParam<ShuffleTestParams
kDefaultReadBufferSize,
GetParam().deserializerBufferSize,
getDefaultMemoryManager(),
- GetParam().shuffleWriterType,
- false);
+ GetParam().shuffleWriterType);
const auto reader =
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
index a1b497c8bb..6a0f2130d7 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
@@ -42,8 +42,7 @@ public class ShuffleReaderJniWrapper implements RuntimeAware {
int batchSize,
long readerBufferSize,
long deserializerBufferSize,
- String shuffleWriterType,
- boolean enableCudf);
+ String shuffleWriterType);
public native long read(long shuffleReaderHandle, ShuffleStreamReader
streamReader);
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 0f2cc5b30c..2ee41f0d8d 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -398,8 +398,7 @@ trait SparkPlanExecApi {
def createColumnarBatchSerializer(
schema: StructType,
metrics: Map[String, SQLMetric],
- shuffleWriterType: ShuffleWriterType,
- enableCudf: Boolean = false): Serializer
+ shuffleWriterType: ShuffleWriterType): Serializer
/** Create broadcast relation for BroadcastExchangeExec */
def createBroadcastRelation(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 9359a465a1..b57bd59123 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -45,6 +45,10 @@ case object RssSortShuffleWriterType extends
ShuffleWriterType {
override val name: String = ReservedKeys.GLUTEN_RSS_SORT_SHUFFLE_WRITER
}
+case object GpuHashShuffleWriterType extends ShuffleWriterType {
+ override val name: String = ReservedKeys.GLUTEN_GPU_HASH_SHUFFLE_WRITER
+}
+
/*
* Note: Gluten configiguration.md is automatically generated from this code.
* Make sure to run dev/gen-all-config-docs.sh after making changes to this
file.
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/ReservedKeys.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/ReservedKeys.scala
index 6635a8b20a..d609a80e2a 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/ReservedKeys.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/ReservedKeys.scala
@@ -34,4 +34,5 @@ object ReservedKeys {
val GLUTEN_HASH_SHUFFLE_WRITER = "hash"
val GLUTEN_SORT_SHUFFLE_WRITER = "sort"
val GLUTEN_RSS_SORT_SHUFFLE_WRITER = "rss_sort"
+ val GLUTEN_GPU_HASH_SHUFFLE_WRITER = "gpu_hash"
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GPUColumnarShuffleExchangeExec.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GPUColumnarShuffleExchangeExec.scala
index 0c1f9cfe31..79a66b00b7 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GPUColumnarShuffleExchangeExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GPUColumnarShuffleExchangeExec.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.config.GpuHashShuffleWriterType
+import org.apache.gluten.config.HashShuffleWriterType
+import org.apache.gluten.execution.ValidationResult
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.internal.Logging
@@ -34,9 +37,19 @@ case class GPUColumnarShuffleExchangeExec(
advisoryPartitionSize: Option[Long] = None)
extends ColumnarShuffleExchangeExecBase(outputPartitioning, child,
projectOutputAttributes) {
+ override protected def doValidateInternal(): ValidationResult = {
+ val validation = super.doValidateInternal()
+ if (!validation.ok()) {
+ return validation
+ }
+ if (shuffleWriterType != HashShuffleWriterType) {
+ return ValidationResult.failed("Only support hash partitioning")
+ }
+ ValidationResult.succeeded
+ }
// super.stringArgs ++ Iterator(output.map(o =>
s"${o}#${o.dataType.simpleString}"))
val serializer: Serializer = BackendsApiManager.getSparkPlanExecApiInstance
- .createColumnarBatchSerializer(schema, metrics, shuffleWriterType, true)
+ .createColumnarBatchSerializer(schema, metrics, GpuHashShuffleWriterType)
override def nodeName: String = "CudfColumnarExchange"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]