This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8dfecc1463ff [SPARK-49434][SPARK-49435][CONNECT][SQL] Move aggregators
to sql/api
8dfecc1463ff is described below
commit 8dfecc1463ff0c2a3a18e7a4409736344c2dc3b8
Author: Herman van Hovell <[email protected]>
AuthorDate: Sat Sep 28 16:30:15 2024 -0700
[SPARK-49434][SPARK-49435][CONNECT][SQL] Move aggregators to sql/api
### What changes were proposed in this pull request?
This PR moves all user facing Aggregators from sql/core to sql/api.
### Why are the changes needed?
We are create a unifies Scala SQL interface. This is part of that effort.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48267 from hvanhovell/SPARK-49434.
Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
project/MimaExcludes.scala | 5 ++++
.../spark/sql/expressions/javalang/typed.java | 10 ++++----
.../spark/sql/expressions/ReduceAggregator.scala | 16 +++++--------
.../spark/sql/expressions/scalalang/typed.scala | 4 ++--
.../spark/sql/internal}/typedaggregators.scala | 27 ++++++++++------------
.../ColumnNodeToExpressionConverterSuite.scala | 2 +-
6 files changed, 31 insertions(+), 33 deletions(-)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 41f547a43b69..2b3d76eb0c2c 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -184,6 +184,11 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.avro.functions$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.protobuf.functions"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.protobuf.functions$"),
+
+ // SPARK-49434: Move aggregators to sql/api
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.javalang.typed"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.scalalang.typed"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.scalalang.typed$"),
) ++ loggingExcludes("org.apache.spark.sql.DataFrameReader") ++
loggingExcludes("org.apache.spark.sql.streaming.DataStreamReader") ++
loggingExcludes("org.apache.spark.sql.SparkSession#Builder")
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
b/sql/api/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
similarity index 88%
rename from
sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
rename to
sql/api/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
index e1e4ba4c8e0d..91a1231ec030 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
+++ b/sql/api/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
@@ -19,13 +19,13 @@ package org.apache.spark.sql.expressions.javalang;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.TypedColumn;
-import org.apache.spark.sql.execution.aggregate.TypedAverage;
-import org.apache.spark.sql.execution.aggregate.TypedCount;
-import org.apache.spark.sql.execution.aggregate.TypedSumDouble;
-import org.apache.spark.sql.execution.aggregate.TypedSumLong;
+import org.apache.spark.sql.internal.TypedAverage;
+import org.apache.spark.sql.internal.TypedCount;
+import org.apache.spark.sql.internal.TypedSumDouble;
+import org.apache.spark.sql.internal.TypedSumLong;
/**
- * Type-safe functions available for {@link org.apache.spark.sql.Dataset}
operations in Java.
+ * Type-safe functions available for {@link org.apache.spark.sql.api.Dataset}
operations in Java.
*
* Scala users should use {@link
org.apache.spark.sql.expressions.scalalang.typed}.
*
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
b/sql/api/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
similarity index 82%
rename from
sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
rename to
sql/api/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
index 192b5bf65c4c..9d98d1a98b00 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
@@ -18,19 +18,17 @@
package org.apache.spark.sql.expressions
import org.apache.spark.SparkException
-import org.apache.spark.sql.Encoder
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
-import
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveBooleanEncoder,
ProductEncoder}
+import org.apache.spark.sql.{Encoder, Encoders}
/**
* An aggregator that uses a single associative and commutative reduce
function. This reduce
- * function can be used to go through all input values and reduces them to a
single value.
- * If there is no input, a null value is returned.
+ * function can be used to go through all input values and reduces them to a
single value. If
+ * there is no input, a null value is returned.
*
* This class currently assumes there is at least one input row.
*/
private[sql] class ReduceAggregator[T: Encoder](func: (T, T) => T)
- extends Aggregator[T, (Boolean, T), T] {
+ extends Aggregator[T, (Boolean, T), T] {
@transient private val encoder = implicitly[Encoder[T]]
@@ -47,10 +45,8 @@ private[sql] class ReduceAggregator[T: Encoder](func: (T, T)
=> T)
override def zero: (Boolean, T) = (false, _zero.asInstanceOf[T])
- override def bufferEncoder: Encoder[(Boolean, T)] = {
- ProductEncoder.tuple(Seq(PrimitiveBooleanEncoder,
encoder.asInstanceOf[AgnosticEncoder[T]]))
- .asInstanceOf[Encoder[(Boolean, T)]]
- }
+ override def bufferEncoder: Encoder[(Boolean, T)] =
+ Encoders.tuple(Encoders.scalaBoolean, encoder)
override def outputEncoder: Encoder[T] = encoder
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
b/sql/api/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
similarity index 94%
rename from
sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
rename to
sql/api/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
index 8d17edd42442..9ea3ab8cd4e1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.expressions.scalalang
-import org.apache.spark.sql._
-import org.apache.spark.sql.execution.aggregate._
+import org.apache.spark.sql.TypedColumn
+import org.apache.spark.sql.internal.{TypedAverage, TypedCount,
TypedSumDouble, TypedSumLong}
/**
* Type-safe functions available for `Dataset` operations in Scala.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
b/sql/api/src/main/scala/org/apache/spark/sql/internal/typedaggregators.scala
similarity index 81%
rename from
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
rename to
sql/api/src/main/scala/org/apache/spark/sql/internal/typedaggregators.scala
index b6550bf3e4aa..aabb3a6f00fd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/internal/typedaggregators.scala
@@ -15,26 +15,24 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.aggregate
+package org.apache.spark.sql.internal
import org.apache.spark.api.java.function.MapFunction
-import org.apache.spark.sql.{Encoder, TypedColumn}
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.{Encoder, Encoders, TypedColumn}
import org.apache.spark.sql.expressions.Aggregator
////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines internal implementations for aggregators.
////////////////////////////////////////////////////////////////////////////////////////////////////
-
class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double,
Double] {
override def zero: Double = 0.0
override def reduce(b: Double, a: IN): Double = b + f(a)
override def merge(b1: Double, b2: Double): Double = b1 + b2
override def finish(reduction: Double): Double = reduction
- override def bufferEncoder: Encoder[Double] = ExpressionEncoder[Double]()
- override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
+ override def bufferEncoder: Encoder[Double] = Encoders.scalaDouble
+ override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
// Java api support
def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) =>
f.call(x).asInstanceOf[Double])
@@ -44,15 +42,14 @@ class TypedSumDouble[IN](val f: IN => Double) extends
Aggregator[IN, Double, Dou
}
}
-
class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] {
override def zero: Long = 0L
override def reduce(b: Long, a: IN): Long = b + f(a)
override def merge(b1: Long, b2: Long): Long = b1 + b2
override def finish(reduction: Long): Long = reduction
- override def bufferEncoder: Encoder[Long] = ExpressionEncoder[Long]()
- override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]()
+ override def bufferEncoder: Encoder[Long] = Encoders.scalaLong
+ override def outputEncoder: Encoder[Long] = Encoders.scalaLong
// Java api support
def this(f: MapFunction[IN, java.lang.Long]) = this((x: IN) =>
f.call(x).asInstanceOf[Long])
@@ -62,7 +59,6 @@ class TypedSumLong[IN](val f: IN => Long) extends
Aggregator[IN, Long, Long] {
}
}
-
class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
override def zero: Long = 0
override def reduce(b: Long, a: IN): Long = {
@@ -71,8 +67,8 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN,
Long, Long] {
override def merge(b1: Long, b2: Long): Long = b1 + b2
override def finish(reduction: Long): Long = reduction
- override def bufferEncoder: Encoder[Long] = ExpressionEncoder[Long]()
- override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]()
+ override def bufferEncoder: Encoder[Long] = Encoders.scalaLong
+ override def outputEncoder: Encoder[Long] = Encoders.scalaLong
// Java api support
def this(f: MapFunction[IN, Object]) = this((x: IN) =>
f.call(x).asInstanceOf[Any])
@@ -81,7 +77,6 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN,
Long, Long] {
}
}
-
class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double,
Long), Double] {
override def zero: (Double, Long) = (0.0, 0L)
override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) +
b._1, 1 + b._2)
@@ -90,8 +85,10 @@ class TypedAverage[IN](val f: IN => Double) extends
Aggregator[IN, (Double, Long
(b1._1 + b2._1, b1._2 + b2._2)
}
- override def bufferEncoder: Encoder[(Double, Long)] =
ExpressionEncoder[(Double, Long)]()
- override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]()
+ override def bufferEncoder: Encoder[(Double, Long)] =
+ Encoders.tuple(Encoders.scalaDouble, Encoders.scalaLong)
+
+ override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
// Java api support
def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) =>
f.call(x).asInstanceOf[Double])
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/internal/ColumnNodeToExpressionConverterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/internal/ColumnNodeToExpressionConverterSuite.scala
index c993aa8e5203..76fcdfc38095 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/internal/ColumnNodeToExpressionConverterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/internal/ColumnNodeToExpressionConverterSuite.scala
@@ -324,7 +324,7 @@ class ColumnNodeToExpressionConverterSuite extends
SparkFunSuite {
a.asInstanceOf[AgnosticEncoder[Any]]
test("udf") {
- val int2LongSum = new aggregate.TypedSumLong[Int]((i: Int) => i.toLong)
+ val int2LongSum = new TypedSumLong[Int]((i: Int) => i.toLong)
val bufferEncoder = encoderFor(int2LongSum.bufferEncoder)
val outputEncoder = encoderFor(int2LongSum.outputEncoder)
val bufferAttrs = bufferEncoder.namedExpressions.map {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]