This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d6ed6115 perf: cache serialized query plans to avoid per-partition 
serialization (#3246)
5d6ed6115 is described below

commit 5d6ed6115b7893aea3b82d42fcb3946cdfb29f8f
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jan 23 06:30:14 2026 -0700

    perf: cache serialized query plans to avoid per-partition serialization 
(#3246)
---
 .../apache/spark/sql/comet/CometExecUtils.scala    |  6 ++--
 .../sql/comet/CometTakeOrderedAndProjectExec.scala | 29 ++++++++++-----
 .../org/apache/spark/sql/comet/operators.scala     | 41 +++++++++++++++++++---
 3 files changed, 60 insertions(+), 16 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala
index fd97fe3fa..a2af60142 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala
@@ -53,9 +53,11 @@ object CometExecUtils {
       limit: Int,
       offset: Int = 0): RDD[ColumnarBatch] = {
     val numParts = childPlan.getNumPartitions
+    // Serialize the plan once before mapping to avoid repeated serialization 
per partition
+    val limitOp = CometExecUtils.getLimitNativePlan(outputAttribute, limit, 
offset).get
+    val serializedPlan = CometExec.serializeNativePlan(limitOp)
     childPlan.mapPartitionsWithIndexInternal { case (idx, iter) =>
-      val limitOp = CometExecUtils.getLimitNativePlan(outputAttribute, limit, 
offset).get
-      CometExec.getCometIterator(Seq(iter), outputAttribute.length, limitOp, 
numParts, idx)
+      CometExec.getCometIterator(Seq(iter), outputAttribute.length, 
serializedPlan, numParts, idx)
     }
   }
 
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
index 2517c19f2..2abe78317 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
@@ -133,12 +133,15 @@ case class CometTakeOrderedAndProjectExec(
           CometExecUtils.getNativeLimitRDD(childRDD, child.output, limit)
         } else {
           val numParts = childRDD.getNumPartitions
+          // Serialize the plan once before mapping to avoid repeated 
serialization per partition
+          val topK =
+            CometExecUtils
+              .getTopKNativePlan(child.output, sortOrder, child, limit)
+              .get
+          val serializedTopK = CometExec.serializeNativePlan(topK)
+          val numOutputCols = child.output.length
           childRDD.mapPartitionsWithIndexInternal { case (idx, iter) =>
-            val topK =
-              CometExecUtils
-                .getTopKNativePlan(child.output, sortOrder, child, limit)
-                .get
-            CometExec.getCometIterator(Seq(iter), child.output.length, topK, 
numParts, idx)
+            CometExec.getCometIterator(Seq(iter), numOutputCols, 
serializedTopK, numParts, idx)
           }
         }
 
@@ -154,11 +157,19 @@ case class CometTakeOrderedAndProjectExec(
         new CometShuffledBatchRDD(dep, readMetrics)
       }
 
+      // Serialize the plan once before mapping to avoid repeated 
serialization per partition
+      val topKAndProjection = CometExecUtils
+        .getProjectionNativePlan(projectList, child.output, sortOrder, child, 
limit, offset)
+        .get
+      val serializedTopKAndProjection = 
CometExec.serializeNativePlan(topKAndProjection)
+      val finalOutputLength = output.length
       singlePartitionRDD.mapPartitionsInternal { iter =>
-        val topKAndProjection = CometExecUtils
-          .getProjectionNativePlan(projectList, child.output, sortOrder, 
child, limit, offset)
-          .get
-        val it = CometExec.getCometIterator(Seq(iter), output.length, 
topKAndProjection, 1, 0)
+        val it = CometExec.getCometIterator(
+          Seq(iter),
+          finalOutputLength,
+          serializedTopKAndProjection,
+          1,
+          0)
         setSubqueries(it.id, this)
 
         Option(TaskContext.get()).foreach { context =>
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index f4f97b831..cb7098617 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -113,6 +113,19 @@ object CometExec {
 
   def newIterId: Long = curId.getAndIncrement()
 
+  /**
+   * Serialize a native plan to bytes. Use this method to serialize the plan 
once before calling
+   * getCometIterator for each partition, avoiding repeated serialization.
+   */
+  def serializeNativePlan(nativePlan: Operator): Array[Byte] = {
+    val size = nativePlan.getSerializedSize
+    val bytes = new Array[Byte](size)
+    val codedOutput = CodedOutputStream.newInstance(bytes)
+    nativePlan.writeTo(codedOutput)
+    codedOutput.checkNoSpaceLeft()
+    bytes
+  }
+
   def getCometIterator(
       inputs: Seq[Iterator[ColumnarBatch]],
       numOutputCols: Int,
@@ -130,6 +143,28 @@ object CometExec {
       encryptedFilePaths = Seq.empty)
   }
 
+  /**
+   * Create a CometExecIterator with a pre-serialized native plan. Use this 
overload when
+   * executing the same plan across multiple partitions to avoid serializing 
the plan repeatedly.
+   */
+  def getCometIterator(
+      inputs: Seq[Iterator[ColumnarBatch]],
+      numOutputCols: Int,
+      serializedPlan: Array[Byte],
+      numParts: Int,
+      partitionIdx: Int): CometExecIterator = {
+    new CometExecIterator(
+      newIterId,
+      inputs,
+      numOutputCols,
+      serializedPlan,
+      CometMetricNode(Map.empty),
+      numParts,
+      partitionIdx,
+      broadcastedHadoopConfForEncryption = None,
+      encryptedFilePaths = Seq.empty)
+  }
+
   def getCometIterator(
       inputs: Seq[Iterator[ColumnarBatch]],
       numOutputCols: Int,
@@ -139,11 +174,7 @@ object CometExec {
       partitionIdx: Int,
       broadcastedHadoopConfForEncryption: 
Option[Broadcast[SerializableConfiguration]],
       encryptedFilePaths: Seq[String]): CometExecIterator = {
-    val size = nativePlan.getSerializedSize
-    val bytes = new Array[Byte](size)
-    val codedOutput = CodedOutputStream.newInstance(bytes)
-    nativePlan.writeTo(codedOutput)
-    codedOutput.checkNoSpaceLeft()
+    val bytes = serializeNativePlan(nativePlan)
     new CometExecIterator(
       newIterId,
       inputs,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to