This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 40a8988b04 [GLUTEN-10933][VL] Introduce GPU ShuffleWriterType 
kGpuHashShuffle (#10984)
40a8988b04 is described below

commit 40a8988b040e4b06657ead9f54c4a6ff49749167
Author: Jin Chengcheng <[email protected]>
AuthorDate: Mon Nov 3 16:34:47 2025 +0000

    [GLUTEN-10933][VL] Introduce GPU ShuffleWriterType kGpuHashShuffle (#10984)
---
 .../backendsapi/clickhouse/CHSparkPlanExecApi.scala       |  3 +--
 .../shuffle/VeloxCelebornColumnarBatchSerializer.scala    |  3 +--
 .../gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala  |  6 ++----
 .../gluten/vectorized/ColumnarBatchSerializer.scala       | 12 ++++--------
 cpp/core/jni/JniWrapper.cc                                |  4 +---
 cpp/core/shuffle/Options.h                                |  7 +------
 cpp/core/shuffle/ShuffleWriter.cc                         |  6 ++++++
 cpp/velox/compute/VeloxRuntime.cc                         |  3 +--
 cpp/velox/shuffle/GpuShuffleReader.h                      |  3 +++
 cpp/velox/shuffle/VeloxShuffleReader.cc                   | 15 ++++++---------
 cpp/velox/shuffle/VeloxShuffleReader.h                    |  4 +---
 cpp/velox/tests/VeloxShuffleWriterTest.cc                 |  3 +--
 .../apache/gluten/vectorized/ShuffleReaderJniWrapper.java |  3 +--
 .../org/apache/gluten/backendsapi/SparkPlanExecApi.scala  |  3 +--
 .../scala/org/apache/gluten/config/GlutenConfig.scala     |  4 ++++
 .../scala/org/apache/gluten/config/ReservedKeys.scala     |  1 +
 .../sql/execution/GPUColumnarShuffleExchangeExec.scala    | 15 ++++++++++++++-
 17 files changed, 49 insertions(+), 46 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 58b8e6e3f5..dbaccb16c8 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -454,8 +454,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
   override def createColumnarBatchSerializer(
       schema: StructType,
       metrics: Map[String, SQLMetric],
-      shuffleWriterType: ShuffleWriterType,
-      enableCudf: Boolean): Serializer = {
+      shuffleWriterType: ShuffleWriterType): Serializer = {
     val readBatchNumRows = metrics("avgReadBatchNumRows")
     val numOutputRows = metrics("numOutputRows")
     val dataSize = metrics("dataSize")
diff --git 
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index 529945d9f4..ce32a9b7ad 100644
--- 
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++ 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -103,8 +103,7 @@ private class CelebornColumnarBatchSerializerInstance(
         batchSize,
         readerBufferSize,
         deserializerBufferSize,
-        shuffleWriterType.name,
-        false
+        shuffleWriterType.name
       )
     // Close shuffle reader instance as lately as the end of task processing,
     // since the native reader could hold a reference to memory pool that
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 1c65cc778f..20f52997ab 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -633,8 +633,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
   override def createColumnarBatchSerializer(
       schema: StructType,
       metrics: Map[String, SQLMetric],
-      shuffleWriterType: ShuffleWriterType,
-      enableCudf: Boolean): Serializer = {
+      shuffleWriterType: ShuffleWriterType): Serializer = {
     val numOutputRows = metrics("numOutputRows")
     val deserializeTime = metrics("deserializeTime")
     val readBatchNumRows = metrics("avgReadBatchNumRows")
@@ -659,8 +658,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
           numOutputRows,
           deserializeTime,
           decompressTime,
-          shuffleWriterType,
-          enableCudf)
+          shuffleWriterType)
     }
   }
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
index 2369cf3642..3b5fce63f8 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
@@ -51,8 +51,7 @@ class ColumnarBatchSerializer(
     numOutputRows: SQLMetric,
     deserializeTime: SQLMetric,
     decompressTime: SQLMetric,
-    shuffleWriterType: ShuffleWriterType,
-    enableCudf: Boolean)
+    shuffleWriterType: ShuffleWriterType)
   extends Serializer
   with Serializable {
 
@@ -64,8 +63,7 @@ class ColumnarBatchSerializer(
       numOutputRows,
       deserializeTime,
       decompressTime,
-      shuffleWriterType,
-      enableCudf)
+      shuffleWriterType)
   }
 
   override def supportsRelocationOfSerializedObjects: Boolean = true
@@ -77,8 +75,7 @@ private class ColumnarBatchSerializerInstanceImpl(
     numOutputRows: SQLMetric,
     deserializeTime: SQLMetric,
     decompressTime: SQLMetric,
-    shuffleWriterType: ShuffleWriterType,
-    enableCudf: Boolean)
+    shuffleWriterType: ShuffleWriterType)
   extends ColumnarBatchSerializerInstance
   with Logging {
 
@@ -114,8 +111,7 @@ private class ColumnarBatchSerializerInstanceImpl(
       batchSize,
       readerBufferSize,
       deserializerBufferSize,
-      shuffleWriterType.name,
-      enableCudf)
+      shuffleWriterType.name)
     // Close shuffle reader instance as lately as the end of task processing,
     // since the native reader could hold a reference to memory pool that
     // was used to create all buffers read from shuffle reader. The pool
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index f913f90f64..0420f5a6c3 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -1083,8 +1083,7 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
     jint batchSize,
     jlong readerBufferSize,
     jlong deserializerBufferSize,
-    jstring shuffleWriterType,
-    jboolean enableCudf) {
+    jstring shuffleWriterType) {
   JNI_METHOD_START
   auto ctx = getRuntime(env, wrapper);
 
@@ -1096,7 +1095,6 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
   options.batchSize = batchSize;
   options.readerBufferSize = readerBufferSize;
   options.deserializerBufferSize = deserializerBufferSize;
-  options.enableCudf = enableCudf;
 
   options.shuffleWriterType = 
ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterType));
   std::shared_ptr<arrow::Schema> schema =
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 2273322f67..aaf653f59c 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -43,7 +43,7 @@ static constexpr int64_t kDefaultDeserializerBufferSize = 1 
<< 20;
 static constexpr int64_t kDefaultShuffleFileBufferSize = 32 << 10;
 static constexpr bool kDefaultEnableDictionary = false;
 
-enum class ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle };
+enum class ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle, 
kGpuHashShuffle };
 
 enum class PartitionWriterType { kLocal, kRss };
 
@@ -62,11 +62,6 @@ struct ShuffleReaderOptions {
 
   // Buffer size when deserializing rows into columnar batches. Only used for 
sort-based shuffle.
   int64_t deserializerBufferSize = kDefaultDeserializerBufferSize;
-
-  // When true, convert the buffers to cudf table.
-  // Add a lock after reader produces the Vector, the next operator should be 
CudfFromVelox.
-  // After move the shuffle read operation to gpu, move the lock to start read.
-  bool enableCudf = false;
 };
 
 struct ShuffleWriterOptions {
diff --git a/cpp/core/shuffle/ShuffleWriter.cc 
b/cpp/core/shuffle/ShuffleWriter.cc
index 099794ddb7..71df2b1a7b 100644
--- a/cpp/core/shuffle/ShuffleWriter.cc
+++ b/cpp/core/shuffle/ShuffleWriter.cc
@@ -24,6 +24,7 @@ namespace {
 const std::string kHashShuffleName = "hash";
 const std::string kSortShuffleName = "sort";
 const std::string kRssSortShuffleName = "rss_sort";
+const std::string kGpuHashShuffleName = "gpu_hash";
 } // namespace
 
 ShuffleWriterType ShuffleWriter::stringToType(const std::string& typeString) {
@@ -36,6 +37,9 @@ ShuffleWriterType ShuffleWriter::stringToType(const 
std::string& typeString) {
   if (typeString == kRssSortShuffleName) {
     return ShuffleWriterType::kRssSortShuffle;
   }
+  if (typeString == kGpuHashShuffleName) {
+    return ShuffleWriterType::kGpuHashShuffle;
+  }
   throw GlutenException("Unrecognized shuffle writer type: " + typeString);
 }
 
@@ -47,6 +51,8 @@ std::string ShuffleWriter::typeToString(ShuffleWriterType 
type) {
       return kSortShuffleName;
     case ShuffleWriterType::kRssSortShuffle:
       return kRssSortShuffleName;
+    case ShuffleWriterType::kGpuHashShuffle:
+      return kGpuHashShuffleName;
   }
   GLUTEN_UNREACHABLE();
 }
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index 5ddf146404..bc858f85c6 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -300,8 +300,7 @@ std::shared_ptr<ShuffleReader> 
VeloxRuntime::createShuffleReader(
       options.readerBufferSize,
       options.deserializerBufferSize,
       memoryManager(),
-      options.shuffleWriterType,
-      options.enableCudf);
+      options.shuffleWriterType);
 
   return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
 }
diff --git a/cpp/velox/shuffle/GpuShuffleReader.h 
b/cpp/velox/shuffle/GpuShuffleReader.h
index 3de5a8228d..4617b0c2b6 100644
--- a/cpp/velox/shuffle/GpuShuffleReader.h
+++ b/cpp/velox/shuffle/GpuShuffleReader.h
@@ -27,6 +27,9 @@
 
 namespace gluten {
 
+/// Convert the buffers to cudf table.
+/// Add a lock after reader produces the Vector, relase the lock after the 
thread processes all the batches.
+/// After move the shuffle read operation to gpu, move the lock to start read.
 class GpuHashShuffleReaderDeserializer final : public ColumnarBatchIterator {
  public:
   GpuHashShuffleReaderDeserializer(
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc 
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index b6c1bf4b42..2dac73224a 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -803,10 +803,8 @@ 
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
     int64_t readerBufferSize,
     int64_t deserializerBufferSize,
     VeloxMemoryManager* memoryManager,
-    ShuffleWriterType shuffleWriterType,
-    bool enableCudf)
-    : enableCudf_(enableCudf),
-      schema_(schema),
+    ShuffleWriterType shuffleWriterType)
+    : schema_(schema),
       codec_(codec),
       veloxCompressionType_(veloxCompressionType),
       rowType_(rowType),
@@ -821,10 +819,9 @@ 
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
 std::unique_ptr<ColumnarBatchIterator> 
VeloxShuffleReaderDeserializerFactory::createDeserializer(
     const std::shared_ptr<StreamReader>& streamReader) {
   switch (shuffleWriterType_) {
-    case ShuffleWriterType::kHashShuffle:
- #ifdef GLUTEN_ENABLE_GPU       
-      if (enableCudf_) {
-        return std::make_unique<GpuHashShuffleReaderDeserializer>(
+    case ShuffleWriterType::kGpuHashShuffle:
+#ifdef GLUTEN_ENABLE_GPU
+      return std::make_unique<GpuHashShuffleReaderDeserializer>(
           streamReader,
           schema_,
           codec_,
@@ -836,8 +833,8 @@ std::unique_ptr<ColumnarBatchIterator> 
VeloxShuffleReaderDeserializerFactory::cr
           hasComplexType_,
           deserializeTime_,
           decompressTime_);
-      }
 #endif
+    case ShuffleWriterType::kHashShuffle:
       return std::make_unique<VeloxHashShuffleReaderDeserializer>(
           streamReader,
           schema_,
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h 
b/cpp/velox/shuffle/VeloxShuffleReader.h
index bdc67d2643..26a1634f4d 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -169,8 +169,7 @@ class VeloxShuffleReaderDeserializerFactory {
       int64_t readerBufferSize,
       int64_t deserializerBufferSize,
       VeloxMemoryManager* memoryManager,
-      ShuffleWriterType shuffleWriterType,
-      bool enableCudf);
+      ShuffleWriterType shuffleWriterType);
 
   std::unique_ptr<ColumnarBatchIterator> createDeserializer(const 
std::shared_ptr<StreamReader>& streamReader);
 
@@ -181,7 +180,6 @@ class VeloxShuffleReaderDeserializerFactory {
  private:
   void initFromSchema();
 
-  const bool enableCudf_;
   std::shared_ptr<arrow::Schema> schema_;
   std::shared_ptr<arrow::util::Codec> codec_;
   facebook::velox::common::CompressionKind veloxCompressionType_;
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc 
b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index c2906c67d8..0d62faafee 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -305,8 +305,7 @@ class VeloxShuffleWriterTest : public 
::testing::TestWithParam<ShuffleTestParams
         kDefaultReadBufferSize,
         GetParam().deserializerBufferSize,
         getDefaultMemoryManager(),
-        GetParam().shuffleWriterType,
-        false);
+        GetParam().shuffleWriterType);
 
     const auto reader = 
std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
 
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
index a1b497c8bb..6a0f2130d7 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java
@@ -42,8 +42,7 @@ public class ShuffleReaderJniWrapper implements RuntimeAware {
       int batchSize,
       long readerBufferSize,
       long deserializerBufferSize,
-      String shuffleWriterType,
-      boolean enableCudf);
+      String shuffleWriterType);
 
   public native long read(long shuffleReaderHandle, ShuffleStreamReader 
streamReader);
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 0f2cc5b30c..2ee41f0d8d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -398,8 +398,7 @@ trait SparkPlanExecApi {
   def createColumnarBatchSerializer(
       schema: StructType,
       metrics: Map[String, SQLMetric],
-      shuffleWriterType: ShuffleWriterType,
-      enableCudf: Boolean = false): Serializer
+      shuffleWriterType: ShuffleWriterType): Serializer
 
   /** Create broadcast relation for BroadcastExchangeExec */
   def createBroadcastRelation(
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 9359a465a1..b57bd59123 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -45,6 +45,10 @@ case object RssSortShuffleWriterType extends 
ShuffleWriterType {
   override val name: String = ReservedKeys.GLUTEN_RSS_SORT_SHUFFLE_WRITER
 }
 
+case object GpuHashShuffleWriterType extends ShuffleWriterType {
+  override val name: String = ReservedKeys.GLUTEN_GPU_HASH_SHUFFLE_WRITER
+}
+
 /*
  * Note: Gluten configiguration.md is automatically generated from this code.
  * Make sure to run dev/gen-all-config-docs.sh after making changes to this 
file.
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/ReservedKeys.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/ReservedKeys.scala
index 6635a8b20a..d609a80e2a 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/ReservedKeys.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/ReservedKeys.scala
@@ -34,4 +34,5 @@ object ReservedKeys {
   val GLUTEN_HASH_SHUFFLE_WRITER = "hash"
   val GLUTEN_SORT_SHUFFLE_WRITER = "sort"
   val GLUTEN_RSS_SORT_SHUFFLE_WRITER = "rss_sort"
+  val GLUTEN_GPU_HASH_SHUFFLE_WRITER = "gpu_hash"
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GPUColumnarShuffleExchangeExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GPUColumnarShuffleExchangeExec.scala
index 0c1f9cfe31..79a66b00b7 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GPUColumnarShuffleExchangeExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GPUColumnarShuffleExchangeExec.scala
@@ -17,6 +17,9 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.config.GpuHashShuffleWriterType
+import org.apache.gluten.config.HashShuffleWriterType
+import org.apache.gluten.execution.ValidationResult
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.internal.Logging
@@ -34,9 +37,19 @@ case class GPUColumnarShuffleExchangeExec(
     advisoryPartitionSize: Option[Long] = None)
   extends ColumnarShuffleExchangeExecBase(outputPartitioning, child, 
projectOutputAttributes) {
 
+  override protected def doValidateInternal(): ValidationResult = {
+    val validation = super.doValidateInternal()
+    if (!validation.ok()) {
+      return validation
+    }
+    if (shuffleWriterType != HashShuffleWriterType) {
+      return ValidationResult.failed("Only support hash partitioning")
+    }
+    ValidationResult.succeeded
+  }
   // super.stringArgs ++ Iterator(output.map(o => 
s"${o}#${o.dataType.simpleString}"))
   val serializer: Serializer = BackendsApiManager.getSparkPlanExecApiInstance
-    .createColumnarBatchSerializer(schema, metrics, shuffleWriterType, true)
+    .createColumnarBatchSerializer(schema, metrics, GpuHashShuffleWriterType)
 
   override def nodeName: String = "CudfColumnarExchange"
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to