Copilot commented on code in PR #2016: URL: https://github.com/apache/auron/pull/2016#discussion_r2886559160
########## thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.auron.iceberg + +import scala.collection.JavaConverters._ + +import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns} +import org.apache.iceberg.expressions.Expressions +import org.apache.spark.internal.Logging +import org.apache.spark.sql.auron.NativeConverters +import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.types.StructType + +final case class IcebergScanPlan( + fileTasks: Seq[FileScanTask], + fileFormat: FileFormat, + readSchema: StructType) + +object IcebergScanSupport extends Logging { + + def plan(exec: BatchScanExec): Option[IcebergScanPlan] = { + val scan = exec.scan + val scanClassName = scan.getClass.getName + // Only handle Iceberg scans; other sources must stay on Spark's path. + if (!scanClassName.startsWith("org.apache.iceberg.spark.source.")) { + return None + } + + // Changelog scan carries row-level changes; not supported by native COW-only path. + if (scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan") { + return None + } + + val readSchema = scan.readSchema + // Native scan does not support Iceberg metadata columns (e.g. _file, _pos). + if (hasMetadataColumns(readSchema)) { + return None + } + + if (!readSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) { + return None + } + + val partitions = inputPartitions(exec) + // Empty scan (e.g. empty table) should still build a plan to return no rows. + if (partitions.isEmpty) { + return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema)) Review Comment: `plan()` treats `partitions.isEmpty` as a valid empty-table scan and returns a native plan with no tasks. Since `inputPartitions()` can also return `Seq.empty` on planning/reflection failures, this can incorrectly convert a non-empty table into an empty native scan. Consider distinguishing “truly empty scan” from “failed to obtain partitions” (e.g., make `inputPartitions` return `Option[Seq[InputPartition]]` and return `None` from `plan()` on failures). ```suggestion // If no partitions are available, do not attempt a native empty scan. This can indicate // either a truly empty table or a failure to obtain partitions; in both cases, fall back // to Spark's scan path to avoid silently dropping data on planning failures. if (partitions.isEmpty) { logWarning(s"Falling back to Spark scan for $scanClassName: no input partitions available.") return None ``` ########## thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.auron.iceberg + +import scala.collection.JavaConverters._ + +import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns} +import org.apache.iceberg.expressions.Expressions +import org.apache.spark.internal.Logging +import org.apache.spark.sql.auron.NativeConverters +import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.types.StructType + +final case class IcebergScanPlan( + fileTasks: Seq[FileScanTask], + fileFormat: FileFormat, + readSchema: StructType) + +object IcebergScanSupport extends Logging { + + def plan(exec: BatchScanExec): Option[IcebergScanPlan] = { + val scan = exec.scan + val scanClassName = scan.getClass.getName + // Only handle Iceberg scans; other sources must stay on Spark's path. + if (!scanClassName.startsWith("org.apache.iceberg.spark.source.")) { + return None + } + + // Changelog scan carries row-level changes; not supported by native COW-only path. + if (scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan") { + return None + } + + val readSchema = scan.readSchema + // Native scan does not support Iceberg metadata columns (e.g. _file, _pos). + if (hasMetadataColumns(readSchema)) { + return None + } + + if (!readSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) { + return None + } + + val partitions = inputPartitions(exec) + // Empty scan (e.g. empty table) should still build a plan to return no rows. + if (partitions.isEmpty) { + return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema)) + } + + val icebergPartitions = partitions.flatMap(icebergPartition) + // All partitions must be Iceberg SparkInputPartition; otherwise fallback. + if (icebergPartitions.size != partitions.size) { + return None + } + + val fileTasks = icebergPartitions.flatMap(_.fileTasks) + + // Native scan does not apply delete files; only allow pure data files (COW). + if (!fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty)) { + return None + } + + // Residual filters require row-level evaluation, not supported in native scan. + if (!fileTasks.forall(task => Expressions.alwaysTrue().equals(task.residual()))) { + return None + } + + // Native scan handles a single file format; mixed formats must fallback. + val formats = fileTasks.map(_.file().format()).distinct + if (formats.size > 1) { + return None + } + + val format = formats.headOption.getOrElse(FileFormat.PARQUET) + if (format != FileFormat.PARQUET && format != FileFormat.ORC) { + return None + } + + Some(IcebergScanPlan(fileTasks, format, readSchema)) + } + + private def hasMetadataColumns(schema: StructType): Boolean = + schema.fields.exists(field => MetadataColumns.isMetadataColumn(field.name)) + + private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = { + // Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection. + val fromBatch = + try { + val batch = exec.scan.toBatch + if (batch != null) { + batch.planInputPartitions().toSeq + } else { + Seq.empty + } + } catch { + case t: Throwable => + logWarning( + s"Failed to plan input partitions via DataSource V2 batch API for " + + s"${exec.getClass.getName}; falling back to reflective methods.", + t) + Seq.empty + } + if (fromBatch.nonEmpty) { + return fromBatch + } + + // Some Spark versions expose partitions through inputPartitions/partitions methods on BatchScanExec. + val methods = exec.getClass.getMethods + val inputPartitionsMethod = methods.find(_.getName == "inputPartitions") + val partitionsMethod = methods.find(_.getName == "partitions") + + val raw = inputPartitionsMethod + .orElse(partitionsMethod) + .map(_.invoke(exec)) + .getOrElse(Seq.empty) Review Comment: The reflective call `_.invoke(exec)` can throw (e.g., `InvocationTargetException`, `IllegalAccessException`) and currently isn’t caught. Because `isSupported()` is called outside `tryConvert`, any exception here can fail query planning rather than falling back. Wrap the reflective invocation + normalization in a `try/catch` (e.g., `NonFatal`) and return `Seq.empty`/failure so the provider can safely fall back. ########## thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala: ########## @@ -31,4 +39,200 @@ class AuronIcebergIntegrationSuite } } + test("iceberg native scan is applied for simple COW table") { + withTable("local.db.t2") { + sql("create table local.db.t2 using iceberg as select 1 as id, 'a' as v") + val df = sql("select * from local.db.t2") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg native scan is applied for empty COW table") { + withTable("local.db.t_empty") { + sql(""" + |create table local.db.t_empty (id int, v string) + |using iceberg + |tblproperties ( + | 'format-version' = '2' + |) + |""".stripMargin) + val df = sql("select * from local.db.t_empty") + checkAnswer(df, Seq.empty) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg native scan is applied for projection on COW table") { + withTable("local.db.t3") { + sql("create table local.db.t3 using iceberg as select 1 as id, 'a' as v") + val df = sql("select id from local.db.t3") + checkAnswer(df, Seq(Row(1))) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg native scan is applied for partitioned COW table with filter") { + withTable("local.db.t_partition") { + sql(""" + |create table local.db.t_partition (id int, v string, p string) + |using iceberg + |partitioned by (p) + |""".stripMargin) + sql("insert into local.db.t_partition values (1, 'a', 'p1'), (2, 'b', 'p2')") + val df = sql("select * from local.db.t_partition where p = 'p1'") + checkAnswer(df, Seq(Row(1, "a", "p1"))) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg native scan is applied for ORC COW table") { + withTable("local.db.t_orc") { + sql(""" + |create table local.db.t_orc (id int, v string) + |using iceberg + |tblproperties ('write.format.default' = 'orc') + |""".stripMargin) + sql("insert into local.db.t_orc values (1, 'a'), (2, 'b')") + val df = sql("select * from local.db.t_orc") + checkAnswer(df, Seq(Row(1, "a"), Row(2, "b"))) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg native scan is applied when delete files are null (format v1)") { + withTable("local.db.t_v1") { + sql(""" + |create table local.db.t_v1 (id int, v string) + |using iceberg + |tblproperties ('format-version' = '1') + |""".stripMargin) + sql("insert into local.db.t_v1 values (1, 'a'), (2, 'b')") + val icebergTable = Spark3Util.loadIcebergTable(spark, "local.db.t_v1") + val scanTasks = icebergTable.newScan().planFiles() + val allDeletesEmpty = + try { + scanTasks + .iterator() + .asScala + .forall(task => task.deletes() == null || task.deletes().isEmpty) + } finally { + scanTasks.close() + } + assert(allDeletesEmpty) + val df = sql("select * from local.db.t_v1") + checkAnswer(df, Seq(Row(1, "a"), Row(2, "b"))) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan falls back for residual filters on data columns") { + withTable("local.db.t_residual") { + sql("create table local.db.t_residual (id int, v string) using iceberg") + sql("insert into local.db.t_residual values (1, 'a'), (2, 'b')") + val df = sql("select * from local.db.t_residual where v = 'a'") + checkAnswer(df, Seq(Row(1, "a"))) + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan falls back when reading metadata columns") { + withTable("local.db.t4") { + sql("create table local.db.t4 using iceberg as select 1 as id, 'a' as v") + val df = sql("select _file from local.db.t4") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan falls back for unsupported decimal types") { + withTable("local.db.t5") { + sql("create table local.db.t5 (id int, amount decimal(38, 10)) using iceberg") + sql("insert into local.db.t5 values (1, 123.45)") + val df = sql("select * from local.db.t5") + checkAnswer(df, Seq(Row(1, new java.math.BigDecimal("123.4500000000")))) + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan falls back when delete files exist") { + withTable("local.db.t_delete") { + sql(""" + |create table local.db.t_delete (id int, v string) + |using iceberg + |tblproperties ( + | 'format-version' = '2', + | 'write.delete.mode' = 'merge-on-read' + |) + |""".stripMargin) + sql("insert into local.db.t_delete values (1, 'a'), (2, 'b')") + addPositionDeleteFile("local.db.t_delete") + val icebergTable = Spark3Util.loadIcebergTable(spark, "local.db.t_delete") + val scanTasks = icebergTable.newScan().planFiles() + val hasDeletes = + try { + scanTasks + .iterator() + .asScala + .exists(task => task.deletes() != null && !task.deletes().isEmpty) + } finally { + scanTasks.close() + } + assert(hasDeletes) + val df = sql("select * from local.db.t_delete") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan is disabled via spark.auron.enable.iceberg.scan") { + withTable("local.db.t_disable") { + sql("create table local.db.t_disable using iceberg as select 1 as id, 'a' as v") + withSQLConf("spark.auron.enable.iceberg.scan" -> "false") { + assert( + !org.apache.auron.spark.configuration.SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get()) + val df = sql("select * from local.db.t_disable") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + } + + private def addPositionDeleteFile(tableName: String): Unit = { + val table = Spark3Util.loadIcebergTable(spark, tableName) + val taskIterable = table.newScan().planFiles() + val taskIter = taskIterable.iterator() + if (!taskIter.hasNext) { + taskIterable.close() + return + } + + val task = taskIter.next().asInstanceOf[FileScanTask] + val deletePath = + table.locationProvider().newDataLocation(s"delete-${UUID.randomUUID().toString}.parquet") + val outputFile = table.io().newOutputFile(deletePath) + val encryptedOutput = table.encryption().encrypt(outputFile) + val appenderFactory = new GenericAppenderFactory(table.schema(), table.spec()) + val writer = + appenderFactory.newPosDeleteWriter(encryptedOutput, FileFormat.PARQUET, task.partition()) + + val delete = PositionDelete.create[Record]().set(task.file().location(), 0L, null) + writer.write(delete) + writer.close() Review Comment: `writer.close()` is not in a `finally`, so an exception during `writer.write(delete)` would leave the writer unclosed (and may keep the output file open). Consider using `try/finally` (or `Using.resource`) to guarantee the writer is closed even on failure. ```suggestion try { writer.write(delete) } finally { writer.close() } ``` ########## auron-build.sh: ########## @@ -393,7 +393,7 @@ if [[ -n "$FLINK_VER" ]]; then BUILD_ARGS+=("-Pflink-$FLINK_VER") fi if [[ -n "$ICEBERG_VER" ]]; then - BUILD_ARGS+=("-Piceberg-$ICEBERG_VER") + BUILD_ARGS+=("-Piceberg-1.10.1") Review Comment: `ICEBERG_VER` is validated earlier, but the build arg is hard-coded to `-Piceberg-1.10.1` instead of using the variable. This makes the option misleading and will break if additional supported versions are added. Use `-Piceberg-$ICEBERG_VER` (or drop the parameter entirely if it’s intentionally fixed). ```suggestion BUILD_ARGS+=("-Piceberg-$ICEBERG_VER") ``` ########## thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala: ########## @@ -31,4 +39,200 @@ class AuronIcebergIntegrationSuite } } + test("iceberg native scan is applied for simple COW table") { + withTable("local.db.t2") { + sql("create table local.db.t2 using iceberg as select 1 as id, 'a' as v") + val df = sql("select * from local.db.t2") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg native scan is applied for empty COW table") { + withTable("local.db.t_empty") { + sql(""" + |create table local.db.t_empty (id int, v string) + |using iceberg + |tblproperties ( + | 'format-version' = '2' + |) + |""".stripMargin) + val df = sql("select * from local.db.t_empty") + checkAnswer(df, Seq.empty) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg native scan is applied for projection on COW table") { + withTable("local.db.t3") { + sql("create table local.db.t3 using iceberg as select 1 as id, 'a' as v") + val df = sql("select id from local.db.t3") + checkAnswer(df, Seq(Row(1))) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg native scan is applied for partitioned COW table with filter") { + withTable("local.db.t_partition") { + sql(""" + |create table local.db.t_partition (id int, v string, p string) + |using iceberg + |partitioned by (p) + |""".stripMargin) + sql("insert into local.db.t_partition values (1, 'a', 'p1'), (2, 'b', 'p2')") + val df = sql("select * from local.db.t_partition where p = 'p1'") + checkAnswer(df, Seq(Row(1, "a", "p1"))) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg native scan is applied for ORC COW table") { + withTable("local.db.t_orc") { + sql(""" + |create table local.db.t_orc (id int, v string) + |using iceberg + |tblproperties ('write.format.default' = 'orc') + |""".stripMargin) + sql("insert into local.db.t_orc values (1, 'a'), (2, 'b')") + val df = sql("select * from local.db.t_orc") + checkAnswer(df, Seq(Row(1, "a"), Row(2, "b"))) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg native scan is applied when delete files are null (format v1)") { + withTable("local.db.t_v1") { + sql(""" + |create table local.db.t_v1 (id int, v string) + |using iceberg + |tblproperties ('format-version' = '1') + |""".stripMargin) + sql("insert into local.db.t_v1 values (1, 'a'), (2, 'b')") + val icebergTable = Spark3Util.loadIcebergTable(spark, "local.db.t_v1") + val scanTasks = icebergTable.newScan().planFiles() + val allDeletesEmpty = + try { + scanTasks + .iterator() + .asScala + .forall(task => task.deletes() == null || task.deletes().isEmpty) + } finally { + scanTasks.close() + } + assert(allDeletesEmpty) + val df = sql("select * from local.db.t_v1") + checkAnswer(df, Seq(Row(1, "a"), Row(2, "b"))) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan falls back for residual filters on data columns") { + withTable("local.db.t_residual") { + sql("create table local.db.t_residual (id int, v string) using iceberg") + sql("insert into local.db.t_residual values (1, 'a'), (2, 'b')") + val df = sql("select * from local.db.t_residual where v = 'a'") + checkAnswer(df, Seq(Row(1, "a"))) + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan falls back when reading metadata columns") { + withTable("local.db.t4") { + sql("create table local.db.t4 using iceberg as select 1 as id, 'a' as v") + val df = sql("select _file from local.db.t4") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan falls back for unsupported decimal types") { + withTable("local.db.t5") { + sql("create table local.db.t5 (id int, amount decimal(38, 10)) using iceberg") + sql("insert into local.db.t5 values (1, 123.45)") + val df = sql("select * from local.db.t5") + checkAnswer(df, Seq(Row(1, new java.math.BigDecimal("123.4500000000")))) + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan falls back when delete files exist") { + withTable("local.db.t_delete") { + sql(""" + |create table local.db.t_delete (id int, v string) + |using iceberg + |tblproperties ( + | 'format-version' = '2', + | 'write.delete.mode' = 'merge-on-read' + |) + |""".stripMargin) + sql("insert into local.db.t_delete values (1, 'a'), (2, 'b')") + addPositionDeleteFile("local.db.t_delete") + val icebergTable = Spark3Util.loadIcebergTable(spark, "local.db.t_delete") + val scanTasks = icebergTable.newScan().planFiles() + val hasDeletes = + try { + scanTasks + .iterator() + .asScala + .exists(task => task.deletes() != null && !task.deletes().isEmpty) + } finally { + scanTasks.close() + } + assert(hasDeletes) + val df = sql("select * from local.db.t_delete") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + + test("iceberg scan is disabled via spark.auron.enable.iceberg.scan") { + withTable("local.db.t_disable") { + sql("create table local.db.t_disable using iceberg as select 1 as id, 'a' as v") + withSQLConf("spark.auron.enable.iceberg.scan" -> "false") { + assert( + !org.apache.auron.spark.configuration.SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get()) + val df = sql("select * from local.db.t_disable") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } + } + } + + private def addPositionDeleteFile(tableName: String): Unit = { + val table = Spark3Util.loadIcebergTable(spark, tableName) + val taskIterable = table.newScan().planFiles() + val taskIter = taskIterable.iterator() + if (!taskIter.hasNext) { + taskIterable.close() + return + } + + val task = taskIter.next().asInstanceOf[FileScanTask] + val deletePath = + table.locationProvider().newDataLocation(s"delete-${UUID.randomUUID().toString}.parquet") + val outputFile = table.io().newOutputFile(deletePath) + val encryptedOutput = table.encryption().encrypt(outputFile) + val appenderFactory = new GenericAppenderFactory(table.schema(), table.spec()) + val writer = + appenderFactory.newPosDeleteWriter(encryptedOutput, FileFormat.PARQUET, task.partition()) + + val delete = PositionDelete.create[Record]().set(task.file().location(), 0L, null) + writer.write(delete) + writer.close() + + val deleteFile = writer.toDeleteFile() + table.newRowDelta().addDeletes(deleteFile).commit() + taskIterable.close() Review Comment: `taskIterable` is only closed on the early-return path and at the end. If any operation after the iterator check throws (writer creation, commit, etc.), `taskIterable.close()` will be skipped and can leak resources across tests. Wrap the method body in a `try/finally` (or use `Using`) to always close `taskIterable`. ```suggestion try { val task = taskIter.next().asInstanceOf[FileScanTask] val deletePath = table.locationProvider().newDataLocation(s"delete-${UUID.randomUUID().toString}.parquet") val outputFile = table.io().newOutputFile(deletePath) val encryptedOutput = table.encryption().encrypt(outputFile) val appenderFactory = new GenericAppenderFactory(table.schema(), table.spec()) val writer = appenderFactory.newPosDeleteWriter(encryptedOutput, FileFormat.PARQUET, task.partition()) val delete = PositionDelete.create[Record]().set(task.file().location(), 0L, null) writer.write(delete) writer.close() val deleteFile = writer.toDeleteFile() table.newRowDelta().addDeletes(deleteFile).commit() } finally { taskIterable.close() } ``` ########## thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.auron.iceberg + +import scala.collection.JavaConverters._ + +import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns} +import org.apache.iceberg.expressions.Expressions +import org.apache.spark.internal.Logging +import org.apache.spark.sql.auron.NativeConverters +import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.types.StructType + +final case class IcebergScanPlan( + fileTasks: Seq[FileScanTask], + fileFormat: FileFormat, + readSchema: StructType) + +object IcebergScanSupport extends Logging { + + def plan(exec: BatchScanExec): Option[IcebergScanPlan] = { + val scan = exec.scan + val scanClassName = scan.getClass.getName + // Only handle Iceberg scans; other sources must stay on Spark's path. + if (!scanClassName.startsWith("org.apache.iceberg.spark.source.")) { + return None + } + + // Changelog scan carries row-level changes; not supported by native COW-only path. + if (scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan") { + return None + } + + val readSchema = scan.readSchema + // Native scan does not support Iceberg metadata columns (e.g. _file, _pos). + if (hasMetadataColumns(readSchema)) { + return None + } + + if (!readSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) { + return None + } + + val partitions = inputPartitions(exec) + // Empty scan (e.g. empty table) should still build a plan to return no rows. + if (partitions.isEmpty) { + return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema)) + } + + val icebergPartitions = partitions.flatMap(icebergPartition) + // All partitions must be Iceberg SparkInputPartition; otherwise fallback. + if (icebergPartitions.size != partitions.size) { + return None + } + + val fileTasks = icebergPartitions.flatMap(_.fileTasks) + + // Native scan does not apply delete files; only allow pure data files (COW). + if (!fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty)) { + return None + } + + // Residual filters require row-level evaluation, not supported in native scan. + if (!fileTasks.forall(task => Expressions.alwaysTrue().equals(task.residual()))) { + return None + } + + // Native scan handles a single file format; mixed formats must fallback. + val formats = fileTasks.map(_.file().format()).distinct + if (formats.size > 1) { + return None + } + + val format = formats.headOption.getOrElse(FileFormat.PARQUET) + if (format != FileFormat.PARQUET && format != FileFormat.ORC) { + return None + } + + Some(IcebergScanPlan(fileTasks, format, readSchema)) + } + + private def hasMetadataColumns(schema: StructType): Boolean = + schema.fields.exists(field => MetadataColumns.isMetadataColumn(field.name)) + + private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = { + // Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection. + val fromBatch = + try { + val batch = exec.scan.toBatch + if (batch != null) { + batch.planInputPartitions().toSeq + } else { + Seq.empty + } + } catch { + case t: Throwable => + logWarning( + s"Failed to plan input partitions via DataSource V2 batch API for " + + s"${exec.getClass.getName}; falling back to reflective methods.", + t) + Seq.empty + } + if (fromBatch.nonEmpty) { + return fromBatch + } + + // Some Spark versions expose partitions through inputPartitions/partitions methods on BatchScanExec. + val methods = exec.getClass.getMethods + val inputPartitionsMethod = methods.find(_.getName == "inputPartitions") + val partitionsMethod = methods.find(_.getName == "partitions") + + val raw = inputPartitionsMethod + .orElse(partitionsMethod) + .map(_.invoke(exec)) + .getOrElse(Seq.empty) + + // Normalize to Seq[InputPartition], flattening nested Seq if needed. + raw match { + case seq: scala.collection.Seq[_] + if seq.nonEmpty && + seq.head.isInstanceOf[scala.collection.Seq[_]] => + seq.asInstanceOf[scala.collection.Seq[scala.collection.Seq[InputPartition]]].flatten.toSeq + case seq: scala.collection.Seq[_] => + seq.asInstanceOf[scala.collection.Seq[InputPartition]].toSeq + case _ => + Seq.empty + } + } + + private case class IcebergPartitionView(fileTasks: Seq[FileScanTask]) + + private def icebergPartition(partition: InputPartition): Option[IcebergPartitionView] = { + val className = partition.getClass.getName + // Only accept Iceberg SparkInputPartition to access task groups. + if (className != "org.apache.iceberg.spark.source.SparkInputPartition") { + return None + } + + try { + // SparkInputPartition is package-private; use reflection to read its task group. + val taskGroupField = partition.getClass.getDeclaredField("taskGroup") + taskGroupField.setAccessible(true) + val taskGroup = taskGroupField.get(partition) + + // Extract tasks and keep only file scan tasks. + val tasksMethod = taskGroup.getClass.getDeclaredMethod("tasks") + tasksMethod.setAccessible(true) + val tasks = tasksMethod.invoke(taskGroup).asInstanceOf[java.util.Collection[_]].asScala + val fileTasks = tasks.collect { case task: FileScanTask => task }.toSeq + + // If any task is not a FileScanTask, fallback. + if (fileTasks.size != tasks.size) { + return None + } + + Some(IcebergPartitionView(fileTasks)) + } catch { + case _: ReflectiveOperationException => None + } Review Comment: `icebergPartition` only catches `ReflectiveOperationException`, but `setAccessible(true)` can throw runtime exceptions (e.g., `InaccessibleObjectException` on newer JDK/module settings or `SecurityException`). Since this runs in `isSupported()`, uncaught runtime exceptions can fail planning. Catch `NonFatal` (optionally with a debug log) and return `None` to ensure a safe fallback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
