This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new e4731e9d3b44 [SPARK-45974][SQL] Add scan.filterAttributes non-empty judgment for RowLevelOperationRuntimeGroupFiltering e4731e9d3b44 is described below commit e4731e9d3b4443f79a23e7d4bf5b749b54f2e1bb Author: wforget <643348...@qq.com> AuthorDate: Sun Nov 26 23:28:52 2023 +0800 [SPARK-45974][SQL] Add scan.filterAttributes non-empty judgment for RowLevelOperationRuntimeGroupFiltering ### What changes were proposed in this pull request? Add scan.filterAttributes non-empty judgment for RowLevelOperationRuntimeGroupFiltering. ### Why are the changes needed? When scan.filterAttributes is empty, an invalid dynamic pruning condition will be generated in RowLevelOperationRuntimeGroupFiltering. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added test case ### Was this patch authored or co-authored using generative AI tooling? No Closes #43869 from wForget/SPARK-45974. Authored-by: wforget <643348...@qq.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit ade861d19910df724d9233df98c059ff9d57f795) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../RowLevelOperationRuntimeGroupFiltering.scala | 4 ++- .../sql/connector/MergeIntoTableSuiteBase.scala | 32 ++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala index 7360349284ec..479e9065c071 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala @@ -50,7 +50,8 @@ class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPla // apply special dynamic filtering only for group-based row-level operations case GroupBasedRowLevelOperation(replaceData, _, Some(cond), DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _)) - if conf.runtimeRowLevelOperationGroupFilterEnabled && cond != TrueLiteral => + if conf.runtimeRowLevelOperationGroupFilterEnabled && cond != TrueLiteral + && scan.filterAttributes().nonEmpty => // use reference equality on scan to find required scan relations val newQuery = replaceData.query transformUp { @@ -115,6 +116,7 @@ class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPla matchingRowsPlan: LogicalPlan, buildKeys: Seq[Attribute], pruningKeys: Seq[Attribute]): Expression = { + assert(buildKeys.nonEmpty && pruningKeys.nonEmpty) val buildQuery = Aggregate(buildKeys, buildKeys, matchingRowsPlan) DynamicPruningExpression( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index e7555c23fa4f..5668e5981910 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -32,6 +32,38 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase { import testImplicits._ + test("SPARK-45974: merge into non filter attributes table") { + val tableName: String = "cat.ns1.non_partitioned_table" + withTable(tableName) { + withTempView("source") { + val sourceRows = Seq( + (1, 100, "hr"), + (2, 200, "finance"), + (3, 300, "hr")) + sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source") + + sql(s"CREATE TABLE $tableName (pk INT NOT NULL, salary INT, dep STRING)".stripMargin) + + val df = sql( + s"""MERGE INTO $tableName t + |USING (select * from source) s + |ON t.pk = s.pk + |WHEN MATCHED THEN + | UPDATE SET t.salary = s.salary + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin) + + checkAnswer( + sql(s"SELECT * FROM $tableName"), + Seq( + Row(1, 100, "hr"), // insert + Row(2, 200, "finance"), // insert + Row(3, 300, "hr"))) // insert + } + } + } + test("merge into empty table with NOT MATCHED clause") { withTempView("source") { createTable("pk INT NOT NULL, salary INT, dep STRING") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org