This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d170be26c [CORE] Defer Protobuf serialization of SplitInfos in 
GlutenPartitions (#10662)
0d170be26c is described below

commit 0d170be26c4153edadf34baa4484f0f8510fa75b
Author: kevinwilfong <[email protected]>
AuthorDate: Fri Sep 12 10:38:40 2025 -0700

    [CORE] Defer Protobuf serialization of SplitInfos in GlutenPartitions 
(#10662)
---
 .../gluten/backendsapi/clickhouse/CHIteratorApi.scala  | 18 ++++++++----------
 .../gluten/execution/NativeFileScanColumnarRDD.scala   |  4 +++-
 .../org/apache/spark/affinity/MixedAffinitySuite.scala |  6 ++----
 .../gluten/backendsapi/velox/VeloxIteratorApi.scala    |  9 +++++----
 .../org/apache/gluten/substrait/rel/SplitInfo.java     |  4 ++--
 .../gluten/execution/GlutenWholeStageColumnarRDD.scala |  9 ++++++---
 .../apache/spark/softaffinity/SoftAffinitySuite.scala  | 17 ++++-------------
 .../softaffinity/SoftAffinityWithRDDInfoSuite.scala    |  6 +-----
 8 files changed, 31 insertions(+), 42 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 62ca100cc3..8d22bd2b1c 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -36,7 +36,7 @@ import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
 import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.execution.datasources.FilePartition
-import 
org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuilder, 
ExtensionTableNode}
+import 
org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
 import org.apache.spark.sql.execution.datasources.mergetree.PartSerializer
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types._
@@ -244,26 +244,22 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
     val planByteArray = wsCtx.root.toProtobuf.toByteArray
     splitInfos.zipWithIndex.map {
       case (splits, index) =>
-        val splitInfosByteArray = splits.zipWithIndex.map {
+        val splitInfos = splits.zipWithIndex.map {
           case (split, i) =>
             split match {
               case filesNode: LocalFilesNode if 
leaves(i).isInstanceOf[BasicScanExecTransformer] =>
                 setFileSchemaForLocalFiles(
                   filesNode,
                   leaves(i).asInstanceOf[BasicScanExecTransformer])
-                filesNode.toProtobuf.toByteArray
-              case extensionTableNode: ExtensionTableNode =>
-                extensionTableNode.toProtobuf.toByteArray
-              case kafkaSourceNode: StreamKafkaSourceNode =>
-                kafkaSourceNode.toProtobuf.toByteArray
+                filesNode
+              case splitInfo => splitInfo
             }
         }
 
         GlutenPartition(
           index,
           planByteArray,
-          splitInfosByteArray.toArray,
-          locations = splits.flatMap(_.preferredLocations().asScala).toArray
+          splitInfos.toArray
         )
     }
   }
@@ -289,7 +285,9 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
       "CH backend only accepts GlutenPartition in 
GlutenWholeStageColumnarRDD.")
     val splitInfoByteArray = inputPartition
       .asInstanceOf[GlutenPartition]
-      .splitInfosByteArray
+      .splitInfos
+      .map(splitInfo => splitInfo.toProtobuf.toByteArray)
+      .toArray
     val wsPlan = inputPartition.plan
     val materializeInput = false
 
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
index abe8ea0b94..cf0508d6cb 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
@@ -45,7 +45,9 @@ class NativeFileScanColumnarRDD(
 
     val splitInfoByteArray = inputPartition
       .asInstanceOf[GlutenPartition]
-      .splitInfosByteArray
+      .splitInfos
+      .map(splitInfo => splitInfo.toProtobuf.toByteArray)
+      .toArray
 
     val resIter = GlutenTimeMetric.millis(scanTime) {
       _ =>
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
index 26ab84d460..6c95bfc156 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
@@ -16,9 +16,8 @@
  */
 package org.apache.spark.affinity
 
-import org.apache.gluten.execution.{GlutenMergeTreePartition, GlutenPartition, 
MergeTreePartSplit}
+import org.apache.gluten.execution.{GlutenMergeTreePartition, 
MergeTreePartSplit}
 import org.apache.gluten.softaffinity.AffinityManager
-import org.apache.gluten.substrait.plan.PlanBuilder
 
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.test.SharedSparkSession
@@ -68,9 +67,8 @@ class MixedAffinitySuite extends QueryTest with 
SharedSparkSession {
         StructType(Seq()),
         Map.empty)
     val locations = affinity.getNativeMergeTreePartitionLocations(partition)
-    val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations 
= locations)
     assertResult(Set("forced_host_host-0")) {
-      nativePartition.preferredLocations().toSet
+      locations.toSet
     }
   }
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index ee1889d340..3ce8dbcd94 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -23,7 +23,7 @@ import org.apache.gluten.iterator.Iterators
 import org.apache.gluten.metrics.{IMetrics, IteratorMetricsJniWrapper}
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.plan.PlanNode
-import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode, 
SplitInfo}
+import org.apache.gluten.substrait.rel.{LocalFilesBuilder, SplitInfo}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.gluten.vectorized._
 
@@ -144,8 +144,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
         GlutenPartition(
           index,
           planByteArray,
-          
splitInfos.map(_.asInstanceOf[LocalFilesNode].toProtobuf.toByteArray).toArray,
-          splitInfos.flatMap(_.preferredLocations().asScala).toArray
+          splitInfos.toArray
         )
     }
   }
@@ -241,7 +240,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
 
     val splitInfoByteArray = inputPartition
       .asInstanceOf[GlutenPartition]
-      .splitInfosByteArray
+      .splitInfos
+      .map(splitInfo => splitInfo.toProtobuf.toByteArray)
+      .toArray
     val spillDirPath = SparkDirectoryUtil
       .get()
       .namespace("gluten-spill")
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/SplitInfo.java 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/SplitInfo.java
index 6e2eeff7d8..070a5b5a74 100644
--- 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/SplitInfo.java
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/SplitInfo.java
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.substrait.rel;
 
-import com.google.protobuf.MessageOrBuilder;
+import com.google.protobuf.Message;
 
 import java.io.Serializable;
 import java.util.List;
@@ -30,5 +30,5 @@ public interface SplitInfo extends Serializable {
   /** The preferred locations where the table files returned by this read 
split can run faster. */
   List<String> preferredLocations();
 
-  MessageOrBuilder toProtobuf();
+  Message toProtobuf();
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
index d332e3774a..263d56720c 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics}
+import org.apache.gluten.substrait.rel.SplitInfo
 
 import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
 import org.apache.spark.rdd.RDD
@@ -26,6 +27,8 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
+import scala.collection.JavaConverters.asScalaBufferConverter
+
 trait BaseGlutenPartition extends Partition with InputPartition {
   def plan: Array[Byte]
 }
@@ -33,13 +36,13 @@ trait BaseGlutenPartition extends Partition with 
InputPartition {
 case class GlutenPartition(
     index: Int,
     plan: Array[Byte],
-    splitInfosByteArray: Array[Array[Byte]] = Array.empty[Array[Byte]],
-    locations: Array[String] = Array.empty[String],
+    splitInfos: Array[SplitInfo] = Array.empty[SplitInfo],
     files: Array[String] =
       Array.empty[String] // touched files, for implementing UDF 
input_file_name
 ) extends BaseGlutenPartition {
 
-  override def preferredLocations(): Array[String] = locations
+  override def preferredLocations(): Array[String] =
+    splitInfos.flatMap(_.preferredLocations().asScala)
 }
 
 case class FirstZippedPartitionsPartition(
diff --git 
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
 
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
index fced0424d9..f802daccfa 100644
--- 
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
+++ 
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
@@ -17,10 +17,8 @@
 package org.apache.spark.softaffinity
 
 import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.GlutenPartition
 import org.apache.gluten.softaffinity.SoftAffinityManager
 import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.substrait.plan.PlanBuilder
 
 import org.apache.spark.SparkConf
 import org.apache.spark.scheduler.{SparkListenerExecutorAdded, 
SparkListenerExecutorRemoved}
@@ -68,9 +66,8 @@ class SoftAffinitySuite extends QueryTest with 
SharedSparkSession with Predicate
       partition.files.map(_.filePath.toString),
       partition.preferredLocations())
 
-    val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations 
= locations)
     assertResult(Set("host-1", "host-2", "host-3")) {
-      nativePartition.preferredLocations().toSet
+      locations.toSet
     }
   }
 
@@ -99,10 +96,8 @@ class SoftAffinitySuite extends QueryTest with 
SharedSparkSession with Predicate
       partition.files.map(_.filePath.toString),
       partition.preferredLocations())
 
-    val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations 
= locations)
-
     assertResult(Set("192.168.22.1", "host-5", "host-2")) {
-      nativePartition.preferredLocations().toSet
+      locations.toSet
     }
   }
 
@@ -131,10 +126,8 @@ class SoftAffinitySuite extends QueryTest with 
SharedSparkSession with Predicate
       partition.files.map(_.filePath.toString),
       partition.preferredLocations())
 
-    val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations 
= locations)
-
     assertResult(Set("executor_192.168.22.1_1", "executor_10.1.1.33_6")) {
-      nativePartition.preferredLocations().toSet
+      locations.toSet
     }
   }
 
@@ -163,8 +156,6 @@ class SoftAffinitySuite extends QueryTest with 
SharedSparkSession with Predicate
       partition.files.map(_.filePath.toString),
       partition.preferredLocations())
 
-    val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations 
= locations)
-
     val affinityResultSet = if (scalaVersion.startsWith("2.12")) {
       Set("host-1", "host-5", "host-6")
     } else if (scalaVersion.startsWith("2.13")) {
@@ -172,7 +163,7 @@ class SoftAffinitySuite extends QueryTest with 
SharedSparkSession with Predicate
     }
 
     assertResult(affinityResultSet) {
-      nativePartition.preferredLocations().toSet
+      locations.toSet
     }
   }
 
diff --git 
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
 
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
index 1d97d65539..5bfb781e7a 100644
--- 
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
+++ 
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
@@ -17,10 +17,8 @@
 package org.apache.spark.softaffinity
 
 import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.GlutenPartition
 import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager}
 import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.substrait.plan.PlanBuilder
 
 import org.apache.spark.SparkConf
 import org.apache.spark.scheduler._
@@ -105,10 +103,8 @@ class SoftAffinityWithRDDInfoSuite extends QueryTest with 
SharedSparkSession wit
       // check location (executor 0) of dulicate reading is returned.
       val locations = SoftAffinity.getFilePartitionLocations(filePartition)
 
-      val nativePartition = new GlutenPartition(0, PlanBuilder.EMPTY_PLAN, 
locations = locations)
-
       assertResult(Set("executor_host-0_0")) {
-        nativePartition.preferredLocations().toSet
+        locations.toSet
       }
       softAffinityListener.onStageCompleted(stage1EndEvent)
       // stage 1 completed, check all middle status is cleared.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to