This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 101dd6b  [SPARK-37125][SQL] Support AnsiInterval radix sort
101dd6b is described below

commit 101dd6bbff2491a608e1ab51541a120a1f08e942
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Wed Oct 27 15:31:26 2021 +0800

    [SPARK-37125][SQL] Support AnsiInterval radix sort
    
    ### What changes were proposed in this pull request?
    
    - Make `AnsiInterval` data type support radix sort in SQL.
    - Enhance the `SortSuite` by disable radix.
    
    ### Why are the changes needed?
    
    The radix sort is more faster than timsort, the benchmark result can see in 
`SortBenchmark`.
    
    Since the `AnsiInterval` data type is comparable:
    
    - `YearMonthIntervalType` -> int ordering
    - `DayTimeIntervalType` -> long ordering
    
    And we aslo support radix sort when the ordering column date type is int or 
long.
    
    So `AnsiInterval` radix sort can be supported.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    - The data correctness should be ensured in `SortSuite`
    - Add a new benchmark
    
    Closes #34398 from ulysses-you/ansi-interval-sort.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/expressions/SortOrder.scala |  6 +-
 .../AnsiIntervalSortBenchmark-jdk11-results.txt    | 28 +++++++++
 .../AnsiIntervalSortBenchmark-results.txt          | 28 +++++++++
 .../spark/sql/execution/SortPrefixUtils.scala      |  5 +-
 .../org/apache/spark/sql/execution/SortSuite.scala | 17 +++--
 .../benchmark/AnsiIntervalSortBenchmark.scala      | 73 ++++++++++++++++++++++
 6 files changed, 146 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index 9aef25c..8e6f076 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -132,7 +132,7 @@ object SortOrder {
 case class SortPrefix(child: SortOrder) extends UnaryExpression {
 
   val nullValue = child.child.dataType match {
-    case BooleanType | DateType | TimestampType | _: IntegralType =>
+    case BooleanType | DateType | TimestampType | _: IntegralType | _: 
AnsiIntervalType =>
       if (nullAsSmallest) Long.MinValue else Long.MaxValue
     case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS 
=>
       if (nullAsSmallest) Long.MinValue else Long.MaxValue
@@ -154,7 +154,7 @@ case class SortPrefix(child: SortOrder) extends 
UnaryExpression {
   private lazy val calcPrefix: Any => Long = child.child.dataType match {
     case BooleanType => (raw) =>
       if (raw.asInstanceOf[Boolean]) 1 else 0
-    case DateType | TimestampType | _: IntegralType => (raw) =>
+    case DateType | TimestampType | _: IntegralType | _: AnsiIntervalType => 
(raw) =>
       raw.asInstanceOf[java.lang.Number].longValue()
     case FloatType | DoubleType => (raw) => {
       val dVal = raw.asInstanceOf[java.lang.Number].doubleValue()
@@ -198,7 +198,7 @@ case class SortPrefix(child: SortOrder) extends 
UnaryExpression {
         s"$input ? 1L : 0L"
       case _: IntegralType =>
         s"(long) $input"
-      case DateType | TimestampType =>
+      case DateType | TimestampType | _: AnsiIntervalType =>
         s"(long) $input"
       case FloatType | DoubleType =>
         s"$DoublePrefixCmp.computePrefix((double)$input)"
diff --git a/sql/core/benchmarks/AnsiIntervalSortBenchmark-jdk11-results.txt 
b/sql/core/benchmarks/AnsiIntervalSortBenchmark-jdk11-results.txt
new file mode 100644
index 0000000..004d9d8
--- /dev/null
+++ b/sql/core/benchmarks/AnsiIntervalSortBenchmark-jdk11-results.txt
@@ -0,0 +1,28 @@
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+year month interval one column:               Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+----------------------------------------------------------------------------------------------------------------------------
+year month interval one column enable radix           40092          40744     
    668          2.5         400.9       1.0X
+year month interval one column disable radix          55178          55871     
    609          1.8         551.8       0.7X
+
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+year month interval two columns:               Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+-----------------------------------------------------------------------------------------------------------------------------
+year month interval two columns enable radix           56855          57911    
    1497          1.8         568.5       1.0X
+year month interval two columns disable radix          58694          59525    
     774          1.7         586.9       1.0X
+
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+day time interval one columns:               Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+---------------------------------------------------------------------------------------------------------------------------
+day time interval one columns enable radix           52460          52564      
   115          1.9         524.6       1.0X
+day time interval one columns disable radix          56617          56967      
   505          1.8         566.2       0.9X
+
+OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+day time interval two columns:               Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+---------------------------------------------------------------------------------------------------------------------------
+day time interval two columns enable radix           57437          60961      
   770          1.7         574.4       1.0X
+day time interval two columns disable radix          59075          60393      
  1153          1.7         590.7       1.0X
+
diff --git a/sql/core/benchmarks/AnsiIntervalSortBenchmark-results.txt 
b/sql/core/benchmarks/AnsiIntervalSortBenchmark-results.txt
new file mode 100644
index 0000000..f2d8714
--- /dev/null
+++ b/sql/core/benchmarks/AnsiIntervalSortBenchmark-results.txt
@@ -0,0 +1,28 @@
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+year month interval one column:               Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+----------------------------------------------------------------------------------------------------------------------------
+year month interval one column enable radix           32543          33408     
    895          3.1         325.4       1.0X
+year month interval one column disable radix          43452          44715     
   1124          2.3         434.5       0.7X
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+year month interval two columns:               Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+-----------------------------------------------------------------------------------------------------------------------------
+year month interval two columns enable radix           47072          47193    
     168          2.1         470.7       1.0X
+year month interval two columns disable radix          47212          47230    
      21          2.1         472.1       1.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+day time interval one columns:               Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+---------------------------------------------------------------------------------------------------------------------------
+day time interval one columns enable radix           34811          36177      
  1207          2.9         348.1       1.0X
+day time interval one columns disable radix          47870          50210      
   NaN          2.1         478.7       0.7X
+
+OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+day time interval two columns:               Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+---------------------------------------------------------------------------------------------------------------------------
+day time interval two columns enable radix           50871          52438      
   NaN          2.0         508.7       1.0X
+day time interval two columns disable radix          50320          51250      
   897          2.0         503.2       1.0X
+
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
index 2bd5cad..a1b093f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
@@ -42,7 +42,8 @@ object SortPrefixUtils {
     sortOrder.dataType match {
       case StringType => stringPrefixComparator(sortOrder)
       case BinaryType => binaryPrefixComparator(sortOrder)
-      case BooleanType | ByteType | ShortType | IntegerType | LongType | 
DateType | TimestampType =>
+      case BooleanType | ByteType | ShortType | IntegerType | LongType | 
DateType | TimestampType |
+          _: AnsiIntervalType =>
         longPrefixComparator(sortOrder)
       case dt: DecimalType if dt.precision - dt.scale <= 
Decimal.MAX_LONG_DIGITS =>
         longPrefixComparator(sortOrder)
@@ -122,7 +123,7 @@ object SortPrefixUtils {
   def canSortFullyWithPrefix(sortOrder: SortOrder): Boolean = {
     sortOrder.dataType match {
       case BooleanType | ByteType | ShortType | IntegerType | LongType | 
DateType |
-           TimestampType | FloatType | DoubleType =>
+           TimestampType | FloatType | DoubleType | _: AnsiIntervalType =>
         true
       case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS =>
         true
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 6a4f3f6..812fdba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -22,6 +22,7 @@ import scala.util.Random
 import org.apache.spark.AccumulatorSuite
 import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 
@@ -124,12 +125,16 @@ class SortSuite extends SparkPlanTest with 
SharedSparkSession {
         sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
         StructType(StructField("a", dataType, nullable = true) :: Nil)
       )
-      checkThatPlansAgree(
-        inputDf,
-        p => SortExec(sortOrder, global = true, p: SparkPlan, 
testSpillFrequency = 23),
-        ReferenceSort(sortOrder, global = true, _: SparkPlan),
-        sortAnswers = false
-      )
+      Seq(true, false).foreach { enableRadix =>
+        withSQLConf(SQLConf.RADIX_SORT_ENABLED.key -> enableRadix.toString) {
+          checkThatPlansAgree(
+            inputDf,
+            p => SortExec(sortOrder, global = true, p: SparkPlan, 
testSpillFrequency = 23),
+            ReferenceSort(sortOrder, global = true, _: SparkPlan),
+            sortAnswers = false
+          )
+        }
+      }
     }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AnsiIntervalSortBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AnsiIntervalSortBenchmark.scala
new file mode 100644
index 0000000..0537527
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AnsiIntervalSortBenchmark.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Benchmark to measure performance for interval sort.
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> --jars <spark core test jar> 
<sql core test jar>
+ *   2. build/sbt "sql/test:runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this 
class>"
+ *      Results will be written to "benchmarks/IntervalBenchmark-results.txt".
+ * }}}
+ */
+object AnsiIntervalSortBenchmark extends SqlBasedBenchmark {
+  private val numRows = 100 * 1000 * 1000
+
+  private def radixBenchmark(name: String, cardinality: Long)(f: => Unit): 
Unit = {
+    val benchmark = new Benchmark(name, cardinality, output = output)
+    benchmark.addCase(s"$name enable radix", 3) { _ =>
+      withSQLConf(SQLConf.RADIX_SORT_ENABLED.key -> "true") {
+        f
+      }
+    }
+
+    benchmark.addCase(s"$name disable radix", 3) { _ =>
+      withSQLConf(SQLConf.RADIX_SORT_ENABLED.key -> "false") {
+        f
+      }
+    }
+    benchmark.run()
+  }
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val dt = spark.range(numRows).selectExpr("make_dt_interval(id % 24) as 
c1", "id as c2")
+    radixBenchmark("year month interval one column", numRows) {
+      dt.sortWithinPartitions("c1").select("c2").noop()
+    }
+
+    radixBenchmark("year month interval two columns", numRows) {
+      dt.sortWithinPartitions("c1", "c2").select("c2").noop()
+    }
+
+    val ym = spark.range(numRows).selectExpr("make_ym_interval(id % 2000) as 
c1", "id as c2")
+    radixBenchmark("day time interval one columns", numRows) {
+      ym.sortWithinPartitions("c1").select("c2").noop()
+    }
+
+    radixBenchmark("day time interval two columns", numRows) {
+      ym.sortWithinPartitions("c1", "c2").select("c2").noop()
+    }
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to