This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b8884f819 [spark] use cachedRelation.output to build OutputMap in
MergePaimonScalarSubqueriers (#3148)
b8884f819 is described below
commit b8884f81957d6e22c8c4506756d45f48922884ac
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Apr 2 19:59:36 2024 +0800
[spark] use cachedRelation.output to build OutputMap in
MergePaimonScalarSubqueriers (#3148)
---
.../spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala | 4 ++--
.../spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala | 4 ++--
.../spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala | 4 ++--
.../catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala | 8 ++++----
4 files changed, 10 insertions(+), 10 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
index 05c0ed1b6..8f6ad4671 100644
---
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
@@ -36,7 +36,7 @@ object MergePaimonScalarSubqueriers extends
MergePaimonScalarSubqueriersBase {
DataSourceV2ScanRelation(
cachedRelation,
cachedScan: PaimonScan,
- cachedOutput,
+ _,
cachedPartitioning)) =>
checkIdenticalPlans(newRelation, cachedRelation).flatMap {
outputMap =>
@@ -46,7 +46,7 @@ object MergePaimonScalarSubqueriers extends
MergePaimonScalarSubqueriersBase {
val mergedAttributes = mergedScan
.readSchema()
.map(f => AttributeReference(f.name, f.dataType,
f.nullable, f.metadata)())
- val cachedOutputNameMap = cachedOutput.map(a => a.name ->
a).toMap
+ val cachedOutputNameMap = cachedRelation.output.map(a =>
a.name -> a).toMap
val mergedOutput =
mergedAttributes.map(a =>
cachedOutputNameMap.getOrElse(a.name, a))
val newV2ScanRelation = DataSourceV2ScanRelation(
diff --git
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
index dbe7d7ef0..8a948ab8b 100644
---
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
+++
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
@@ -41,7 +41,7 @@ object MergePaimonScalarSubqueriers extends
MergePaimonScalarSubqueriersBase {
DataSourceV2ScanRelation(
cachedRelation,
cachedScan: PaimonScan,
- cachedOutput,
+ _,
cachedPartitioning,
cacheOrdering)) =>
checkIdenticalPlans(newRelation, cachedRelation).flatMap {
@@ -57,7 +57,7 @@ object MergePaimonScalarSubqueriers extends
MergePaimonScalarSubqueriersBase {
val mergedAttributes = mergedScan
.readSchema()
.map(f => AttributeReference(f.name, f.dataType,
f.nullable, f.metadata)())
- val cachedOutputNameMap = cachedOutput.map(a => a.name ->
a).toMap
+ val cachedOutputNameMap = cachedRelation.output.map(a =>
a.name -> a).toMap
val mergedOutput =
mergedAttributes.map(a =>
cachedOutputNameMap.getOrElse(a.name, a))
val newV2ScanRelation = DataSourceV2ScanRelation(
diff --git
a/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
b/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
index f1b6c071f..895a4b373 100644
---
a/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
+++
b/paimon-spark/paimon-spark-3.5/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriers.scala
@@ -41,7 +41,7 @@ object MergePaimonScalarSubqueriers extends
MergePaimonScalarSubqueriersBase {
DataSourceV2ScanRelation(
cachedRelation,
cachedScan: PaimonScan,
- cachedOutput,
+ _,
cachedPartitioning,
cacheOrdering)) =>
checkIdenticalPlans(newRelation, cachedRelation).flatMap {
@@ -57,7 +57,7 @@ object MergePaimonScalarSubqueriers extends
MergePaimonScalarSubqueriersBase {
val mergedAttributes = mergedScan
.readSchema()
.map(f => AttributeReference(f.name, f.dataType,
f.nullable, f.metadata)())
- val cachedOutputNameMap = cachedOutput.map(a => a.name ->
a).toMap
+ val cachedOutputNameMap = cachedRelation.output.map(a =>
a.name -> a).toMap
val mergedOutput =
mergedAttributes.map(a =>
cachedOutputNameMap.getOrElse(a.name, a))
val newV2ScanRelation = DataSourceV2ScanRelation(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
index 49cd5e907..69ac3e3d8 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/MergePaimonScalarSubqueriersBase.scala
@@ -88,7 +88,7 @@ trait MergePaimonScalarSubqueriersBase extends
Rule[LogicalPlan] with PredicateH
val newPlan = removeReferences(planWithReferences, cache)
val subqueryCTEs =
cache.filter(_.merged).map(_.plan.asInstanceOf[CTERelationDef])
if (subqueryCTEs.nonEmpty) {
- WithCTE(newPlan, subqueryCTEs.toSeq)
+ WithCTE(newPlan, subqueryCTEs)
} else {
newPlan
}
@@ -130,12 +130,12 @@ trait MergePaimonScalarSubqueriersBase extends
Rule[LogicalPlan] with PredicateH
} else {
header.attributes
}
- cache(subqueryIndex) = Header(newHeaderAttributes, mergedPlan,
true)
+ cache(subqueryIndex) = Header(newHeaderAttributes, mergedPlan,
merged = true)
subqueryIndex -> headerIndex
})
})
.getOrElse {
- cache += Header(Seq(output), plan, false)
+ cache += Header(Seq(output), plan, merged = false)
cache.length - 1 -> 0
}
}
@@ -292,7 +292,7 @@ trait MergePaimonScalarSubqueriersBase extends
Rule[LogicalPlan] with PredicateH
plan)
}
- protected def mapAttributes[T <: Expression](expr: T, outputMap:
AttributeMap[Attribute]) = {
+ protected def mapAttributes[T <: Expression](expr: T, outputMap:
AttributeMap[Attribute]): T = {
expr.transform { case a: Attribute => outputMap.getOrElse(a, a)
}.asInstanceOf[T]
}