Shekharrajak commented on code in PR #3519:
URL: https://github.com/apache/datafusion-comet/pull/3519#discussion_r2807300733


##########
spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala:
##########
@@ -154,39 +154,59 @@ object IcebergReflection extends Logging {
   /**
    * Gets the tasks from a SparkScan.
    *
-   * The tasks() method is protected in SparkScan, requiring reflection to 
access.
+   * SparkBatchQueryScan (via SparkPartitioningAwareScan) has tasks() method. 
SparkStagedScan (via
+   * SparkScan) has taskGroups() - we extract tasks from groups.
    */
   def getTasks(scan: Any): Option[java.util.List[_]] = {
-    try {
-      val tasksMethod = scan.getClass.getSuperclass
-        .getDeclaredMethod("tasks")
-      tasksMethod.setAccessible(true)
-      Some(tasksMethod.invoke(scan).asInstanceOf[java.util.List[_]])
-    } catch {
-      case e: Exception =>
-        logError(
-          s"Iceberg reflection failure: Failed to get tasks from SparkScan: 
${e.getMessage}")
-        None
+    // Try tasks() first (SparkPartitioningAwareScan hierarchy)
+    findMethodInHierarchy(scan.getClass, "tasks").flatMap { tasksMethod =>
+      try {
+        return Some(tasksMethod.invoke(scan).asInstanceOf[java.util.List[_]])
+      } catch {
+        case _: Exception => None
+      }
+    }
+
+    // Fall back to taskGroups() (SparkScan hierarchy - used by 
SparkStagedScan)
+    findMethodInHierarchy(scan.getClass, "taskGroups").flatMap { 
taskGroupsMethod =>
+      try {
+        val taskGroups = 
taskGroupsMethod.invoke(scan).asInstanceOf[java.util.List[_]]
+        // Extract individual tasks from each ScanTaskGroup
+        val allTasks = new java.util.ArrayList[Any]()
+        val iter = taskGroups.iterator()
+        while (iter.hasNext) {
+          val group = iter.next()
+          val tasksMethod = group.getClass.getMethod("tasks")
+          val groupTasks = 
tasksMethod.invoke(group).asInstanceOf[java.lang.Iterable[_]]
+          groupTasks.forEach(task => allTasks.add(task))
+        }
+        Some(allTasks.asInstanceOf[java.util.List[_]])
+      } catch {
+        case e: Exception =>
+          logError(
+            s"Iceberg reflection failure: Failed to get tasks from SparkScan: 
${e.getMessage}")
+          None
+      }
     }
   }
 
   /**
    * Gets the filter expressions from a SparkScan.
    *
-   * The filterExpressions() method is protected in SparkScan.
+   * The filterExpressions() method is protected in SparkScan. Uses 
findMethodInHierarchy to
+   * support both SparkBatchQueryScan and SparkStagedScan.
    */
   def getFilterExpressions(scan: Any): Option[java.util.List[_]] = {
-    try {
-      val filterExpressionsMethod = scan.getClass.getSuperclass.getSuperclass
-        .getDeclaredMethod("filterExpressions")
-      filterExpressionsMethod.setAccessible(true)
-      
Some(filterExpressionsMethod.invoke(scan).asInstanceOf[java.util.List[_]])
-    } catch {
-      case e: Exception =>
-        logError(
-          "Iceberg reflection failure: Failed to get filter expressions from 
SparkScan: " +
-            s"${e.getMessage}")
-        None
+    findMethodInHierarchy(scan.getClass, "filterExpressions").flatMap { 
filterExpressionsMethod =>

Review Comment:
   previously we were assuming a fixed Iceberg class hierarchy, this 
findMethodInHierarchy walks up the class tree - better approach. 
   
   For compaction to work, we need to extract FileScanTask objects from the 
scan. Different Iceberg scan types expose tasks differently:
   
   SparkBatchQueryScan ->  tasks() method
   SparkStagedScan -> taskGroups() method (returns groups, need to extract 
tasks from each)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to