This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 24f162439 [spark] Add EvalSubqueriesForDeleteTable for quick delete 
with subqueries (#3464)
24f162439 is described below

commit 24f1624396ead6e51fc07070c9a699341f38ffef
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jun 4 11:46:26 2024 +0800

    [spark] Add EvalSubqueriesForDeleteTable for quick delete with subqueries 
(#3464)
    
    * 1
    
    * for comment
    
    * for failed test
    
    * add more test
---
 .../analysis/expressions/ExpressionHelper.scala    |   1 +
 .../optimizer/EvalSubqueriesForDeleteTable.scala   | 117 +++++++++++++++++++++
 .../extensions/PaimonSparkSessionExtensions.scala  |   9 +-
 .../spark/sql/PaimonOptimizationTestBase.scala     |  90 ++++++++++++++++
 4 files changed, 213 insertions(+), 4 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index e80186767..333508125 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -128,6 +128,7 @@ trait ExpressionHelper extends PredicateHelper {
       partitionColumns: Seq[String],
       resolver: Resolver
   ): Boolean = {
+    condition.references.nonEmpty &&
     condition.references.forall(r => partitionColumns.exists(resolver(r.name, 
_)))
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
new file mode 100644
index 000000000..aaaed10f1
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.optimizer
+
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
+import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{execution, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.{Expression, In, InSubquery, 
Literal, ScalarSubquery, SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ExecSubqueryExpression, QueryExecution}
+import org.apache.spark.sql.types.BooleanType
+
+import scala.collection.JavaConverters._
+
+/**
+ * For those delete conditions with subqueries that only contain partition 
columns, we can eval them
+ * in advance. So that when running [[DeleteFromPaimonTableCommand]], we can 
directly call
+ * dropPartitions to achieve fast deletion.
+ *
+ * Note: this rule must be placed before [[MergePaimonScalarSubqueriers]], 
because
+ * [[MergePaimonScalarSubqueriers]] will merge subqueries.
+ */
+object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with 
ExpressionHelper with Logging {
+
+  lazy val spark: SparkSession = SparkSession.active
+  lazy val resolver: Resolver = spark.sessionState.conf.resolver
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformDown {
+      case d @ DeleteFromPaimonTableCommand(_, table, condition)
+          if SubqueryExpression.hasSubquery(condition) &&
+            isPredicatePartitionColumnsOnly(condition, 
table.partitionKeys().asScala, resolver) =>
+        try {
+          d.copy(condition = evalSubquery(condition))
+        } catch {
+          case e: Throwable =>
+            logInfo(s"Applying EvalSubqueriesForDeleteTable rule failed for: 
${e.getMessage}")
+            d
+        }
+    }
+  }
+
+  private def evalSubquery(condition: Expression): Expression = {
+    condition.transformDown {
+      case InSubquery(values, listQuery) =>
+        val expr = if (values.length == 1) {
+          values.head
+        } else {
+          throw new RuntimeException("InSubquery with multi-values are not 
supported")
+        }
+        if (listQuery.isCorrelated) {
+          throw new RuntimeException("Correlated InSubquery is not supported")
+        }
+
+        val executedPlan = QueryExecution.prepareExecutedPlan(spark, 
listQuery.plan)
+        val physicalSubquery = execution.InSubqueryExec(
+          expr,
+          execution.SubqueryExec(s"subquery#${listQuery.exprId.id}", 
executedPlan),
+          listQuery.exprId)
+        evalPhysicalSubquery(physicalSubquery)
+
+        physicalSubquery.values() match {
+          case Some(l) if l.length > 0 => In(expr, l.map(Literal(_, 
expr.dataType)))
+          case _ => Literal(false, BooleanType)
+        }
+
+      case s: ScalarSubquery =>
+        if (s.isCorrelated) {
+          throw new RuntimeException("Correlated ScalarSubquery is not 
supported")
+        }
+
+        val executedPlan = QueryExecution.prepareExecutedPlan(spark, s.plan)
+        val physicalSubquery = execution.ScalarSubquery(
+          execution.SubqueryExec
+            .createForScalarSubquery(s"scalar-subquery#${s.exprId.id}", 
executedPlan),
+          s.exprId)
+        evalPhysicalSubquery(physicalSubquery)
+
+        Literal(physicalSubquery.eval(), s.dataType)
+
+      case _: SubqueryExpression =>
+        throw new RuntimeException("Only support InSubquery and 
ScalarSubquery")
+    }
+  }
+
+  // Evaluate physicalSubquery in a bottom-up way.
+  private def evalPhysicalSubquery(subquery: ExecSubqueryExpression): Unit = {
+    subquery.plan.foreachUp {
+      plan =>
+        plan.expressions.foreach(_.foreachUp {
+          case s: ExecSubqueryExpression => evalPhysicalSubquery(s)
+          case _ =>
+        })
+    }
+    subquery.updateResult()
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index 3fc151621..f770ac506 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark.extensions
 
 import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, 
PaimonDeleteTable, PaimonIncompatiblePHRRules, 
PaimonIncompatibleResolutionRules, PaimonMergeInto, 
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable}
-import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers
+import 
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, 
MergePaimonScalarSubqueriers}
 import 
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
 import org.apache.paimon.spark.execution.PaimonStrategy
 
@@ -52,10 +52,11 @@ class PaimonSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
           PaimonTableValuedFunctions.getTableValueFunctionInjection(fnName))
     }
 
-    // planner extensions
-    extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))
-
     // optimization rules
+    extensions.injectOptimizerRule(_ => EvalSubqueriesForDeleteTable)
     extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueriers)
+
+    // planner extensions
+    extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index 1f4370045..9ab5551dd 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -18,9 +18,11 @@
 
 package org.apache.paimon.spark.sql
 
+import org.apache.paimon.Snapshot.CommitKind
 import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers
 
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
CreateNamedStruct, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, 
LogicalPlan, OneRowRelation, WithCTE}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -112,6 +114,94 @@ abstract class PaimonOptimizationTestBase extends 
PaimonSparkTestBase {
     }
   }
 
+  test(s"Paimon Optimization: eval subqueries for delete table with 
ScalarSubquery") {
+    withPk.foreach(
+      hasPk => {
+        val tblProps = if (hasPk) {
+          s"TBLPROPERTIES ('primary-key'='id, pt')"
+        } else {
+          ""
+        }
+        withTable("t1", "t2") {
+          spark.sql(s"""
+                       |CREATE TABLE t1 (id INT, name STRING, pt INT)
+                       |$tblProps
+                       |PARTITIONED BY (pt)
+                       |""".stripMargin)
+          spark.sql(
+            "INSERT INTO t1 VALUES (1, 'a', 1), (2, 'b', 2), (3, 'c', 2), (4, 
'd', 3), (5, 'e', 4)")
+
+          spark.sql(s"CREATE TABLE t2 (id INT, n INT)")
+          spark.sql("INSERT INTO t2 VALUES (1, 1), (2, 2), (3, 3), (4, 4)")
+
+          spark.sql(s"""DELETE FROM t1 WHERE
+                       |pt >= (SELECT min(id) FROM t2 WHERE n BETWEEN 2 AND 3)
+                       |AND
+                       |pt <= (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND 
3)""".stripMargin)
+          // For partition-only predicates, drop partition is called 
internally.
+          Assertions.assertEquals(
+            CommitKind.OVERWRITE,
+            
loadTable("t1").store().snapshotManager().latestSnapshot().commitKind())
+
+          checkAnswer(
+            spark.sql("SELECT * FROM t1 ORDER BY id"),
+            Row(1, "a", 1) :: Row(5, "e", 4) :: Nil)
+
+          // subquery eval nothing
+          spark.sql(s"""DELETE FROM t1 WHERE
+                       |pt >= (SELECT min(id) FROM t2 WHERE n > 
10)""".stripMargin)
+
+          checkAnswer(
+            spark.sql("SELECT * FROM t1 ORDER BY id"),
+            Row(1, "a", 1) :: Row(5, "e", 4) :: Nil)
+        }
+      })
+  }
+
+  test(s"Paimon Optimization: eval subqueries for delete table with 
InSubquery") {
+    withPk.foreach(
+      hasPk => {
+        val tblProps = if (hasPk) {
+          s"TBLPROPERTIES ('primary-key'='id, pt')"
+        } else {
+          ""
+        }
+        withTable("t1", "t2") {
+          spark.sql(s"""
+                       |CREATE TABLE t1 (id INT, name STRING, pt INT)
+                       |$tblProps
+                       |PARTITIONED BY (pt)
+                       |""".stripMargin)
+          spark.sql(
+            "INSERT INTO t1 VALUES (1, 'a', 1), (2, 'b', 2), (3, 'c', 2), (4, 
'd', 3), (5, 'e', 4)")
+
+          spark.sql(s"CREATE TABLE t2 (id INT, n INT)")
+          spark.sql("INSERT INTO t2 VALUES (1, 1), (2, 2), (3, 3), (4, 4)")
+
+          spark.sql(s"""DELETE FROM t1 WHERE
+                       |pt in (SELECT id FROM t2 WHERE n BETWEEN 2 AND 3)
+                       |OR
+                       |pt in (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND 
3)""".stripMargin)
+          // For partition-only predicates, drop partition is called 
internally.
+          Assertions.assertEquals(
+            CommitKind.OVERWRITE,
+            
loadTable("t1").store().snapshotManager().latestSnapshot().commitKind())
+
+          checkAnswer(
+            spark.sql("SELECT * FROM t1 ORDER BY id"),
+            Row(1, "a", 1) :: Row(5, "e", 4) :: Nil)
+
+          // subquery eval nothing
+          spark.sql(s"""DELETE FROM t1 WHERE
+                       |pt in (SELECT id FROM t2 WHERE n > 10)""".stripMargin)
+
+          checkAnswer(
+            spark.sql("SELECT * FROM t1 ORDER BY id"),
+            Row(1, "a", 1) :: Row(5, "e", 4) :: Nil)
+        }
+      })
+  }
+
   private def definitionNode(plan: LogicalPlan, cteIndex: Int) = {
     CTERelationDef(plan, cteIndex, underSubquery = true)
   }

Reply via email to