This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ade861d19910 [SPARK-45974][SQL] Add scan.filterAttributes non-empty
judgment for RowLevelOperationRuntimeGroupFiltering
ade861d19910 is described below
commit ade861d19910df724d9233df98c059ff9d57f795
Author: wforget <[email protected]>
AuthorDate: Sun Nov 26 23:28:52 2023 +0800
[SPARK-45974][SQL] Add scan.filterAttributes non-empty judgment for
RowLevelOperationRuntimeGroupFiltering
### What changes were proposed in this pull request?
Add scan.filterAttributes non-empty judgment for
RowLevelOperationRuntimeGroupFiltering.
### Why are the changes needed?
When scan.filterAttributes is empty, an invalid dynamic pruning condition
will be generated in RowLevelOperationRuntimeGroupFiltering.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
added test case
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43869 from wForget/SPARK-45974.
Authored-by: wforget <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../RowLevelOperationRuntimeGroupFiltering.scala | 4 ++-
.../sql/connector/MergeIntoTableSuiteBase.scala | 32 ++++++++++++++++++++++
2 files changed, 35 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
index b8288c636c38..7c28f91ee1cc 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
@@ -51,7 +51,8 @@ class
RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPla
// apply special dynamic filtering only for group-based row-level
operations
case GroupBasedRowLevelOperation(replaceData, _, Some(cond),
DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _))
- if conf.runtimeRowLevelOperationGroupFilterEnabled && cond !=
TrueLiteral =>
+ if conf.runtimeRowLevelOperationGroupFilterEnabled && cond !=
TrueLiteral
+ && scan.filterAttributes().nonEmpty =>
// use reference equality on scan to find required scan relations
val newQuery = replaceData.query transformUp {
@@ -116,6 +117,7 @@ class
RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPla
matchingRowsPlan: LogicalPlan,
buildKeys: Seq[Attribute],
pruningKeys: Seq[Attribute]): Expression = {
+ assert(buildKeys.nonEmpty && pruningKeys.nonEmpty)
val buildQuery = Aggregate(buildKeys, buildKeys, matchingRowsPlan)
DynamicPruningExpression(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index e7555c23fa4f..5668e5981910 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -32,6 +32,38 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase {
import testImplicits._
+ test("SPARK-45974: merge into non filter attributes table") {
+ val tableName: String = "cat.ns1.non_partitioned_table"
+ withTable(tableName) {
+ withTempView("source") {
+ val sourceRows = Seq(
+ (1, 100, "hr"),
+ (2, 200, "finance"),
+ (3, 300, "hr"))
+ sourceRows.toDF("pk", "salary",
"dep").createOrReplaceTempView("source")
+
+ sql(s"CREATE TABLE $tableName (pk INT NOT NULL, salary INT, dep
STRING)".stripMargin)
+
+ val df = sql(
+ s"""MERGE INTO $tableName t
+ |USING (select * from source) s
+ |ON t.pk = s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET t.salary = s.salary
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableName"),
+ Seq(
+ Row(1, 100, "hr"), // insert
+ Row(2, 200, "finance"), // insert
+ Row(3, 300, "hr"))) // insert
+ }
+ }
+ }
+
test("merge into empty table with NOT MATCHED clause") {
withTempView("source") {
createTable("pk INT NOT NULL, salary INT, dep STRING")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]