This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a86df3614 [GLUEN-10107][CORE][VELOX] Introduce 
NeedCustomColumnarBatchSerializer trait to make columnarBatchSerializerClass 
custom by rss implementation (#10201)
0a86df3614 is described below

commit 0a86df3614aed7a5bf7fd86507874fdc94dfab9e
Author: Terry Wang <[email protected]>
AuthorDate: Tue Aug 19 18:36:35 2025 +0800

    [GLUEN-10107][CORE][VELOX] Introduce NeedCustomColumnarBatchSerializer 
trait to make columnarBatchSerializerClass custom by rss implementation (#10201)
---
 ...celeborn.CelebornColumnarBatchSerializerFactory |  1 +
 ...VeloxCelebornColumnarBatchSerlizerFactory.scala | 13 ++++++-
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  | 44 ++++++++++++----------
 .../CelebornColumnarBatchSerializerFactory.java    |  8 +++-
 .../gluten/celeborn/CelebornShuffleManager.java    | 36 +++++++++++++++++-
 .../gluten/shuffle/SupportsColumnarShuffle.scala   |  4 ++
 6 files changed, 81 insertions(+), 25 deletions(-)

diff --git 
a/backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory
 
b/backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory
new file mode 100644
index 0000000000..eb150c4942
--- /dev/null
+++ 
b/backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory
@@ -0,0 +1 @@
+org.apache.spark.shuffle.VeloxCelebornColumnarBatchSerlizerFactory
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala
similarity index 63%
copy from 
gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
copy to 
backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala
index a5b2719a6d..a8f27d7616 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
+++ 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala
@@ -14,6 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.shuffle
+package org.apache.spark.shuffle
 
-trait SupportsColumnarShuffle
+import org.apache.gluten.backendsapi.velox.VeloxBackend
+
+import 
org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory
+
+class VeloxCelebornColumnarBatchSerlizerFactory extends 
CelebornColumnarBatchSerializerFactory {
+  override def backendName(): String = VeloxBackend.BACKEND_NAME
+
+  override def columnarBatchSerializerClass(): String =
+    "org.apache.spark.shuffle.CelebornColumnarBatchSerializer"
+}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 18e26f8a25..816b6d5a66 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -23,10 +23,11 @@ import org.apache.gluten.execution._
 import org.apache.gluten.expression._
 import org.apache.gluten.expression.aggregate.{HLLAdapter, 
VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet}
 import org.apache.gluten.extension.columnar.FallbackTags
+import org.apache.gluten.shuffle.NeedCustomColumnarBatchSerializer
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.vectorized.{ColumnarBatchSerializer, 
ColumnarBatchSerializeResult}
 
-import org.apache.spark.{ShuffleDependency, SparkException}
+import org.apache.spark.{ShuffleDependency, SparkEnv, SparkException}
 import org.apache.spark.api.python.{ColumnarArrowEvalPythonExec, 
PullOutArrowEvalPythonPreProjectHelper}
 import org.apache.spark.memory.SparkMemoryUtil
 import org.apache.spark.rdd.RDD
@@ -556,6 +557,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       partitioning: Partitioning,
       output: Seq[Attribute]): ShuffleWriterType = {
     val conf = GlutenConfig.get
+    // todo: remove isUseCelebornShuffleManager here
     if (conf.isUseCelebornShuffleManager) {
       if (conf.celebornShuffleWriterType == 
ReservedKeys.GLUTEN_SORT_SHUFFLE_WRITER) {
         if (conf.useCelebornRssSort) {
@@ -632,25 +634,27 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
     val deserializeTime = metrics("deserializeTime")
     val readBatchNumRows = metrics("avgReadBatchNumRows")
     val decompressTime = metrics("decompressTime")
-    if (GlutenConfig.get.isUseCelebornShuffleManager) {
-      val clazz = 
ClassUtils.getClass("org.apache.spark.shuffle.CelebornColumnarBatchSerializer")
-      val constructor =
-        clazz.getConstructor(
-          classOf[StructType],
-          classOf[SQLMetric],
-          classOf[SQLMetric],
-          classOf[ShuffleWriterType])
-      constructor
-        .newInstance(schema, readBatchNumRows, numOutputRows, 
shuffleWriterType)
-        .asInstanceOf[Serializer]
-    } else {
-      new ColumnarBatchSerializer(
-        schema,
-        readBatchNumRows,
-        numOutputRows,
-        deserializeTime,
-        decompressTime,
-        shuffleWriterType)
+    SparkEnv.get.shuffleManager match {
+      case serializer: NeedCustomColumnarBatchSerializer =>
+        val className = serializer.columnarBatchSerializerClass()
+        val clazz = ClassUtils.getClass(className)
+        val constructor =
+          clazz.getConstructor(
+            classOf[StructType],
+            classOf[SQLMetric],
+            classOf[SQLMetric],
+            classOf[ShuffleWriterType])
+        constructor
+          .newInstance(schema, readBatchNumRows, numOutputRows, 
shuffleWriterType)
+          .asInstanceOf[Serializer]
+      case _ =>
+        new ColumnarBatchSerializer(
+          schema,
+          readBatchNumRows,
+          numOutputRows,
+          deserializeTime,
+          decompressTime,
+          shuffleWriterType)
     }
   }
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
 
b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornColumnarBatchSerializerFactory.java
similarity index 82%
copy from 
gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
copy to 
gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornColumnarBatchSerializerFactory.java
index a5b2719a6d..b15f0d6dbf 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
+++ 
b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornColumnarBatchSerializerFactory.java
@@ -14,6 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.shuffle
+package org.apache.spark.shuffle.gluten.celeborn;
 
-trait SupportsColumnarShuffle
+public interface CelebornColumnarBatchSerializerFactory {
+  String backendName();
+
+  String columnarBatchSerializerClass();
+}
diff --git 
a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
 
b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
index c963e4be20..a04c19d6e2 100644
--- 
a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
+++ 
b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
@@ -19,6 +19,7 @@ package org.apache.spark.shuffle.gluten.celeborn;
 import org.apache.gluten.backendsapi.BackendsApiManager;
 import org.apache.gluten.config.GlutenConfig;
 import org.apache.gluten.exception.GlutenException;
+import org.apache.gluten.shuffle.NeedCustomColumnarBatchSerializer;
 import org.apache.gluten.shuffle.SupportsColumnarShuffle;
 
 import com.google.common.base.Preconditions;
@@ -42,7 +43,8 @@ import java.util.ServiceLoader;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
-public class CelebornShuffleManager implements ShuffleManager, 
SupportsColumnarShuffle {
+public class CelebornShuffleManager
+    implements ShuffleManager, SupportsColumnarShuffle, 
NeedCustomColumnarBatchSerializer {
 
   private static final Logger logger = 
LoggerFactory.getLogger(CelebornShuffleManager.class);
 
@@ -63,6 +65,8 @@ public class CelebornShuffleManager implements 
ShuffleManager, SupportsColumnarS
 
   private static final CelebornShuffleWriterFactory writerFactory;
 
+  private static final CelebornColumnarBatchSerializerFactory 
columnarBatchSerializerFactory;
+
   static {
     final ServiceLoader<CelebornShuffleWriterFactory> loader =
         ServiceLoader.load(CelebornShuffleWriterFactory.class);
@@ -81,6 +85,31 @@ public class CelebornShuffleManager implements 
ShuffleManager, SupportsColumnarS
           "No Celeborn shuffle writer factory found for backend " + 
backendName);
     }
     writerFactory = factoryMap.get(backendName);
+
+    final ServiceLoader<CelebornColumnarBatchSerializerFactory>
+        celebornColumnarBatchSerializerFactoriesLoader =
+            ServiceLoader.load(CelebornColumnarBatchSerializerFactory.class);
+    final List<CelebornColumnarBatchSerializerFactory> 
columnarBatchSerializerFactoryList =
+        Arrays.stream(
+                Iterators.toArray(
+                    celebornColumnarBatchSerializerFactoriesLoader.iterator(),
+                    CelebornColumnarBatchSerializerFactory.class))
+            .collect(Collectors.toList());
+    //   for now, we ignore check since CH backend has not support this 
feature yet.
+    //    Preconditions.checkState(
+    //        !columnarBatchSerializerFactoryList.isEmpty(),
+    //        "No factory found for Celeborn columnar batch serializer");
+    final Map<String, CelebornColumnarBatchSerializerFactory> 
columanrBatchSerilizerFactoryMap =
+        columnarBatchSerializerFactoryList.stream()
+            
.collect(Collectors.toMap(CelebornColumnarBatchSerializerFactory::backendName, 
f -> f));
+
+    //   for now, we ignore check since CH backend has not support this 
feature yet.
+    //    if (!columanrBatchSerilizerFactoryMap.containsKey(backendName)) {
+    //      throw new UnsupportedOperationException(
+    //          "No Celeborn columnar batch serializer writer factory found 
for backend " +
+    // backendName);
+    //    }
+    columnarBatchSerializerFactory = 
columanrBatchSerilizerFactoryMap.get(backendName);
   }
 
   private final SparkConf conf;
@@ -408,4 +437,9 @@ public class CelebornShuffleManager implements 
ShuffleManager, SupportsColumnarS
         .getReader(
             handle, startMapIndex, endMapIndex, startPartition, endPartition, 
context, metrics);
   }
+
+  @Override
+  public String columnarBatchSerializerClass() {
+    return columnarBatchSerializerFactory.columnarBatchSerializerClass();
+  }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
index a5b2719a6d..ac13b7e874 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
@@ -17,3 +17,7 @@
 package org.apache.gluten.shuffle
 
 trait SupportsColumnarShuffle
+
+trait NeedCustomColumnarBatchSerializer {
+  def columnarBatchSerializerClass(): String
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to