This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 375ffb859 [spark] Make MergePaimonScalarSubqueries only applied for
paimon (#3783)
375ffb859 is described below
commit 375ffb8593090d2a43fccb9fe95fc3c37ad4faf0
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Jul 19 10:23:10 2024 +0800
[spark] Make MergePaimonScalarSubqueries only applied for paimon (#3783)
---
...larSubqueriers.scala => MergePaimonScalarSubqueries.scala} | 2 +-
...larSubqueriers.scala => MergePaimonScalarSubqueries.scala} | 2 +-
...larSubqueriers.scala => MergePaimonScalarSubqueries.scala} | 2 +-
.../catalyst/optimizer/EvalSubqueriesForDeleteTable.scala | 4 ++--
...larSubqueriers.scala => MergePaimonScalarSubqueries.scala} | 2 +-
...eriersBase.scala => MergePaimonScalarSubqueriesBase.scala} | 11 +++++++++--
.../spark/extensions/PaimonSparkSessionExtensions.scala | 4 ++--
.../apache/paimon/spark/sql/PaimonOptimizationTestBase.scala | 4 ++--
8 files changed, 19 insertions(+), 12 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
similarity index 97%
rename from
paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
rename to
paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
index 8f6ad4671..77574a629 100644
---
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeMap, Attri
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
-object MergePaimonScalarSubqueriers extends MergePaimonScalarSubqueriersBase {
+object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase {
override def tryMergeDataSourceV2ScanRelation(
newV2ScanRelation: DataSourceV2ScanRelation,
diff --git
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
similarity index 97%
rename from
paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
rename to
paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
index 8a948ab8b..c7cd70bb7 100644
---
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
+++
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeMap, Attri
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
-object MergePaimonScalarSubqueriers extends MergePaimonScalarSubqueriersBase {
+object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase {
override def tryMergeDataSourceV2ScanRelation(
newV2ScanRelation: DataSourceV2ScanRelation,
diff --git
a/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
b/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
similarity index 97%
rename from
paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
rename to
paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
index 895a4b373..2144f77f3 100644
---
a/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
+++
b/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeMap, Attri
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
-object MergePaimonScalarSubqueriers extends MergePaimonScalarSubqueriersBase {
+object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase {
override def tryMergeDataSourceV2ScanRelation(
newV2ScanRelation: DataSourceV2ScanRelation,
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
index aaaed10f1..5d264370a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
@@ -37,8 +37,8 @@ import scala.collection.JavaConverters._
* in advance. So that when running [[DeleteFromPaimonTableCommand]], we can
directly call
* dropPartitions to achieve fast deletion.
*
- * Note: this rule must be placed before [[MergePaimonScalarSubqueriers]],
because
- * [[MergePaimonScalarSubqueriers]] will merge subqueries.
+ * Note: this rule must be placed before [[MergePaimonScalarSubqueries]],
because
+ * [[MergePaimonScalarSubqueries]] will merge subqueries.
*/
object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with
ExpressionHelper with Logging {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
similarity index 94%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
index 0c0a0f9ee..3f6801fe3 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark.catalyst.optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-object MergePaimonScalarSubqueriers extends Rule[LogicalPlan] {
+object MergePaimonScalarSubqueries extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala
similarity index 97%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala
index 45a086d09..eca8c9cdf 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala
@@ -40,11 +40,11 @@ import scala.collection.mutable.ArrayBuffer
* reused. So we extend the [[tryMergePlans]] method to check and merge
* [[DataSourceV2ScanRelation]]s, thus we can merge scalar subqueries for
paimon.
*/
-trait MergePaimonScalarSubqueriersBase extends Rule[LogicalPlan] with
PredicateHelper {
+trait MergePaimonScalarSubqueriesBase extends Rule[LogicalPlan] with
PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = {
plan match {
// Subquery reuse needs to be enabled for this optimization.
- case _ if !conf.getConf(SQLConf.SUBQUERY_REUSE_ENABLED) => plan
+ case _ if !conf.getConf(SQLConf.SUBQUERY_REUSE_ENABLED) &&
!existsPaimonScan(plan) => plan
// This rule does a whole plan traversal, no need to run on subqueries.
case _: Subquery => plan
@@ -56,6 +56,13 @@ trait MergePaimonScalarSubqueriersBase extends
Rule[LogicalPlan] with PredicateH
}
}
+ private def existsPaimonScan(plan: LogicalPlan): Boolean = {
+ plan.find {
+ case r: DataSourceV2ScanRelation => r.scan.isInstanceOf[PaimonScan]
+ case _ => false
+ }.isDefined
+ }
+
/**
* An item in the cache of merged scalar subqueries.
*
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index f770ac506..3cd278322 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark.extensions
import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis,
PaimonDeleteTable, PaimonIncompatiblePHRRules,
PaimonIncompatibleResolutionRules, PaimonMergeInto,
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable}
-import
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable,
MergePaimonScalarSubqueriers}
+import
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable,
MergePaimonScalarSubqueries}
import
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
import org.apache.paimon.spark.execution.PaimonStrategy
@@ -54,7 +54,7 @@ class PaimonSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
// optimization rules
extensions.injectOptimizerRule(_ => EvalSubqueriesForDeleteTable)
- extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueriers)
+ extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueries)
// planner extensions
extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index 70cb4b0c4..78e8905fa 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.Snapshot.CommitKind
import org.apache.paimon.spark.PaimonSparkTestBase
-import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers
+import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueries
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{Attribute,
CreateNamedStruct, Literal, NamedExpression}
@@ -39,7 +39,7 @@ abstract class PaimonOptimizationTestBase extends
PaimonSparkTestBase {
private object Optimize extends RuleExecutor[LogicalPlan] {
val batches: immutable.Seq[Batch] =
- Batch("MergePaimonScalarSubqueries", Once, MergePaimonScalarSubqueriers)
:: Nil
+ Batch("MergePaimonScalarSubqueries", Once, MergePaimonScalarSubqueries)
:: Nil
}
test("Paimon Optimization: merge scalar subqueries") {