This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 350cf3aa6 feat: accelerate Iceberg RewriteDataFiles reads via Comet 
native scan (#4251)
350cf3aa6 is described below

commit 350cf3aa6151391aca69b688a4387c126bd41781
Author: Jordan Epstein <[email protected]>
AuthorDate: Mon May 11 18:45:21 2026 -0500

    feat: accelerate Iceberg RewriteDataFiles reads via Comet native scan 
(#4251)
---
 .github/workflows/pr_build_linux.yml               |   1 +
 .github/workflows/pr_build_macos.yml               |   1 +
 .../apache/comet/iceberg/IcebergReflection.scala   | 113 +++-
 .../org/apache/comet/rules/CometScanRule.scala     |   8 +-
 .../org/apache/comet/CometIcebergNativeSuite.scala |  31 +-
 .../comet/CometIcebergRewriteActionSuite.scala     | 596 +++++++++++++++++++++
 .../org/apache/comet/CometIcebergTestBase.scala    |  50 ++
 7 files changed, 742 insertions(+), 58 deletions(-)

diff --git a/.github/workflows/pr_build_linux.yml 
b/.github/workflows/pr_build_linux.yml
index c7e0c91f7..5c1ae2dc4 100644
--- a/.github/workflows/pr_build_linux.yml
+++ b/.github/workflows/pr_build_linux.yml
@@ -333,6 +333,7 @@ jobs:
               org.apache.spark.sql.comet.ParquetEncryptionITCase
               org.apache.comet.exec.CometNativeReaderSuite
               org.apache.comet.CometIcebergNativeSuite
+              org.apache.comet.CometIcebergRewriteActionSuite
               org.apache.comet.iceberg.IcebergReflectionSuite
           - name: "csv"
             value: |
diff --git a/.github/workflows/pr_build_macos.yml 
b/.github/workflows/pr_build_macos.yml
index 263317cd1..29eca594a 100644
--- a/.github/workflows/pr_build_macos.yml
+++ b/.github/workflows/pr_build_macos.yml
@@ -181,6 +181,7 @@ jobs:
               org.apache.spark.sql.comet.ParquetEncryptionITCase
               org.apache.comet.exec.CometNativeReaderSuite
               org.apache.comet.CometIcebergNativeSuite
+              org.apache.comet.CometIcebergRewriteActionSuite
               org.apache.comet.iceberg.IcebergReflectionSuite
           - name: "csv"
             value: |
diff --git 
a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala 
b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
index 7c52f320c..e85eac2c4 100644
--- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
+++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
@@ -46,8 +46,22 @@ object IcebergReflection extends Logging {
     val PARTITION_SPEC = "org.apache.iceberg.PartitionSpec"
     val PARTITION_FIELD = "org.apache.iceberg.PartitionField"
     val UNBOUND_PREDICATE = "org.apache.iceberg.expressions.UnboundPredicate"
+    val SPARK_BATCH_QUERY_SCAN = 
"org.apache.iceberg.spark.source.SparkBatchQueryScan"
+    val SPARK_STAGED_SCAN = "org.apache.iceberg.spark.source.SparkStagedScan"
   }
 
+  /**
+   * SparkScan implementations that Comet recognises as Iceberg data scans.
+   *
+   * `SparkStagedScan` also backs reads against Iceberg metadata tables (e.g. 
`POSITION_DELETES`),
+   * but the gate for that lives in `getMetadataLocation`, which returns None 
for metadata-table
+   * instances.
+   */
+  val ICEBERG_SCAN_CLASSES: Set[String] =
+    Set(ClassNames.SPARK_BATCH_QUERY_SCAN, ClassNames.SPARK_STAGED_SCAN)
+
+  def isIcebergScanClass(name: String): Boolean = 
ICEBERG_SCAN_CLASSES.contains(name)
+
   /**
    * Iceberg content types.
    */
@@ -172,44 +186,87 @@ object IcebergReflection extends Logging {
     }
   }
 
+  private lazy val sparkStagedScanClass: Class[_] = 
loadClass(ClassNames.SPARK_STAGED_SCAN)
+
+  private def isStagedScan(scan: Any): Boolean = 
sparkStagedScanClass.isInstance(scan)
+
   /**
    * Gets the tasks from a SparkScan.
    *
-   * The tasks() method is protected in SparkScan, requiring reflection to 
access.
+   * Most Iceberg scans (e.g. SparkBatchQueryScan) inherit a `tasks()` 
accessor from
+   * SparkPartitioningAwareScan. SparkStagedScan extends SparkScan directly 
and only declares
+   * `taskGroups()`, so for staged scans we flatten the groups instead. Both 
methods are protected
+   * and require reflection.
    */
-  def getTasks(scan: Any): Option[java.util.List[_]] = {
-    try {
-      val tasksMethod = scan.getClass.getSuperclass
-        .getDeclaredMethod("tasks")
-      tasksMethod.setAccessible(true)
-      Some(tasksMethod.invoke(scan).asInstanceOf[java.util.List[_]])
-    } catch {
-      case e: Exception =>
+  def getTasks(scan: Any): Option[java.util.List[_]] =
+    if (isStagedScan(scan)) tasksFromTaskGroups(scan) else 
tasksFromTasksAccessor(scan)
+
+  private def tasksFromTasksAccessor(scan: Any): Option[java.util.List[_]] =
+    findMethodInHierarchy(scan.getClass, "tasks") match {
+      case Some(method) =>
+        Some(method.invoke(scan).asInstanceOf[java.util.List[_]])
+      case None =>
         logError(
-          s"Iceberg reflection failure: Failed to get tasks from SparkScan: 
${e.getMessage}")
+          "Iceberg reflection failure: Failed to get tasks from SparkScan: " +
+            s"tasks() not found on ${scan.getClass.getName}")
+        None
+    }
+
+  private def tasksFromTaskGroups(scan: Any): Option[java.util.List[_]] =
+    findMethodInHierarchy(scan.getClass, "taskGroups") match {
+      case Some(method) =>
+        try {
+          val groups = method.invoke(scan).asInstanceOf[java.util.List[_]]
+          if (groups.isEmpty) {
+            Some(new java.util.ArrayList[AnyRef]())
+          } else {
+            // All task groups in a stage share the same concrete class, so 
the per-group
+            // `tasks()` lookup can be cached once instead of done N times.
+            val groupTasksMethod = groups.get(0).getClass.getMethod("tasks")
+            val flat = new java.util.ArrayList[AnyRef]()
+            groups.forEach { group =>
+              val groupTasks =
+                
groupTasksMethod.invoke(group).asInstanceOf[java.util.Collection[_ <: AnyRef]]
+              flat.addAll(groupTasks)
+            }
+            Some(flat)
+          }
+        } catch {
+          case e: ReflectiveOperationException =>
+            logError(
+              "Iceberg reflection failure: Failed to flatten tasks from 
SparkStagedScan: " +
+                s"${e.getMessage}")
+            None
+        }
+      case None =>
+        logError(
+          "Iceberg reflection failure: Failed to flatten tasks from 
SparkStagedScan: " +
+            s"taskGroups() not found on ${scan.getClass.getName}")
         None
     }
-  }
 
   /**
    * Gets the filter expressions from a SparkScan.
    *
-   * The filterExpressions() method is protected in SparkScan.
+   * `filterExpressions()` is declared on SparkPartitioningAwareScan but 
absent from plain
+   * SparkScan. SparkStagedScan (used by RewriteDataFiles) extends SparkScan 
directly and never
+   * pushes filters, so we short-circuit with an empty list rather than 
reflectively probing for a
+   * method we know isn't there.
    */
-  def getFilterExpressions(scan: Any): Option[java.util.List[_]] = {
-    try {
-      val filterExpressionsMethod = scan.getClass.getSuperclass.getSuperclass
-        .getDeclaredMethod("filterExpressions")
-      filterExpressionsMethod.setAccessible(true)
-      
Some(filterExpressionsMethod.invoke(scan).asInstanceOf[java.util.List[_]])
-    } catch {
-      case e: Exception =>
-        logError(
-          "Iceberg reflection failure: Failed to get filter expressions from 
SparkScan: " +
-            s"${e.getMessage}")
-        None
+  def getFilterExpressions(scan: Any): Option[java.util.List[_]] =
+    if (isStagedScan(scan)) {
+      Some(java.util.Collections.emptyList[AnyRef]())
+    } else {
+      findMethodInHierarchy(scan.getClass, "filterExpressions") match {
+        case Some(method) =>
+          Some(method.invoke(scan).asInstanceOf[java.util.List[_]])
+        case None =>
+          logError(
+            "Iceberg reflection failure: Failed to get filter expressions from 
SparkScan: " +
+              s"filterExpressions() not found on ${scan.getClass.getName}")
+          None
+      }
     }
-  }
 
   /**
    * Gets the Iceberg table format version.
@@ -350,6 +407,12 @@ object IcebergReflection extends Logging {
   /**
    * Gets the metadata file location from an Iceberg table.
    *
+   * Returns None for Iceberg metadata-table instances (e.g. POSITION_DELETES, 
the table that
+   * `RewritePositionDeleteFiles` reads via `SparkStagedScan`). This is the 
gate that keeps Comet
+   * from accelerating metadata-table reads, which have a different schema 
from the parent data
+   * table and aren't supported by the iceberg-rust-driven native path. 
`CometScanRule` falls back
+   * to Spark when this returns None; `CometIcebergRewriteActionSuite` pins 
the behaviour.
+   *
    * @param table
    *   The Iceberg table instance
    * @return
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
index aee11d4ce..64b69be1e 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
@@ -301,10 +301,10 @@ case class CometScanRule(session: SparkSession)
           withInfos(scanExec, fallbackReasons.toSet)
         }
 
-      // Iceberg scan - detected by class name
-      case _
-          if scanExec.scan.getClass.getName ==
-            "org.apache.iceberg.spark.source.SparkBatchQueryScan" =>
+      // Iceberg scan - detected by class name. SparkStagedScan covers reads 
issued by
+      // RewriteDataFiles (and similar maintenance actions) where the planner 
has already
+      // staged FileScanTasks via ScanTaskSetManager.
+      case _ if 
IcebergReflection.isIcebergScanClass(scanExec.scan.getClass.getName) =>
         val fallbackReasons = new ListBuffer[String]()
 
         // Native Iceberg scan requires both configs to be enabled
diff --git 
a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala
index a2d8c2576..cabc0f97c 100644
--- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala
@@ -20,7 +20,6 @@
 package org.apache.comet
 
 import java.io.File
-import java.nio.file.Files
 
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
@@ -49,17 +48,8 @@ import org.apache.comet.testing.{FuzzDataGenerator, 
SchemaGenOptions}
 class CometIcebergNativeSuite
     extends CometTestBase
     with RESTCatalogHelper
-    with AdaptiveSparkPlanHelper {
-
-  // Skip these tests if Iceberg is not available in classpath
-  private def icebergAvailable: Boolean = {
-    try {
-      Class.forName("org.apache.iceberg.catalog.Catalog")
-      true
-    } catch {
-      case _: ClassNotFoundException => false
-    }
-  }
+    with AdaptiveSparkPlanHelper
+    with CometIcebergTestBase {
 
   /** Collects all CometIcebergNativeScanExec nodes from a plan */
   private def collectIcebergNativeScans(plan: SparkPlan): 
Seq[CometIcebergNativeScanExec] = {
@@ -2518,23 +2508,6 @@ class CometIcebergNativeSuite
     }
   }
 
-  // Helper to create temp directory
-  def withTempIcebergDir(f: File => Unit): Unit = {
-    val dir = Files.createTempDirectory("comet-iceberg-test").toFile
-    try {
-      f(dir)
-    } finally {
-      def deleteRecursively(file: File): Unit = {
-        if (file.isDirectory) {
-          file.listFiles().foreach(deleteRecursively)
-        }
-        file.delete()
-      }
-
-      deleteRecursively(dir)
-    }
-  }
-
   test("runtime filtering - multiple DPP filters on two partition columns") {
     assume(icebergAvailable, "Iceberg not available")
     withTempIcebergDir { warehouseDir =>
diff --git 
a/spark/src/test/scala/org/apache/comet/CometIcebergRewriteActionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometIcebergRewriteActionSuite.scala
new file mode 100644
index 000000000..962296093
--- /dev/null
+++ b/spark/src/test/scala/org/apache/comet/CometIcebergRewriteActionSuite.scala
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet
+
+import java.io.File
+
+import scala.collection.mutable
+
+import org.apache.spark.CometListenerBusUtils
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.sql.{CometTestBase, SparkSession}
+import org.apache.spark.sql.execution.SparkPlanInfo
+import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart
+
+import org.apache.comet.iceberg.IcebergReflection
+
+/**
+ * Verifies that Comet engages on the per-group reads issued by Iceberg's
+ * RewriteDataFilesSparkAction. The action stages each rewrite group's 
FileScanTasks under a UUID
+ * via ScanTaskSetManager and then reads them back through SparkStagedScan, 
which Comet's
+ * CometScanRule recognises alongside SparkBatchQueryScan.
+ */
+class CometIcebergRewriteActionSuite extends CometTestBase with 
CometIcebergTestBase {
+
+  private val catalog = "test_cat"
+  private val MultiFileTableSize = 4
+
+  test("binPack rewrite reads each file group via CometIcebergNativeScan") {
+    runRewriteTest(
+      RewriteCase(
+        table = s"$catalog.db.binpack_test",
+        configureMode = invoke(_, "binPack"),
+        verifyPlans = rewritePlans => assertReadsAreComet(rewritePlans)))
+  }
+
+  test("sort rewrite runs scan, exchange, and sort natively in Comet") {
+    runRewriteTest(
+      RewriteCase(
+        table = s"$catalog.db.sort_test",
+        configureMode = invoke(_, "sort"),
+        beforeRewrite = setSortOrderAsc(_, "id"),
+        verifyDataAfter = assertSortedById,
+        verifyPlans = { rewritePlans =>
+          assertReadsAreComet(rewritePlans)
+          assertOperator(rewritePlans, "CometExchange")
+          assertOperator(rewritePlans, "CometSort")
+        }))
+  }
+
+  // Single-column zOrder is bit-pattern-equivalent to a natural sort (no 
second dimension to
+  // interleave with), so we expect the same ascending output as the sort 
test. The shuffle here
+  // is CometColumnarExchange rather than CometExchange because the z-value 
column is computed
+  // by a Spark Project (Iceberg's INTERLEAVE_BYTES / INT_ORDERED_BYTES are 
not recognised by
+  // Comet), so the path crosses a JVM-row boundary before the shuffle.
+  test("single-column zOrder rewrite runs scan, columnar exchange, and sort 
natively in Comet") {
+    runRewriteTest(
+      RewriteCase(
+        table = s"$catalog.db.zorder_test",
+        configureMode = invoke(_, "zOrder", classOf[Array[String]] -> 
Array("id")),
+        verifyDataAfter = assertSortedById,
+        verifyPlans = { rewritePlans =>
+          assertReadsAreComet(rewritePlans)
+          assertOperator(rewritePlans, "CometColumnarExchange")
+          assertOperator(rewritePlans, "CometSort")
+        }))
+  }
+
+  test("RewritePositionDeleteFiles action does not convert any operators with 
Comet") {
+    assume(icebergAvailable, "Iceberg not available in classpath")
+
+    withTempIcebergDir { warehouseDir =>
+      withIcebergComet(warehouseDir) {
+        val table = s"$catalog.db.position_delete_rewrite_test"
+        try {
+          // MOR table; produce some positional delete files for the action to 
consume.
+          spark.sql(s"""
+            CREATE TABLE $table (id INT, value DOUBLE) USING iceberg
+            TBLPROPERTIES (
+              'format-version' = '2',
+              'write.delete.mode' = 'merge-on-read',
+              'write.merge.mode' = 'merge-on-read'
+            )
+          """)
+          spark
+            .createDataFrame(Seq((1, 1.0), (2, 2.0), (3, 3.0)))
+            .toDF("id", "value")
+            .coalesce(1)
+            .write
+            .format("iceberg")
+            .mode("append")
+            .saveAsTable(table)
+          spark.sql(s"DELETE FROM $table WHERE id = 1")
+          spark.sql(s"DELETE FROM $table WHERE id = 2")
+
+          val deleteCodes = deleteFileContentCodes(table)
+          assert(
+            deleteCodes.nonEmpty && deleteCodes.forall(_ == 1),
+            s"Expected MOR positional delete files, got $deleteCodes")
+
+          // Iceberg-spark-runtime-4.0 (used for the spark-4.1 and spark-4.2 
profiles since no
+          // 4.1+ runtime is published) calls DataSourceV2Relation.create with 
a signature that
+          // Spark 4.1 removed (the 5th `timeTravelSpec` parameter was added). 
The action
+          // therefore cannot run on those profiles; skip cleanly rather than 
fail. Tracked
+          // upstream at https://github.com/apache/iceberg/issues/15238.
+          val plans =
+            try {
+              captureSqlPlans {
+                runActionChain(
+                  table,
+                  actionMethodName = "rewritePositionDeletes",
+                  configure = identity,
+                  options = Map("rewrite-all" -> "true"))
+              }
+            } catch {
+              case e: java.lang.reflect.InvocationTargetException
+                  if e.getCause.isInstanceOf[NoSuchMethodError] =>
+                cancel(
+                  "Iceberg's RewritePositionDeleteFiles action is incompatible 
with this " +
+                    s"Spark/Iceberg combination: ${e.getCause.getMessage}")
+            }
+
+          // The action reads the POSITION_DELETES metadata table (different 
schema from the data
+          // table) via SparkStagedScan. Comet must not convert any operator 
in those plans.
+          val cometBearingPlans = plans.filter(_.containsCometOperator)
+          assert(
+            cometBearingPlans.isEmpty,
+            "Expected no Comet operators in RewritePositionDeleteFiles plans, 
but found:\n" +
+              dumpPlans(cometBearingPlans))
+        } finally {
+          spark.sql(s"DROP TABLE IF EXISTS $table")
+        }
+      }
+    }
+  }
+
+  test("binPack rewrite applies positional and equality deletes during 
compaction (MOR)") {
+    assume(icebergAvailable, "Iceberg not available in classpath")
+
+    withTempIcebergDir { warehouseDir =>
+      withIcebergComet(warehouseDir) {
+        val table = s"$catalog.db.deletes_compaction_test"
+        try {
+          spark.sql(s"""
+            CREATE TABLE $table (id INT, value DOUBLE) USING iceberg
+            TBLPROPERTIES (
+              'format-version' = '2',
+              'write.delete.mode' = 'merge-on-read',
+              'write.merge.mode' = 'merge-on-read'
+            )
+          """)
+          // coalesce(1) puts all 3 rows in one data file. Without it, Iceberg 
would split per
+          // VALUES row and DELETE WHERE id=N would be a metadata-only file 
unlink — never
+          // exercising the merge-on-read writer that produces positional 
delete files.
+          spark
+            .createDataFrame(Seq((1, 1.0), (2, 2.0), (3, 3.0)))
+            .toDF("id", "value")
+            .coalesce(1)
+            .write
+            .format("iceberg")
+            .mode("append")
+            .saveAsTable(table)
+
+          // Spark DML writes a positional delete (content=1) for id=1.
+          spark.sql(s"DELETE FROM $table WHERE id = 1")
+          // Spark's Iceberg DSv2 connector never emits equality deletes, so 
we drop into the
+          // Iceberg Java API to add an equality delete (content=2) for id=2.
+          writeEqualityDeleteFile(loadIcebergTable(table), "id", 2)
+
+          val rowsBefore = spark.sql(s"SELECT * FROM $table ORDER BY 
id").collect().toSeq
+          assert(
+            rowsBefore.size == 1 && rowsBefore.head.getInt(0) == 3,
+            s"Expected only id=3 visible before rewrite, got $rowsBefore")
+          val deleteCodesBefore = deleteFileContentCodes(table)
+          assert(
+            deleteCodesBefore == Seq(1, 2),
+            "Expected one positional (1) and one equality (2) delete file, " +
+              s"got $deleteCodesBefore")
+          assert(
+            sumDataFileRecords(table) == 3,
+            s"Expected 3 records in data files before rewrite, got 
${sumDataFileRecords(table)}")
+
+          val plans = captureSqlPlans {
+            val rewrittenCount =
+              runRewriteDataFiles(table, invoke(_, "binPack"), 
Map("rewrite-all" -> "true"))
+            assert(
+              rewrittenCount >= 1,
+              s"Expected >= 1 input file rewritten, got $rewrittenCount")
+          }
+
+          val rewritePlans = plans.filter(_.hasNode("AppendData"))
+          assert(rewritePlans.nonEmpty, "Expected at least one rewrite plan")
+          assertReadsAreComet(rewritePlans)
+
+          val rowsAfter = spark.sql(s"SELECT * FROM $table ORDER BY 
id").collect().toSeq
+          assert(
+            rowsAfter == rowsBefore,
+            s"Surviving rows changed across rewrite: before=$rowsBefore, 
after=$rowsAfter")
+          // The strong check: post-rewrite data files physically contain 
exactly the surviving
+          // rows. Without Comet correctly applying both positional AND 
equality deletes during
+          // the rewrite read, the rewritten file would still contain the 
deleted rows and this
+          // would be 3.
+          assert(
+            sumDataFileRecords(table) == 1,
+            "Expected 1 record materialised in data files after rewrite, " +
+              s"got ${sumDataFileRecords(table)}")
+        } finally {
+          spark.sql(s"DROP TABLE IF EXISTS $table")
+        }
+      }
+    }
+  }
+
+  // -- Test driver -----------------------------------------------------------
+
+  private case class RewriteCase(
+      table: String,
+      configureMode: AnyRef => AnyRef,
+      beforeRewrite: AnyRef => Unit = _ => (),
+      verifyDataAfter: Seq[org.apache.spark.sql.Row] => Unit = _ => (),
+      verifyPlans: Seq[CapturedPlan] => Unit)
+
+  /**
+   * End-to-end test driver: creates a multi-file Iceberg table, runs a 
RewriteDataFiles action in
+   * the requested mode, captures the per-group SQL plans, and hands them to 
`verifyPlans`.
+   */
+  private def runRewriteTest(rc: RewriteCase): Unit = {
+    assume(icebergAvailable, "Iceberg not available in classpath")
+
+    withTempIcebergDir { warehouseDir =>
+      withIcebergComet(warehouseDir) {
+        try {
+          createMultiFileTable(rc.table, MultiFileTableSize)
+          rc.beforeRewrite(loadIcebergTable(rc.table))
+
+          val rowsBefore = spark.sql(s"SELECT * FROM ${rc.table} ORDER BY 
id").collect().toSeq
+          val filesBefore = countDataFiles(rc.table)
+
+          val plans = captureSqlPlans {
+            val rewrittenCount = runRewriteDataFiles(rc.table, 
rc.configureMode)
+            assert(
+              rewrittenCount >= MultiFileTableSize,
+              s"Expected the rewrite action to consume >= $MultiFileTableSize 
input files, " +
+                s"got $rewrittenCount")
+          }
+
+          val rowsAfterById = spark.sql(s"SELECT * FROM ${rc.table} ORDER BY 
id").collect().toSeq
+          val rowsAfterFileOrder = spark.sql(s"SELECT * FROM 
${rc.table}").collect().toSeq
+          val filesAfter = countDataFiles(rc.table)
+          assertDataPreserved(rowsBefore, rowsAfterById, filesBefore, 
filesAfter)
+          rc.verifyDataAfter(rowsAfterFileOrder)
+
+          val rewritePlans = plans.filter(_.hasNode("AppendData"))
+          assert(
+            rewritePlans.nonEmpty,
+            "Expected at least one captured plan with AppendData but got 
none.\n" +
+              dumpPlans(plans))
+          rc.verifyPlans(rewritePlans)
+        } catch {
+          case _: ClassNotFoundException =>
+            cancel("Iceberg Actions API not available - requires 
iceberg-spark-runtime")
+        } finally {
+          spark.sql(s"DROP TABLE IF EXISTS ${rc.table}")
+        }
+      }
+    }
+  }
+
+  // -- Plan capture ----------------------------------------------------------
+
+  /**
+   * One captured SQL execution: the human-readable plan string (for failure 
diagnostics) plus the
+   * exact set of plan-node names from `SparkPlanInfo` (for typed assertions). 
We use
+   * `SparkListener` rather than `QueryExecutionListener` because the rewrite 
action uses
+   * `cloneSession()`, and `listenerManager` is per-session — only the 
SparkContext-level event
+   * bus survives the clone.
+   */
+  private case class CapturedPlan(physicalDescription: String, nodeNames: 
Set[String]) {
+    def hasNode(name: String): Boolean = nodeNames.contains(name)
+    def containsCometOperator: Boolean = 
nodeNames.exists(_.startsWith("Comet"))
+  }
+
+  private def captureSqlPlans(body: => Unit): Seq[CapturedPlan] = {
+    val plans = mutable.ArrayBuffer[CapturedPlan]()
+    val listener = new SparkListener {
+      override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
+        case s: SparkListenerSQLExecutionStart =>
+          plans.synchronized {
+            plans += CapturedPlan(s.physicalPlanDescription, 
collectNodeNames(s.sparkPlanInfo))
+          }
+        case _ =>
+      }
+    }
+    spark.sparkContext.addSparkListener(listener)
+    try {
+      body
+      CometListenerBusUtils.waitUntilEmpty(spark.sparkContext)
+    } finally {
+      spark.sparkContext.removeSparkListener(listener)
+    }
+    plans.synchronized { plans.toSeq }
+  }
+
+  private def collectNodeNames(info: SparkPlanInfo): Set[String] = {
+    val acc = mutable.Set[String]()
+    def walk(node: SparkPlanInfo): Unit = {
+      acc += node.nodeName
+      node.children.foreach(walk)
+    }
+    walk(info)
+    acc.toSet
+  }
+
+  // -- Plan assertions -------------------------------------------------------
+
+  private def assertReadsAreComet(plans: Seq[CapturedPlan]): Unit = {
+    assertOperator(plans, "CometIcebergNativeScan")
+    val withFallback =
+      plans.count(p => p.hasNode("BatchScan") && 
!p.hasNode("CometIcebergNativeScan"))
+    assert(
+      withFallback == 0,
+      s"$withFallback of ${plans.size} plans fell back to a non-Comet 
BatchScan.\n" +
+        dumpPlans(plans))
+  }
+
+  private def assertOperator(plans: Seq[CapturedPlan], operator: String): Unit 
= {
+    val matching = plans.count(_.hasNode(operator))
+    assert(
+      matching == plans.size,
+      s"Expected every plan to contain $operator; only $matching of 
${plans.size} did.\n" +
+        dumpPlans(plans))
+  }
+
+  private def dumpPlans(plans: Seq[CapturedPlan]): String =
+    plans.zipWithIndex
+      .map { case (p, i) => s"--- plan $i ---\n${p.physicalDescription}" }
+      .mkString("\n\n")
+
+  /**
+   * Verifies the rewrite preserved every row and reduced the data-file count. 
Both checks are
+   * critical: a buggy native scan dropping rows or rewriting them incorrectly 
would still produce
+   * a syntactically valid Iceberg snapshot, so plan-shape checks alone aren't 
sufficient.
+   */
+  private def assertDataPreserved(
+      rowsBefore: Seq[org.apache.spark.sql.Row],
+      rowsAfter: Seq[org.apache.spark.sql.Row],
+      filesBefore: Long,
+      filesAfter: Long): Unit = {
+    assert(
+      rowsAfter.size == rowsBefore.size,
+      s"Row count changed across rewrite: before=${rowsBefore.size}, 
after=${rowsAfter.size}")
+    assert(
+      rowsAfter == rowsBefore,
+      "Row content changed across rewrite (rows compared in id order).")
+    assert(
+      filesAfter < filesBefore,
+      s"Expected data-file count to decrease across rewrite, got $filesBefore 
-> $filesAfter")
+  }
+
+  private def countDataFiles(table: String): Long =
+    spark.sql(s"SELECT COUNT(*) FROM $table.files").collect()(0).getLong(0)
+
+  /** Returns the sorted `content` codes of delete files: 1 = positional, 2 = 
equality. */
+  private def deleteFileContentCodes(table: String): Seq[Int] =
+    spark
+      .sql(s"SELECT content FROM $table.delete_files ORDER BY content")
+      .collect()
+      .map(_.getInt(0))
+      .toSeq
+
+  /**
+   * Total records across all data files (excluding delete files) in the 
current snapshot. Used to
+   * verify whether deletes were physically materialised by the rewrite vs. 
only applied at read
+   * time on a still-3-row data file.
+   */
+  private def sumDataFileRecords(table: String): Long =
+    spark
+      .sql(s"SELECT COALESCE(SUM(record_count), 0) FROM $table.data_files")
+      .collect()(0)
+      .getLong(0)
+
+  /**
+   * Asserts the given rows are sorted ascending by id when read in file 
order. Used by the sort
+   * rewrite test to confirm the action actually produced sorted output, not 
just that a CometSort
+   * appeared in the plan.
+   */
+  private def assertSortedById(rows: Seq[org.apache.spark.sql.Row]): Unit = {
+    val ids = rows.map(_.getInt(0))
+    assert(
+      ids == ids.sorted,
+      s"Expected rows to be sorted by id after sort rewrite, got first 20 ids: 
${ids.take(20)}")
+  }
+
+  // -- Iceberg / Spark fixtures ---------------------------------------------
+
+  private def withIcebergComet(warehouseDir: File)(body: => Unit): Unit =
+    withSQLConf(
+      s"spark.sql.catalog.$catalog" -> "org.apache.iceberg.spark.SparkCatalog",
+      s"spark.sql.catalog.$catalog.type" -> "hadoop",
+      s"spark.sql.catalog.$catalog.warehouse" -> warehouseDir.getAbsolutePath,
+      CometConf.COMET_ENABLED.key -> "true",
+      CometConf.COMET_EXEC_ENABLED.key -> "true",
+      CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true")(body)
+
+  /** Creates an Iceberg table with `numFiles` separate appends, each 
producing one data file. */
+  private def createMultiFileTable(table: String, numFiles: Int): Unit = {
+    spark.sql(s"CREATE TABLE $table (id INT, value DOUBLE) USING iceberg")
+    (0 until numFiles).foreach { i =>
+      spark
+        .range(i * 100L, (i + 1) * 100L)
+        .selectExpr("CAST(id AS INT) as id", "CAST(id * 1.5 AS DOUBLE) as 
value")
+        .coalesce(1)
+        .write
+        .format("iceberg")
+        .mode("append")
+        .saveAsTable(table)
+    }
+  }
+
+  // -- Iceberg reflection (the API isn't compile-time visible here) ----------
+
+  private def loadIcebergTable(name: String): AnyRef =
+    IcebergReflection
+      .loadClass("org.apache.iceberg.spark.Spark3Util")
+      .getMethod("loadIcebergTable", classOf[SparkSession], classOf[String])
+      .invoke(null, spark, name)
+
+  /** Sets a single ASC sort order on the table — required for `action.sort()` 
(no-args). */
+  private def setSortOrderAsc(table: AnyRef, column: String): Unit = {
+    val replace = invoke(table, "replaceSortOrder")
+    val withAsc = invoke(replace, "asc", classOf[String] -> column)
+    invoke(withAsc, "commit")
+  }
+
+  private def runRewriteDataFiles(
+      table: String,
+      configureMode: AnyRef => AnyRef,
+      options: Map[String, String] = Map("min-input-files" -> "2")): Int = {
+    val result = runActionChain(table, "rewriteDataFiles", configureMode, 
options)
+    invoke(result, "rewrittenDataFilesCount").asInstanceOf[Int]
+  }
+
+  /**
+   * Invokes `SparkActions.get(spark).<actionMethodName>(table)`, threads it 
through `configure`
+   * (e.g. `binPack`/`sort`/`zOrder`), applies any extra options, and returns 
the action's
+   * `.execute()` result. The result type depends on the action; callers cast 
/ project as needed.
+   */
+  private def runActionChain(
+      table: String,
+      actionMethodName: String,
+      configure: AnyRef => AnyRef,
+      options: Map[String, String]): AnyRef = {
+    val tableObj = loadIcebergTable(table)
+    val tableIface = IcebergReflection.loadClass("org.apache.iceberg.Table")
+    val actions =
+      
invoke(IcebergReflection.loadClass("org.apache.iceberg.spark.actions.SparkActions"),
 "get")
+    val action = invoke(actions, actionMethodName, tableIface -> tableObj)
+    val configured = configure(action)
+    val withOptions = options.foldLeft(configured) { case (acc, (k, v)) =>
+      invoke(acc, "option", classOf[String] -> k, classOf[String] -> v)
+    }
+    invoke(withOptions, "execute")
+  }
+
+  /**
+   * Writes a single-column equality delete file via the Iceberg Java API and 
commits it to
+   * `table`. Spark's Iceberg DSv2 connector never emits equality deletes from 
DML, so we have to
+   * write them by hand to exercise the full delete surface during a rewrite 
read.
+   *
+   * Modelled on `writeEqDeleteRecord` in Iceberg's own
+   * 
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+   * — same shape, but using the public `GenericAppenderFactory` API rather 
than the
+   * package-private `GenericFileWriterFactory.Builder`, and expressed via 
reflection because
+   * iceberg-data classes aren't compile-time visible from this suite.
+   */
+  private def writeEqualityDeleteFile(
+      table: AnyRef,
+      columnName: String,
+      deleteValue: Any): Unit = {
+    val schemaCls = IcebergReflection.loadClass("org.apache.iceberg.Schema")
+    val tableIface = IcebergReflection.loadClass("org.apache.iceberg.Table")
+    val partitionSpecCls = 
IcebergReflection.loadClass("org.apache.iceberg.PartitionSpec")
+    val structLikeCls = 
IcebergReflection.loadClass("org.apache.iceberg.StructLike")
+    val fileFormatCls = 
IcebergReflection.loadClass("org.apache.iceberg.FileFormat")
+    val encryptedOutputFileCls =
+      
IcebergReflection.loadClass("org.apache.iceberg.encryption.EncryptedOutputFile")
+    val deleteFileCls = 
IcebergReflection.loadClass("org.apache.iceberg.DeleteFile")
+    val parquet = fileFormatCls.getField("PARQUET").get(null)
+
+    val schema = invoke(table, "schema")
+    val spec = invoke(table, "spec")
+    val field = invoke(schema, "findField", classOf[String] -> columnName)
+    val fieldId = invoke(field, "fieldId").asInstanceOf[Int]
+    val eqDeleteRowSchema =
+      invoke(schema, "select", classOf[Array[String]] -> Array(columnName))
+
+    val genericRecord = invoke(
+      IcebergReflection.loadClass("org.apache.iceberg.data.GenericRecord"),
+      "create",
+      schemaCls -> eqDeleteRowSchema)
+    val record = invoke(
+      genericRecord,
+      "copy",
+      classOf[java.util.Map[_, _]] ->
+        java.util.Collections.singletonMap(columnName, 
deleteValue.asInstanceOf[AnyRef]))
+
+    val outputFileFactoryCls =
+      IcebergReflection.loadClass("org.apache.iceberg.io.OutputFileFactory")
+    val builderForMethod = outputFileFactoryCls.getMethod(
+      "builderFor",
+      tableIface,
+      java.lang.Integer.TYPE,
+      java.lang.Long.TYPE)
+    builderForMethod.setAccessible(true)
+    val outputFileFactoryBuilder = builderForMethod.invoke(null, table, 
Int.box(1), Long.box(1L))
+    val outputFileFactoryWithFormat =
+      invoke(outputFileFactoryBuilder, "format", fileFormatCls -> parquet)
+    val outputFileFactory = invoke(outputFileFactoryWithFormat, "build")
+    val encryptedOutputFile = invoke(outputFileFactory, "newOutputFile")
+
+    // GenericAppenderFactory(schema, spec, equalityFieldIds, 
eqDeleteRowSchema, posDeleteRowSchema)
+    // is public in 1.5+; we use it instead of the package-private 
GenericFileWriterFactory.Builder.
+    val factoryCls = 
IcebergReflection.loadClass("org.apache.iceberg.data.GenericAppenderFactory")
+    val factoryCtor = factoryCls.getConstructor(
+      schemaCls,
+      partitionSpecCls,
+      classOf[Array[Int]],
+      schemaCls,
+      schemaCls)
+    factoryCtor.setAccessible(true)
+    val factory = factoryCtor
+      .newInstance(schema, spec, Array(fieldId), eqDeleteRowSchema, null)
+      .asInstanceOf[AnyRef]
+
+    val newEqDeleteWriter = factory.getClass.getMethod(
+      "newEqDeleteWriter",
+      encryptedOutputFileCls,
+      fileFormatCls,
+      structLikeCls)
+    newEqDeleteWriter.setAccessible(true)
+    val writer = newEqDeleteWriter.invoke(factory, encryptedOutputFile, 
parquet, null)
+
+    // EqualityDeleteWriter.write(T) erases to write(Object) at runtime.
+    val writeMethod = writer.getClass.getMethod("write", classOf[Object])
+    writeMethod.setAccessible(true)
+    try {
+      writeMethod.invoke(writer, record)
+    } finally {
+      val closeMethod = writer.getClass.getMethod("close")
+      closeMethod.setAccessible(true)
+      closeMethod.invoke(writer)
+    }
+    val deleteFile = invoke(writer, "toDeleteFile")
+
+    val rowDelta = invoke(table, "newRowDelta")
+    val rowDeltaWithDelete = invoke(rowDelta, "addDeletes", deleteFileCls -> 
deleteFile)
+    invoke(rowDeltaWithDelete, "commit")
+  }
+
+  /**
+   * Reflective invocation. Pass an instance for instance methods, or a 
`Class[_]` (e.g. via
+   * `IcebergReflection.loadClass`) for static methods. Each `(paramType, 
value)` pair specifies
+   * one argument's declared type and value. setAccessible is forced because 
some Iceberg chain
+   * methods (notably in pre-1.6 BaseSparkAction) live on package-private 
classes, which trigger
+   * IllegalAccessException on JDK 11+ even for public methods unless 
reflective access is forced.
+   */
+  private def invoke(
+      classOrTarget: AnyRef,
+      methodName: String,
+      args: (Class[_], Any)*): AnyRef = {
+    val (clazz, target) = classOrTarget match {
+      case c: Class[_] => (c, null)
+      case t => (t.getClass, t)
+    }
+    val (paramTypes, values) = args.unzip
+    val method = clazz.getMethod(methodName, paramTypes: _*)
+    method.setAccessible(true)
+    method.invoke(target, values.map(_.asInstanceOf[AnyRef]): _*)
+  }
+}
diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergTestBase.scala 
b/spark/src/test/scala/org/apache/comet/CometIcebergTestBase.scala
new file mode 100644
index 000000000..eac78442b
--- /dev/null
+++ b/spark/src/test/scala/org/apache/comet/CometIcebergTestBase.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet
+
+import java.io.File
+import java.nio.file.Files
+
+import org.apache.comet.iceberg.IcebergReflection
+
+/**
+ * Shared fixtures for Iceberg-backed test suites: classpath probe and 
per-test temp directory.
+ */
+trait CometIcebergTestBase {
+
+  protected def icebergAvailable: Boolean =
+    try {
+      IcebergReflection.loadClass("org.apache.iceberg.catalog.Catalog")
+      true
+    } catch {
+      case _: ClassNotFoundException => false
+    }
+
+  protected def withTempIcebergDir(f: File => Unit): Unit = {
+    val dir = Files.createTempDirectory("comet-iceberg-test").toFile
+    try f(dir)
+    finally deleteRecursively(dir)
+  }
+
+  private def deleteRecursively(file: File): Unit = {
+    if (file.isDirectory) file.listFiles().foreach(deleteRecursively)
+    file.delete()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to