This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ade861d19910 [SPARK-45974][SQL] Add scan.filterAttributes non-empty 
judgment for RowLevelOperationRuntimeGroupFiltering
ade861d19910 is described below

commit ade861d19910df724d9233df98c059ff9d57f795
Author: wforget <643348...@qq.com>
AuthorDate: Sun Nov 26 23:28:52 2023 +0800

    [SPARK-45974][SQL] Add scan.filterAttributes non-empty judgment for 
RowLevelOperationRuntimeGroupFiltering
    
    ### What changes were proposed in this pull request?
    
    Add scan.filterAttributes non-empty judgment for 
RowLevelOperationRuntimeGroupFiltering.
    
    ### Why are the changes needed?
    
    When scan.filterAttributes is empty, an invalid dynamic pruning condition 
will be generated in RowLevelOperationRuntimeGroupFiltering.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    added test case
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43869 from wForget/SPARK-45974.
    
    Authored-by: wforget <643348...@qq.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../RowLevelOperationRuntimeGroupFiltering.scala   |  4 ++-
 .../sql/connector/MergeIntoTableSuiteBase.scala    | 32 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
index b8288c636c38..7c28f91ee1cc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
@@ -51,7 +51,8 @@ class 
RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPla
     // apply special dynamic filtering only for group-based row-level 
operations
     case GroupBasedRowLevelOperation(replaceData, _, Some(cond),
         DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _))
-        if conf.runtimeRowLevelOperationGroupFilterEnabled && cond != 
TrueLiteral =>
+        if conf.runtimeRowLevelOperationGroupFilterEnabled && cond != 
TrueLiteral
+          && scan.filterAttributes().nonEmpty =>
 
       // use reference equality on scan to find required scan relations
       val newQuery = replaceData.query transformUp {
@@ -116,6 +117,7 @@ class 
RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPla
       matchingRowsPlan: LogicalPlan,
       buildKeys: Seq[Attribute],
       pruningKeys: Seq[Attribute]): Expression = {
+    assert(buildKeys.nonEmpty && pruningKeys.nonEmpty)
 
     val buildQuery = Aggregate(buildKeys, buildKeys, matchingRowsPlan)
     DynamicPruningExpression(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index e7555c23fa4f..5668e5981910 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -32,6 +32,38 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase {
 
   import testImplicits._
 
+  test("SPARK-45974: merge into non filter attributes table") {
+    val tableName: String = "cat.ns1.non_partitioned_table"
+    withTable(tableName) {
+      withTempView("source") {
+        val sourceRows = Seq(
+          (1, 100, "hr"),
+          (2, 200, "finance"),
+          (3, 300, "hr"))
+        sourceRows.toDF("pk", "salary", 
"dep").createOrReplaceTempView("source")
+
+        sql(s"CREATE TABLE $tableName (pk INT NOT NULL, salary INT, dep 
STRING)".stripMargin)
+
+        val df = sql(
+          s"""MERGE INTO $tableName t
+             |USING (select * from source) s
+             |ON t.pk = s.pk
+             |WHEN MATCHED THEN
+             | UPDATE SET t.salary = s.salary
+             |WHEN NOT MATCHED THEN
+             | INSERT *
+             |""".stripMargin)
+
+        checkAnswer(
+          sql(s"SELECT * FROM $tableName"),
+          Seq(
+            Row(1, 100, "hr"), // insert
+            Row(2, 200, "finance"), // insert
+            Row(3, 300, "hr"))) // insert
+      }
+    }
+  }
+
   test("merge into empty table with NOT MATCHED clause") {
     withTempView("source") {
       createTable("pk INT NOT NULL, salary INT, dep STRING")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to