This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2475b35a2ac7 [SPARK-50665][SQL] Substitute LocalRelation with 
ComparableLocalRelation in NormalizePlan
2475b35a2ac7 is described below

commit 2475b35a2ac795a3f31a3ed0098111e3b55d286e
Author: Vladimir Golubev <[email protected]>
AuthorDate: Thu Dec 26 10:51:59 2024 +0800

    [SPARK-50665][SQL] Substitute LocalRelation with ComparableLocalRelation in 
NormalizePlan
    
    ### What changes were proposed in this pull request?
    
    Substitute `LocalRelation` with `ComparableLocalRelation` in 
`NormalizePlan`.
    
    `ComparableLocalRelation` has `Seq[Seq[Expression]]` instead of 
`Seq[InternalRow]`. The conversion happens through `Literal`s.
    
    ### Why are the changes needed?
    
    `LocalRelation`'s data field is incomparable if it contains maps, because 
`ArrayBasedMapData` doesn't define `equals`: 
https://github.com/apache/spark/pull/13847
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is to compare logical plans in the single-pass Analyzer.
    
    ### How was this patch tested?
    
    N/A
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    copilot.nvim.
    
    Closes #49287 from vladimirg-db/vladimirg-db/normalize-local-relation.
    
    Authored-by: Vladimir Golubev <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/plans/NormalizePlan.scala   | 33 ++++++++++++++++++++++
 1 file changed, 33 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
index 1cc876588550..ee68e433fbea 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala
@@ -21,6 +21,7 @@ import 
org.apache.spark.sql.catalyst.analysis.GetViewColumnByNameAndOrdinal
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 
 object NormalizePlan extends PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan =
@@ -104,6 +105,8 @@ object NormalizePlan extends PredicateHelper {
       case Project(projectList, child) =>
         Project(normalizeProjectList(projectList), child)
       case c: KeepAnalyzedQuery => c.storeAnalyzedQuery()
+      case localRelation: LocalRelation =>
+        ComparableLocalRelation.fromLocalRelation(localRelation)
     }
   }
 
@@ -134,3 +137,33 @@ object NormalizePlan extends PredicateHelper {
     case _ => condition // Don't reorder.
   }
 }
+
+/**
+ * A substitute for the [[LocalRelation]] that has comparable `data` field. 
[[LocalRelation]]'s
+ * `data` is incomparable for maps, because [[ArrayBasedMapData]] doesn't 
define [[equals]].
+ */
+case class ComparableLocalRelation(
+    override val output: Seq[Attribute],
+    data: Seq[Seq[Expression]],
+    override val isStreaming: Boolean,
+    stream: Option[SparkDataStream]) extends LeafNode
+
+object ComparableLocalRelation {
+  def fromLocalRelation(localRelation: LocalRelation): ComparableLocalRelation 
= {
+    val dataTypes = localRelation.output.map(_.dataType)
+    ComparableLocalRelation(
+      output = localRelation.output,
+      data = localRelation.data.map { row =>
+        if (row != null) {
+          row.toSeq(dataTypes).zip(dataTypes).map {
+            case (value, dataType) => Literal(value, dataType)
+          }
+        } else {
+          Seq.empty
+        }
+      },
+      isStreaming = localRelation.isStreaming,
+      stream = localRelation.stream
+    )
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to