Copilot commented on code in PR #2016:
URL: https://github.com/apache/auron/pull/2016#discussion_r2847704023
##########
thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala:
##########
@@ -31,4 +39,184 @@ class AuronIcebergIntegrationSuite
}
}
+ test("iceberg native scan is applied for simple COW table") {
+ withTable("local.db.t2") {
+ sql("create table local.db.t2 using iceberg as select 1 as id, 'a' as v")
+ val df = sql("select * from local.db.t2")
+ df.collect()
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg native scan is applied for projection on COW table") {
+ withTable("local.db.t3") {
+ sql("create table local.db.t3 using iceberg as select 1 as id, 'a' as v")
+ val df = sql("select id from local.db.t3")
+ checkAnswer(df, Seq(Row(1)))
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg native scan is applied for partitioned COW table with filter")
{
+ withTable("local.db.t_partition") {
+ sql("""
+ |create table local.db.t_partition (id int, v string, p string)
+ |using iceberg
+ |partitioned by (p)
+ |""".stripMargin)
+ sql("insert into local.db.t_partition values (1, 'a', 'p1'), (2, 'b',
'p2')")
+ val df = sql("select * from local.db.t_partition where p = 'p1'")
+ checkAnswer(df, Seq(Row(1, "a", "p1")))
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg native scan is applied for ORC COW table") {
+ withTable("local.db.t_orc") {
+ sql("""
+ |create table local.db.t_orc (id int, v string)
+ |using iceberg
+ |tblproperties ('write.format.default' = 'orc')
+ |""".stripMargin)
+ sql("insert into local.db.t_orc values (1, 'a'), (2, 'b')")
+ val df = sql("select * from local.db.t_orc")
+ checkAnswer(df, Seq(Row(1, "a"), Row(2, "b")))
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg native scan is applied when delete files are null (format
v1)") {
+ withTable("local.db.t_v1") {
+ sql("""
+ |create table local.db.t_v1 (id int, v string)
+ |using iceberg
+ |tblproperties ('format-version' = '1')
+ |""".stripMargin)
+ sql("insert into local.db.t_v1 values (1, 'a'), (2, 'b')")
+ val icebergTable = Spark3Util.loadIcebergTable(spark, "local.db.t_v1")
+ val scanTasks = icebergTable.newScan().planFiles()
+ val allDeletesEmpty =
+ try {
+ scanTasks
+ .iterator()
+ .asScala
+ .forall(task => task.deletes() == null || task.deletes().isEmpty)
+ } finally {
+ scanTasks.close()
+ }
+ assert(allDeletesEmpty)
+ val df = sql("select * from local.db.t_v1")
+ checkAnswer(df, Seq(Row(1, "a"), Row(2, "b")))
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg scan falls back for residual filters on data columns") {
+ withTable("local.db.t_residual") {
+ sql("create table local.db.t_residual (id int, v string) using iceberg")
+ sql("insert into local.db.t_residual values (1, 'a'), (2, 'b')")
+ val df = sql("select * from local.db.t_residual where v = 'a'")
+ checkAnswer(df, Seq(Row(1, "a")))
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(!plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg scan falls back when reading metadata columns") {
+ withTable("local.db.t4") {
+ sql("create table local.db.t4 using iceberg as select 1 as id, 'a' as v")
+ val df = sql("select _file from local.db.t4")
+ df.collect()
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(!plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg scan falls back for unsupported decimal types") {
+ withTable("local.db.t5") {
+ sql("create table local.db.t5 (id int, amount decimal(38, 10)) using
iceberg")
+ sql("insert into local.db.t5 values (1, 123.45)")
+ val df = sql("select * from local.db.t5")
+ checkAnswer(df, Seq(Row(1, new java.math.BigDecimal("123.4500000000"))))
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(!plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg scan falls back when delete files exist") {
+ withTable("local.db.t_delete") {
+ sql("""
+ |create table local.db.t_delete (id int, v string)
+ |using iceberg
+ |tblproperties (
+ | 'format-version' = '2',
+ | 'write.delete.mode' = 'merge-on-read'
+ |)
+ |""".stripMargin)
+ sql("insert into local.db.t_delete values (1, 'a'), (2, 'b')")
+ addPositionDeleteFile("local.db.t_delete")
+ val icebergTable = Spark3Util.loadIcebergTable(spark,
"local.db.t_delete")
+ val scanTasks = icebergTable.newScan().planFiles()
+ val hasDeletes =
+ try {
+ scanTasks
+ .iterator()
+ .asScala
+ .exists(task => task.deletes() != null && !task.deletes().isEmpty)
+ } finally {
+ scanTasks.close()
+ }
+ assert(hasDeletes)
+ val df = sql("select * from local.db.t_delete")
+ df.collect()
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(!plan.contains("NativeIcebergTableScan"))
+ }
+ }
+
+ test("iceberg scan is disabled via spark.auron.enable.iceberg.scan") {
+ withTable("local.db.t_disable") {
+ sql("create table local.db.t_disable using iceberg as select 1 as id,
'a' as v")
+ withSQLConf("spark.auron.enable.iceberg.scan" -> "false") {
+ assert(
+
!org.apache.auron.spark.configuration.SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get())
+ val df = sql("select * from local.db.t_disable")
+ df.collect()
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(!plan.contains("NativeIcebergTableScan"))
+ }
+ }
+ }
+
Review Comment:
The PR description claims "Empty table handling" is tested in the suite, but
there is no explicit test case for empty tables. While the code in
IcebergScanSupport.scala (line 61-62) handles empty partitions, adding an
explicit integration test would provide better coverage and documentation of
this behavior. Consider adding a test like: `test("iceberg native scan is
applied for empty COW table")` that creates an empty table and verifies
NativeIcebergTableScan is used.
```suggestion
test("iceberg native scan is applied for empty COW table") {
withTable("local.db.t_empty") {
sql("""
|create table local.db.t_empty (id int, v string)
|using iceberg
|tblproperties (
| 'format-version' = '2'
|)
|""".stripMargin)
val df = sql("select * from local.db.t_empty")
df.collect()
val plan = df.queryExecution.executedPlan.toString()
assert(plan.contains("NativeIcebergTableScan"))
}
}
```
##########
thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron.iceberg
+
+import scala.collection.JavaConverters._
+
+import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns}
+import org.apache.iceberg.expressions.Expressions
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.NativeConverters
+import org.apache.spark.sql.connector.read.InputPartition
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.types.StructType
+
+final case class IcebergScanPlan(
+ fileTasks: Seq[FileScanTask],
+ fileFormat: FileFormat,
+ readSchema: StructType)
+
+object IcebergScanSupport extends Logging {
+
+ def plan(exec: BatchScanExec): Option[IcebergScanPlan] = {
+ val scan = exec.scan
+ val scanClassName = scan.getClass.getName
+ // Only handle Iceberg scans; other sources must stay on Spark's path.
+ if (!scanClassName.startsWith("org.apache.iceberg.spark.source.")) {
+ return None
+ }
+
+ // Changelog scan carries row-level changes; not supported by native
COW-only path.
+ if (scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan")
{
+ return None
+ }
+
+ val readSchema = scan.readSchema
+ // Native scan does not support Iceberg metadata columns (e.g. _file,
_pos).
+ if (hasMetadataColumns(readSchema)) {
+ return None
+ }
+
+ if (!readSchema.fields.forall(field =>
NativeConverters.isTypeSupported(field.dataType))) {
+ return None
+ }
+
+ val partitions = inputPartitions(exec)
+ // Empty scan (e.g. empty table) should still build a plan to return no
rows.
+ if (partitions.isEmpty) {
+ return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema))
+ }
+
+ val icebergPartitions = partitions.flatMap(icebergPartition)
+ // All partitions must be Iceberg SparkInputPartition; otherwise fallback.
+ if (icebergPartitions.size != partitions.size) {
+ return None
+ }
+
+ val fileTasks = icebergPartitions.flatMap(_.fileTasks)
+
+ // Native scan does not apply delete files; only allow pure data files
(COW).
+ if (!fileTasks.forall(task => task.deletes() == null ||
task.deletes().isEmpty)) {
+ return None
+ }
+
+ // Residual filters require row-level evaluation, not supported in native
scan.
+ if (!fileTasks.forall(task =>
Expressions.alwaysTrue().equals(task.residual()))) {
+ return None
+ }
+
+ // Native scan handles a single file format; mixed formats must fallback.
+ val formats = fileTasks.map(_.file().format()).distinct
+ if (formats.size > 1) {
+ return None
+ }
+
+ val format = formats.headOption.getOrElse(FileFormat.PARQUET)
+ if (format != FileFormat.PARQUET && format != FileFormat.ORC) {
+ return None
+ }
+
+ Some(IcebergScanPlan(fileTasks, format, readSchema))
+ }
+
+ private def hasMetadataColumns(schema: StructType): Boolean =
+ schema.fields.exists(field => MetadataColumns.isMetadataColumn(field.name))
+
+ private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = {
+ // Prefer DataSource V2 batch API; if not available, fallback to exec
methods via reflection.
+ val fromBatch =
+ try {
+ val batch = exec.scan.toBatch
+ if (batch != null) {
+ batch.planInputPartitions().toSeq
+ } else {
+ Seq.empty
+ }
+ } catch {
+ case _: Throwable => Seq.empty
+ }
+ if (fromBatch.nonEmpty) {
+ return fromBatch
+ }
+
+ // Some Spark versions expose partitions through
inputPartitions/partitions methods on BatchScanExec.
+ val methods = exec.getClass.getMethods
+ val inputPartitionsMethod = methods.find(_.getName == "inputPartitions")
+ val partitionsMethod = methods.find(_.getName == "partitions")
+
+ val raw = inputPartitionsMethod
+ .orElse(partitionsMethod)
+ .map(_.invoke(exec))
+ .getOrElse(Seq.empty)
+
+ // Normalize to Seq[InputPartition], flattening nested Seq if needed.
+ raw match {
+ case seq: scala.collection.Seq[_]
+ if seq.nonEmpty &&
+ seq.head.isInstanceOf[scala.collection.Seq[_]] =>
+
seq.asInstanceOf[scala.collection.Seq[scala.collection.Seq[InputPartition]]].flatten.toSeq
+ case seq: scala.collection.Seq[_] =>
+ seq.asInstanceOf[scala.collection.Seq[InputPartition]].toSeq
+ case _ =>
+ Seq.empty
+ }
+ }
+
+ private case class IcebergPartitionView(fileTasks: Seq[FileScanTask])
+
+ private def icebergPartition(partition: InputPartition):
Option[IcebergPartitionView] = {
+ val className = partition.getClass.getName
+ // Only accept Iceberg SparkInputPartition to access task groups.
+ if (className != "org.apache.iceberg.spark.source.SparkInputPartition") {
+ return None
+ }
+
+ try {
+ // SparkInputPartition is package-private; use reflection to read its
task group.
+ val taskGroupField = partition.getClass.getDeclaredField("taskGroup")
+ taskGroupField.setAccessible(true)
+ val taskGroup = taskGroupField.get(partition)
+
+ // Extract tasks and keep only file scan tasks.
+ val tasksMethod = taskGroup.getClass.getDeclaredMethod("tasks")
+ tasksMethod.setAccessible(true)
+ val tasks =
tasksMethod.invoke(taskGroup).asInstanceOf[java.util.Collection[_]].asScala
+ val fileTasks = tasks.collect { case task: FileScanTask => task }.toSeq
+
+ // If any task is not a FileScanTask, fallback.
+ if (fileTasks.size != tasks.size) {
+ return None
+ }
+
+ Some(IcebergPartitionView(fileTasks))
+ } catch {
+ case _: ReflectiveOperationException => None
Review Comment:
Catching all ReflectiveOperationException here silently swallows important
debugging information. When reflection fails, it could be due to Iceberg
version incompatibility or internal API changes. Consider logging a warning
with the exception details to help diagnose issues in production environments,
similar to how IcebergConvertProvider logs warnings for unsupported Spark
versions.
```suggestion
case e: ReflectiveOperationException =>
logWarning(
s"Failed to read Iceberg SparkInputPartition via reflection;
falling back to None. " +
s"This may indicate an Iceberg or Spark version
incompatibility.",
e)
None
```
##########
dev/reformat:
##########
@@ -48,15 +48,15 @@ else
cargo fmt --all -q --
fi
-# Check or format all code, including third-party code, with spark-3.5
+# Check or format all code, including third-party code, with spark-3.4
sparkver=spark-3.5
for celebornver in celeborn-0.5 celeborn-0.6
do
- run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10
-Ppaimon,paimon-1.2 -Pflink,flink-1.18 -Piceberg,iceberg-1.9
+ run_maven -P"${sparkver}" -P"${celebornver}" -Puniffle-0.10 -Ppaimon-1.2
-Pflink-1.18 -Piceberg -DicebergEnabled=true -DicebergVersion=1.10.1
done
-sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4)
+sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.5)
Review Comment:
The sparkvers array should include "spark-3.4" to match the array iteration
below. The comment on line 51 and the code on line 52 both reference spark-3.5,
but the array only includes spark-3.5 once. It appears spark-3.4 was removed
from the array when it should have been spark-3.5 that was removed since it's
already handled in the celeborn loop above. This would cause the code to not be
formatted for spark-3.4.
```suggestion
sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4)
```
##########
thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron.iceberg
+
+import scala.collection.JavaConverters._
+
+import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns}
+import org.apache.iceberg.expressions.Expressions
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.NativeConverters
+import org.apache.spark.sql.connector.read.InputPartition
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.types.StructType
+
+final case class IcebergScanPlan(
+ fileTasks: Seq[FileScanTask],
+ fileFormat: FileFormat,
+ readSchema: StructType)
+
+object IcebergScanSupport extends Logging {
+
+ def plan(exec: BatchScanExec): Option[IcebergScanPlan] = {
+ val scan = exec.scan
+ val scanClassName = scan.getClass.getName
+ // Only handle Iceberg scans; other sources must stay on Spark's path.
+ if (!scanClassName.startsWith("org.apache.iceberg.spark.source.")) {
+ return None
+ }
+
+ // Changelog scan carries row-level changes; not supported by native
COW-only path.
+ if (scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan")
{
+ return None
+ }
+
+ val readSchema = scan.readSchema
+ // Native scan does not support Iceberg metadata columns (e.g. _file,
_pos).
+ if (hasMetadataColumns(readSchema)) {
+ return None
+ }
+
+ if (!readSchema.fields.forall(field =>
NativeConverters.isTypeSupported(field.dataType))) {
+ return None
+ }
+
+ val partitions = inputPartitions(exec)
+ // Empty scan (e.g. empty table) should still build a plan to return no
rows.
+ if (partitions.isEmpty) {
+ return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema))
+ }
+
+ val icebergPartitions = partitions.flatMap(icebergPartition)
+ // All partitions must be Iceberg SparkInputPartition; otherwise fallback.
+ if (icebergPartitions.size != partitions.size) {
+ return None
+ }
+
+ val fileTasks = icebergPartitions.flatMap(_.fileTasks)
+
+ // Native scan does not apply delete files; only allow pure data files
(COW).
+ if (!fileTasks.forall(task => task.deletes() == null ||
task.deletes().isEmpty)) {
+ return None
+ }
+
+ // Residual filters require row-level evaluation, not supported in native
scan.
+ if (!fileTasks.forall(task =>
Expressions.alwaysTrue().equals(task.residual()))) {
+ return None
+ }
+
+ // Native scan handles a single file format; mixed formats must fallback.
+ val formats = fileTasks.map(_.file().format()).distinct
+ if (formats.size > 1) {
+ return None
+ }
+
+ val format = formats.headOption.getOrElse(FileFormat.PARQUET)
+ if (format != FileFormat.PARQUET && format != FileFormat.ORC) {
+ return None
+ }
+
+ Some(IcebergScanPlan(fileTasks, format, readSchema))
+ }
+
+ private def hasMetadataColumns(schema: StructType): Boolean =
+ schema.fields.exists(field => MetadataColumns.isMetadataColumn(field.name))
+
+ private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = {
+ // Prefer DataSource V2 batch API; if not available, fallback to exec
methods via reflection.
+ val fromBatch =
+ try {
+ val batch = exec.scan.toBatch
+ if (batch != null) {
+ batch.planInputPartitions().toSeq
+ } else {
+ Seq.empty
+ }
+ } catch {
+ case _: Throwable => Seq.empty
Review Comment:
The code at line 111 swallows all exceptions from the reflection operation.
For better observability, consider logging exceptions similar to the pattern
used in NativeRDD.scala where setAccessible failures are logged. This would
help diagnose issues when accessing Iceberg's internal partition structure
fails.
```suggestion
case t: Throwable =>
logWarning(
s"Failed to plan input partitions via DataSource V2 batch API
for " +
s"${exec.getClass.getName}; falling back to reflective
methods.",
t)
Seq.empty
```
##########
dev/reformat:
##########
@@ -48,15 +48,15 @@ else
cargo fmt --all -q --
fi
-# Check or format all code, including third-party code, with spark-3.5
+# Check or format all code, including third-party code, with spark-3.4
Review Comment:
The comment says "spark-3.4" but the code uses "spark-3.5". Update the
comment to accurately reflect the Spark version being used.
```suggestion
# Check or format all code, including third-party code, with spark-3.5
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]