This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b8884f819 [spark] use cachedRelation.output to build OutputMap in 
MergePaimonScalarSubqueriers (#3148)
b8884f819 is described below

commit b8884f81957d6e22c8c4506756d45f48922884ac
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Apr 2 19:59:36 2024 +0800

    [spark] use cachedRelation.output to build OutputMap in 
MergePaimonScalarSubqueriers (#3148)
---
 .../spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala   | 4 ++--
 .../spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala   | 4 ++--
 .../spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala   | 4 ++--
 .../catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala     | 8 ++++----
 4 files changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
index 05c0ed1b6..8f6ad4671 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
@@ -36,7 +36,7 @@ object MergePaimonScalarSubqueriers extends 
MergePaimonScalarSubqueriersBase {
             DataSourceV2ScanRelation(
               cachedRelation,
               cachedScan: PaimonScan,
-              cachedOutput,
+              _,
               cachedPartitioning)) =>
         checkIdenticalPlans(newRelation, cachedRelation).flatMap {
           outputMap =>
@@ -46,7 +46,7 @@ object MergePaimonScalarSubqueriers extends 
MergePaimonScalarSubqueriersBase {
                   val mergedAttributes = mergedScan
                     .readSchema()
                     .map(f => AttributeReference(f.name, f.dataType, 
f.nullable, f.metadata)())
-                  val cachedOutputNameMap = cachedOutput.map(a => a.name -> 
a).toMap
+                  val cachedOutputNameMap = cachedRelation.output.map(a => 
a.name -> a).toMap
                   val mergedOutput =
                     mergedAttributes.map(a => 
cachedOutputNameMap.getOrElse(a.name, a))
                   val newV2ScanRelation = DataSourceV2ScanRelation(
diff --git 
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
index dbe7d7ef0..8a948ab8b 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
@@ -41,7 +41,7 @@ object MergePaimonScalarSubqueriers extends 
MergePaimonScalarSubqueriersBase {
             DataSourceV2ScanRelation(
               cachedRelation,
               cachedScan: PaimonScan,
-              cachedOutput,
+              _,
               cachedPartitioning,
               cacheOrdering)) =>
         checkIdenticalPlans(newRelation, cachedRelation).flatMap {
@@ -57,7 +57,7 @@ object MergePaimonScalarSubqueriers extends 
MergePaimonScalarSubqueriersBase {
                   val mergedAttributes = mergedScan
                     .readSchema()
                     .map(f => AttributeReference(f.name, f.dataType, 
f.nullable, f.metadata)())
-                  val cachedOutputNameMap = cachedOutput.map(a => a.name -> 
a).toMap
+                  val cachedOutputNameMap = cachedRelation.output.map(a => 
a.name -> a).toMap
                   val mergedOutput =
                     mergedAttributes.map(a => 
cachedOutputNameMap.getOrElse(a.name, a))
                   val newV2ScanRelation = DataSourceV2ScanRelation(
diff --git 
a/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
 
b/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
index f1b6c071f..895a4b373 100644
--- 
a/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
+++ 
b/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
@@ -41,7 +41,7 @@ object MergePaimonScalarSubqueriers extends 
MergePaimonScalarSubqueriersBase {
             DataSourceV2ScanRelation(
               cachedRelation,
               cachedScan: PaimonScan,
-              cachedOutput,
+              _,
               cachedPartitioning,
               cacheOrdering)) =>
         checkIdenticalPlans(newRelation, cachedRelation).flatMap {
@@ -57,7 +57,7 @@ object MergePaimonScalarSubqueriers extends 
MergePaimonScalarSubqueriersBase {
                   val mergedAttributes = mergedScan
                     .readSchema()
                     .map(f => AttributeReference(f.name, f.dataType, 
f.nullable, f.metadata)())
-                  val cachedOutputNameMap = cachedOutput.map(a => a.name -> 
a).toMap
+                  val cachedOutputNameMap = cachedRelation.output.map(a => 
a.name -> a).toMap
                   val mergedOutput =
                     mergedAttributes.map(a => 
cachedOutputNameMap.getOrElse(a.name, a))
                   val newV2ScanRelation = DataSourceV2ScanRelation(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
index 49cd5e907..69ac3e3d8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
@@ -88,7 +88,7 @@ trait MergePaimonScalarSubqueriersBase extends 
Rule[LogicalPlan] with PredicateH
     val newPlan = removeReferences(planWithReferences, cache)
     val subqueryCTEs = 
cache.filter(_.merged).map(_.plan.asInstanceOf[CTERelationDef])
     if (subqueryCTEs.nonEmpty) {
-      WithCTE(newPlan, subqueryCTEs.toSeq)
+      WithCTE(newPlan, subqueryCTEs)
     } else {
       newPlan
     }
@@ -130,12 +130,12 @@ trait MergePaimonScalarSubqueriersBase extends 
Rule[LogicalPlan] with PredicateH
                 } else {
                   header.attributes
                 }
-                cache(subqueryIndex) = Header(newHeaderAttributes, mergedPlan, 
true)
+                cache(subqueryIndex) = Header(newHeaderAttributes, mergedPlan, 
merged = true)
                 subqueryIndex -> headerIndex
             })
       })
       .getOrElse {
-        cache += Header(Seq(output), plan, false)
+        cache += Header(Seq(output), plan, merged = false)
         cache.length - 1 -> 0
       }
   }
@@ -292,7 +292,7 @@ trait MergePaimonScalarSubqueriersBase extends 
Rule[LogicalPlan] with PredicateH
       plan)
   }
 
-  protected def mapAttributes[T <: Expression](expr: T, outputMap: 
AttributeMap[Attribute]) = {
+  protected def mapAttributes[T <: Expression](expr: T, outputMap: 
AttributeMap[Attribute]): T = {
     expr.transform { case a: Attribute => outputMap.getOrElse(a, a) 
}.asInstanceOf[T]
   }
 

Reply via email to