This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9756bbe8026c [SPARK-46289][SQL] Support ordering UDTs in interpreted
mode
9756bbe8026c is described below
commit 9756bbe8026c2f9863046f01949a8d6e6d647f2f
Author: Bruce Robbins <[email protected]>
AuthorDate: Mon Dec 18 11:55:24 2023 -0800
[SPARK-46289][SQL] Support ordering UDTs in interpreted mode
### What changes were proposed in this pull request?
When comparing two UDT values in interpreted mode, treat each value as an
instance of the UDT's underlying type.
### Why are the changes needed?
Consider the following code:
```
import org.apache.spark.ml.linalg.{DenseVector, Vector}
val df = Seq.tabulate(30) { x =>
(x, x + 1, x + 2, new DenseVector(Array((x/100.0).toDouble, ((x +
1)/100.0).toDouble, ((x + 3)/100.0).toDouble)))
}.toDF("id", "c1", "c2", "c3")
df.createOrReplaceTempView("df")
// this works
sql("select * from df order by c3").collect
sql("set spark.sql.codegen.wholeStage=false")
sql("set spark.sql.codegen.factoryMode=NO_CODEGEN")
// this gets an error
sql("select * from df order by c3").collect
```
The first collect action works. However, the second collect action, which
runs in interpreted mode, gets the following exception:
```
org.apache.spark.SparkIllegalArgumentException: Type
UninitializedPhysicalType does not support ordered operations.
at
org.apache.spark.sql.errors.QueryExecutionErrors$.orderedOperationUnsupportedByDataTypeError(QueryExecutionErrors.scala:348)
at
org.apache.spark.sql.catalyst.types.UninitializedPhysicalType$.ordering(PhysicalDataType.scala:332)
at
org.apache.spark.sql.catalyst.types.UninitializedPhysicalType$.ordering(PhysicalDataType.scala:329)
at
org.apache.spark.sql.catalyst.expressions.InterpretedOrdering.compare(ordering.scala:60)
at
org.apache.spark.sql.catalyst.expressions.InterpretedOrdering.compare(ordering.scala:39)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter$RowComparator.compare(UnsafeExternalRowSorter.java:254)
```
The code generator creates code that compares UDTs based on their
underlying type. See
[here](https://github.com/apache/spark/blob/c045a425bf0c472f164e3ef75a8a2c68d72d61d3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L721).
On the other hand, the interpreted mode code tries to compare the values as
UDTs, not as their underlying types. This PR brings interpreted mode code in
line with the generated code.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44361 from bersprockets/udt_order_issue.
Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/catalyst/expressions/ordering.scala | 8 +++++++-
.../org/apache/spark/sql/UserDefinedTypeSuite.scala | 19 ++++++++++++++++++-
2 files changed, 25 insertions(+), 2 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
index 41c68d439a28..47de7a26affc 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
@@ -37,7 +37,13 @@ class BaseOrdering extends Ordering[InternalRow] {
* An interpreted row ordering comparator.
*/
class InterpretedOrdering(ordering: Seq[SortOrder]) extends BaseOrdering {
- private lazy val physicalDataTypes = ordering.map(order =>
PhysicalDataType(order.dataType))
+ private lazy val physicalDataTypes = ordering.map { order =>
+ val dt = order.dataType match {
+ case udt: UserDefinedType[_] => udt.sqlType
+ case _ => order.dataType
+ }
+ PhysicalDataType(dt)
+ }
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
this(bindReferences(ordering, inputSchema))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index 65aa5ae6a055..24175ea8ed94 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -22,9 +22,10 @@ import java.util.Arrays
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{Cast, ExpressionEvalHelper,
Literal}
+import org.apache.spark.sql.catalyst.expressions.{Cast,
CodegenObjectFactoryMode, ExpressionEvalHelper, Literal}
import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -282,4 +283,20 @@ class UserDefinedTypeSuite extends QueryTest with
SharedSparkSession with Parque
java.util.Arrays.equals(unwrappedFeaturesArrays(0), Array(0.1, 1.0))
java.util.Arrays.equals(unwrappedFeaturesArrays(1), Array(0.2, 2.0))
}
+
+ test("SPARK-46289: UDT ordering") {
+ val settings = Seq(
+ ("true", CodegenObjectFactoryMode.CODEGEN_ONLY.toString),
+ ("false", CodegenObjectFactoryMode.NO_CODEGEN.toString))
+ withTempView("v1") {
+ pointsRDD.createOrReplaceTempView("v1")
+ for ((wsSetting, cgSetting) <- settings) {
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wsSetting,
+ SQLConf.CODEGEN_FACTORY_MODE.key -> cgSetting) {
+ val df = sql("select label from v1 order by features")
+ checkAnswer(df, Row(1.0) :: Row(0.0) :: Nil)
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]