This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a26b46fde [spark] support subquery for spark update/delete (#3162)
a26b46fde is described below

commit a26b46fde3e411e53426fa0b2c349c234661e862
Author: Yann Byron <[email protected]>
AuthorDate: Sun Apr 7 14:19:22 2024 +0800

    [spark] support subquery for spark update/delete (#3162)
---
 .../catalyst/analysis/PaimonDeleteTable.scala      |  5 --
 .../catalyst/analysis/PaimonDeleteTable.scala      |  5 --
 .../catalyst/analysis/PaimonUpdateTable.scala      |  3 -
 .../commands/DeleteFromPaimonTableCommand.scala    |  3 +-
 .../paimon/spark/sql/DeleteFromTableTest.scala     | 72 ++++++++++++++++++++--
 .../apache/paimon/spark/sql/UpdateTableTest.scala  | 54 ++++++++++++++--
 6 files changed, 119 insertions(+), 23 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 7da432d56..1d6aa1a8a 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -38,11 +38,6 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with 
RowLevelHelper {
 
         table.getTable match {
           case paimonTable: FileStoreTable =>
-            val primaryKeys = paimonTable.primaryKeys().asScala
-            if (primaryKeys.isEmpty) {
-              condition.foreach(checkSubquery)
-            }
-
             val relation = PaimonRelation.getPaimonRelation(d.table)
             DeleteFromPaimonTableCommand(relation, paimonTable, 
condition.getOrElse(TrueLiteral))
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 7b0f0cb64..578b4e4dc 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -37,11 +37,6 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with 
RowLevelHelper {
 
         table.getTable match {
           case paimonTable: FileStoreTable =>
-            val primaryKeys = paimonTable.primaryKeys().asScala
-            if (primaryKeys.isEmpty) {
-              checkSubquery(condition)
-            }
-
             val relation = PaimonRelation.getPaimonRelation(d.table)
             DeleteFromPaimonTableCommand(relation, paimonTable, condition)
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
index e369c46e2..123c67a2f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
@@ -42,9 +42,6 @@ object PaimonUpdateTable
         table.getTable match {
           case paimonTable: FileStoreTable =>
             val primaryKeys = paimonTable.primaryKeys().asScala
-            if (primaryKeys.isEmpty) {
-              condition.foreach(checkSubquery)
-            }
             if (!validUpdateAssignment(u.table.outputSet, primaryKeys, 
assignments)) {
               throw new RuntimeException("Can't update the primary key 
column.")
             }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index bd73c8fc5..457467da1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -33,13 +33,12 @@ import org.apache.spark.sql.Utils.createDataset
 import org.apache.spark.sql.catalyst.expressions.{And, Expression, Not}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, SupportsSubquery}
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.functions.lit
 
 import java.util.{Collections, UUID}
 
 import scala.collection.JavaConverters._
-import scala.util.Try
 
 case class DeleteFromPaimonTableCommand(
     relation: DataSourceV2Relation,
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
index 40eb9b9ad..81f0d7753 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -87,7 +87,49 @@ abstract class DeleteFromTableTestBase extends 
PaimonSparkTestBase {
     )
   }
 
-  test("Paimon Delete: append-only table, condition contains subquery") {
+  test("Paimon Delete: append-only table, condition contains IN/NOT IN 
subquery") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED 
BY (dt)
+                 |""".stripMargin)
+
+    spark.sql("""
+                |INSERT INTO T
+                |VALUES (1, 'a', '2024'), (2, 'b', '2024'),
+                | (3, 'c', '2025'), (4, 'd', '2025'),
+                | (5, 'e', '2026'), (6, 'f', '2026')
+                |""".stripMargin)
+
+    Seq(2, 4, 6).toDF("key").createOrReplaceTempView("source")
+
+    spark.sql("""
+                |DELETE FROM T
+                |WHERE id >= (SELECT MAX(key) FROM source)""".stripMargin)
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d", 
"2025"), (5, "e", "2026"))
+        .toDF()
+    )
+
+    // IN
+    spark.sql("""
+                |DELETE FROM T
+                |WHERE id IN (SELECT key FROM source)""".stripMargin)
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a", "2024"), (3, "c", "2025"), (5, "e", "2026")).toDF()
+    )
+
+    // NOT IN: (4, 5, 6)
+    spark.sql("""
+                |DELETE FROM T
+                |WHERE id NOT IN (SELECT key + key % 3 FROM 
source)""".stripMargin)
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((5, "e", "2026")).toDF()
+    )
+  }
+
+  test("Paimon Delete: append-only table, condition contains EXISTS/NOT EXISTS 
subquery") {
     spark.sql(s"""
                  |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED 
BY (dt)
                  |""".stripMargin)
@@ -97,9 +139,31 @@ abstract class DeleteFromTableTestBase extends 
PaimonSparkTestBase {
                 |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), 
(4, 'd', '2025')
                 |""".stripMargin)
 
-    Seq(1, 2).toDF("id").createOrReplaceTempView("updated_ids")
-    assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE id IN (SELECT * 
FROM updated_ids)"))
-      .hasMessageContaining("Subqueries are not supported")
+    Seq(2, 4, 6).toDF("key").createOrReplaceTempView("source")
+
+    // EXISTS
+    spark.sql("""
+                |DELETE FROM T
+                |WHERE EXiSTS (SELECT * FROM source WHERE key > 
7)""".stripMargin)
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d", 
"2025")).toDF())
+
+    // NOT EXISTS
+    spark.sql("""
+                |DELETE FROM T
+                |WHERE NOT EXiSTS (SELECT * FROM source WHERE key > 
5)""".stripMargin)
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d", 
"2025")).toDF()
+    )
+    spark.sql("""
+                |DELETE FROM T
+                |WHERE NOT EXiSTS (SELECT * FROM source WHERE key > 
7)""".stripMargin)
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      spark.emptyDataFrame
+    )
   }
 
   CoreOptions.MergeEngine.values().foreach {
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
index cc95e7a90..c438c03de 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -102,10 +102,56 @@ class UpdateTableTest extends PaimonSparkTestBase {
                 |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), 
(4, 'd', '2025')
                 |""".stripMargin)
 
-    Seq(1, 2).toDF("id").createOrReplaceTempView("updated_ids")
-    assertThatThrownBy(
-      () => spark.sql("UPDATE T set name = 'in_new' WHERE id IN (SELECT * FROM 
updated_ids)"))
-      .hasMessageContaining("Subqueries are not supported")
+    Seq(2, 4, 6).toDF("key").createOrReplaceTempView("source")
+
+    spark.sql("""
+                |UPDATE T
+                |SET name = concat(substring(name, 0, 1), '2')
+                |WHERE id < (SELECT MIN(key) FROM source)""".stripMargin)
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a2", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d", 
"2025")).toDF()
+    )
+
+    // EXISTS
+    spark.sql("""
+                |UPDATE T
+                |SET name = concat(substring(name, 0, 1), '3')
+                |WHERE EXiSTS (SELECT * FROM source WHERE key > 
5)""".stripMargin)
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a3", "2024"), (2, "b3", "2024"), (3, "c3", "2025"), (4, "d3", 
"2025")).toDF()
+    )
+
+    // NOT EXISTS
+    spark.sql("""
+                |UPDATE T
+                |SET name = concat(substring(name, 0, 1), '4')
+                |WHERE NOT EXiSTS (SELECT * FROM source WHERE key > 
5)""".stripMargin)
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a3", "2024"), (2, "b3", "2024"), (3, "c3", "2025"), (4, "d3", 
"2025")).toDF()
+    )
+
+    // IN
+    spark.sql("""
+                |UPDATE T
+                |SET name = concat(substring(name, 0, 1), '5')
+                |WHERE id IN (SELECT key FROM source)""".stripMargin)
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a3", "2024"), (2, "b5", "2024"), (3, "c3", "2025"), (4, "d5", 
"2025")).toDF()
+    )
+
+    // NOT IN
+    spark.sql("""
+                |UPDATE T
+                |SET name = concat(substring(name, 0, 1), '6')
+                |WHERE id NOT IN (SELECT key FROM source)""".stripMargin)
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a6", "2024"), (2, "b5", "2024"), (3, "c6", "2025"), (4, "d5", 
"2025")).toDF()
+    )
   }
 
   CoreOptions.MergeEngine.values().foreach {

Reply via email to