This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 375ffb859 [spark] Make MergePaimonScalarSubqueries only applied for 
paimon (#3783)
375ffb859 is described below

commit 375ffb8593090d2a43fccb9fe95fc3c37ad4faf0
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Jul 19 10:23:10 2024 +0800

    [spark] Make MergePaimonScalarSubqueries only applied for paimon (#3783)
---
 ...larSubqueriers.scala => MergePaimonScalarSubqueries.scala} |  2 +-
 ...larSubqueriers.scala => MergePaimonScalarSubqueries.scala} |  2 +-
 ...larSubqueriers.scala => MergePaimonScalarSubqueries.scala} |  2 +-
 .../catalyst/optimizer/EvalSubqueriesForDeleteTable.scala     |  4 ++--
 ...larSubqueriers.scala => MergePaimonScalarSubqueries.scala} |  2 +-
 ...eriersBase.scala => MergePaimonScalarSubqueriesBase.scala} | 11 +++++++++--
 .../spark/extensions/PaimonSparkSessionExtensions.scala       |  4 ++--
 .../apache/paimon/spark/sql/PaimonOptimizationTestBase.scala  |  4 ++--
 8 files changed, 19 insertions(+), 12 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
similarity index 97%
rename from 
paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
rename to 
paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
index 8f6ad4671..77574a629 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeMap, Attri
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 
-object MergePaimonScalarSubqueriers extends MergePaimonScalarSubqueriersBase {
+object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase {
 
   override def tryMergeDataSourceV2ScanRelation(
       newV2ScanRelation: DataSourceV2ScanRelation,
diff --git 
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
similarity index 97%
rename from 
paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
rename to 
paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
index 8a948ab8b..c7cd70bb7 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeMap, Attri
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 
-object MergePaimonScalarSubqueriers extends MergePaimonScalarSubqueriersBase {
+object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase {
 
   override def tryMergeDataSourceV2ScanRelation(
       newV2ScanRelation: DataSourceV2ScanRelation,
diff --git 
a/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
 
b/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
similarity index 97%
rename from 
paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
rename to 
paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
index 895a4b373..2144f77f3 100644
--- 
a/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
+++ 
b/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeMap, Attri
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 
-object MergePaimonScalarSubqueriers extends MergePaimonScalarSubqueriersBase {
+object MergePaimonScalarSubqueries extends MergePaimonScalarSubqueriesBase {
 
   override def tryMergeDataSourceV2ScanRelation(
       newV2ScanRelation: DataSourceV2ScanRelation,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
index aaaed10f1..5d264370a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
@@ -37,8 +37,8 @@ import scala.collection.JavaConverters._
  * in advance. So that when running [[DeleteFromPaimonTableCommand]], we can 
directly call
  * dropPartitions to achieve fast deletion.
  *
- * Note: this rule must be placed before [[MergePaimonScalarSubqueriers]], 
because
- * [[MergePaimonScalarSubqueriers]] will merge subqueries.
+ * Note: this rule must be placed before [[MergePaimonScalarSubqueries]], 
because
+ * [[MergePaimonScalarSubqueries]] will merge subqueries.
  */
 object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with 
ExpressionHelper with Logging {
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
similarity index 94%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
index 0c0a0f9ee..3f6801fe3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueries.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark.catalyst.optimizer
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 
-object MergePaimonScalarSubqueriers extends Rule[LogicalPlan] {
+object MergePaimonScalarSubqueries extends Rule[LogicalPlan] {
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala
similarity index 97%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala
index 45a086d09..eca8c9cdf 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriesBase.scala
@@ -40,11 +40,11 @@ import scala.collection.mutable.ArrayBuffer
  * reused. So we extend the [[tryMergePlans]] method to check and merge
  * [[DataSourceV2ScanRelation]]s, thus we can merge scalar subqueries for 
paimon.
  */
-trait MergePaimonScalarSubqueriersBase extends Rule[LogicalPlan] with 
PredicateHelper {
+trait MergePaimonScalarSubqueriesBase extends Rule[LogicalPlan] with 
PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = {
     plan match {
       // Subquery reuse needs to be enabled for this optimization.
-      case _ if !conf.getConf(SQLConf.SUBQUERY_REUSE_ENABLED) => plan
+      case _ if !conf.getConf(SQLConf.SUBQUERY_REUSE_ENABLED) && 
!existsPaimonScan(plan) => plan
 
       // This rule does a whole plan traversal, no need to run on subqueries.
       case _: Subquery => plan
@@ -56,6 +56,13 @@ trait MergePaimonScalarSubqueriersBase extends 
Rule[LogicalPlan] with PredicateH
     }
   }
 
+  private def existsPaimonScan(plan: LogicalPlan): Boolean = {
+    plan.find {
+      case r: DataSourceV2ScanRelation => r.scan.isInstanceOf[PaimonScan]
+      case _ => false
+    }.isDefined
+  }
+
   /**
    * An item in the cache of merged scalar subqueries.
    *
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index f770ac506..3cd278322 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark.extensions
 
 import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, 
PaimonDeleteTable, PaimonIncompatiblePHRRules, 
PaimonIncompatibleResolutionRules, PaimonMergeInto, 
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable}
-import 
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, 
MergePaimonScalarSubqueriers}
+import 
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, 
MergePaimonScalarSubqueries}
 import 
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
 import org.apache.paimon.spark.execution.PaimonStrategy
 
@@ -54,7 +54,7 @@ class PaimonSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
 
     // optimization rules
     extensions.injectOptimizerRule(_ => EvalSubqueriesForDeleteTable)
-    extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueriers)
+    extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueries)
 
     // planner extensions
     extensions.injectPlannerStrategy(spark => PaimonStrategy(spark))
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index 70cb4b0c4..78e8905fa 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.sql
 
 import org.apache.paimon.Snapshot.CommitKind
 import org.apache.paimon.spark.PaimonSparkTestBase
-import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueriers
+import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueries
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
CreateNamedStruct, Literal, NamedExpression}
@@ -39,7 +39,7 @@ abstract class PaimonOptimizationTestBase extends 
PaimonSparkTestBase {
 
   private object Optimize extends RuleExecutor[LogicalPlan] {
     val batches: immutable.Seq[Batch] =
-      Batch("MergePaimonScalarSubqueries", Once, MergePaimonScalarSubqueriers) 
:: Nil
+      Batch("MergePaimonScalarSubqueries", Once, MergePaimonScalarSubqueries) 
:: Nil
   }
 
   test("Paimon Optimization: merge scalar subqueries") {

Reply via email to