This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 75aed566018 [SPARK-45652][SQL][3.4] SPJ: Handle empty input partitions 
after dynamic filtering
75aed566018 is described below

commit 75aed566018046c23200e3bb7188d29ece321b26
Author: Chao Sun <[email protected]>
AuthorDate: Thu Oct 26 13:54:24 2023 -0700

    [SPARK-45652][SQL][3.4] SPJ: Handle empty input partitions after dynamic 
filtering
    
    This is a cherry-pick of https://github.com/apache/spark/pull/43531 to 
branch-3.4, with a few modifications.
    
    ### What changes were proposed in this pull request?
    
    Handle the case when input partitions become empty after V2 dynamic 
filtering, when SPJ is enabled.
    
    ### Why are the changes needed?
    
    Current in the situation when all input partitions are filtered out via 
dynamic filtering, SPJ doesn't work but instead will panic:
    ```
    java.util.NoSuchElementException: None.get
            at scala.None$.get(Option.scala:529)
            at scala.None$.get(Option.scala:527)
            at 
org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions$lzycompute(BatchScanExec.scala:108)
            at 
org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions(BatchScanExec.scala:65)
            at 
org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD$lzycompute(BatchScanExec.scala:136)
            at 
org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD(BatchScanExec.scala:135)
            at 
org.apache.spark.sql.boson.BosonBatchScanExec.inputRDD$lzycompute(BosonBatchScanExec.scala:28)
    ```
    
    This is because the `groupPartitions` method will return `None` in this 
scenario. We should handle the case.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added a test case for this.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43539 from sunchao/SPARK-45652-branch-3.4.
    
    Authored-by: Chao Sun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../execution/datasources/v2/BatchScanExec.scala   |  5 +--
 .../connector/KeyGroupedPartitioningSuite.scala    | 42 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
index 02821d10d50..55f4a712410 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -105,7 +105,7 @@ case class BatchScanExec(
                 "partition values that are not present in the original 
partitioning.")
           }
 
-          groupPartitions(newPartitions).get.map(_._2)
+          groupPartitions(newPartitions).getOrElse(Seq.empty).map(_._2)
 
         case _ =>
           // no validation is needed as the data source did not report any 
specific partitioning
@@ -148,7 +148,8 @@ case class BatchScanExec(
                   
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
                   "is enabled")
 
-            val groupedPartitions = 
groupPartitions(finalPartitions.map(_.head), true).get
+            val groupedPartitions = 
groupPartitions(finalPartitions.map(_.head), true)
+                .getOrElse(Seq.empty)
 
             // This means the input partitions are not grouped by partition 
values. We'll need to
             // check `groupByPartitionValues` and decide whether to group and 
replicate splits
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index 0c3a1930dc9..cf76f6ca32c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -1093,4 +1093,46 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
       }
     }
   }
+
+  test("SPARK-45652: SPJ should handle empty partition after dynamic 
filtering") {
+    withSQLConf(
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+      SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
+      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10") {
+      val items_partitions = Array(identity("id"))
+      createTable(items, items_schema, items_partitions)
+      sql(s"INSERT INTO testcat.ns.$items VALUES " +
+          s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+          s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
+          s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
+          s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
+          s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
+
+      val purchases_partitions = Array(identity("item_id"))
+      createTable(purchases, purchases_schema, purchases_partitions)
+      sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
+          s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
+          s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
+          s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
+          s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
+          s"(3, 19.5, cast('2020-02-01' as timestamp))")
+
+      Seq(true, false).foreach { pushDownValues =>
+        Seq(true, false).foreach { partiallyClustered => {
+          withSQLConf(
+            SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key 
->
+                partiallyClustered.toString,
+            SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> 
pushDownValues.toString) {
+            // The dynamic filtering effectively filtered out all the 
partitions
+            val df = sql(s"SELECT p.price from testcat.ns.$items i, 
testcat.ns.$purchases p " +
+                "WHERE i.id = p.item_id AND i.price > 50.0")
+            checkAnswer(df, Seq.empty)
+          }
+        }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to