This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch set-ffi-safe-for-native-iceberg-compat in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
commit 18098243f854afe237ad0363b1805803b6631a93 Author: Andy Grove <[email protected]> AuthorDate: Thu Feb 5 10:37:25 2026 -0700 feat: Set ffi_safe flag conditionally for native_iceberg_compat scans For native_iceberg_compat scans that have no partition columns and no missing columns, set arrow_ffi_safe=true on the Scan protobuf. In this case all Arrow arrays come from parquet file data with non-reused buffers, so a cheap clone suffices instead of a deep copy on the native side. Co-Authored-By: Claude Opus 4.5 <[email protected]> --- .../org/apache/spark/sql/comet/operators.scala | 24 ++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 6f33467ef..43b7a9e2f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -1904,6 +1904,30 @@ case class CometSortMergeJoinExec( } object CometScanWrapper extends CometSink[SparkPlan] { + override def convert( + op: SparkPlan, + builder: Operator.Builder, + childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = { + val result = super.convert(op, builder, childOp: _*) + result.map { operator => + op match { + case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT => + val hasPartitionColumns = scan.relation.partitionSchema.nonEmpty + val hasMissingColumns = scan.requiredSchema.fields.exists { field => + !scan.relation.dataSchema.fieldNames.contains(field.name) + } + val ffiSafe = !hasPartitionColumns && !hasMissingColumns + if (ffiSafe) { + val scanProto = operator.getScan.toBuilder.setArrowFfiSafe(true).build() + operator.toBuilder.setScan(scanProto).build() + } else { + operator + } + case _ => operator + } + } + } + override def createExec(nativeOp: Operator, op: SparkPlan): CometNativeExec = { CometScanWrapper(nativeOp, op) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
