This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 4c311d16c perf: Mark more operators as FFI safe to avoid deep copies
(#3765)
4c311d16c is described below
commit 4c311d16c85b4d4e6157c31d72b093ee993e79cb
Author: Andy Grove <[email protected]>
AuthorDate: Mon Mar 23 07:21:11 2026 -0700
perf: Mark more operators as FFI safe to avoid deep copies (#3765)
* mark more operators as ffi safe
* convert CometLocalTableScanExec to extend CometSink
* scalastyle
---
.../apache/comet/serde/operator/CometSink.scala | 4 +---
.../sql/comet/CometBroadcastExchangeExec.scala | 7 -------
.../spark/sql/comet/CometLocalTableScanExec.scala | 23 +++++-----------------
.../spark/sql/comet/CometSparkToColumnarExec.scala | 4 ++++
4 files changed, 10 insertions(+), 28 deletions(-)
diff --git
a/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala
b/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala
index 71faab2a4..845803d13 100644
--- a/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala
@@ -41,7 +41,7 @@ import
org.apache.comet.serde.QueryPlanSerde.{serializeDataType, supportedDataTy
abstract class CometSink[T <: SparkPlan] extends CometOperatorSerde[T] {
/** Whether the data produced by the Comet operator is FFI safe */
- def isFfiSafe: Boolean = false
+ def isFfiSafe: Boolean = true
override def enabledConfig: Option[ConfigEntry[Boolean]] = None
@@ -90,8 +90,6 @@ abstract class CometSink[T <: SparkPlan] extends
CometOperatorSerde[T] {
object CometExchangeSink extends CometSink[SparkPlan] {
- override def isFfiSafe: Boolean = true
-
override def convert(
op: SparkPlan,
builder: Operator.Builder,
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
index 4a323e575..8012b18b2 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
@@ -279,13 +279,6 @@ case class CometBroadcastExchangeExec(
object CometBroadcastExchangeExec extends CometSink[BroadcastExchangeExec] {
- /**
- * Exchange data is FFI safe because there is no use of mutable buffers
involved.
- *
- * Source of broadcast exchange batches is ArrowStreamReader.
- */
- override def isFfiSafe: Boolean = true
-
override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED)
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala
index b804fe347..68a2ebf8e 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala
@@ -19,8 +19,6 @@
package org.apache.spark.sql.comet
-import scala.jdk.CollectionConverters._
-
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -34,9 +32,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
import com.google.common.base.Objects
import org.apache.comet.{CometConf, ConfigEntry}
-import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
import org.apache.comet.serde.OperatorOuterClass.Operator
-import org.apache.comet.serde.QueryPlanSerde.serializeDataType
+import org.apache.comet.serde.operator.CometSink
case class CometLocalTableScanExec(
originalPlan: LocalTableScanExec,
@@ -106,24 +103,14 @@ case class CometLocalTableScanExec(
override def hashCode(): Int = Objects.hashCode(originalPlan,
originalPlan.schema, output)
}
-object CometLocalTableScanExec extends CometOperatorSerde[LocalTableScanExec] {
+object CometLocalTableScanExec extends CometSink[LocalTableScanExec] {
+
+ // uses CometArrowConverters, which re-uses arrays
+ override def isFfiSafe: Boolean = false
override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED)
- override def convert(
- op: LocalTableScanExec,
- builder: Operator.Builder,
- childOp: Operator*): Option[Operator] = {
- val scanTypes = op.output.flatten(attr => serializeDataType(attr.dataType))
- val scanBuilder = OperatorOuterClass.Scan
- .newBuilder()
- .setSource(op.getClass.getSimpleName)
- .addAllFields(scanTypes.asJava)
- .setArrowFfiSafe(false)
- Some(builder.setScan(scanBuilder).build())
- }
-
override def createExec(nativeOp: Operator, op: LocalTableScanExec):
CometNativeExec = {
CometScanWrapper(nativeOp, CometLocalTableScanExec(op, op.rows, op.output))
}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
index a8a61e7a7..8447c9d04 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
@@ -139,6 +139,10 @@ case class CometSparkToColumnarExec(child: SparkPlan)
}
object CometSparkToColumnarExec extends CometSink[SparkPlan] with
DataTypeSupport {
+
+ // uses CometArrowConverters, which re-uses arrays
+ override def isFfiSafe: Boolean = false
+
override def createExec(
nativeOp: OperatorOuterClass.Operator,
op: SparkPlan): CometNativeExec = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]