This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch set-ffi-safe-for-native-iceberg-compat
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git

commit 18098243f854afe237ad0363b1805803b6631a93
Author: Andy Grove <[email protected]>
AuthorDate: Thu Feb 5 10:37:25 2026 -0700

    feat: Set ffi_safe flag conditionally for native_iceberg_compat scans
    
    For native_iceberg_compat scans that have no partition columns and no
    missing columns, set arrow_ffi_safe=true on the Scan protobuf. In this
    case all Arrow arrays come from parquet file data with non-reused
    buffers, so a cheap clone suffices instead of a deep copy on the native
    side.
    
    Co-Authored-By: Claude Opus 4.5 <[email protected]>
---
 .../org/apache/spark/sql/comet/operators.scala     | 24 ++++++++++++++++++++++
 1 file changed, 24 insertions(+)

diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 6f33467ef..43b7a9e2f 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -1904,6 +1904,30 @@ case class CometSortMergeJoinExec(
 }
 
 object CometScanWrapper extends CometSink[SparkPlan] {
+  override def convert(
+      op: SparkPlan,
+      builder: Operator.Builder,
+      childOp: OperatorOuterClass.Operator*): 
Option[OperatorOuterClass.Operator] = {
+    val result = super.convert(op, builder, childOp: _*)
+    result.map { operator =>
+      op match {
+        case scan: CometScanExec if scan.scanImpl == 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT =>
+          val hasPartitionColumns = scan.relation.partitionSchema.nonEmpty
+          val hasMissingColumns = scan.requiredSchema.fields.exists { field =>
+            !scan.relation.dataSchema.fieldNames.contains(field.name)
+          }
+          val ffiSafe = !hasPartitionColumns && !hasMissingColumns
+          if (ffiSafe) {
+            val scanProto = 
operator.getScan.toBuilder.setArrowFfiSafe(true).build()
+            operator.toBuilder.setScan(scanProto).build()
+          } else {
+            operator
+          }
+        case _ => operator
+      }
+    }
+  }
+
   override def createExec(nativeOp: Operator, op: SparkPlan): CometNativeExec 
= {
     CometScanWrapper(nativeOp, op)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to