This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0a86df3614 [GLUEN-10107][CORE][VELOX] Introduce
NeedCustomColumnarBatchSerializer trait to make columnarBatchSerializerClass
custom by rss implementation (#10201)
0a86df3614 is described below
commit 0a86df3614aed7a5bf7fd86507874fdc94dfab9e
Author: Terry Wang <[email protected]>
AuthorDate: Tue Aug 19 18:36:35 2025 +0800
[GLUEN-10107][CORE][VELOX] Introduce NeedCustomColumnarBatchSerializer
trait to make columnarBatchSerializerClass custom by rss implementation (#10201)
---
...celeborn.CelebornColumnarBatchSerializerFactory | 1 +
...VeloxCelebornColumnarBatchSerlizerFactory.scala | 13 ++++++-
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 44 ++++++++++++----------
.../CelebornColumnarBatchSerializerFactory.java | 8 +++-
.../gluten/celeborn/CelebornShuffleManager.java | 36 +++++++++++++++++-
.../gluten/shuffle/SupportsColumnarShuffle.scala | 4 ++
6 files changed, 81 insertions(+), 25 deletions(-)
diff --git
a/backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory
b/backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory
new file mode 100644
index 0000000000..eb150c4942
--- /dev/null
+++
b/backends-velox/src-celeborn/main/resources/META-INF/services/org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory
@@ -0,0 +1 @@
+org.apache.spark.shuffle.VeloxCelebornColumnarBatchSerlizerFactory
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala
similarity index 63%
copy from
gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
copy to
backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala
index a5b2719a6d..a8f27d7616 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
+++
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerlizerFactory.scala
@@ -14,6 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.shuffle
+package org.apache.spark.shuffle
-trait SupportsColumnarShuffle
+import org.apache.gluten.backendsapi.velox.VeloxBackend
+
+import
org.apache.spark.shuffle.gluten.celeborn.CelebornColumnarBatchSerializerFactory
+
+class VeloxCelebornColumnarBatchSerlizerFactory extends
CelebornColumnarBatchSerializerFactory {
+ override def backendName(): String = VeloxBackend.BACKEND_NAME
+
+ override def columnarBatchSerializerClass(): String =
+ "org.apache.spark.shuffle.CelebornColumnarBatchSerializer"
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 18e26f8a25..816b6d5a66 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -23,10 +23,11 @@ import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.expression.aggregate.{HLLAdapter,
VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet}
import org.apache.gluten.extension.columnar.FallbackTags
+import org.apache.gluten.shuffle.NeedCustomColumnarBatchSerializer
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.vectorized.{ColumnarBatchSerializer,
ColumnarBatchSerializeResult}
-import org.apache.spark.{ShuffleDependency, SparkException}
+import org.apache.spark.{ShuffleDependency, SparkEnv, SparkException}
import org.apache.spark.api.python.{ColumnarArrowEvalPythonExec,
PullOutArrowEvalPythonPreProjectHelper}
import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.rdd.RDD
@@ -556,6 +557,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
partitioning: Partitioning,
output: Seq[Attribute]): ShuffleWriterType = {
val conf = GlutenConfig.get
+ // todo: remove isUseCelebornShuffleManager here
if (conf.isUseCelebornShuffleManager) {
if (conf.celebornShuffleWriterType ==
ReservedKeys.GLUTEN_SORT_SHUFFLE_WRITER) {
if (conf.useCelebornRssSort) {
@@ -632,25 +634,27 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
val deserializeTime = metrics("deserializeTime")
val readBatchNumRows = metrics("avgReadBatchNumRows")
val decompressTime = metrics("decompressTime")
- if (GlutenConfig.get.isUseCelebornShuffleManager) {
- val clazz =
ClassUtils.getClass("org.apache.spark.shuffle.CelebornColumnarBatchSerializer")
- val constructor =
- clazz.getConstructor(
- classOf[StructType],
- classOf[SQLMetric],
- classOf[SQLMetric],
- classOf[ShuffleWriterType])
- constructor
- .newInstance(schema, readBatchNumRows, numOutputRows,
shuffleWriterType)
- .asInstanceOf[Serializer]
- } else {
- new ColumnarBatchSerializer(
- schema,
- readBatchNumRows,
- numOutputRows,
- deserializeTime,
- decompressTime,
- shuffleWriterType)
+ SparkEnv.get.shuffleManager match {
+ case serializer: NeedCustomColumnarBatchSerializer =>
+ val className = serializer.columnarBatchSerializerClass()
+ val clazz = ClassUtils.getClass(className)
+ val constructor =
+ clazz.getConstructor(
+ classOf[StructType],
+ classOf[SQLMetric],
+ classOf[SQLMetric],
+ classOf[ShuffleWriterType])
+ constructor
+ .newInstance(schema, readBatchNumRows, numOutputRows,
shuffleWriterType)
+ .asInstanceOf[Serializer]
+ case _ =>
+ new ColumnarBatchSerializer(
+ schema,
+ readBatchNumRows,
+ numOutputRows,
+ deserializeTime,
+ decompressTime,
+ shuffleWriterType)
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornColumnarBatchSerializerFactory.java
similarity index 82%
copy from
gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
copy to
gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornColumnarBatchSerializerFactory.java
index a5b2719a6d..b15f0d6dbf 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
+++
b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornColumnarBatchSerializerFactory.java
@@ -14,6 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.shuffle
+package org.apache.spark.shuffle.gluten.celeborn;
-trait SupportsColumnarShuffle
+public interface CelebornColumnarBatchSerializerFactory {
+ String backendName();
+
+ String columnarBatchSerializerClass();
+}
diff --git
a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
index c963e4be20..a04c19d6e2 100644
---
a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
+++
b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
@@ -19,6 +19,7 @@ package org.apache.spark.shuffle.gluten.celeborn;
import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.config.GlutenConfig;
import org.apache.gluten.exception.GlutenException;
+import org.apache.gluten.shuffle.NeedCustomColumnarBatchSerializer;
import org.apache.gluten.shuffle.SupportsColumnarShuffle;
import com.google.common.base.Preconditions;
@@ -42,7 +43,8 @@ import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-public class CelebornShuffleManager implements ShuffleManager,
SupportsColumnarShuffle {
+public class CelebornShuffleManager
+ implements ShuffleManager, SupportsColumnarShuffle,
NeedCustomColumnarBatchSerializer {
private static final Logger logger =
LoggerFactory.getLogger(CelebornShuffleManager.class);
@@ -63,6 +65,8 @@ public class CelebornShuffleManager implements
ShuffleManager, SupportsColumnarS
private static final CelebornShuffleWriterFactory writerFactory;
+ private static final CelebornColumnarBatchSerializerFactory
columnarBatchSerializerFactory;
+
static {
final ServiceLoader<CelebornShuffleWriterFactory> loader =
ServiceLoader.load(CelebornShuffleWriterFactory.class);
@@ -81,6 +85,31 @@ public class CelebornShuffleManager implements
ShuffleManager, SupportsColumnarS
"No Celeborn shuffle writer factory found for backend " +
backendName);
}
writerFactory = factoryMap.get(backendName);
+
+ final ServiceLoader<CelebornColumnarBatchSerializerFactory>
+ celebornColumnarBatchSerializerFactoriesLoader =
+ ServiceLoader.load(CelebornColumnarBatchSerializerFactory.class);
+ final List<CelebornColumnarBatchSerializerFactory>
columnarBatchSerializerFactoryList =
+ Arrays.stream(
+ Iterators.toArray(
+ celebornColumnarBatchSerializerFactoriesLoader.iterator(),
+ CelebornColumnarBatchSerializerFactory.class))
+ .collect(Collectors.toList());
+ // for now, we ignore check since CH backend has not support this
feature yet.
+ // Preconditions.checkState(
+ // !columnarBatchSerializerFactoryList.isEmpty(),
+ // "No factory found for Celeborn columnar batch serializer");
+ final Map<String, CelebornColumnarBatchSerializerFactory>
columanrBatchSerilizerFactoryMap =
+ columnarBatchSerializerFactoryList.stream()
+
.collect(Collectors.toMap(CelebornColumnarBatchSerializerFactory::backendName,
f -> f));
+
+ // for now, we ignore check since CH backend has not support this
feature yet.
+ // if (!columanrBatchSerilizerFactoryMap.containsKey(backendName)) {
+ // throw new UnsupportedOperationException(
+ // "No Celeborn columnar batch serializer writer factory found
for backend " +
+ // backendName);
+ // }
+ columnarBatchSerializerFactory =
columanrBatchSerilizerFactoryMap.get(backendName);
}
private final SparkConf conf;
@@ -408,4 +437,9 @@ public class CelebornShuffleManager implements
ShuffleManager, SupportsColumnarS
.getReader(
handle, startMapIndex, endMapIndex, startPartition, endPartition,
context, metrics);
}
+
+ @Override
+ public String columnarBatchSerializerClass() {
+ return columnarBatchSerializerFactory.columnarBatchSerializerClass();
+ }
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
index a5b2719a6d..ac13b7e874 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/shuffle/SupportsColumnarShuffle.scala
@@ -17,3 +17,7 @@
package org.apache.gluten.shuffle
trait SupportsColumnarShuffle
+
+trait NeedCustomColumnarBatchSerializer {
+ def columnarBatchSerializerClass(): String
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]