This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 24f162439 [spark] Add EvalSubqueriesForDeleteTable for quick delete
with subqueries (#3464)
24f162439 is described below
commit 24f1624396ead6e51fc07070c9a699341f38ffef
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jun 4 11:46:26 2024 +0800
[spark] Add EvalSubqueriesForDeleteTable for quick delete with subqueries
(#3464)
* 1
* for comment
* for failed test
* add more test
---
.../analysis/expressions/ExpressionHelper.scala | 1 +
.../optimizer/EvalSubqueriesForDeleteTable.scala | 117 +++++++++++++++++++++
.../extensions/PaimonSparkSessionExtensions.scala | 9 +-
.../spark/sql/PaimonOptimizationTestBase.scala | 90 ++++++++++++++++
4 files changed, 213 insertions(+), 4 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index e80186767..333508125 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -128,6 +128,7 @@ trait ExpressionHelper extends PredicateHelper {
partitionColumns: Seq[String],
resolver: Resolver
): Boolean = {
+ condition.references.nonEmpty &&
condition.references.forall(r => partitionColumns.exists(resolver(r.name,
_)))
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
new file mode 100644
index 000000000..aaaed10f1
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.optimizer
+
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
+import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{execution, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.{Expression, In, InSubquery,
Literal, ScalarSubquery, SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ExecSubqueryExpression, QueryExecution}
+import org.apache.spark.sql.types.BooleanType
+
+import scala.collection.JavaConverters._
+
+/**
+ * For those delete conditions with subqueries that only contain partition
columns, we can eval them
+ * in advance. So that when running [[DeleteFromPaimonTableCommand]], we can
directly call
+ * dropPartitions to achieve fast deletion.
+ *
+ * Note: this rule must be placed before [[MergePaimonScalarSubqueriers]],
because
+ * [[MergePaimonScalarSubqueriers]] will merge subqueries.
+ */
+object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with
ExpressionHelper with Logging {
+
+ lazy val spark: SparkSession = SparkSession.active
+ lazy val resolver: Resolver = spark.sessionState.conf.resolver
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.transformDown {
+ case d @ DeleteFromPaimonTableCommand(_, table, condition)
+ if SubqueryExpression.hasSubquery(condition) &&
+ isPredicatePartitionColumnsOnly(condition,
table.partitionKeys().asScala, resolver) =>
+ try {
+ d.copy(condition = evalSubquery(condition))
+ } catch {
+ case e: Throwable =>
+ logInfo(s"Applying EvalSubqueriesForDeleteTable rule failed for:
${e.getMessage}")
+ d
+ }
+ }
+ }
+
+ private def evalSubquery(condition: Expression): Expression = {
+ condition.transformDown {
+ case InSubquery(values, listQuery) =>
+ val expr = if (values.length == 1) {
+ values.head
+ } else {
+ throw new RuntimeException("InSubquery with multi-values are not
supported")
+ }
+ if (listQuery.isCorrelated) {
+ throw new RuntimeException("Correlated InSubquery is not supported")
+ }
+
+ val executedPlan = QueryExecution.prepareExecutedPlan(spark,
listQuery.plan)
+ val physicalSubquery = execution.InSubqueryExec(
+ expr,
+ execution.SubqueryExec(s"subquery#${listQuery.exprId.id}",
executedPlan),
+ listQuery.exprId)
+ evalPhysicalSubquery(physicalSubquery)
+
+ physicalSubquery.values() match {
+ case Some(l) if l.length > 0 => In(expr, l.map(Literal(_,
expr.dataType)))
+ case _ => Literal(false, BooleanType)
+ }
+
+ case s: ScalarSubquery =>
+ if (s.isCorrelated) {
+ throw new RuntimeException("Correlated ScalarSubquery is not
supported")
+ }
+
+ val executedPlan = QueryExecution.prepareExecutedPlan(spark, s.plan)
+ val physicalSubquery = execution.ScalarSubquery(
+ execution.SubqueryExec
+ .createForScalarSubquery(s"scalar-subquery#${s.exprId.id}",
executedPlan),
+ s.exprId)
+ evalPhysicalSubquery(physicalSubquery)
+
+ Literal(physicalSubquery.eval(), s.dataType)
+
+ case _: SubqueryExpression =>
+ throw new RuntimeException("Only support InSubquery and
ScalarSubquery")
+ }
+ }
+
+ // Evaluate physicalSubquery in a bottom-up way.
+ private def evalPhysicalSubquery(subquery: ExecSubqueryExpression): Unit = {
+ subquery.plan.foreachUp {
+ plan =>
+ plan.expressions.foreach(_.foreachUp {
+ case s: ExecSubqueryExpression => evalPhysicalSubquery(s)
+ case _ =>
+ })
+ }
+ subquery.updateResult()
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index 3fc151621..f770ac506 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark.extensions
import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis,
PaimonDeleteTable, PaimonIncompatiblePHRRules,
PaimonIncompatibleResolutionRules, PaimonMergeInto,
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable}
-import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers
+import
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable,
MergePaimonScalarSubqueriers}
import
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
import org.apache.paimon.spark.execution.PaimonStrategy
@@ -52,10 +52,11 @@ class PaimonSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
PaimonTableValuedFunctions.getTableValueFunctionInjection(fnName))
}
- // planner extensions
- extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))
-
// optimization rules
+ extensions.injectOptimizerRule(_ => EvalSubqueriesForDeleteTable)
extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueriers)
+
+ // planner extensions
+ extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index 1f4370045..9ab5551dd 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -18,9 +18,11 @@
package org.apache.paimon.spark.sql
+import org.apache.paimon.Snapshot.CommitKind
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers
+import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{Attribute,
CreateNamedStruct, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef,
LogicalPlan, OneRowRelation, WithCTE}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -112,6 +114,94 @@ abstract class PaimonOptimizationTestBase extends
PaimonSparkTestBase {
}
}
+ test(s"Paimon Optimization: eval subqueries for delete table with
ScalarSubquery") {
+ withPk.foreach(
+ hasPk => {
+ val tblProps = if (hasPk) {
+ s"TBLPROPERTIES ('primary-key'='id, pt')"
+ } else {
+ ""
+ }
+ withTable("t1", "t2") {
+ spark.sql(s"""
+ |CREATE TABLE t1 (id INT, name STRING, pt INT)
+ |$tblProps
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+ spark.sql(
+ "INSERT INTO t1 VALUES (1, 'a', 1), (2, 'b', 2), (3, 'c', 2), (4,
'd', 3), (5, 'e', 4)")
+
+ spark.sql(s"CREATE TABLE t2 (id INT, n INT)")
+ spark.sql("INSERT INTO t2 VALUES (1, 1), (2, 2), (3, 3), (4, 4)")
+
+ spark.sql(s"""DELETE FROM t1 WHERE
+ |pt >= (SELECT min(id) FROM t2 WHERE n BETWEEN 2 AND 3)
+ |AND
+ |pt <= (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND
3)""".stripMargin)
+ // For partition-only predicates, drop partition is called
internally.
+ Assertions.assertEquals(
+ CommitKind.OVERWRITE,
+
loadTable("t1").store().snapshotManager().latestSnapshot().commitKind())
+
+ checkAnswer(
+ spark.sql("SELECT * FROM t1 ORDER BY id"),
+ Row(1, "a", 1) :: Row(5, "e", 4) :: Nil)
+
+ // subquery eval nothing
+ spark.sql(s"""DELETE FROM t1 WHERE
+ |pt >= (SELECT min(id) FROM t2 WHERE n >
10)""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT * FROM t1 ORDER BY id"),
+ Row(1, "a", 1) :: Row(5, "e", 4) :: Nil)
+ }
+ })
+ }
+
+ test(s"Paimon Optimization: eval subqueries for delete table with
InSubquery") {
+ withPk.foreach(
+ hasPk => {
+ val tblProps = if (hasPk) {
+ s"TBLPROPERTIES ('primary-key'='id, pt')"
+ } else {
+ ""
+ }
+ withTable("t1", "t2") {
+ spark.sql(s"""
+ |CREATE TABLE t1 (id INT, name STRING, pt INT)
+ |$tblProps
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+ spark.sql(
+ "INSERT INTO t1 VALUES (1, 'a', 1), (2, 'b', 2), (3, 'c', 2), (4,
'd', 3), (5, 'e', 4)")
+
+ spark.sql(s"CREATE TABLE t2 (id INT, n INT)")
+ spark.sql("INSERT INTO t2 VALUES (1, 1), (2, 2), (3, 3), (4, 4)")
+
+ spark.sql(s"""DELETE FROM t1 WHERE
+ |pt in (SELECT id FROM t2 WHERE n BETWEEN 2 AND 3)
+ |OR
+ |pt in (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND
3)""".stripMargin)
+ // For partition-only predicates, drop partition is called
internally.
+ Assertions.assertEquals(
+ CommitKind.OVERWRITE,
+
loadTable("t1").store().snapshotManager().latestSnapshot().commitKind())
+
+ checkAnswer(
+ spark.sql("SELECT * FROM t1 ORDER BY id"),
+ Row(1, "a", 1) :: Row(5, "e", 4) :: Nil)
+
+ // subquery eval nothing
+ spark.sql(s"""DELETE FROM t1 WHERE
+ |pt in (SELECT id FROM t2 WHERE n > 10)""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT * FROM t1 ORDER BY id"),
+ Row(1, "a", 1) :: Row(5, "e", 4) :: Nil)
+ }
+ })
+ }
+
private def definitionNode(plan: LogicalPlan, cteIndex: Int) = {
CTERelationDef(plan, cteIndex, underSubquery = true)
}