This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9756bbe8026c [SPARK-46289][SQL] Support ordering UDTs in interpreted 
mode
9756bbe8026c is described below

commit 9756bbe8026c2f9863046f01949a8d6e6d647f2f
Author: Bruce Robbins <bersprock...@gmail.com>
AuthorDate: Mon Dec 18 11:55:24 2023 -0800

    [SPARK-46289][SQL] Support ordering UDTs in interpreted mode
    
    ### What changes were proposed in this pull request?
    
    When comparing two UDT values in interpreted mode, treat each value as an 
instance of the UDT's underlying type.
    
    ### Why are the changes needed?
    
    Consider the following code:
    ```
    import org.apache.spark.ml.linalg.{DenseVector, Vector}
    
    val df = Seq.tabulate(30) { x =>
      (x, x + 1, x + 2, new DenseVector(Array((x/100.0).toDouble, ((x + 
1)/100.0).toDouble, ((x + 3)/100.0).toDouble)))
    }.toDF("id", "c1", "c2", "c3")
    
    df.createOrReplaceTempView("df")
    
    // this works
    sql("select * from df order by c3").collect
    
    sql("set spark.sql.codegen.wholeStage=false")
    sql("set spark.sql.codegen.factoryMode=NO_CODEGEN")
    
    // this gets an error
    sql("select * from df order by c3").collect
    ```
    The first collect action works. However, the second collect action, which 
runs in interpreted mode, gets the following exception:
    ```
    org.apache.spark.SparkIllegalArgumentException: Type 
UninitializedPhysicalType does not support ordered operations.
            at 
org.apache.spark.sql.errors.QueryExecutionErrors$.orderedOperationUnsupportedByDataTypeError(QueryExecutionErrors.scala:348)
            at 
org.apache.spark.sql.catalyst.types.UninitializedPhysicalType$.ordering(PhysicalDataType.scala:332)
            at 
org.apache.spark.sql.catalyst.types.UninitializedPhysicalType$.ordering(PhysicalDataType.scala:329)
            at 
org.apache.spark.sql.catalyst.expressions.InterpretedOrdering.compare(ordering.scala:60)
            at 
org.apache.spark.sql.catalyst.expressions.InterpretedOrdering.compare(ordering.scala:39)
            at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter$RowComparator.compare(UnsafeExternalRowSorter.java:254)
    ```
    The code generator creates code that compares UDTs based on their 
underlying type. See 
[here](https://github.com/apache/spark/blob/c045a425bf0c472f164e3ef75a8a2c68d72d61d3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L721).
    
    On the other hand, the interpreted mode code tries to compare the values as 
UDTs, not as their underlying types. This PR brings interpreted mode code in 
line with the generated code.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44361 from bersprockets/udt_order_issue.
    
    Authored-by: Bruce Robbins <bersprock...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/catalyst/expressions/ordering.scala     |  8 +++++++-
 .../org/apache/spark/sql/UserDefinedTypeSuite.scala   | 19 ++++++++++++++++++-
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
index 41c68d439a28..47de7a26affc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
@@ -37,7 +37,13 @@ class BaseOrdering extends Ordering[InternalRow] {
  * An interpreted row ordering comparator.
  */
 class InterpretedOrdering(ordering: Seq[SortOrder]) extends BaseOrdering {
-  private lazy val physicalDataTypes = ordering.map(order => 
PhysicalDataType(order.dataType))
+  private lazy val physicalDataTypes = ordering.map { order =>
+    val dt = order.dataType match {
+      case udt: UserDefinedType[_] => udt.sqlType
+      case _ => order.dataType
+    }
+    PhysicalDataType(dt)
+  }
 
   def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
     this(bindReferences(ordering, inputSchema))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index 65aa5ae6a055..24175ea8ed94 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -22,9 +22,10 @@ import java.util.Arrays
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{Cast, ExpressionEvalHelper, 
Literal}
+import org.apache.spark.sql.catalyst.expressions.{Cast, 
CodegenObjectFactoryMode, ExpressionEvalHelper, Literal}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 
@@ -282,4 +283,20 @@ class UserDefinedTypeSuite extends QueryTest with 
SharedSparkSession with Parque
     java.util.Arrays.equals(unwrappedFeaturesArrays(0), Array(0.1, 1.0))
     java.util.Arrays.equals(unwrappedFeaturesArrays(1), Array(0.2, 2.0))
   }
+
+  test("SPARK-46289: UDT ordering") {
+    val settings = Seq(
+      ("true", CodegenObjectFactoryMode.CODEGEN_ONLY.toString),
+      ("false", CodegenObjectFactoryMode.NO_CODEGEN.toString))
+    withTempView("v1") {
+      pointsRDD.createOrReplaceTempView("v1")
+      for ((wsSetting, cgSetting) <- settings) {
+        withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wsSetting,
+          SQLConf.CODEGEN_FACTORY_MODE.key -> cgSetting) {
+          val df = sql("select label from v1 order by features")
+          checkAnswer(df, Row(1.0) :: Row(0.0) :: Nil)
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to