This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a26b46fde [spark] support subquery for spark update/delete (#3162)
a26b46fde is described below
commit a26b46fde3e411e53426fa0b2c349c234661e862
Author: Yann Byron <[email protected]>
AuthorDate: Sun Apr 7 14:19:22 2024 +0800
[spark] support subquery for spark update/delete (#3162)
---
.../catalyst/analysis/PaimonDeleteTable.scala | 5 --
.../catalyst/analysis/PaimonDeleteTable.scala | 5 --
.../catalyst/analysis/PaimonUpdateTable.scala | 3 -
.../commands/DeleteFromPaimonTableCommand.scala | 3 +-
.../paimon/spark/sql/DeleteFromTableTest.scala | 72 ++++++++++++++++++++--
.../apache/paimon/spark/sql/UpdateTableTest.scala | 54 ++++++++++++++--
6 files changed, 119 insertions(+), 23 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 7da432d56..1d6aa1a8a 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -38,11 +38,6 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with
RowLevelHelper {
table.getTable match {
case paimonTable: FileStoreTable =>
- val primaryKeys = paimonTable.primaryKeys().asScala
- if (primaryKeys.isEmpty) {
- condition.foreach(checkSubquery)
- }
-
val relation = PaimonRelation.getPaimonRelation(d.table)
DeleteFromPaimonTableCommand(relation, paimonTable,
condition.getOrElse(TrueLiteral))
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 7b0f0cb64..578b4e4dc 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -37,11 +37,6 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with
RowLevelHelper {
table.getTable match {
case paimonTable: FileStoreTable =>
- val primaryKeys = paimonTable.primaryKeys().asScala
- if (primaryKeys.isEmpty) {
- checkSubquery(condition)
- }
-
val relation = PaimonRelation.getPaimonRelation(d.table)
DeleteFromPaimonTableCommand(relation, paimonTable, condition)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
index e369c46e2..123c67a2f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
@@ -42,9 +42,6 @@ object PaimonUpdateTable
table.getTable match {
case paimonTable: FileStoreTable =>
val primaryKeys = paimonTable.primaryKeys().asScala
- if (primaryKeys.isEmpty) {
- condition.foreach(checkSubquery)
- }
if (!validUpdateAssignment(u.table.outputSet, primaryKeys,
assignments)) {
throw new RuntimeException("Can't update the primary key
column.")
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index bd73c8fc5..457467da1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -33,13 +33,12 @@ import org.apache.spark.sql.Utils.createDataset
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Not}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter, SupportsSubquery}
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.lit
import java.util.{Collections, UUID}
import scala.collection.JavaConverters._
-import scala.util.Try
case class DeleteFromPaimonTableCommand(
relation: DataSourceV2Relation,
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
index 40eb9b9ad..81f0d7753 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -87,7 +87,49 @@ abstract class DeleteFromTableTestBase extends
PaimonSparkTestBase {
)
}
- test("Paimon Delete: append-only table, condition contains subquery") {
+ test("Paimon Delete: append-only table, condition contains IN/NOT IN
subquery") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED
BY (dt)
+ |""".stripMargin)
+
+ spark.sql("""
+ |INSERT INTO T
+ |VALUES (1, 'a', '2024'), (2, 'b', '2024'),
+ | (3, 'c', '2025'), (4, 'd', '2025'),
+ | (5, 'e', '2026'), (6, 'f', '2026')
+ |""".stripMargin)
+
+ Seq(2, 4, 6).toDF("key").createOrReplaceTempView("source")
+
+ spark.sql("""
+ |DELETE FROM T
+ |WHERE id >= (SELECT MAX(key) FROM source)""".stripMargin)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d",
"2025"), (5, "e", "2026"))
+ .toDF()
+ )
+
+ // IN
+ spark.sql("""
+ |DELETE FROM T
+ |WHERE id IN (SELECT key FROM source)""".stripMargin)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a", "2024"), (3, "c", "2025"), (5, "e", "2026")).toDF()
+ )
+
+ // NOT IN: (4, 5, 6)
+ spark.sql("""
+ |DELETE FROM T
+ |WHERE id NOT IN (SELECT key + key % 3 FROM
source)""".stripMargin)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((5, "e", "2026")).toDF()
+ )
+ }
+
+ test("Paimon Delete: append-only table, condition contains EXISTS/NOT EXISTS
subquery") {
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED
BY (dt)
|""".stripMargin)
@@ -97,9 +139,31 @@ abstract class DeleteFromTableTestBase extends
PaimonSparkTestBase {
|VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'),
(4, 'd', '2025')
|""".stripMargin)
- Seq(1, 2).toDF("id").createOrReplaceTempView("updated_ids")
- assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE id IN (SELECT *
FROM updated_ids)"))
- .hasMessageContaining("Subqueries are not supported")
+ Seq(2, 4, 6).toDF("key").createOrReplaceTempView("source")
+
+ // EXISTS
+ spark.sql("""
+ |DELETE FROM T
+ |WHERE EXiSTS (SELECT * FROM source WHERE key >
7)""".stripMargin)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d",
"2025")).toDF())
+
+ // NOT EXISTS
+ spark.sql("""
+ |DELETE FROM T
+ |WHERE NOT EXiSTS (SELECT * FROM source WHERE key >
5)""".stripMargin)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d",
"2025")).toDF()
+ )
+ spark.sql("""
+ |DELETE FROM T
+ |WHERE NOT EXiSTS (SELECT * FROM source WHERE key >
7)""".stripMargin)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ spark.emptyDataFrame
+ )
}
CoreOptions.MergeEngine.values().foreach {
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
index cc95e7a90..c438c03de 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -102,10 +102,56 @@ class UpdateTableTest extends PaimonSparkTestBase {
|VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'),
(4, 'd', '2025')
|""".stripMargin)
- Seq(1, 2).toDF("id").createOrReplaceTempView("updated_ids")
- assertThatThrownBy(
- () => spark.sql("UPDATE T set name = 'in_new' WHERE id IN (SELECT * FROM
updated_ids)"))
- .hasMessageContaining("Subqueries are not supported")
+ Seq(2, 4, 6).toDF("key").createOrReplaceTempView("source")
+
+ spark.sql("""
+ |UPDATE T
+ |SET name = concat(substring(name, 0, 1), '2')
+ |WHERE id < (SELECT MIN(key) FROM source)""".stripMargin)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a2", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d",
"2025")).toDF()
+ )
+
+ // EXISTS
+ spark.sql("""
+ |UPDATE T
+ |SET name = concat(substring(name, 0, 1), '3')
+ |WHERE EXiSTS (SELECT * FROM source WHERE key >
5)""".stripMargin)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a3", "2024"), (2, "b3", "2024"), (3, "c3", "2025"), (4, "d3",
"2025")).toDF()
+ )
+
+ // NOT EXISTS
+ spark.sql("""
+ |UPDATE T
+ |SET name = concat(substring(name, 0, 1), '4')
+ |WHERE NOT EXiSTS (SELECT * FROM source WHERE key >
5)""".stripMargin)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a3", "2024"), (2, "b3", "2024"), (3, "c3", "2025"), (4, "d3",
"2025")).toDF()
+ )
+
+ // IN
+ spark.sql("""
+ |UPDATE T
+ |SET name = concat(substring(name, 0, 1), '5')
+ |WHERE id IN (SELECT key FROM source)""".stripMargin)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a3", "2024"), (2, "b5", "2024"), (3, "c3", "2025"), (4, "d5",
"2025")).toDF()
+ )
+
+ // NOT IN
+ spark.sql("""
+ |UPDATE T
+ |SET name = concat(substring(name, 0, 1), '6')
+ |WHERE id NOT IN (SELECT key FROM source)""".stripMargin)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a6", "2024"), (2, "b5", "2024"), (3, "c6", "2025"), (4, "d5",
"2025")).toDF()
+ )
}
CoreOptions.MergeEngine.values().foreach {