This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dfecc1463ff [SPARK-49434][SPARK-49435][CONNECT][SQL] Move aggregators 
to sql/api
8dfecc1463ff is described below

commit 8dfecc1463ff0c2a3a18e7a4409736344c2dc3b8
Author: Herman van Hovell <[email protected]>
AuthorDate: Sat Sep 28 16:30:15 2024 -0700

    [SPARK-49434][SPARK-49435][CONNECT][SQL] Move aggregators to sql/api
    
    ### What changes were proposed in this pull request?
    This PR moves all user facing Aggregators from sql/core to sql/api.
    
    ### Why are the changes needed?
    We are create a unifies Scala SQL interface. This is part of that effort.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #48267 from hvanhovell/SPARK-49434.
    
    Authored-by: Herman van Hovell <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 project/MimaExcludes.scala                         |  5 ++++
 .../spark/sql/expressions/javalang/typed.java      | 10 ++++----
 .../spark/sql/expressions/ReduceAggregator.scala   | 16 +++++--------
 .../spark/sql/expressions/scalalang/typed.scala    |  4 ++--
 .../spark/sql/internal}/typedaggregators.scala     | 27 ++++++++++------------
 .../ColumnNodeToExpressionConverterSuite.scala     |  2 +-
 6 files changed, 31 insertions(+), 33 deletions(-)

diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 41f547a43b69..2b3d76eb0c2c 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -184,6 +184,11 @@ object MimaExcludes {
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.avro.functions$"),
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.protobuf.functions"),
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.protobuf.functions$"),
+
+    // SPARK-49434: Move aggregators to sql/api
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.javalang.typed"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.scalalang.typed"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.scalalang.typed$"),
   ) ++ loggingExcludes("org.apache.spark.sql.DataFrameReader") ++
     loggingExcludes("org.apache.spark.sql.streaming.DataStreamReader") ++
     loggingExcludes("org.apache.spark.sql.SparkSession#Builder")
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java 
b/sql/api/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
similarity index 88%
rename from 
sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
rename to 
sql/api/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
index e1e4ba4c8e0d..91a1231ec030 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
+++ b/sql/api/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
@@ -19,13 +19,13 @@ package org.apache.spark.sql.expressions.javalang;
 
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.TypedColumn;
-import org.apache.spark.sql.execution.aggregate.TypedAverage;
-import org.apache.spark.sql.execution.aggregate.TypedCount;
-import org.apache.spark.sql.execution.aggregate.TypedSumDouble;
-import org.apache.spark.sql.execution.aggregate.TypedSumLong;
+import org.apache.spark.sql.internal.TypedAverage;
+import org.apache.spark.sql.internal.TypedCount;
+import org.apache.spark.sql.internal.TypedSumDouble;
+import org.apache.spark.sql.internal.TypedSumLong;
 
 /**
- * Type-safe functions available for {@link org.apache.spark.sql.Dataset} 
operations in Java.
+ * Type-safe functions available for {@link org.apache.spark.sql.api.Dataset} 
operations in Java.
  *
  * Scala users should use {@link 
org.apache.spark.sql.expressions.scalalang.typed}.
  *
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
similarity index 82%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
rename to 
sql/api/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
index 192b5bf65c4c..9d98d1a98b00 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
@@ -18,19 +18,17 @@
 package org.apache.spark.sql.expressions
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.Encoder
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
-import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder,
 ProductEncoder}
+import org.apache.spark.sql.{Encoder, Encoders}
 
 /**
  * An aggregator that uses a single associative and commutative reduce 
function. This reduce
- * function can be used to go through all input values and reduces them to a 
single value.
- * If there is no input, a null value is returned.
+ * function can be used to go through all input values and reduces them to a 
single value. If
+ * there is no input, a null value is returned.
  *
  * This class currently assumes there is at least one input row.
  */
 private[sql] class ReduceAggregator[T: Encoder](func: (T, T) => T)
-  extends Aggregator[T, (Boolean, T), T] {
+    extends Aggregator[T, (Boolean, T), T] {
 
   @transient private val encoder = implicitly[Encoder[T]]
 
@@ -47,10 +45,8 @@ private[sql] class ReduceAggregator[T: Encoder](func: (T, T) 
=> T)
 
   override def zero: (Boolean, T) = (false, _zero.asInstanceOf[T])
 
-  override def bufferEncoder: Encoder[(Boolean, T)] = {
-    ProductEncoder.tuple(Seq(PrimitiveBooleanEncoder, 
encoder.asInstanceOf[AgnosticEncoder[T]]))
-      .asInstanceOf[Encoder[(Boolean, T)]]
-  }
+  override def bufferEncoder: Encoder[(Boolean, T)] =
+    Encoders.tuple(Encoders.scalaBoolean, encoder)
 
   override def outputEncoder: Encoder[T] = encoder
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
 b/sql/api/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
similarity index 94%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
rename to 
sql/api/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
index 8d17edd42442..9ea3ab8cd4e1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.sql.expressions.scalalang
 
-import org.apache.spark.sql._
-import org.apache.spark.sql.execution.aggregate._
+import org.apache.spark.sql.TypedColumn
+import org.apache.spark.sql.internal.{TypedAverage, TypedCount, 
TypedSumDouble, TypedSumLong}
 
 /**
  * Type-safe functions available for `Dataset` operations in Scala.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
 b/sql/api/src/main/scala/org/apache/spark/sql/internal/typedaggregators.scala
similarity index 81%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
rename to 
sql/api/src/main/scala/org/apache/spark/sql/internal/typedaggregators.scala
index b6550bf3e4aa..aabb3a6f00fd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/internal/typedaggregators.scala
@@ -15,26 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.aggregate
+package org.apache.spark.sql.internal
 
 import org.apache.spark.api.java.function.MapFunction
-import org.apache.spark.sql.{Encoder, TypedColumn}
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.{Encoder, Encoders, TypedColumn}
 import org.apache.spark.sql.expressions.Aggregator
 
 
////////////////////////////////////////////////////////////////////////////////////////////////////
 // This file defines internal implementations for aggregators.
 
////////////////////////////////////////////////////////////////////////////////////////////////////
 
-
 class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, 
Double] {
   override def zero: Double = 0.0
   override def reduce(b: Double, a: IN): Double = b + f(a)
   override def merge(b1: Double, b2: Double): Double = b1 + b2
   override def finish(reduction: Double): Double = reduction
 
-  override def bufferEncoder: Encoder[Double] = ExpressionEncoder[Double]()
-  override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
+  override def bufferEncoder: Encoder[Double] = Encoders.scalaDouble
+  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
 
   // Java api support
   def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => 
f.call(x).asInstanceOf[Double])
@@ -44,15 +42,14 @@ class TypedSumDouble[IN](val f: IN => Double) extends 
Aggregator[IN, Double, Dou
   }
 }
 
-
 class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] {
   override def zero: Long = 0L
   override def reduce(b: Long, a: IN): Long = b + f(a)
   override def merge(b1: Long, b2: Long): Long = b1 + b2
   override def finish(reduction: Long): Long = reduction
 
-  override def bufferEncoder: Encoder[Long] = ExpressionEncoder[Long]()
-  override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]()
+  override def bufferEncoder: Encoder[Long] = Encoders.scalaLong
+  override def outputEncoder: Encoder[Long] = Encoders.scalaLong
 
   // Java api support
   def this(f: MapFunction[IN, java.lang.Long]) = this((x: IN) => 
f.call(x).asInstanceOf[Long])
@@ -62,7 +59,6 @@ class TypedSumLong[IN](val f: IN => Long) extends 
Aggregator[IN, Long, Long] {
   }
 }
 
-
 class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
   override def zero: Long = 0
   override def reduce(b: Long, a: IN): Long = {
@@ -71,8 +67,8 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, 
Long, Long] {
   override def merge(b1: Long, b2: Long): Long = b1 + b2
   override def finish(reduction: Long): Long = reduction
 
-  override def bufferEncoder: Encoder[Long] = ExpressionEncoder[Long]()
-  override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]()
+  override def bufferEncoder: Encoder[Long] = Encoders.scalaLong
+  override def outputEncoder: Encoder[Long] = Encoders.scalaLong
 
   // Java api support
   def this(f: MapFunction[IN, Object]) = this((x: IN) => 
f.call(x).asInstanceOf[Any])
@@ -81,7 +77,6 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, 
Long, Long] {
   }
 }
 
-
 class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, 
Long), Double] {
   override def zero: (Double, Long) = (0.0, 0L)
   override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + 
b._1, 1 + b._2)
@@ -90,8 +85,10 @@ class TypedAverage[IN](val f: IN => Double) extends 
Aggregator[IN, (Double, Long
     (b1._1 + b2._1, b1._2 + b2._2)
   }
 
-  override def bufferEncoder: Encoder[(Double, Long)] = 
ExpressionEncoder[(Double, Long)]()
-  override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
+  override def bufferEncoder: Encoder[(Double, Long)] =
+    Encoders.tuple(Encoders.scalaDouble, Encoders.scalaLong)
+
+  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
 
   // Java api support
   def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => 
f.call(x).asInstanceOf[Double])
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/ColumnNodeToExpressionConverterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/ColumnNodeToExpressionConverterSuite.scala
index c993aa8e5203..76fcdfc38095 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/ColumnNodeToExpressionConverterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/ColumnNodeToExpressionConverterSuite.scala
@@ -324,7 +324,7 @@ class ColumnNodeToExpressionConverterSuite extends 
SparkFunSuite {
     a.asInstanceOf[AgnosticEncoder[Any]]
 
   test("udf") {
-    val int2LongSum = new aggregate.TypedSumLong[Int]((i: Int) => i.toLong)
+    val int2LongSum = new TypedSumLong[Int]((i: Int) => i.toLong)
     val bufferEncoder = encoderFor(int2LongSum.bufferEncoder)
     val outputEncoder = encoderFor(int2LongSum.outputEncoder)
     val bufferAttrs = bufferEncoder.namedExpressions.map {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to