This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 48776fedf perf: reduce GC pressure in protobuf serialization (#3242)
48776fedf is described below

commit 48776fedf4496f6e1ee358d7ecc3ea702f53e873
Author: Andy Grove <[email protected]>
AuthorDate: Thu Jan 22 15:55:24 2026 -0700

    perf: reduce GC pressure in protobuf serialization (#3242)
---
 .../org/apache/comet/parquet/ParquetFilters.scala  | 13 +++++++-----
 .../spark/sql/comet/CometNativeWriteExec.scala     | 23 ++++++++++++----------
 .../org/apache/spark/sql/comet/operators.scala     | 21 +++++++++++---------
 3 files changed, 33 insertions(+), 24 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala 
b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
index dbc3e17f8..f8da68d59 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
@@ -19,7 +19,6 @@
 
 package org.apache.comet.parquet
 
-import java.io.ByteArrayOutputStream
 import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float 
=> JFloat, Long => JLong, Short => JShort}
 import java.math.{BigDecimal => JBigDecimal}
 import java.sql.{Date, Timestamp}
@@ -43,6 +42,8 @@ import org.apache.spark.sql.sources
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String
 
+import com.google.protobuf.CodedOutputStream
+
 import org.apache.comet.parquet.SourceFilterSerde.{createBinaryExpr, 
createNameExpr, createUnaryExpr, createValueExpr}
 import org.apache.comet.serde.ExprOuterClass
 import org.apache.comet.serde.QueryPlanSerde.scalarFunctionExprToProto
@@ -885,10 +886,12 @@ class ParquetFilters(
 
   def createNativeFilters(predicates: Seq[sources.Filter]): 
Option[Array[Byte]] = {
     predicates.reduceOption(sources.And).flatMap(createNativeFilter).map { 
expr =>
-      val outputStream = new ByteArrayOutputStream()
-      expr.writeTo(outputStream)
-      outputStream.close()
-      outputStream.toByteArray
+      val size = expr.getSerializedSize
+      val bytes = new Array[Byte](size)
+      val codedOutput = CodedOutputStream.newInstance(bytes)
+      expr.writeTo(codedOutput)
+      codedOutput.checkNoSpaceLeft()
+      bytes
     }
   }
 
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala
index f153a691e..39e7ac6ee 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala
@@ -19,8 +19,6 @@
 
 package org.apache.spark.sql.comet
 
-import java.io.ByteArrayOutputStream
-
 import scala.jdk.CollectionConverters._
 
 import org.apache.hadoop.fs.Path
@@ -34,6 +32,8 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLMetrics}
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.Utils
 
+import com.google.protobuf.CodedOutputStream
+
 import org.apache.comet.CometExecIterator
 import org.apache.comet.serde.OperatorOuterClass.Operator
 
@@ -75,10 +75,12 @@ case class CometNativeWriteExec(
     
sparkContext.collectionAccumulator[FileCommitProtocol.TaskCommitMessage]("taskCommitMessages")
 
   override def serializedPlanOpt: SerializedPlan = {
-    val outputStream = new ByteArrayOutputStream()
-    nativeOp.writeTo(outputStream)
-    outputStream.close()
-    SerializedPlan(Some(outputStream.toByteArray))
+    val size = nativeOp.getSerializedSize
+    val bytes = new Array[Byte](size)
+    val codedOutput = CodedOutputStream.newInstance(bytes)
+    nativeOp.writeTo(codedOutput)
+    codedOutput.checkNoSpaceLeft()
+    SerializedPlan(Some(bytes))
   }
 
   override def withNewChildInternal(newChild: SparkPlan): SparkPlan =
@@ -196,10 +198,11 @@ case class CometNativeWriteExec(
 
       val nativeMetrics = CometMetricNode.fromCometPlan(this)
 
-      val outputStream = new ByteArrayOutputStream()
-      modifiedNativeOp.writeTo(outputStream)
-      outputStream.close()
-      val planBytes = outputStream.toByteArray
+      val size = modifiedNativeOp.getSerializedSize
+      val planBytes = new Array[Byte](size)
+      val codedOutput = CodedOutputStream.newInstance(planBytes)
+      modifiedNativeOp.writeTo(codedOutput)
+      codedOutput.checkNoSpaceLeft()
 
       val execIterator = new CometExecIterator(
         CometExec.newIterId,
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 0a435e5b7..f4f97b831 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -19,7 +19,6 @@
 
 package org.apache.spark.sql.comet
 
-import java.io.ByteArrayOutputStream
 import java.util.Locale
 
 import scala.collection.mutable
@@ -50,6 +49,7 @@ import org.apache.spark.util.SerializableConfiguration
 import org.apache.spark.util.io.ChunkedByteBuffer
 
 import com.google.common.base.Objects
+import com.google.protobuf.CodedOutputStream
 
 import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException, 
ConfigEntry}
 import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleEnabled, 
withInfo}
@@ -139,10 +139,11 @@ object CometExec {
       partitionIdx: Int,
       broadcastedHadoopConfForEncryption: 
Option[Broadcast[SerializableConfiguration]],
       encryptedFilePaths: Seq[String]): CometExecIterator = {
-    val outputStream = new ByteArrayOutputStream()
-    nativePlan.writeTo(outputStream)
-    outputStream.close()
-    val bytes = outputStream.toByteArray
+    val size = nativePlan.getSerializedSize
+    val bytes = new Array[Byte](size)
+    val codedOutput = CodedOutputStream.newInstance(bytes)
+    nativePlan.writeTo(codedOutput)
+    codedOutput.checkNoSpaceLeft()
     new CometExecIterator(
       newIterId,
       inputs,
@@ -414,10 +415,12 @@ abstract class CometNativeExec extends CometExec {
   def convertBlock(): CometNativeExec = {
     def transform(arg: Any): AnyRef = arg match {
       case serializedPlan: SerializedPlan if serializedPlan.isEmpty =>
-        val out = new ByteArrayOutputStream()
-        nativeOp.writeTo(out)
-        out.close()
-        SerializedPlan(Some(out.toByteArray))
+        val size = nativeOp.getSerializedSize
+        val bytes = new Array[Byte](size)
+        val codedOutput = CodedOutputStream.newInstance(bytes)
+        nativeOp.writeTo(codedOutput)
+        codedOutput.checkNoSpaceLeft()
+        SerializedPlan(Some(bytes))
       case other: AnyRef => other
       case null => null
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to