This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c311d16c perf: Mark more operators as FFI safe to avoid deep copies 
(#3765)
4c311d16c is described below

commit 4c311d16c85b4d4e6157c31d72b093ee993e79cb
Author: Andy Grove <[email protected]>
AuthorDate: Mon Mar 23 07:21:11 2026 -0700

    perf: Mark more operators as FFI safe to avoid deep copies (#3765)
    
    * mark more operators as ffi safe
    
    * convert CometLocalTableScanExec to extend CometSink
    
    * scalastyle
---
 .../apache/comet/serde/operator/CometSink.scala    |  4 +---
 .../sql/comet/CometBroadcastExchangeExec.scala     |  7 -------
 .../spark/sql/comet/CometLocalTableScanExec.scala  | 23 +++++-----------------
 .../spark/sql/comet/CometSparkToColumnarExec.scala |  4 ++++
 4 files changed, 10 insertions(+), 28 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala 
b/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala
index 71faab2a4..845803d13 100644
--- a/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala
@@ -41,7 +41,7 @@ import 
org.apache.comet.serde.QueryPlanSerde.{serializeDataType, supportedDataTy
 abstract class CometSink[T <: SparkPlan] extends CometOperatorSerde[T] {
 
   /** Whether the data produced by the Comet operator is FFI safe */
-  def isFfiSafe: Boolean = false
+  def isFfiSafe: Boolean = true
 
   override def enabledConfig: Option[ConfigEntry[Boolean]] = None
 
@@ -90,8 +90,6 @@ abstract class CometSink[T <: SparkPlan] extends 
CometOperatorSerde[T] {
 
 object CometExchangeSink extends CometSink[SparkPlan] {
 
-  override def isFfiSafe: Boolean = true
-
   override def convert(
       op: SparkPlan,
       builder: Operator.Builder,
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
index 4a323e575..8012b18b2 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
@@ -279,13 +279,6 @@ case class CometBroadcastExchangeExec(
 
 object CometBroadcastExchangeExec extends CometSink[BroadcastExchangeExec] {
 
-  /**
-   * Exchange data is FFI safe because there is no use of mutable buffers 
involved.
-   *
-   * Source of broadcast exchange batches is ArrowStreamReader.
-   */
-  override def isFfiSafe: Boolean = true
-
   override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
     CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED)
 
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala
index b804fe347..68a2ebf8e 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala
@@ -19,8 +19,6 @@
 
 package org.apache.spark.sql.comet
 
-import scala.jdk.CollectionConverters._
-
 import org.apache.spark.TaskContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -34,9 +32,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 import com.google.common.base.Objects
 
 import org.apache.comet.{CometConf, ConfigEntry}
-import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
 import org.apache.comet.serde.OperatorOuterClass.Operator
-import org.apache.comet.serde.QueryPlanSerde.serializeDataType
+import org.apache.comet.serde.operator.CometSink
 
 case class CometLocalTableScanExec(
     originalPlan: LocalTableScanExec,
@@ -106,24 +103,14 @@ case class CometLocalTableScanExec(
   override def hashCode(): Int = Objects.hashCode(originalPlan, 
originalPlan.schema, output)
 }
 
-object CometLocalTableScanExec extends CometOperatorSerde[LocalTableScanExec] {
+object CometLocalTableScanExec extends CometSink[LocalTableScanExec] {
+
+  // uses CometArrowConverters, which re-uses arrays
+  override def isFfiSafe: Boolean = false
 
   override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
     CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED)
 
-  override def convert(
-      op: LocalTableScanExec,
-      builder: Operator.Builder,
-      childOp: Operator*): Option[Operator] = {
-    val scanTypes = op.output.flatten(attr => serializeDataType(attr.dataType))
-    val scanBuilder = OperatorOuterClass.Scan
-      .newBuilder()
-      .setSource(op.getClass.getSimpleName)
-      .addAllFields(scanTypes.asJava)
-      .setArrowFfiSafe(false)
-    Some(builder.setScan(scanBuilder).build())
-  }
-
   override def createExec(nativeOp: Operator, op: LocalTableScanExec): 
CometNativeExec = {
     CometScanWrapper(nativeOp, CometLocalTableScanExec(op, op.rows, op.output))
   }
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
index a8a61e7a7..8447c9d04 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
@@ -139,6 +139,10 @@ case class CometSparkToColumnarExec(child: SparkPlan)
 }
 
 object CometSparkToColumnarExec extends CometSink[SparkPlan] with 
DataTypeSupport {
+
+  // uses CometArrowConverters, which re-uses arrays
+  override def isFfiSafe: Boolean = false
+
   override def createExec(
       nativeOp: OperatorOuterClass.Operator,
       op: SparkPlan): CometNativeExec = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to