Copilot commented on code in PR #2016:
URL: https://github.com/apache/auron/pull/2016#discussion_r2847704023


##########
thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala:
##########
@@ -31,4 +39,184 @@ class AuronIcebergIntegrationSuite
     }
   }
 
+  test("iceberg native scan is applied for simple COW table") {
+    withTable("local.db.t2") {
+      sql("create table local.db.t2 using iceberg as select 1 as id, 'a' as v")
+      val df = sql("select * from local.db.t2")
+      df.collect()
+      val plan = df.queryExecution.executedPlan.toString()
+      assert(plan.contains("NativeIcebergTableScan"))
+    }
+  }
+
+  test("iceberg native scan is applied for projection on COW table") {
+    withTable("local.db.t3") {
+      sql("create table local.db.t3 using iceberg as select 1 as id, 'a' as v")
+      val df = sql("select id from local.db.t3")
+      checkAnswer(df, Seq(Row(1)))
+      val plan = df.queryExecution.executedPlan.toString()
+      assert(plan.contains("NativeIcebergTableScan"))
+    }
+  }
+
+  test("iceberg native scan is applied for partitioned COW table with filter") 
{
+    withTable("local.db.t_partition") {
+      sql("""
+            |create table local.db.t_partition (id int, v string, p string)
+            |using iceberg
+            |partitioned by (p)
+            |""".stripMargin)
+      sql("insert into local.db.t_partition values (1, 'a', 'p1'), (2, 'b', 
'p2')")
+      val df = sql("select * from local.db.t_partition where p = 'p1'")
+      checkAnswer(df, Seq(Row(1, "a", "p1")))
+      val plan = df.queryExecution.executedPlan.toString()
+      assert(plan.contains("NativeIcebergTableScan"))
+    }
+  }
+
+  test("iceberg native scan is applied for ORC COW table") {
+    withTable("local.db.t_orc") {
+      sql("""
+            |create table local.db.t_orc (id int, v string)
+            |using iceberg
+            |tblproperties ('write.format.default' = 'orc')
+            |""".stripMargin)
+      sql("insert into local.db.t_orc values (1, 'a'), (2, 'b')")
+      val df = sql("select * from local.db.t_orc")
+      checkAnswer(df, Seq(Row(1, "a"), Row(2, "b")))
+      val plan = df.queryExecution.executedPlan.toString()
+      assert(plan.contains("NativeIcebergTableScan"))
+    }
+  }
+
+  test("iceberg native scan is applied when delete files are null (format 
v1)") {
+    withTable("local.db.t_v1") {
+      sql("""
+            |create table local.db.t_v1 (id int, v string)
+            |using iceberg
+            |tblproperties ('format-version' = '1')
+            |""".stripMargin)
+      sql("insert into local.db.t_v1 values (1, 'a'), (2, 'b')")
+      val icebergTable = Spark3Util.loadIcebergTable(spark, "local.db.t_v1")
+      val scanTasks = icebergTable.newScan().planFiles()
+      val allDeletesEmpty =
+        try {
+          scanTasks
+            .iterator()
+            .asScala
+            .forall(task => task.deletes() == null || task.deletes().isEmpty)
+        } finally {
+          scanTasks.close()
+        }
+      assert(allDeletesEmpty)
+      val df = sql("select * from local.db.t_v1")
+      checkAnswer(df, Seq(Row(1, "a"), Row(2, "b")))
+      val plan = df.queryExecution.executedPlan.toString()
+      assert(plan.contains("NativeIcebergTableScan"))
+    }
+  }
+
+  test("iceberg scan falls back for residual filters on data columns") {
+    withTable("local.db.t_residual") {
+      sql("create table local.db.t_residual (id int, v string) using iceberg")
+      sql("insert into local.db.t_residual values (1, 'a'), (2, 'b')")
+      val df = sql("select * from local.db.t_residual where v = 'a'")
+      checkAnswer(df, Seq(Row(1, "a")))
+      val plan = df.queryExecution.executedPlan.toString()
+      assert(!plan.contains("NativeIcebergTableScan"))
+    }
+  }
+
+  test("iceberg scan falls back when reading metadata columns") {
+    withTable("local.db.t4") {
+      sql("create table local.db.t4 using iceberg as select 1 as id, 'a' as v")
+      val df = sql("select _file from local.db.t4")
+      df.collect()
+      val plan = df.queryExecution.executedPlan.toString()
+      assert(!plan.contains("NativeIcebergTableScan"))
+    }
+  }
+
+  test("iceberg scan falls back for unsupported decimal types") {
+    withTable("local.db.t5") {
+      sql("create table local.db.t5 (id int, amount decimal(38, 10)) using 
iceberg")
+      sql("insert into local.db.t5 values (1, 123.45)")
+      val df = sql("select * from local.db.t5")
+      checkAnswer(df, Seq(Row(1, new java.math.BigDecimal("123.4500000000"))))
+      val plan = df.queryExecution.executedPlan.toString()
+      assert(!plan.contains("NativeIcebergTableScan"))
+    }
+  }
+
+  test("iceberg scan falls back when delete files exist") {
+    withTable("local.db.t_delete") {
+      sql("""
+            |create table local.db.t_delete (id int, v string)
+            |using iceberg
+            |tblproperties (
+            |  'format-version' = '2',
+            |  'write.delete.mode' = 'merge-on-read'
+            |)
+            |""".stripMargin)
+      sql("insert into local.db.t_delete values (1, 'a'), (2, 'b')")
+      addPositionDeleteFile("local.db.t_delete")
+      val icebergTable = Spark3Util.loadIcebergTable(spark, 
"local.db.t_delete")
+      val scanTasks = icebergTable.newScan().planFiles()
+      val hasDeletes =
+        try {
+          scanTasks
+            .iterator()
+            .asScala
+            .exists(task => task.deletes() != null && !task.deletes().isEmpty)
+        } finally {
+          scanTasks.close()
+        }
+      assert(hasDeletes)
+      val df = sql("select * from local.db.t_delete")
+      df.collect()
+      val plan = df.queryExecution.executedPlan.toString()
+      assert(!plan.contains("NativeIcebergTableScan"))
+    }
+  }
+
+  test("iceberg scan is disabled via spark.auron.enable.iceberg.scan") {
+    withTable("local.db.t_disable") {
+      sql("create table local.db.t_disable using iceberg as select 1 as id, 
'a' as v")
+      withSQLConf("spark.auron.enable.iceberg.scan" -> "false") {
+        assert(
+          
!org.apache.auron.spark.configuration.SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get())
+        val df = sql("select * from local.db.t_disable")
+        df.collect()
+        val plan = df.queryExecution.executedPlan.toString()
+        assert(!plan.contains("NativeIcebergTableScan"))
+      }
+    }
+  }
+

Review Comment:
   The PR description claims "Empty table handling" is tested in the suite, but 
there is no explicit test case for empty tables. While the code in 
IcebergScanSupport.scala (line 61-62) handles empty partitions, adding an 
explicit integration test would provide better coverage and documentation of 
this behavior. Consider adding a test like: `test("iceberg native scan is 
applied for empty COW table")` that creates an empty table and verifies 
NativeIcebergTableScan is used.
   ```suggestion
   
     test("iceberg native scan is applied for empty COW table") {
       withTable("local.db.t_empty") {
         sql("""
               |create table local.db.t_empty (id int, v string)
               |using iceberg
               |tblproperties (
               |  'format-version' = '2'
               |)
               |""".stripMargin)
         val df = sql("select * from local.db.t_empty")
         df.collect()
         val plan = df.queryExecution.executedPlan.toString()
         assert(plan.contains("NativeIcebergTableScan"))
       }
     }
   ```



##########
thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron.iceberg
+
+import scala.collection.JavaConverters._
+
+import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns}
+import org.apache.iceberg.expressions.Expressions
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.NativeConverters
+import org.apache.spark.sql.connector.read.InputPartition
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.types.StructType
+
+final case class IcebergScanPlan(
+    fileTasks: Seq[FileScanTask],
+    fileFormat: FileFormat,
+    readSchema: StructType)
+
+object IcebergScanSupport extends Logging {
+
+  def plan(exec: BatchScanExec): Option[IcebergScanPlan] = {
+    val scan = exec.scan
+    val scanClassName = scan.getClass.getName
+    // Only handle Iceberg scans; other sources must stay on Spark's path.
+    if (!scanClassName.startsWith("org.apache.iceberg.spark.source.")) {
+      return None
+    }
+
+    // Changelog scan carries row-level changes; not supported by native 
COW-only path.
+    if (scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan") 
{
+      return None
+    }
+
+    val readSchema = scan.readSchema
+    // Native scan does not support Iceberg metadata columns (e.g. _file, 
_pos).
+    if (hasMetadataColumns(readSchema)) {
+      return None
+    }
+
+    if (!readSchema.fields.forall(field => 
NativeConverters.isTypeSupported(field.dataType))) {
+      return None
+    }
+
+    val partitions = inputPartitions(exec)
+    // Empty scan (e.g. empty table) should still build a plan to return no 
rows.
+    if (partitions.isEmpty) {
+      return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema))
+    }
+
+    val icebergPartitions = partitions.flatMap(icebergPartition)
+    // All partitions must be Iceberg SparkInputPartition; otherwise fallback.
+    if (icebergPartitions.size != partitions.size) {
+      return None
+    }
+
+    val fileTasks = icebergPartitions.flatMap(_.fileTasks)
+
+    // Native scan does not apply delete files; only allow pure data files 
(COW).
+    if (!fileTasks.forall(task => task.deletes() == null || 
task.deletes().isEmpty)) {
+      return None
+    }
+
+    // Residual filters require row-level evaluation, not supported in native 
scan.
+    if (!fileTasks.forall(task => 
Expressions.alwaysTrue().equals(task.residual()))) {
+      return None
+    }
+
+    // Native scan handles a single file format; mixed formats must fallback.
+    val formats = fileTasks.map(_.file().format()).distinct
+    if (formats.size > 1) {
+      return None
+    }
+
+    val format = formats.headOption.getOrElse(FileFormat.PARQUET)
+    if (format != FileFormat.PARQUET && format != FileFormat.ORC) {
+      return None
+    }
+
+    Some(IcebergScanPlan(fileTasks, format, readSchema))
+  }
+
+  private def hasMetadataColumns(schema: StructType): Boolean =
+    schema.fields.exists(field => MetadataColumns.isMetadataColumn(field.name))
+
+  private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = {
+    // Prefer DataSource V2 batch API; if not available, fallback to exec 
methods via reflection.
+    val fromBatch =
+      try {
+        val batch = exec.scan.toBatch
+        if (batch != null) {
+          batch.planInputPartitions().toSeq
+        } else {
+          Seq.empty
+        }
+      } catch {
+        case _: Throwable => Seq.empty
+      }
+    if (fromBatch.nonEmpty) {
+      return fromBatch
+    }
+
+    // Some Spark versions expose partitions through 
inputPartitions/partitions methods on BatchScanExec.
+    val methods = exec.getClass.getMethods
+    val inputPartitionsMethod = methods.find(_.getName == "inputPartitions")
+    val partitionsMethod = methods.find(_.getName == "partitions")
+
+    val raw = inputPartitionsMethod
+      .orElse(partitionsMethod)
+      .map(_.invoke(exec))
+      .getOrElse(Seq.empty)
+
+    // Normalize to Seq[InputPartition], flattening nested Seq if needed.
+    raw match {
+      case seq: scala.collection.Seq[_]
+          if seq.nonEmpty &&
+            seq.head.isInstanceOf[scala.collection.Seq[_]] =>
+        
seq.asInstanceOf[scala.collection.Seq[scala.collection.Seq[InputPartition]]].flatten.toSeq
+      case seq: scala.collection.Seq[_] =>
+        seq.asInstanceOf[scala.collection.Seq[InputPartition]].toSeq
+      case _ =>
+        Seq.empty
+    }
+  }
+
+  private case class IcebergPartitionView(fileTasks: Seq[FileScanTask])
+
+  private def icebergPartition(partition: InputPartition): 
Option[IcebergPartitionView] = {
+    val className = partition.getClass.getName
+    // Only accept Iceberg SparkInputPartition to access task groups.
+    if (className != "org.apache.iceberg.spark.source.SparkInputPartition") {
+      return None
+    }
+
+    try {
+      // SparkInputPartition is package-private; use reflection to read its 
task group.
+      val taskGroupField = partition.getClass.getDeclaredField("taskGroup")
+      taskGroupField.setAccessible(true)
+      val taskGroup = taskGroupField.get(partition)
+
+      // Extract tasks and keep only file scan tasks.
+      val tasksMethod = taskGroup.getClass.getDeclaredMethod("tasks")
+      tasksMethod.setAccessible(true)
+      val tasks = 
tasksMethod.invoke(taskGroup).asInstanceOf[java.util.Collection[_]].asScala
+      val fileTasks = tasks.collect { case task: FileScanTask => task }.toSeq
+
+      // If any task is not a FileScanTask, fallback.
+      if (fileTasks.size != tasks.size) {
+        return None
+      }
+
+      Some(IcebergPartitionView(fileTasks))
+    } catch {
+      case _: ReflectiveOperationException => None

Review Comment:
   Catching all ReflectiveOperationException here silently swallows important 
debugging information. When reflection fails, it could be due to Iceberg 
version incompatibility or internal API changes. Consider logging a warning 
with the exception details to help diagnose issues in production environments, 
similar to how IcebergConvertProvider logs warnings for unsupported Spark 
versions.
   ```suggestion
         case e: ReflectiveOperationException =>
           logWarning(
             s"Failed to read Iceberg SparkInputPartition via reflection; 
falling back to None. " +
               s"This may indicate an Iceberg or Spark version 
incompatibility.",
             e)
           None
   ```



##########
dev/reformat:
##########
@@ -48,15 +48,15 @@ else
   cargo fmt --all -q --
 fi
 
-# Check or format all code, including third-party code, with spark-3.5
+# Check or format all code, including third-party code, with spark-3.4
 sparkver=spark-3.5
 for celebornver in celeborn-0.5 celeborn-0.6
 do
-  run_maven -P"${sparkver}" -Pceleborn,"${celebornver}" -Puniffle,uniffle-0.10 
-Ppaimon,paimon-1.2 -Pflink,flink-1.18 -Piceberg,iceberg-1.9
+  run_maven -P"${sparkver}" -P"${celebornver}" -Puniffle-0.10 -Ppaimon-1.2 
-Pflink-1.18 -Piceberg -DicebergEnabled=true -DicebergVersion=1.10.1
 
 done
 
-sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4)
+sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.5)

Review Comment:
   The sparkvers array should include "spark-3.4" to match the array iteration 
below. The comment on line 51 and the code on line 52 both reference spark-3.5, 
but the array only includes spark-3.5 once. It appears spark-3.4 was removed 
from the array when it should have been spark-3.5 that was removed since it's 
already handled in the celeborn loop above. This would cause the code to not be 
formatted for spark-3.4.
   ```suggestion
   sparkvers=(spark-3.0 spark-3.1 spark-3.2 spark-3.3 spark-3.4)
   ```



##########
thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron.iceberg
+
+import scala.collection.JavaConverters._
+
+import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns}
+import org.apache.iceberg.expressions.Expressions
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.NativeConverters
+import org.apache.spark.sql.connector.read.InputPartition
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.types.StructType
+
+final case class IcebergScanPlan(
+    fileTasks: Seq[FileScanTask],
+    fileFormat: FileFormat,
+    readSchema: StructType)
+
+object IcebergScanSupport extends Logging {
+
+  def plan(exec: BatchScanExec): Option[IcebergScanPlan] = {
+    val scan = exec.scan
+    val scanClassName = scan.getClass.getName
+    // Only handle Iceberg scans; other sources must stay on Spark's path.
+    if (!scanClassName.startsWith("org.apache.iceberg.spark.source.")) {
+      return None
+    }
+
+    // Changelog scan carries row-level changes; not supported by native 
COW-only path.
+    if (scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan") 
{
+      return None
+    }
+
+    val readSchema = scan.readSchema
+    // Native scan does not support Iceberg metadata columns (e.g. _file, 
_pos).
+    if (hasMetadataColumns(readSchema)) {
+      return None
+    }
+
+    if (!readSchema.fields.forall(field => 
NativeConverters.isTypeSupported(field.dataType))) {
+      return None
+    }
+
+    val partitions = inputPartitions(exec)
+    // Empty scan (e.g. empty table) should still build a plan to return no 
rows.
+    if (partitions.isEmpty) {
+      return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema))
+    }
+
+    val icebergPartitions = partitions.flatMap(icebergPartition)
+    // All partitions must be Iceberg SparkInputPartition; otherwise fallback.
+    if (icebergPartitions.size != partitions.size) {
+      return None
+    }
+
+    val fileTasks = icebergPartitions.flatMap(_.fileTasks)
+
+    // Native scan does not apply delete files; only allow pure data files 
(COW).
+    if (!fileTasks.forall(task => task.deletes() == null || 
task.deletes().isEmpty)) {
+      return None
+    }
+
+    // Residual filters require row-level evaluation, not supported in native 
scan.
+    if (!fileTasks.forall(task => 
Expressions.alwaysTrue().equals(task.residual()))) {
+      return None
+    }
+
+    // Native scan handles a single file format; mixed formats must fallback.
+    val formats = fileTasks.map(_.file().format()).distinct
+    if (formats.size > 1) {
+      return None
+    }
+
+    val format = formats.headOption.getOrElse(FileFormat.PARQUET)
+    if (format != FileFormat.PARQUET && format != FileFormat.ORC) {
+      return None
+    }
+
+    Some(IcebergScanPlan(fileTasks, format, readSchema))
+  }
+
+  private def hasMetadataColumns(schema: StructType): Boolean =
+    schema.fields.exists(field => MetadataColumns.isMetadataColumn(field.name))
+
+  private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = {
+    // Prefer DataSource V2 batch API; if not available, fallback to exec 
methods via reflection.
+    val fromBatch =
+      try {
+        val batch = exec.scan.toBatch
+        if (batch != null) {
+          batch.planInputPartitions().toSeq
+        } else {
+          Seq.empty
+        }
+      } catch {
+        case _: Throwable => Seq.empty

Review Comment:
   The code at line 111 swallows all exceptions from the reflection operation. 
For better observability, consider logging exceptions similar to the pattern 
used in NativeRDD.scala where setAccessible failures are logged. This would 
help diagnose issues when accessing Iceberg's internal partition structure 
fails.
   ```suggestion
           case t: Throwable =>
             logWarning(
               s"Failed to plan input partitions via DataSource V2 batch API 
for " +
                 s"${exec.getClass.getName}; falling back to reflective 
methods.",
               t)
             Seq.empty
   ```



##########
dev/reformat:
##########
@@ -48,15 +48,15 @@ else
   cargo fmt --all -q --
 fi
 
-# Check or format all code, including third-party code, with spark-3.5
+# Check or format all code, including third-party code, with spark-3.4

Review Comment:
   The comment says "spark-3.4" but the code uses "spark-3.5". Update the 
comment to accurately reflect the Spark version being used.
   ```suggestion
   # Check or format all code, including third-party code, with spark-3.5
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to