This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0d170be26c [CORE] Defer Protobuf serialization of SplitInfos in
GlutenPartitions (#10662)
0d170be26c is described below
commit 0d170be26c4153edadf34baa4484f0f8510fa75b
Author: kevinwilfong <[email protected]>
AuthorDate: Fri Sep 12 10:38:40 2025 -0700
[CORE] Defer Protobuf serialization of SplitInfos in GlutenPartitions
(#10662)
---
.../gluten/backendsapi/clickhouse/CHIteratorApi.scala | 18 ++++++++----------
.../gluten/execution/NativeFileScanColumnarRDD.scala | 4 +++-
.../org/apache/spark/affinity/MixedAffinitySuite.scala | 6 ++----
.../gluten/backendsapi/velox/VeloxIteratorApi.scala | 9 +++++----
.../org/apache/gluten/substrait/rel/SplitInfo.java | 4 ++--
.../gluten/execution/GlutenWholeStageColumnarRDD.scala | 9 ++++++---
.../apache/spark/softaffinity/SoftAffinitySuite.scala | 17 ++++-------------
.../softaffinity/SoftAffinityWithRDDInfoSuite.scala | 6 +-----
8 files changed, 31 insertions(+), 42 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 62ca100cc3..8d22bd2b1c 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -36,7 +36,7 @@ import
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
-import
org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuilder,
ExtensionTableNode}
+import
org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
import org.apache.spark.sql.execution.datasources.mergetree.PartSerializer
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
@@ -244,26 +244,22 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
val planByteArray = wsCtx.root.toProtobuf.toByteArray
splitInfos.zipWithIndex.map {
case (splits, index) =>
- val splitInfosByteArray = splits.zipWithIndex.map {
+ val splitInfos = splits.zipWithIndex.map {
case (split, i) =>
split match {
case filesNode: LocalFilesNode if
leaves(i).isInstanceOf[BasicScanExecTransformer] =>
setFileSchemaForLocalFiles(
filesNode,
leaves(i).asInstanceOf[BasicScanExecTransformer])
- filesNode.toProtobuf.toByteArray
- case extensionTableNode: ExtensionTableNode =>
- extensionTableNode.toProtobuf.toByteArray
- case kafkaSourceNode: StreamKafkaSourceNode =>
- kafkaSourceNode.toProtobuf.toByteArray
+ filesNode
+ case splitInfo => splitInfo
}
}
GlutenPartition(
index,
planByteArray,
- splitInfosByteArray.toArray,
- locations = splits.flatMap(_.preferredLocations().asScala).toArray
+ splitInfos.toArray
)
}
}
@@ -289,7 +285,9 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
"CH backend only accepts GlutenPartition in
GlutenWholeStageColumnarRDD.")
val splitInfoByteArray = inputPartition
.asInstanceOf[GlutenPartition]
- .splitInfosByteArray
+ .splitInfos
+ .map(splitInfo => splitInfo.toProtobuf.toByteArray)
+ .toArray
val wsPlan = inputPartition.plan
val materializeInput = false
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
index abe8ea0b94..cf0508d6cb 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
@@ -45,7 +45,9 @@ class NativeFileScanColumnarRDD(
val splitInfoByteArray = inputPartition
.asInstanceOf[GlutenPartition]
- .splitInfosByteArray
+ .splitInfos
+ .map(splitInfo => splitInfo.toProtobuf.toByteArray)
+ .toArray
val resIter = GlutenTimeMetric.millis(scanTime) {
_ =>
diff --git
a/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
b/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
index 26ab84d460..6c95bfc156 100644
---
a/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
@@ -16,9 +16,8 @@
*/
package org.apache.spark.affinity
-import org.apache.gluten.execution.{GlutenMergeTreePartition, GlutenPartition,
MergeTreePartSplit}
+import org.apache.gluten.execution.{GlutenMergeTreePartition,
MergeTreePartSplit}
import org.apache.gluten.softaffinity.AffinityManager
-import org.apache.gluten.substrait.plan.PlanBuilder
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession
@@ -68,9 +67,8 @@ class MixedAffinitySuite extends QueryTest with
SharedSparkSession {
StructType(Seq()),
Map.empty)
val locations = affinity.getNativeMergeTreePartitionLocations(partition)
- val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations
= locations)
assertResult(Set("forced_host_host-0")) {
- nativePartition.preferredLocations().toSet
+ locations.toSet
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index ee1889d340..3ce8dbcd94 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -23,7 +23,7 @@ import org.apache.gluten.iterator.Iterators
import org.apache.gluten.metrics.{IMetrics, IteratorMetricsJniWrapper}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.plan.PlanNode
-import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode,
SplitInfo}
+import org.apache.gluten.substrait.rel.{LocalFilesBuilder, SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.vectorized._
@@ -144,8 +144,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
GlutenPartition(
index,
planByteArray,
-
splitInfos.map(_.asInstanceOf[LocalFilesNode].toProtobuf.toByteArray).toArray,
- splitInfos.flatMap(_.preferredLocations().asScala).toArray
+ splitInfos.toArray
)
}
}
@@ -241,7 +240,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
val splitInfoByteArray = inputPartition
.asInstanceOf[GlutenPartition]
- .splitInfosByteArray
+ .splitInfos
+ .map(splitInfo => splitInfo.toProtobuf.toByteArray)
+ .toArray
val spillDirPath = SparkDirectoryUtil
.get()
.namespace("gluten-spill")
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/SplitInfo.java
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/SplitInfo.java
index 6e2eeff7d8..070a5b5a74 100644
---
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/SplitInfo.java
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/SplitInfo.java
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.substrait.rel;
-import com.google.protobuf.MessageOrBuilder;
+import com.google.protobuf.Message;
import java.io.Serializable;
import java.util.List;
@@ -30,5 +30,5 @@ public interface SplitInfo extends Serializable {
/** The preferred locations where the table files returned by this read
split can run faster. */
List<String> preferredLocations();
- MessageOrBuilder toProtobuf();
+ Message toProtobuf();
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
index d332e3774a..263d56720c 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics}
+import org.apache.gluten.substrait.rel.SplitInfo
import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
import org.apache.spark.rdd.RDD
@@ -26,6 +27,8 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
+import scala.collection.JavaConverters.asScalaBufferConverter
+
trait BaseGlutenPartition extends Partition with InputPartition {
def plan: Array[Byte]
}
@@ -33,13 +36,13 @@ trait BaseGlutenPartition extends Partition with
InputPartition {
case class GlutenPartition(
index: Int,
plan: Array[Byte],
- splitInfosByteArray: Array[Array[Byte]] = Array.empty[Array[Byte]],
- locations: Array[String] = Array.empty[String],
+ splitInfos: Array[SplitInfo] = Array.empty[SplitInfo],
files: Array[String] =
Array.empty[String] // touched files, for implementing UDF
input_file_name
) extends BaseGlutenPartition {
- override def preferredLocations(): Array[String] = locations
+ override def preferredLocations(): Array[String] =
+ splitInfos.flatMap(_.preferredLocations().asScala)
}
case class FirstZippedPartitionsPartition(
diff --git
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
index fced0424d9..f802daccfa 100644
---
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
+++
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala
@@ -17,10 +17,8 @@
package org.apache.spark.softaffinity
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.GlutenPartition
import org.apache.gluten.softaffinity.SoftAffinityManager
import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.substrait.plan.PlanBuilder
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListenerExecutorAdded,
SparkListenerExecutorRemoved}
@@ -68,9 +66,8 @@ class SoftAffinitySuite extends QueryTest with
SharedSparkSession with Predicate
partition.files.map(_.filePath.toString),
partition.preferredLocations())
- val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations
= locations)
assertResult(Set("host-1", "host-2", "host-3")) {
- nativePartition.preferredLocations().toSet
+ locations.toSet
}
}
@@ -99,10 +96,8 @@ class SoftAffinitySuite extends QueryTest with
SharedSparkSession with Predicate
partition.files.map(_.filePath.toString),
partition.preferredLocations())
- val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations
= locations)
-
assertResult(Set("192.168.22.1", "host-5", "host-2")) {
- nativePartition.preferredLocations().toSet
+ locations.toSet
}
}
@@ -131,10 +126,8 @@ class SoftAffinitySuite extends QueryTest with
SharedSparkSession with Predicate
partition.files.map(_.filePath.toString),
partition.preferredLocations())
- val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations
= locations)
-
assertResult(Set("executor_192.168.22.1_1", "executor_10.1.1.33_6")) {
- nativePartition.preferredLocations().toSet
+ locations.toSet
}
}
@@ -163,8 +156,6 @@ class SoftAffinitySuite extends QueryTest with
SharedSparkSession with Predicate
partition.files.map(_.filePath.toString),
partition.preferredLocations())
- val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations
= locations)
-
val affinityResultSet = if (scalaVersion.startsWith("2.12")) {
Set("host-1", "host-5", "host-6")
} else if (scalaVersion.startsWith("2.13")) {
@@ -172,7 +163,7 @@ class SoftAffinitySuite extends QueryTest with
SharedSparkSession with Predicate
}
assertResult(affinityResultSet) {
- nativePartition.preferredLocations().toSet
+ locations.toSet
}
}
diff --git
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
index 1d97d65539..5bfb781e7a 100644
---
a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
+++
b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinityWithRDDInfoSuite.scala
@@ -17,10 +17,8 @@
package org.apache.spark.softaffinity
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.GlutenPartition
import org.apache.gluten.softaffinity.{AffinityManager, SoftAffinityManager}
import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.substrait.plan.PlanBuilder
import org.apache.spark.SparkConf
import org.apache.spark.scheduler._
@@ -105,10 +103,8 @@ class SoftAffinityWithRDDInfoSuite extends QueryTest with
SharedSparkSession wit
// check location (executor 0) of dulicate reading is returned.
val locations = SoftAffinity.getFilePartitionLocations(filePartition)
- val nativePartition = new GlutenPartition(0, PlanBuilder.EMPTY_PLAN,
locations = locations)
-
assertResult(Set("executor_host-0_0")) {
- nativePartition.preferredLocations().toSet
+ locations.toSet
}
softAffinityListener.onStageCompleted(stage1EndEvent)
// stage 1 completed, check all middle status is cleared.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]