This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 4cab60d8d fix: Expose bucketing information from CometNativeScanExec 
(#3319) (#3437)
4cab60d8d is described below

commit 4cab60d8daf59993685a63061d9472d31bc923b3
Author: Andy Grove <[email protected]>
AuthorDate: Fri Feb 6 19:12:16 2026 -0700

    fix: Expose bucketing information from CometNativeScanExec (#3319) (#3437)
---
 dev/diffs/3.4.3.diff |  32 +++++++++-------
 dev/diffs/3.5.8.diff | 103 ++++++++++-----------------------------------------
 dev/diffs/4.0.1.diff |  26 ++++++++-----
 3 files changed, 55 insertions(+), 106 deletions(-)

diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff
index 4fd1d3ca4..020588843 100644
--- a/dev/diffs/3.4.3.diff
+++ b/dev/diffs/3.4.3.diff
@@ -1150,7 +1150,7 @@ index 02990a7a40d..bddf5e1ccc2 100644
                }
              }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
-index cfc8b2cc845..c6fcfd7bd08 100644
+index cfc8b2cc845..c4be7eb3731 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
@@ -2385,7 +2385,7 @@ index d083cac48ff..3c11bcde807 100644
    import testImplicits._
  
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
-index 266bb343526..e58a2f49eb9 100644
+index 266bb343526..f8ad838e2b2 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 @@ -19,15 +19,18 @@ package org.apache.spark.sql.sources
@@ -2409,7 +2409,7 @@ index 266bb343526..e58a2f49eb9 100644
  import org.apache.spark.sql.execution.joins.SortMergeJoinExec
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.internal.SQLConf
-@@ -101,12 +104,20 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -101,12 +104,22 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
      }
    }
  
@@ -2419,6 +2419,7 @@ index 266bb343526..e58a2f49eb9 100644
 +    val fileScan = collect(plan) {
 +      case f: FileSourceScanExec => f
 +      case f: CometScanExec => f
++      case f: CometNativeScanExec => f
 +    }
      assert(fileScan.nonEmpty, plan)
      fileScan.head
@@ -2427,12 +2428,13 @@ index 266bb343526..e58a2f49eb9 100644
 +  private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan) 
match {
 +    case fs: FileSourceScanExec => fs.bucketedScan
 +    case bs: CometScanExec => bs.bucketedScan
++    case ns: CometNativeScanExec => ns.bucketedScan
 +  }
 +
    // To verify if the bucket pruning works, this function checks two 
conditions:
    //   1) Check if the pruned buckets (before filtering) are empty.
    //   2) Verify the final result is the same as the expected one
-@@ -155,7 +166,8 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -155,7 +168,8 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
            val planWithoutBucketedScan = 
bucketedDataFrame.filter(filterCondition)
              .queryExecution.executedPlan
            val fileScan = getFileScan(planWithoutBucketedScan)
@@ -2442,7 +2444,7 @@ index 266bb343526..e58a2f49eb9 100644
  
            val bucketColumnType = 
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
            val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
-@@ -451,28 +463,54 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -451,28 +465,54 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
          val joinOperator = if 
(joined.sqlContext.conf.adaptiveExecutionEnabled) {
            val executedPlan =
              
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -2505,7 +2507,7 @@ index 266bb343526..e58a2f49eb9 100644
            s"expected sort in the right child to be $sortRight but 
found\n${joinOperator.right}")
  
          // check the output partitioning
-@@ -835,11 +873,11 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -835,11 +875,11 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
        df1.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table")
  
        val scanDF = spark.table("bucketed_table").select("j")
@@ -2519,7 +2521,7 @@ index 266bb343526..e58a2f49eb9 100644
        checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
      }
    }
-@@ -894,7 +932,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -894,7 +934,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") 
{
@@ -2530,7 +2532,7 @@ index 266bb343526..e58a2f49eb9 100644
        SQLConf.SHUFFLE_PARTITIONS.key -> "5",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7")  {
        val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
-@@ -913,7 +954,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -913,7 +956,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than 
bucket number") {
@@ -2541,7 +2543,7 @@ index 266bb343526..e58a2f49eb9 100644
        SQLConf.SHUFFLE_PARTITIONS.key -> "9",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10")  {
  
-@@ -943,7 +987,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -943,7 +989,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("bucket coalescing eliminates shuffle") {
@@ -2552,7 +2554,7 @@ index 266bb343526..e58a2f49eb9 100644
        SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
        // The side with bucketedTableTestSpec1 will be coalesced to have 4 
output partitions.
-@@ -1026,15 +1073,23 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
+@@ -1026,15 +1075,26 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
              expectedNumShuffles: Int,
              expectedCoalescedNumBuckets: Option[Int]): Unit = {
            val plan = sql(query).queryExecution.executedPlan
@@ -2565,6 +2567,7 @@ index 266bb343526..e58a2f49eb9 100644
            val scans = plan.collect {
              case f: FileSourceScanExec if 
f.optionalNumCoalescedBuckets.isDefined => f
 +            case b: CometScanExec if b.optionalNumCoalescedBuckets.isDefined 
=> b
++            case b: CometNativeScanExec if 
b.optionalNumCoalescedBuckets.isDefined => b
            }
            if (expectedCoalescedNumBuckets.isDefined) {
              assert(scans.length == 1)
@@ -2574,6 +2577,8 @@ index 266bb343526..e58a2f49eb9 100644
 +                assert(f.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
 +              case b: CometScanExec =>
 +                assert(b.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
++              case b: CometNativeScanExec =>
++                assert(b.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
 +            }
            } else {
              assert(scans.isEmpty)
@@ -2604,18 +2609,18 @@ index b5f6d2f9f68..277784a92af 100644
  
    protected override lazy val sql = spark.sql _
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
-index 1f55742cd67..42377f7cf26 100644
+index 1f55742cd67..f20129d9dd8 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
 @@ -20,6 +20,7 @@ package org.apache.spark.sql.sources
  import org.apache.spark.sql.QueryTest
  import org.apache.spark.sql.catalyst.expressions.AttributeReference
  import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-+import org.apache.spark.sql.comet.CometScanExec
++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
  import org.apache.spark.sql.execution.FileSourceScanExec
  import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
  import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
-@@ -71,7 +72,10 @@ abstract class DisableUnnecessaryBucketedScanSuite
+@@ -71,7 +72,11 @@ abstract class DisableUnnecessaryBucketedScanSuite
  
      def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): 
Unit = {
        val plan = sql(query).queryExecution.executedPlan
@@ -2623,6 +2628,7 @@ index 1f55742cd67..42377f7cf26 100644
 +      val bucketedScan = collect(plan) {
 +        case s: FileSourceScanExec if s.bucketedScan => s
 +        case s: CometScanExec if s.bucketedScan => s
++        case s: CometNativeScanExec if s.bucketedScan => s
 +      }
        assert(bucketedScan.length == expectedNumBucketedScan)
      }
diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff
index 72c41e4f8..beef44549 100644
--- a/dev/diffs/3.5.8.diff
+++ b/dev/diffs/3.5.8.diff
@@ -2587,7 +2587,7 @@ index d083cac48ff..3c11bcde807 100644
    import testImplicits._
  
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
-index 746f289c393..5b9e31c1fa6 100644
+index 746f289c393..7a6a88a9fce 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 @@ -19,16 +19,19 @@ package org.apache.spark.sql.sources
@@ -2612,7 +2612,7 @@ index 746f289c393..5b9e31c1fa6 100644
  import org.apache.spark.sql.execution.joins.SortMergeJoinExec
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.internal.SQLConf
-@@ -102,12 +105,20 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -102,12 +105,22 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
      }
    }
  
@@ -2622,6 +2622,7 @@ index 746f289c393..5b9e31c1fa6 100644
 +    val fileScan = collect(plan) {
 +      case f: FileSourceScanExec => f
 +      case f: CometScanExec => f
++      case f: CometNativeScanExec => f
 +    }
      assert(fileScan.nonEmpty, plan)
      fileScan.head
@@ -2630,12 +2631,13 @@ index 746f289c393..5b9e31c1fa6 100644
 +  private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan) 
match {
 +    case fs: FileSourceScanExec => fs.bucketedScan
 +    case bs: CometScanExec => bs.bucketedScan
++    case ns: CometNativeScanExec => ns.bucketedScan
 +  }
 +
    // To verify if the bucket pruning works, this function checks two 
conditions:
    //   1) Check if the pruned buckets (before filtering) are empty.
    //   2) Verify the final result is the same as the expected one
-@@ -156,7 +167,8 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -156,7 +169,8 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
            val planWithoutBucketedScan = 
bucketedDataFrame.filter(filterCondition)
              .queryExecution.executedPlan
            val fileScan = getFileScan(planWithoutBucketedScan)
@@ -2645,7 +2647,7 @@ index 746f289c393..5b9e31c1fa6 100644
  
            val bucketColumnType = 
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
            val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
-@@ -452,28 +464,54 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -452,28 +466,54 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
          val joinOperator = if 
(joined.sqlContext.conf.adaptiveExecutionEnabled) {
            val executedPlan =
              
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -2708,14 +2710,7 @@ index 746f289c393..5b9e31c1fa6 100644
            s"expected sort in the right child to be $sortRight but 
found\n${joinOperator.right}")
  
          // check the output partitioning
-@@ -831,16 +869,17 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
-     }
-   }
- 
--  test("disable bucketing when the output doesn't contain all bucketing 
columns") {
-+  test("disable bucketing when the output doesn't contain all bucketing 
columns",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
-     withTable("bucketed_table") {
+@@ -836,11 +876,11 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
        df1.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table")
  
        val scanDF = spark.table("bucketed_table").select("j")
@@ -2729,7 +2724,7 @@ index 746f289c393..5b9e31c1fa6 100644
        checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
      }
    }
-@@ -895,7 +934,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -895,7 +935,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("SPARK-29655 Read bucketed tables obeys spark.sql.shuffle.partitions") 
{
@@ -2740,7 +2735,7 @@ index 746f289c393..5b9e31c1fa6 100644
        SQLConf.SHUFFLE_PARTITIONS.key -> "5",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7")  {
        val bucketSpec = Some(BucketSpec(6, Seq("i", "j"), Nil))
-@@ -914,7 +956,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -914,7 +957,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than 
bucket number") {
@@ -2751,7 +2746,7 @@ index 746f289c393..5b9e31c1fa6 100644
        SQLConf.SHUFFLE_PARTITIONS.key -> "9",
        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10")  {
  
-@@ -944,7 +989,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -944,7 +990,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
    }
  
    test("bucket coalescing eliminates shuffle") {
@@ -2762,17 +2757,7 @@ index 746f289c393..5b9e31c1fa6 100644
        SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
        // The side with bucketedTableTestSpec1 will be coalesced to have 4 
output partitions.
-@@ -1013,7 +1061,8 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
-     }
-   }
- 
--  test("bucket coalescing is applied when join expressions match with 
partitioning expressions") {
-+  test("bucket coalescing is applied when join expressions match with 
partitioning expressions",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
-     withTable("t1", "t2", "t3") {
-       df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
-       df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2")
-@@ -1029,15 +1078,21 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
+@@ -1029,15 +1078,24 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
            Seq(true, false).foreach { aqeEnabled =>
              withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> 
aqeEnabled.toString) {
                val plan = sql(query).queryExecution.executedPlan
@@ -2783,6 +2768,7 @@ index 746f289c393..5b9e31c1fa6 100644
                val scans = collect(plan) {
                  case f: FileSourceScanExec if 
f.optionalNumCoalescedBuckets.isDefined => f
 +                case b: CometScanExec if 
b.optionalNumCoalescedBuckets.isDefined => b
++                case b: CometNativeScanExec if 
b.optionalNumCoalescedBuckets.isDefined => b
                }
                if (expectedCoalescedNumBuckets.isDefined) {
                  assert(scans.length == 1)
@@ -2792,6 +2778,8 @@ index 746f289c393..5b9e31c1fa6 100644
 +                    assert(f.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
 +                  case b: CometScanExec =>
 +                    assert(b.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
++                  case b: CometNativeScanExec =>
++                    assert(b.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
 +                }
                } else {
                  assert(scans.isEmpty)
@@ -2821,20 +2809,18 @@ index 6f897a9c0b7..b0723634f68 100644
  
    protected override lazy val sql = spark.sql _
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
-index d675503a8ba..c386a8cb686 100644
+index d675503a8ba..f220892396e 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
-@@ -17,7 +17,8 @@
- 
+@@ -18,6 +18,7 @@
  package org.apache.spark.sql.sources
  
--import org.apache.spark.sql.QueryTest
-+import org.apache.spark.sql.{IgnoreCometNativeDataFusion, QueryTest}
-+import org.apache.spark.sql.comet.CometScanExec
+ import org.apache.spark.sql.QueryTest
++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
  import org.apache.spark.sql.execution.FileSourceScanExec
  import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
  import org.apache.spark.sql.internal.SQLConf
-@@ -68,7 +69,10 @@ abstract class DisableUnnecessaryBucketedScanSuite
+@@ -68,7 +69,11 @@ abstract class DisableUnnecessaryBucketedScanSuite
  
      def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): 
Unit = {
        val plan = sql(query).queryExecution.executedPlan
@@ -2842,60 +2828,11 @@ index d675503a8ba..c386a8cb686 100644
 +      val bucketedScan = collect(plan) {
 +        case s: FileSourceScanExec if s.bucketedScan => s
 +        case s: CometScanExec if s.bucketedScan => s
++        case s: CometNativeScanExec if s.bucketedScan => s
 +      }
        assert(bucketedScan.length == expectedNumBucketedScan)
      }
  
-@@ -83,7 +87,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
-     }
-   }
- 
--  test("SPARK-32859: disable unnecessary bucketed table scan - basic test") {
-+  test("SPARK-32859: disable unnecessary bucketed table scan - basic test",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
-     withTable("t1", "t2", "t3") {
-       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
-       df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
-@@ -124,7 +129,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
-     }
-   }
- 
--  test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins 
test") {
-+  test("SPARK-32859: disable unnecessary bucketed table scan - multiple joins 
test",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
-     withTable("t1", "t2", "t3") {
-       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
-       df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
-@@ -167,7 +173,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
-     }
-   }
- 
--  test("SPARK-32859: disable unnecessary bucketed table scan - multiple 
bucketed columns test") {
-+  test("SPARK-32859: disable unnecessary bucketed table scan - multiple 
bucketed columns test",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
-     withTable("t1", "t2", "t3") {
-       df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
-       df2.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t2")
-@@ -198,7 +205,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
-     }
-   }
- 
--  test("SPARK-32859: disable unnecessary bucketed table scan - other 
operators test") {
-+  test("SPARK-32859: disable unnecessary bucketed table scan - other 
operators test",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
-     withTable("t1", "t2", "t3") {
-       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1")
-       df2.write.format("parquet").bucketBy(8, "i").saveAsTable("t2")
-@@ -239,7 +247,8 @@ abstract class DisableUnnecessaryBucketedScanSuite
-     }
-   }
- 
--  test("Aggregates with no groupby over tables having 1 BUCKET, return 
multiple rows") {
-+  test("Aggregates with no groupby over tables having 1 BUCKET, return 
multiple rows",
-+    
IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3319";))
 {
-     withTable("t1") {
-       withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
-         sql(
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 index 7f6fa2a123e..c778b4e2c48 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff
index 7faed2825..d6694e827 100644
--- a/dev/diffs/4.0.1.diff
+++ b/dev/diffs/4.0.1.diff
@@ -1436,7 +1436,7 @@ index 3eeed2e4175..9f21d547c1c 100644
                }
              }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
-index 2a0ab21ddb0..e8a5a891105 100644
+index 2a0ab21ddb0..6030e7c2b9b 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
@@ -3080,7 +3080,7 @@ index 7838e62013d..8fa09652921 100644
    import testImplicits._
  
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
-index c4b09c4b289..dd5763e8405 100644
+index c4b09c4b289..75c3437788e 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
 @@ -26,10 +26,11 @@ import org.apache.spark.sql.catalyst.expressions
@@ -3097,7 +3097,7 @@ index c4b09c4b289..dd5763e8405 100644
  import org.apache.spark.sql.execution.joins.SortMergeJoinExec
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.internal.SQLConf
-@@ -103,12 +104,20 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -103,12 +104,22 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
      }
    }
  
@@ -3107,6 +3107,7 @@ index c4b09c4b289..dd5763e8405 100644
 +    val fileScan = collect(plan) {
 +      case f: FileSourceScanExec => f
 +      case f: CometScanExec => f
++      case f: CometNativeScanExec => f
 +    }
      assert(fileScan.nonEmpty, plan)
      fileScan.head
@@ -3115,12 +3116,13 @@ index c4b09c4b289..dd5763e8405 100644
 +  private def getBucketScan(plan: SparkPlan): Boolean = getFileScan(plan) 
match {
 +    case fs: FileSourceScanExec => fs.bucketedScan
 +    case bs: CometScanExec => bs.bucketedScan
++    case ns: CometNativeScanExec => ns.bucketedScan
 +  }
 +
    // To verify if the bucket pruning works, this function checks two 
conditions:
    //   1) Check if the pruned buckets (before filtering) are empty.
    //   2) Verify the final result is the same as the expected one
-@@ -157,7 +166,8 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -157,7 +168,8 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
            val planWithoutBucketedScan = 
bucketedDataFrame.filter(filterCondition)
              .queryExecution.executedPlan
            val fileScan = getFileScan(planWithoutBucketedScan)
@@ -3130,7 +3132,7 @@ index c4b09c4b289..dd5763e8405 100644
  
            val bucketColumnType = 
bucketedDataFrame.schema.apply(bucketColumnIndex).dataType
            val rowsWithInvalidBuckets = fileScan.execute().filter(row => {
-@@ -454,28 +464,54 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -454,28 +466,54 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
          val joinOperator = if 
(joined.sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
            val executedPlan =
              
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -3193,7 +3195,7 @@ index c4b09c4b289..dd5763e8405 100644
            s"expected sort in the right child to be $sortRight but 
found\n${joinOperator.right}")
  
          // check the output partitioning
-@@ -838,11 +874,11 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
+@@ -838,11 +876,11 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
        df1.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table")
  
        val scanDF = spark.table("bucketed_table").select("j")
@@ -3207,7 +3209,7 @@ index c4b09c4b289..dd5763e8405 100644
        checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
      }
    }
-@@ -1031,15 +1067,21 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
+@@ -1031,15 +1069,24 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils with Adapti
            Seq(true, false).foreach { aqeEnabled =>
              withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> 
aqeEnabled.toString) {
                val plan = sql(query).queryExecution.executedPlan
@@ -3218,6 +3220,7 @@ index c4b09c4b289..dd5763e8405 100644
                val scans = collect(plan) {
                  case f: FileSourceScanExec if 
f.optionalNumCoalescedBuckets.isDefined => f
 +                case b: CometScanExec if 
b.optionalNumCoalescedBuckets.isDefined => b
++                case b: CometNativeScanExec if 
b.optionalNumCoalescedBuckets.isDefined => b
                }
                if (expectedCoalescedNumBuckets.isDefined) {
                  assert(scans.length == 1)
@@ -3227,6 +3230,8 @@ index c4b09c4b289..dd5763e8405 100644
 +                    assert(f.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
 +                  case b: CometScanExec =>
 +                    assert(b.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
++                  case b: CometNativeScanExec =>
++                    assert(b.optionalNumCoalescedBuckets == 
expectedCoalescedNumBuckets)
 +                }
                } else {
                  assert(scans.isEmpty)
@@ -3256,18 +3261,18 @@ index 95c2fcbd7b5..e2d4a20c5d9 100644
  
    protected override lazy val sql = spark.sql _
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
-index c5c56f081d8..197cd241f48 100644
+index c5c56f081d8..6cc51f93b4f 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
 @@ -18,6 +18,7 @@
  package org.apache.spark.sql.sources
  
  import org.apache.spark.sql.QueryTest
-+import org.apache.spark.sql.comet.CometScanExec
++import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
  import org.apache.spark.sql.execution.FileSourceScanExec
  import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
  import org.apache.spark.sql.internal.SQLConf
-@@ -68,7 +69,10 @@ abstract class DisableUnnecessaryBucketedScanSuite
+@@ -68,7 +69,11 @@ abstract class DisableUnnecessaryBucketedScanSuite
  
      def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): 
Unit = {
        val plan = sql(query).queryExecution.executedPlan
@@ -3275,6 +3280,7 @@ index c5c56f081d8..197cd241f48 100644
 +      val bucketedScan = collect(plan) {
 +        case s: FileSourceScanExec if s.bucketedScan => s
 +        case s: CometScanExec if s.bucketedScan => s
++        case s: CometNativeScanExec if s.bucketedScan => s
 +      }
        assert(bucketedScan.length == expectedNumBucketedScan)
      }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to