This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2475b35a2ac7 [SPARK-50665][SQL] Substitute LocalRelation with
ComparableLocalRelation in NormalizePlan
2475b35a2ac7 is described below
commit 2475b35a2ac795a3f31a3ed0098111e3b55d286e
Author: Vladimir Golubev <[email protected]>
AuthorDate: Thu Dec 26 10:51:59 2024 +0800
[SPARK-50665][SQL] Substitute LocalRelation with ComparableLocalRelation in
NormalizePlan
### What changes were proposed in this pull request?
Substitute `LocalRelation` with `ComparableLocalRelation` in
`NormalizePlan`.
`ComparableLocalRelation` has `Seq[Seq[Expression]]` instead of
`Seq[InternalRow]`. The conversion happens through `Literal`s.
### Why are the changes needed?
`LocalRelation`'s data field is incomparable if it contains maps, because
`ArrayBasedMapData` doesn't define `equals`:
https://github.com/apache/spark/pull/13847
### Does this PR introduce _any_ user-facing change?
No. This is to compare logical plans in the single-pass Analyzer.
### How was this patch tested?
N/A
### Was this patch authored or co-authored using generative AI tooling?
copilot.nvim.
Closes #49287 from vladimirg-db/vladimirg-db/normalize-local-relation.
Authored-by: Vladimir Golubev <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/plans/NormalizePlan.scala | 33 ++++++++++++++++++++++
1 file changed, 33 insertions(+)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
index 1cc876588550..ee68e433fbea 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
@@ -21,6 +21,7 @@ import
org.apache.spark.sql.catalyst.analysis.GetViewColumnByNameAndOrdinal
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
object NormalizePlan extends PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan =
@@ -104,6 +105,8 @@ object NormalizePlan extends PredicateHelper {
case Project(projectList, child) =>
Project(normalizeProjectList(projectList), child)
case c: KeepAnalyzedQuery => c.storeAnalyzedQuery()
+ case localRelation: LocalRelation =>
+ ComparableLocalRelation.fromLocalRelation(localRelation)
}
}
@@ -134,3 +137,33 @@ object NormalizePlan extends PredicateHelper {
case _ => condition // Don't reorder.
}
}
+
+/**
+ * A substitute for the [[LocalRelation]] that has comparable `data` field.
[[LocalRelation]]'s
+ * `data` is incomparable for maps, because [[ArrayBasedMapData]] doesn't
define [[equals]].
+ */
+case class ComparableLocalRelation(
+ override val output: Seq[Attribute],
+ data: Seq[Seq[Expression]],
+ override val isStreaming: Boolean,
+ stream: Option[SparkDataStream]) extends LeafNode
+
+object ComparableLocalRelation {
+ def fromLocalRelation(localRelation: LocalRelation): ComparableLocalRelation
= {
+ val dataTypes = localRelation.output.map(_.dataType)
+ ComparableLocalRelation(
+ output = localRelation.output,
+ data = localRelation.data.map { row =>
+ if (row != null) {
+ row.toSeq(dataTypes).zip(dataTypes).map {
+ case (value, dataType) => Literal(value, dataType)
+ }
+ } else {
+ Seq.empty
+ }
+ },
+ isStreaming = localRelation.isStreaming,
+ stream = localRelation.stream
+ )
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]