This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ad46db4ef671 [SPARK-50130][SQL][FOLLOWUP] Make Encoder generation lazy
ad46db4ef671 is described below

commit ad46db4ef671d8829dfffba2780ba0f6b4f4e43d
Author: Takuya Ueshin <[email protected]>
AuthorDate: Wed Nov 20 13:22:51 2024 -0800

    [SPARK-50130][SQL][FOLLOWUP] Make Encoder generation lazy
    
    ### What changes were proposed in this pull request?
    
    Makes Encoder generation lazy.
    
    ### Why are the changes needed?
    
    The encoder with empty schema for lazy plan could cause unexpected behavior.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #48829 from ueshin/issues/SPARK-50130/lazy_encoder.
    
    Authored-by: Takuya Ueshin <[email protected]>
    Signed-off-by: Takuya Ueshin <[email protected]>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 35 ++++++++--------------
 .../apache/spark/sql/DataFrameSubquerySuite.scala  | 15 +++++++---
 .../test/scala/org/apache/spark/sql/UDFSuite.scala |  2 +-
 3 files changed, 25 insertions(+), 27 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 500a4c7c4d9b..4766a74308a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -95,13 +95,8 @@ private[sql] object Dataset {
   def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame =
     sparkSession.withActive {
       val qe = sparkSession.sessionState.executePlan(logicalPlan)
-      val encoder = if (qe.isLazyAnalysis) {
-        RowEncoder.encoderFor(new StructType())
-      } else {
-        qe.assertAnalyzed()
-        RowEncoder.encoderFor(qe.analyzed.schema)
-      }
-      new Dataset[Row](qe, encoder)
+      if (!qe.isLazyAnalysis) qe.assertAnalyzed()
+      new Dataset[Row](qe, () => RowEncoder.encoderFor(qe.analyzed.schema))
     }
 
   def ofRows(
@@ -111,13 +106,8 @@ private[sql] object Dataset {
     sparkSession.withActive {
       val qe = new QueryExecution(
         sparkSession, logicalPlan, shuffleCleanupMode = shuffleCleanupMode)
-      val encoder = if (qe.isLazyAnalysis) {
-        RowEncoder.encoderFor(new StructType())
-      } else {
-        qe.assertAnalyzed()
-        RowEncoder.encoderFor(qe.analyzed.schema)
-      }
-      new Dataset[Row](qe, encoder)
+      if (!qe.isLazyAnalysis) qe.assertAnalyzed()
+      new Dataset[Row](qe, () => RowEncoder.encoderFor(qe.analyzed.schema))
     }
 
   /** A variant of ofRows that allows passing in a tracker so we can track 
query parsing time. */
@@ -129,13 +119,8 @@ private[sql] object Dataset {
     : DataFrame = sparkSession.withActive {
     val qe = new QueryExecution(
       sparkSession, logicalPlan, tracker, shuffleCleanupMode = 
shuffleCleanupMode)
-    val encoder = if (qe.isLazyAnalysis) {
-      RowEncoder.encoderFor(new StructType())
-    } else {
-      qe.assertAnalyzed()
-      RowEncoder.encoderFor(qe.analyzed.schema)
-    }
-    new Dataset[Row](qe, encoder)
+    if (!qe.isLazyAnalysis) qe.assertAnalyzed()
+    new Dataset[Row](qe, () => RowEncoder.encoderFor(qe.analyzed.schema))
   }
 }
 
@@ -229,7 +214,7 @@ private[sql] object Dataset {
 @Stable
 class Dataset[T] private[sql](
     @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
-    @DeveloperApi @Unstable @transient val encoder: Encoder[T])
+    @transient encoderGenerator: () => Encoder[T])
   extends api.Dataset[T] {
   type DS[U] = Dataset[U]
 
@@ -252,6 +237,10 @@ class Dataset[T] private[sql](
   // Note for Spark contributors: if adding or updating any action in 
`Dataset`, please make sure
   // you wrap it with `withNewExecutionId` if this actions doesn't call other 
action.
 
+  private[sql] def this(queryExecution: QueryExecution, encoder: Encoder[T]) = 
{
+    this(queryExecution, () => encoder)
+  }
+
   def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder: 
Encoder[T]) = {
     this(sparkSession.sessionState.executePlan(logicalPlan), encoder)
   }
@@ -274,6 +263,8 @@ class Dataset[T] private[sql](
     }
   }
 
+  @DeveloperApi @Unstable @transient lazy val encoder: Encoder[T] = 
encoderGenerator()
+
   /**
    * Expose the encoder as implicit so it can be used to construct new Dataset 
objects that have
    * the same external type.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSubquerySuite.scala
index 5a065d7e73b1..d656c36ce842 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSubquerySuite.scala
@@ -54,11 +54,18 @@ class DataFrameSubquerySuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("unanalyzable expression") {
-    val exception = intercept[AnalysisException] {
-      spark.range(1).select($"id" === $"id".outer()).schema
-    }
+    val sub = spark.range(1).select($"id" === $"id".outer())
+
+    checkError(
+      intercept[AnalysisException](sub.schema),
+      condition = "UNANALYZABLE_EXPRESSION",
+      parameters = Map("expr" -> "\"outer(id)\""),
+      queryContext =
+        Array(ExpectedContext(fragment = "outer", callSitePattern = 
getCurrentClassCallSitePattern))
+    )
+
     checkError(
-      exception,
+      intercept[AnalysisException](sub.encoder),
       condition = "UNANALYZABLE_EXPRESSION",
       parameters = Map("expr" -> "\"outer(id)\""),
       queryContext =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index d550d0f94f23..18af2fcb0ee7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -1205,7 +1205,7 @@ class UDFSuite extends QueryTest with SharedSparkSession {
         dt
       )
       checkError(
-        intercept[AnalysisException](spark.range(1).select(f())),
+        intercept[AnalysisException](spark.range(1).select(f()).encoder),
         condition = "UNSUPPORTED_DATA_TYPE_FOR_ENCODER",
         sqlState = "0A000",
         parameters = Map("dataType" -> s"\"${dt.sql}\"")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to