This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 48776fedf perf: reduce GC pressure in protobuf serialization (#3242)
48776fedf is described below
commit 48776fedf4496f6e1ee358d7ecc3ea702f53e873
Author: Andy Grove <[email protected]>
AuthorDate: Thu Jan 22 15:55:24 2026 -0700
perf: reduce GC pressure in protobuf serialization (#3242)
---
.../org/apache/comet/parquet/ParquetFilters.scala | 13 +++++++-----
.../spark/sql/comet/CometNativeWriteExec.scala | 23 ++++++++++++----------
.../org/apache/spark/sql/comet/operators.scala | 21 +++++++++++---------
3 files changed, 33 insertions(+), 24 deletions(-)
diff --git a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
index dbc3e17f8..f8da68d59 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala
@@ -19,7 +19,6 @@
package org.apache.comet.parquet
-import java.io.ByteArrayOutputStream
import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float
=> JFloat, Long => JLong, Short => JShort}
import java.math.{BigDecimal => JBigDecimal}
import java.sql.{Date, Timestamp}
@@ -43,6 +42,8 @@ import org.apache.spark.sql.sources
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
+import com.google.protobuf.CodedOutputStream
+
import org.apache.comet.parquet.SourceFilterSerde.{createBinaryExpr,
createNameExpr, createUnaryExpr, createValueExpr}
import org.apache.comet.serde.ExprOuterClass
import org.apache.comet.serde.QueryPlanSerde.scalarFunctionExprToProto
@@ -885,10 +886,12 @@ class ParquetFilters(
def createNativeFilters(predicates: Seq[sources.Filter]):
Option[Array[Byte]] = {
predicates.reduceOption(sources.And).flatMap(createNativeFilter).map {
expr =>
- val outputStream = new ByteArrayOutputStream()
- expr.writeTo(outputStream)
- outputStream.close()
- outputStream.toByteArray
+ val size = expr.getSerializedSize
+ val bytes = new Array[Byte](size)
+ val codedOutput = CodedOutputStream.newInstance(bytes)
+ expr.writeTo(codedOutput)
+ codedOutput.checkNoSpaceLeft()
+ bytes
}
}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala
index f153a691e..39e7ac6ee 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala
@@ -19,8 +19,6 @@
package org.apache.spark.sql.comet
-import java.io.ByteArrayOutputStream
-
import scala.jdk.CollectionConverters._
import org.apache.hadoop.fs.Path
@@ -34,6 +32,8 @@ import org.apache.spark.sql.execution.metric.{SQLMetric,
SQLMetrics}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
+import com.google.protobuf.CodedOutputStream
+
import org.apache.comet.CometExecIterator
import org.apache.comet.serde.OperatorOuterClass.Operator
@@ -75,10 +75,12 @@ case class CometNativeWriteExec(
sparkContext.collectionAccumulator[FileCommitProtocol.TaskCommitMessage]("taskCommitMessages")
override def serializedPlanOpt: SerializedPlan = {
- val outputStream = new ByteArrayOutputStream()
- nativeOp.writeTo(outputStream)
- outputStream.close()
- SerializedPlan(Some(outputStream.toByteArray))
+ val size = nativeOp.getSerializedSize
+ val bytes = new Array[Byte](size)
+ val codedOutput = CodedOutputStream.newInstance(bytes)
+ nativeOp.writeTo(codedOutput)
+ codedOutput.checkNoSpaceLeft()
+ SerializedPlan(Some(bytes))
}
override def withNewChildInternal(newChild: SparkPlan): SparkPlan =
@@ -196,10 +198,11 @@ case class CometNativeWriteExec(
val nativeMetrics = CometMetricNode.fromCometPlan(this)
- val outputStream = new ByteArrayOutputStream()
- modifiedNativeOp.writeTo(outputStream)
- outputStream.close()
- val planBytes = outputStream.toByteArray
+ val size = modifiedNativeOp.getSerializedSize
+ val planBytes = new Array[Byte](size)
+ val codedOutput = CodedOutputStream.newInstance(planBytes)
+ modifiedNativeOp.writeTo(codedOutput)
+ codedOutput.checkNoSpaceLeft()
val execIterator = new CometExecIterator(
CometExec.newIterId,
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 0a435e5b7..f4f97b831 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -19,7 +19,6 @@
package org.apache.spark.sql.comet
-import java.io.ByteArrayOutputStream
import java.util.Locale
import scala.collection.mutable
@@ -50,6 +49,7 @@ import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.io.ChunkedByteBuffer
import com.google.common.base.Objects
+import com.google.protobuf.CodedOutputStream
import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException,
ConfigEntry}
import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleEnabled,
withInfo}
@@ -139,10 +139,11 @@ object CometExec {
partitionIdx: Int,
broadcastedHadoopConfForEncryption:
Option[Broadcast[SerializableConfiguration]],
encryptedFilePaths: Seq[String]): CometExecIterator = {
- val outputStream = new ByteArrayOutputStream()
- nativePlan.writeTo(outputStream)
- outputStream.close()
- val bytes = outputStream.toByteArray
+ val size = nativePlan.getSerializedSize
+ val bytes = new Array[Byte](size)
+ val codedOutput = CodedOutputStream.newInstance(bytes)
+ nativePlan.writeTo(codedOutput)
+ codedOutput.checkNoSpaceLeft()
new CometExecIterator(
newIterId,
inputs,
@@ -414,10 +415,12 @@ abstract class CometNativeExec extends CometExec {
def convertBlock(): CometNativeExec = {
def transform(arg: Any): AnyRef = arg match {
case serializedPlan: SerializedPlan if serializedPlan.isEmpty =>
- val out = new ByteArrayOutputStream()
- nativeOp.writeTo(out)
- out.close()
- SerializedPlan(Some(out.toByteArray))
+ val size = nativeOp.getSerializedSize
+ val bytes = new Array[Byte](size)
+ val codedOutput = CodedOutputStream.newInstance(bytes)
+ nativeOp.writeTo(codedOutput)
+ codedOutput.checkNoSpaceLeft()
+ SerializedPlan(Some(bytes))
case other: AnyRef => other
case null => null
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]