This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 03180ece177d [SPARK-49421][CONNECT][SQL] Create a shared
RelationalGroupedDataset interface
03180ece177d is described below
commit 03180ece177d3ca9ea9ee6aa7a17979696e386ad
Author: Herman van Hovell <[email protected]>
AuthorDate: Thu Aug 29 22:59:55 2024 -0400
[SPARK-49421][CONNECT][SQL] Create a shared RelationalGroupedDataset
interface
### What changes were proposed in this pull request?
This PR introduces a shared RelationalGroupedDataset interface.
### Why are the changes needed?
We want to unify the Classic and Connect Scala DataFrame APIs.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47918 from hvanhovell/SPARK-49421.
Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../main/scala/org/apache/spark/sql/Dataset.scala | 254 ++------------
.../spark/sql/RelationalGroupedDataset.scala | 378 ++++-----------------
.../scala/org/apache/spark/sql/api/Dataset.scala | 218 ++++++++++++
.../spark/sql/api/RelationalGroupedDataset.scala | 338 ++++++++++++++++++
.../resources/query-tests/queries/cube_string.json | 6 +-
.../query-tests/queries/cube_string.proto.bin | Bin 92 -> 96 bytes
.../query-tests/queries/groupby_agg_string.json | 6 +-
.../queries/groupby_agg_string.proto.bin | Bin 103 -> 107 bytes
.../resources/query-tests/queries/groupby_avg.json | 6 +-
.../query-tests/queries/groupby_avg.proto.bin | Bin 90 -> 94 bytes
.../resources/query-tests/queries/groupby_max.json | 6 +-
.../query-tests/queries/groupby_max.proto.bin | Bin 90 -> 94 bytes
.../query-tests/queries/groupby_mean.json | 6 +-
.../query-tests/queries/groupby_mean.proto.bin | Bin 90 -> 94 bytes
.../resources/query-tests/queries/groupby_min.json | 6 +-
.../query-tests/queries/groupby_min.proto.bin | Bin 90 -> 94 bytes
.../resources/query-tests/queries/groupby_sum.json | 6 +-
.../query-tests/queries/groupby_sum.proto.bin | Bin 90 -> 94 bytes
.../queries/grouping_and_grouping_id.json | 6 +-
.../queries/grouping_and_grouping_id.proto.bin | Bin 138 -> 142 bytes
.../test/resources/query-tests/queries/pivot.json | 3 +-
.../resources/query-tests/queries/pivot.proto.bin | Bin 97 -> 99 bytes
.../queries/pivot_without_column_values.json | 3 +-
.../queries/pivot_without_column_values.proto.bin | Bin 85 -> 87 bytes
.../query-tests/queries/rollup_string.json | 6 +-
.../query-tests/queries/rollup_string.proto.bin | Bin 92 -> 96 bytes
.../main/scala/org/apache/spark/sql/Dataset.scala | 229 ++-----------
.../spark/sql/RelationalGroupedDataset.scala | 364 +++-----------------
28 files changed, 775 insertions(+), 1066 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 37a182675b6c..d05834c4fc6c 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -135,6 +135,7 @@ class Dataset[T] private[sql] (
@DeveloperApi val plan: proto.Plan,
val encoder: Encoder[T])
extends api.Dataset[T, Dataset] {
+ type RGD = RelationalGroupedDataset
import sparkSession.RichColumn
@@ -506,58 +507,12 @@ class Dataset[T] private[sql] (
}
}
- /**
- * Groups the Dataset using the specified columns, so we can run aggregation
on them. See
- * [[RelationalGroupedDataset]] for all the available aggregate functions.
- *
- * {{{
- * // Compute the average for all numeric columns grouped by department.
- * ds.groupBy($"department").avg()
- *
- * // Compute the max age and average salary, grouped by department and
gender.
- * ds.groupBy($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- *
- * @group untypedrel
- * @since 3.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
def groupBy(cols: Column*): RelationalGroupedDataset = {
new RelationalGroupedDataset(toDF(), cols,
proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
}
- /**
- * Groups the Dataset using the specified columns, so that we can run
aggregation on them. See
- * [[RelationalGroupedDataset]] for all the available aggregate functions.
- *
- * This is a variant of groupBy that can only group by existing columns
using column names (i.e.
- * cannot construct expressions).
- *
- * {{{
- * // Compute the average for all numeric columns grouped by department.
- * ds.groupBy("department").avg()
- *
- * // Compute the max age and average salary, grouped by department and
gender.
- * ds.groupBy($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- * @group untypedrel
- * @since 3.4.0
- */
- @scala.annotation.varargs
- def groupBy(col1: String, cols: String*): RelationalGroupedDataset = {
- val colNames: Seq[String] = col1 +: cols
- new RelationalGroupedDataset(
- toDF(),
- colNames.map(colName => Column(colName)),
- proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
- }
-
/** @inheritdoc */
def reduce(func: (T, T) => T): T = {
val udf = SparkUserDefinedFunction(
@@ -599,134 +554,19 @@ class Dataset[T] private[sql] (
def groupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]):
KeyValueGroupedDataset[K, T] =
groupByKey(UdfUtils.mapFunctionToScalaFunc(func))(encoder)
- /**
- * Create a multi-dimensional rollup for the current Dataset using the
specified columns, so we
- * can run aggregation on them. See [[RelationalGroupedDataset]] for all the
available aggregate
- * functions.
- *
- * {{{
- * // Compute the average for all numeric columns rolled up by department
and group.
- * ds.rollup($"department", $"group").avg()
- *
- * // Compute the max age and average salary, rolled up by department and
gender.
- * ds.rollup($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- *
- * @group untypedrel
- * @since 3.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
def rollup(cols: Column*): RelationalGroupedDataset = {
new RelationalGroupedDataset(toDF(), cols,
proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
}
- /**
- * Create a multi-dimensional rollup for the current Dataset using the
specified columns, so we
- * can run aggregation on them. See [[RelationalGroupedDataset]] for all the
available aggregate
- * functions.
- *
- * This is a variant of rollup that can only group by existing columns using
column names (i.e.
- * cannot construct expressions).
- *
- * {{{
- * // Compute the average for all numeric columns rolled up by department
and group.
- * ds.rollup("department", "group").avg()
- *
- * // Compute the max age and average salary, rolled up by department and
gender.
- * ds.rollup($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- *
- * @group untypedrel
- * @since 3.4.0
- */
- @scala.annotation.varargs
- def rollup(col1: String, cols: String*): RelationalGroupedDataset = {
- val colNames: Seq[String] = col1 +: cols
- new RelationalGroupedDataset(
- toDF(),
- colNames.map(colName => Column(colName)),
- proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
- }
-
- /**
- * Create a multi-dimensional cube for the current Dataset using the
specified columns, so we
- * can run aggregation on them. See [[RelationalGroupedDataset]] for all the
available aggregate
- * functions.
- *
- * {{{
- * // Compute the average for all numeric columns cubed by department and
group.
- * ds.cube($"department", $"group").avg()
- *
- * // Compute the max age and average salary, cubed by department and
gender.
- * ds.cube($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- *
- * @group untypedrel
- * @since 3.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
def cube(cols: Column*): RelationalGroupedDataset = {
new RelationalGroupedDataset(toDF(), cols,
proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
}
- /**
- * Create a multi-dimensional cube for the current Dataset using the
specified columns, so we
- * can run aggregation on them. See [[RelationalGroupedDataset]] for all the
available aggregate
- * functions.
- *
- * This is a variant of cube that can only group by existing columns using
column names (i.e.
- * cannot construct expressions).
- *
- * {{{
- * // Compute the average for all numeric columns cubed by department and
group.
- * ds.cube("department", "group").avg()
- *
- * // Compute the max age and average salary, cubed by department and
gender.
- * ds.cube($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- * @group untypedrel
- * @since 3.4.0
- */
- @scala.annotation.varargs
- def cube(col1: String, cols: String*): RelationalGroupedDataset = {
- val colNames: Seq[String] = col1 +: cols
- new RelationalGroupedDataset(
- toDF(),
- colNames.map(colName => Column(colName)),
- proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
- }
-
- /**
- * Create multi-dimensional aggregation for the current Dataset using the
specified grouping
- * sets, so we can run aggregation on them. See [[RelationalGroupedDataset]]
for all the
- * available aggregate functions.
- *
- * {{{
- * // Compute the average for all numeric columns group by specific
grouping sets.
- * ds.groupingSets(Seq(Seq($"department", $"group"), Seq()),
$"department", $"group").avg()
- *
- * // Compute the max age and average salary, group by specific grouping
sets.
- * ds.groupingSets(Seq($"department", $"gender"), Seq()), $"department",
$"group").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- *
- * @group untypedrel
- * @since 4.0.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
def groupingSets(groupingSets: Seq[Seq[Column]], cols: Column*):
RelationalGroupedDataset = {
val groupingSetMsgs = groupingSets.map { groupingSet =>
@@ -743,61 +583,6 @@ class Dataset[T] private[sql] (
groupingSets = Some(groupingSetMsgs))
}
- /**
- * (Scala-specific) Aggregates on the entire Dataset without groups.
- * {{{
- * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
- * ds.agg("age" -> "max", "salary" -> "avg")
- * ds.groupBy().agg("age" -> "max", "salary" -> "avg")
- * }}}
- *
- * @group untypedrel
- * @since 3.4.0
- */
- def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame =
{
- groupBy().agg(aggExpr, aggExprs: _*)
- }
-
- /**
- * (Scala-specific) Aggregates on the entire Dataset without groups.
- * {{{
- * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
- * ds.agg(Map("age" -> "max", "salary" -> "avg"))
- * ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
- * }}}
- *
- * @group untypedrel
- * @since 3.4.0
- */
- def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs)
-
- /**
- * (Java-specific) Aggregates on the entire Dataset without groups.
- * {{{
- * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
- * ds.agg(Map("age" -> "max", "salary" -> "avg"))
- * ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
- * }}}
- *
- * @group untypedrel
- * @since 3.4.0
- */
- def agg(exprs: java.util.Map[String, String]): DataFrame =
groupBy().agg(exprs)
-
- /**
- * Aggregates on the entire Dataset without groups.
- * {{{
- * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
- * ds.agg(max($"age"), avg($"salary"))
- * ds.groupBy().agg(max($"age"), avg($"salary"))
- * }}}
- *
- * @group untypedrel
- * @since 3.4.0
- */
- @scala.annotation.varargs
- def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr,
exprs: _*)
-
/** @inheritdoc */
def unpivot(
ids: Array[Column],
@@ -1745,4 +1530,33 @@ class Dataset[T] private[sql] (
/** @inheritdoc */
override def distinct(): Dataset[T] = super.distinct()
+
+ /** @inheritdoc */
+ @scala.annotation.varargs
+ override def groupBy(col1: String, cols: String*): RelationalGroupedDataset =
+ super.groupBy(col1, cols: _*)
+
+ /** @inheritdoc */
+ @scala.annotation.varargs
+ override def rollup(col1: String, cols: String*): RelationalGroupedDataset =
+ super.rollup(col1, cols: _*)
+
+ /** @inheritdoc */
+ @scala.annotation.varargs
+ override def cube(col1: String, cols: String*): RelationalGroupedDataset =
+ super.cube(col1, cols: _*)
+
+ /** @inheritdoc */
+ override def agg(aggExpr: (String, String), aggExprs: (String, String)*):
DataFrame =
+ super.agg(aggExpr, aggExprs: _*)
+
+ /** @inheritdoc */
+ override def agg(exprs: Map[String, String]): DataFrame = super.agg(exprs)
+
+ /** @inheritdoc */
+ override def agg(exprs: java.util.Map[String, String]): DataFrame =
super.agg(exprs)
+
+ /** @inheritdoc */
+ @scala.annotation.varargs
+ override def agg(expr: Column, exprs: Column*): DataFrame = super.agg(expr,
exprs: _*)
}
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index 0c8657e12d8d..c9b011ca4535 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql
-import java.util.Locale
-
import scala.jdk.CollectionConverters._
import org.apache.spark.connect.proto
@@ -36,42 +34,52 @@ import org.apache.spark.connect.proto
* @since 3.4.0
*/
class RelationalGroupedDataset private[sql] (
- private[sql] val df: DataFrame,
+ protected val df: DataFrame,
private[sql] val groupingExprs: Seq[Column],
groupType: proto.Aggregate.GroupType,
pivot: Option[proto.Aggregate.Pivot] = None,
- groupingSets: Option[Seq[proto.Aggregate.GroupingSets]] = None) {
+ groupingSets: Option[Seq[proto.Aggregate.GroupingSets]] = None)
+ extends api.RelationalGroupedDataset[Dataset] {
+ type RGD = RelationalGroupedDataset
import df.sparkSession.RichColumn
- private[this] def toDF(aggExprs: Seq[Column]): DataFrame = {
+ protected def toDF(aggExprs: Seq[Column]): DataFrame = {
df.sparkSession.newDataFrame { builder =>
- builder.getAggregateBuilder
+ val aggBuilder = builder.getAggregateBuilder
.setInput(df.plan.getRoot)
- .addAllGroupingExpressions(groupingExprs.map(_.expr).asJava)
- .addAllAggregateExpressions(aggExprs.map(e =>
e.typedExpr(df.encoder)).asJava)
+ groupingExprs.foreach(c => aggBuilder.addGroupingExpressions(c.expr))
+ aggExprs.foreach(c =>
aggBuilder.addAggregateExpressions(c.typedExpr(df.encoder)))
groupType match {
case proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP =>
-
builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
+ aggBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
case proto.Aggregate.GroupType.GROUP_TYPE_CUBE =>
-
builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
+ aggBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
-
builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
+ aggBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
case proto.Aggregate.GroupType.GROUP_TYPE_PIVOT =>
assert(pivot.isDefined)
- builder.getAggregateBuilder
+ aggBuilder
.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_PIVOT)
.setPivot(pivot.get)
case proto.Aggregate.GroupType.GROUP_TYPE_GROUPING_SETS =>
assert(groupingSets.isDefined)
- val aggBuilder = builder.getAggregateBuilder
- .setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPING_SETS)
+
aggBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPING_SETS)
groupingSets.get.foreach(aggBuilder.addGroupingSets)
case g => throw new UnsupportedOperationException(g.toString)
}
}
}
+ protected def selectNumericColumns(colNames: Seq[String]): Seq[Column] = {
+ // This behaves different than the classic implementation. The classic
implementation validates
+ // if a column is actually a number, and if it is not it throws an error
immediately. In connect
+ // it depends on the input type (casting) rules for the method invoked. If
the input violates
+ // the a different error will be thrown. However it is also possible to
get a result for a
+ // non-numeric column in connect, for example when you use min/max.
+ colNames.map(df.col)
+ }
+
/**
* Returns a `KeyValueGroupedDataset` where the data is grouped by the
grouping expressions of
* current `RelationalGroupedDataset`.
@@ -82,295 +90,71 @@ class RelationalGroupedDataset private[sql] (
KeyValueGroupedDatasetImpl[K, T](df, encoderFor[K], encoderFor[T],
groupingExprs)
}
- /**
- * (Scala-specific) Compute aggregates by specifying the column names and
aggregate methods. The
- * resulting `DataFrame` will also contain the grouping columns.
- *
- * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
- * {{{
- * // Selects the age of the oldest employee and the aggregate expense for
each department
- * df.groupBy("department").agg(
- * "age" -> "max",
- * "expense" -> "sum"
- * )
- * }}}
- *
- * @since 3.4.0
- */
- def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame =
{
- toDF((aggExpr +: aggExprs).map { case (colName, expr) =>
- strToColumn(expr, df(colName))
- })
- }
-
- /**
- * (Scala-specific) Compute aggregates by specifying a map from column name
to aggregate
- * methods. The resulting `DataFrame` will also contain the grouping columns.
- *
- * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
- * {{{
- * // Selects the age of the oldest employee and the aggregate expense for
each department
- * df.groupBy("department").agg(Map(
- * "age" -> "max",
- * "expense" -> "sum"
- * ))
- * }}}
- *
- * @since 3.4.0
- */
- def agg(exprs: Map[String, String]): DataFrame = {
- toDF(exprs.map { case (colName, expr) =>
- strToColumn(expr, df(colName))
- }.toSeq)
- }
+ /** @inheritdoc */
+ override def agg(aggExpr: (String, String), aggExprs: (String, String)*):
DataFrame =
+ super.agg(aggExpr, aggExprs: _*)
- /**
- * (Java-specific) Compute aggregates by specifying a map from column name
to aggregate methods.
- * The resulting `DataFrame` will also contain the grouping columns.
- *
- * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
- * {{{
- * // Selects the age of the oldest employee and the aggregate expense for
each department
- * import com.google.common.collect.ImmutableMap;
- * df.groupBy("department").agg(ImmutableMap.of("age", "max", "expense",
"sum"));
- * }}}
- *
- * @since 3.4.0
- */
- def agg(exprs: java.util.Map[String, String]): DataFrame = {
- agg(exprs.asScala.toMap)
- }
+ /** @inheritdoc */
+ override def agg(exprs: Map[String, String]): DataFrame = super.agg(exprs)
- private[this] def strToColumn(expr: String, inputExpr: Column): Column = {
- expr.toLowerCase(Locale.ROOT) match {
- case "avg" | "average" | "mean" => functions.avg(inputExpr)
- case "stddev" | "std" => functions.stddev(inputExpr)
- case "count" | "size" => functions.count(inputExpr)
- case name => Column.fn(name, inputExpr)
- }
- }
+ /** @inheritdoc */
+ override def agg(exprs: java.util.Map[String, String]): DataFrame =
super.agg(exprs)
- /**
- * Compute aggregates by specifying a series of aggregate columns. Note that
this function by
- * default retains the grouping columns in its output. To not retain
grouping columns, set
- * `spark.sql.retainGroupColumns` to false.
- *
- * The available aggregate methods are defined in
[[org.apache.spark.sql.functions]].
- *
- * {{{
- * // Selects the age of the oldest employee and the aggregate expense for
each department
- *
- * // Scala:
- * import org.apache.spark.sql.functions._
- * df.groupBy("department").agg(max("age"), sum("expense"))
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * df.groupBy("department").agg(max("age"), sum("expense"));
- * }}}
- *
- * Note that before Spark 1.4, the default behavior is to NOT retain
grouping columns. To change
- * to that behavior, set config variable `spark.sql.retainGroupColumns` to
`false`.
- * {{{
- * // Scala, 1.3.x:
- * df.groupBy("department").agg($"department", max("age"), sum("expense"))
- *
- * // Java, 1.3.x:
- * df.groupBy("department").agg(col("department"), max("age"),
sum("expense"));
- * }}}
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def agg(expr: Column, exprs: Column*): DataFrame = {
- toDF(expr +: exprs)
- }
+ override def agg(expr: Column, exprs: Column*): DataFrame = super.agg(expr,
exprs: _*)
- /**
- * Count the number of rows for each group. The resulting `DataFrame` will
also contain the
- * grouping columns.
- *
- * @since 3.4.0
- */
- def count(): DataFrame =
toDF(Seq(functions.count(functions.lit(1)).alias("count")))
+ /** @inheritdoc */
+ override def count(): DataFrame = super.count()
- /**
- * Compute the average value for each numeric columns for each group. This
is an alias for
- * `avg`. The resulting `DataFrame` will also contain the grouping columns.
When specified
- * columns are given, only compute the average values for them.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def mean(colNames: String*): DataFrame = {
- toDF(colNames.map(colName => functions.mean(colName)))
- }
+ override def mean(colNames: String*): DataFrame = super.mean(colNames: _*)
- /**
- * Compute the max value for each numeric columns for each group. The
resulting `DataFrame` will
- * also contain the grouping columns. When specified columns are given, only
compute the max
- * values for them.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def max(colNames: String*): DataFrame = {
- toDF(colNames.map(colName => functions.max(colName)))
- }
+ override def max(colNames: String*): DataFrame = super.max(colNames: _*)
- /**
- * Compute the mean value for each numeric columns for each group. The
resulting `DataFrame`
- * will also contain the grouping columns. When specified columns are given,
only compute the
- * mean values for them.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def avg(colNames: String*): DataFrame = {
- toDF(colNames.map(colName => functions.avg(colName)))
- }
+ override def avg(colNames: String*): DataFrame = super.avg(colNames: _*)
- /**
- * Compute the min value for each numeric column for each group. The
resulting `DataFrame` will
- * also contain the grouping columns. When specified columns are given, only
compute the min
- * values for them.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def min(colNames: String*): DataFrame = {
- toDF(colNames.map(colName => functions.min(colName)))
- }
+ override def min(colNames: String*): DataFrame = super.min(colNames: _*)
- /**
- * Compute the sum for each numeric columns for each group. The resulting
`DataFrame` will also
- * contain the grouping columns. When specified columns are given, only
compute the sum for
- * them.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def sum(colNames: String*): DataFrame = {
- toDF(colNames.map(colName => functions.sum(colName)))
- }
+ override def sum(colNames: String*): DataFrame = super.sum(colNames: _*)
- /**
- * Pivots a column of the current `DataFrame` and performs the specified
aggregation.
- *
- * Spark will eagerly compute the distinct values in `pivotColumn` so it can
determine the
- * resulting schema of the transformation. To avoid any eager computations,
provide an explicit
- * list of values via `pivot(pivotColumn: String, values: Seq[Any])`.
- *
- * {{{
- * // Compute the sum of earnings for each year by course with each course
as a separate column
- * df.groupBy("year").pivot("course").sum("earnings")
- * }}}
- *
- * @see
- * `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
except for the
- * aggregation.
- *
- * @param pivotColumn
- * Name of the column to pivot.
- * @since 3.4.0
- */
- def pivot(pivotColumn: String): RelationalGroupedDataset =
pivot(Column(pivotColumn))
+ /** @inheritdoc */
+ override def pivot(pivotColumn: String): RelationalGroupedDataset =
super.pivot(pivotColumn)
- /**
- * Pivots a column of the current `DataFrame` and performs the specified
aggregation. There are
- * two versions of pivot function: one that requires the caller to specify
the list of distinct
- * values to pivot on, and one that does not. The latter is more concise but
less efficient,
- * because Spark needs to first compute the list of distinct values
internally.
- *
- * {{{
- * // Compute the sum of earnings for each year by course with each course
as a separate column
- * df.groupBy("year").pivot("course", Seq("dotNET",
"Java")).sum("earnings")
- *
- * // Or without specifying column values (less efficient)
- * df.groupBy("year").pivot("course").sum("earnings")
- * }}}
- *
- * From Spark 3.0.0, values can be literal columns, for instance, struct.
For pivoting by
- * multiple columns, use the `struct` function to combine the columns and
values:
- *
- * {{{
- * df.groupBy("year")
- * .pivot("trainingCourse", Seq(struct(lit("java"), lit("Experts"))))
- * .agg(sum($"earnings"))
- * }}}
- *
- * @see
- * `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
except for the
- * aggregation.
- *
- * @param pivotColumn
- * Name of the column to pivot.
- * @param values
- * List of values that will be translated to columns in the output
DataFrame.
- * @since 3.4.0
- */
- def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset =
{
- pivot(Column(pivotColumn), values)
- }
+ /** @inheritdoc */
+ override def pivot(pivotColumn: String, values: Seq[Any]):
RelationalGroupedDataset =
+ super.pivot(pivotColumn, values)
- /**
- * (Java-specific) Pivots a column of the current `DataFrame` and performs
the specified
- * aggregation.
- *
- * There are two versions of pivot function: one that requires the caller to
specify the list of
- * distinct values to pivot on, and one that does not. The latter is more
concise but less
- * efficient, because Spark needs to first compute the list of distinct
values internally.
- *
- * {{{
- * // Compute the sum of earnings for each year by course with each course
as a separate column
- * df.groupBy("year").pivot("course", Arrays.<Object>asList("dotNET",
"Java")).sum("earnings");
- *
- * // Or without specifying column values (less efficient)
- * df.groupBy("year").pivot("course").sum("earnings");
- * }}}
- *
- * @see
- * `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
except for the
- * aggregation.
- *
- * @param pivotColumn
- * Name of the column to pivot.
- * @param values
- * List of values that will be translated to columns in the output
DataFrame.
- * @since 3.4.0
- */
- def pivot(pivotColumn: String, values: java.util.List[Any]):
RelationalGroupedDataset = {
- pivot(Column(pivotColumn), values)
+ /** @inheritdoc */
+ override def pivot(pivotColumn: String, values: java.util.List[Any]):
RelationalGroupedDataset =
+ super.pivot(pivotColumn, values)
+
+ /** @inheritdoc */
+ override def pivot(
+ pivotColumn: Column,
+ values: java.util.List[Any]): RelationalGroupedDataset = {
+ super.pivot(pivotColumn, values)
}
- /**
- * Pivots a column of the current `DataFrame` and performs the specified
aggregation. This is an
- * overloaded version of the `pivot` method with `pivotColumn` of the
`String` type.
- *
- * {{{
- * // Compute the sum of earnings for each year by course with each course
as a separate column
- * df.groupBy($"year").pivot($"course", Seq("dotNET",
"Java")).sum($"earnings")
- * }}}
- *
- * @see
- * `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
except for the
- * aggregation.
- *
- * @param pivotColumn
- * the column to pivot.
- * @param values
- * List of values that will be translated to columns in the output
DataFrame.
- * @since 3.4.0
- */
+ /** @inheritdoc */
def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset =
{
groupType match {
case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
- val valueExprs = values.map(_ match {
+ val valueExprs = values.map {
case c: Column if c.expr.hasLiteral => c.expr.getLiteral
case c: Column if !c.expr.hasLiteral =>
throw new IllegalArgumentException("values only accept literal
Column")
case v => functions.lit(v).expr.getLiteral
- })
+ }
new RelationalGroupedDataset(
df,
groupingExprs,
@@ -386,46 +170,8 @@ class RelationalGroupedDataset private[sql] (
}
}
- /**
- * Pivots a column of the current `DataFrame` and performs the specified
aggregation.
- *
- * Spark will eagerly compute the distinct values in `pivotColumn` so it can
determine the
- * resulting schema of the transformation. To avoid any eager computations,
provide an explicit
- * list of values via `pivot(pivotColumn: Column, values: Seq[Any])`.
- *
- * {{{
- * // Compute the sum of earnings for each year by course with each course
as a separate column
- * df.groupBy($"year").pivot($"course").sum($"earnings");
- * }}}
- *
- * @see
- * `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
except for the
- * aggregation.
- *
- * @param pivotColumn
- * he column to pivot.
- * @since 3.4.0
- */
+ /** @inheritdoc */
def pivot(pivotColumn: Column): RelationalGroupedDataset = {
- pivot(pivotColumn, Seq())
- }
-
- /**
- * (Java-specific) Pivots a column of the current `DataFrame` and performs
the specified
- * aggregation. This is an overloaded version of the `pivot` method with
`pivotColumn` of the
- * `String` type.
- *
- * @see
- * `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
except for the
- * aggregation.
- *
- * @param pivotColumn
- * the column to pivot.
- * @param values
- * List of values that will be translated to columns in the output
DataFrame.
- * @since 3.4.0
- */
- def pivot(pivotColumn: Column, values: java.util.List[Any]):
RelationalGroupedDataset = {
- pivot(pivotColumn, values.asScala.toSeq)
+ pivot(pivotColumn, Nil)
}
}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
index 16f15205cabe..2b071a384e0a 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
@@ -120,6 +120,8 @@ import org.apache.spark.util.SparkClassUtils
*/
@Stable
abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable {
+ type RGD <: RelationalGroupedDataset[DS]
+
def sparkSession: SparkSession[DS]
val encoder: Encoder[T]
@@ -1137,6 +1139,222 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]]
extends Serializable {
*/
def where(conditionExpr: String): DS[T] = filter(conditionExpr)
+ /**
+ * Groups the Dataset using the specified columns, so we can run aggregation
on them. See
+ * [[RelationalGroupedDataset]] for all the available aggregate functions.
+ *
+ * {{{
+ * // Compute the average for all numeric columns grouped by department.
+ * ds.groupBy($"department").avg()
+ *
+ * // Compute the max age and average salary, grouped by department and
gender.
+ * ds.groupBy($"department", $"gender").agg(Map(
+ * "salary" -> "avg",
+ * "age" -> "max"
+ * ))
+ * }}}
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
+ @scala.annotation.varargs
+ def groupBy(cols: Column*): RGD
+
+ /**
+ * Groups the Dataset using the specified columns, so that we can run
aggregation on them.
+ * See [[RelationalGroupedDataset]] for all the available aggregate
functions.
+ *
+ * This is a variant of groupBy that can only group by existing columns
using column names
+ * (i.e. cannot construct expressions).
+ *
+ * {{{
+ * // Compute the average for all numeric columns grouped by department.
+ * ds.groupBy("department").avg()
+ *
+ * // Compute the max age and average salary, grouped by department and
gender.
+ * ds.groupBy($"department", $"gender").agg(Map(
+ * "salary" -> "avg",
+ * "age" -> "max"
+ * ))
+ * }}}
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
+ @scala.annotation.varargs
+ def groupBy(col1: String, cols: String*): RGD = groupBy((col1 +:
cols).map(col): _*)
+
+ /**
+ * Create a multi-dimensional rollup for the current Dataset using the
specified columns,
+ * so we can run aggregation on them.
+ * See [[RelationalGroupedDataset]] for all the available aggregate
functions.
+ *
+ * {{{
+ * // Compute the average for all numeric columns rolled up by department
and group.
+ * ds.rollup($"department", $"group").avg()
+ *
+ * // Compute the max age and average salary, rolled up by department and
gender.
+ * ds.rollup($"department", $"gender").agg(Map(
+ * "salary" -> "avg",
+ * "age" -> "max"
+ * ))
+ * }}}
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
+ @scala.annotation.varargs
+ def rollup(cols: Column*): RGD
+
+ /**
+ * Create a multi-dimensional rollup for the current Dataset using the
specified columns,
+ * so we can run aggregation on them.
+ * See [[RelationalGroupedDataset]] for all the available aggregate
functions.
+ *
+ * This is a variant of rollup that can only group by existing columns using
column names
+ * (i.e. cannot construct expressions).
+ *
+ * {{{
+ * // Compute the average for all numeric columns rolled up by department
and group.
+ * ds.rollup("department", "group").avg()
+ *
+ * // Compute the max age and average salary, rolled up by department and
gender.
+ * ds.rollup($"department", $"gender").agg(Map(
+ * "salary" -> "avg",
+ * "age" -> "max"
+ * ))
+ * }}}
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
+ @scala.annotation.varargs
+ def rollup(col1: String, cols: String*): RGD = rollup((col1 +:
cols).map(col): _*)
+
+ /**
+ * Create a multi-dimensional cube for the current Dataset using the
specified columns,
+ * so we can run aggregation on them.
+ * See [[RelationalGroupedDataset]] for all the available aggregate
functions.
+ *
+ * {{{
+ * // Compute the average for all numeric columns cubed by department and
group.
+ * ds.cube($"department", $"group").avg()
+ *
+ * // Compute the max age and average salary, cubed by department and
gender.
+ * ds.cube($"department", $"gender").agg(Map(
+ * "salary" -> "avg",
+ * "age" -> "max"
+ * ))
+ * }}}
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
+ @scala.annotation.varargs
+ def cube(cols: Column*): RGD
+
+ /**
+ * Create a multi-dimensional cube for the current Dataset using the
specified columns,
+ * so we can run aggregation on them.
+ * See [[RelationalGroupedDataset]] for all the available aggregate
functions.
+ *
+ * This is a variant of cube that can only group by existing columns using
column names
+ * (i.e. cannot construct expressions).
+ *
+ * {{{
+ * // Compute the average for all numeric columns cubed by department and
group.
+ * ds.cube("department", "group").avg()
+ *
+ * // Compute the max age and average salary, cubed by department and
gender.
+ * ds.cube($"department", $"gender").agg(Map(
+ * "salary" -> "avg",
+ * "age" -> "max"
+ * ))
+ * }}}
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
+ @scala.annotation.varargs
+ def cube(col1: String, cols: String*): RGD = cube((col1 +: cols).map(col):
_*)
+
+ /**
+ * Create multi-dimensional aggregation for the current Dataset using the
specified grouping sets,
+ * so we can run aggregation on them.
+ * See [[RelationalGroupedDataset]] for all the available aggregate
functions.
+ *
+ * {{{
+ * // Compute the average for all numeric columns group by specific
grouping sets.
+ * ds.groupingSets(Seq(Seq($"department", $"group"), Seq()),
$"department", $"group").avg()
+ *
+ * // Compute the max age and average salary, group by specific grouping
sets.
+ * ds.groupingSets(Seq($"department", $"gender"), Seq()), $"department",
$"group").agg(Map(
+ * "salary" -> "avg",
+ * "age" -> "max"
+ * ))
+ * }}}
+ *
+ * @group untypedrel
+ * @since 4.0.0
+ */
+ @scala.annotation.varargs
+ def groupingSets(groupingSets: Seq[Seq[Column]], cols: Column*): RGD
+
+ /**
+ * (Scala-specific) Aggregates on the entire Dataset without groups.
+ * {{{
+ * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
+ * ds.agg("age" -> "max", "salary" -> "avg")
+ * ds.groupBy().agg("age" -> "max", "salary" -> "avg")
+ * }}}
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
+ def agg(aggExpr: (String, String), aggExprs: (String, String)*): DS[Row] = {
+ groupBy().agg(aggExpr, aggExprs: _*)
+ }
+
+ /**
+ * (Scala-specific) Aggregates on the entire Dataset without groups.
+ * {{{
+ * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
+ * ds.agg(Map("age" -> "max", "salary" -> "avg"))
+ * ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
+ * }}}
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
+ def agg(exprs: Map[String, String]): DS[Row] = groupBy().agg(exprs)
+
+ /**
+ * (Java-specific) Aggregates on the entire Dataset without groups.
+ * {{{
+ * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
+ * ds.agg(Map("age" -> "max", "salary" -> "avg"))
+ * ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
+ * }}}
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
+ def agg(exprs: util.Map[String, String]): DS[Row] = groupBy().agg(exprs)
+
+ /**
+ * Aggregates on the entire Dataset without groups.
+ * {{{
+ * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
+ * ds.agg(max($"age"), avg($"salary"))
+ * ds.groupBy().agg(max($"age"), avg($"salary"))
+ * }}}
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
+ @scala.annotation.varargs
+ def agg(expr: Column, exprs: Column*): DS[Row] = groupBy().agg(expr, exprs:
_*)
+
/**
* (Scala-specific)
* Reduces the elements of this Dataset using the specified binary function.
The given `func`
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/api/RelationalGroupedDataset.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/RelationalGroupedDataset.scala
new file mode 100644
index 000000000000..30b2992d43a0
--- /dev/null
+++
b/sql/api/src/main/scala/org/apache/spark/sql/api/RelationalGroupedDataset.scala
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.api
+
+import scala.jdk.CollectionConverters._
+
+import _root_.java.util
+
+import org.apache.spark.annotation.Stable
+import org.apache.spark.sql.{functions, Column, Row}
+
+/**
+ * A set of methods for aggregations on a `DataFrame`, created by
[[Dataset#groupBy groupBy]],
+ * [[Dataset#cube cube]] or [[Dataset#rollup rollup]] (and also `pivot`).
+ *
+ * The main method is the `agg` function, which has multiple variants. This
class also contains
+ * some first-order statistics such as `mean`, `sum` for convenience.
+ *
+ * @note This class was named `GroupedData` in Spark 1.x.
+ * @since 2.0.0
+ */
+@Stable
+abstract class RelationalGroupedDataset[DS[U] <: Dataset[U, DS]] {
+ type RGD <: RelationalGroupedDataset[DS]
+
+ protected def df: DS[Row]
+
+ /**
+ * Create a aggregation based on the grouping column, the grouping type, and
the aggregations.
+ */
+ protected def toDF(aggCols: Seq[Column]): DS[Row]
+
+ protected def selectNumericColumns(colNames: Seq[String]): Seq[Column]
+
+ /**
+ * Convert a name method tuple into a Column.
+ */
+ private def toAggCol(colAndMethod: (String, String)): Column = {
+ val col = df.col(colAndMethod._1)
+ colAndMethod._2.toLowerCase(util.Locale.ROOT) match {
+ case "avg" | "average" | "mean" => functions.avg(col)
+ case "stddev" | "std" => functions.stddev(col)
+ case "count" | "size" => functions.count(col)
+ case name => Column.fn(name, col)
+ }
+ }
+
+ private def aggregateNumericColumns(
+ colNames: Seq[String],
+ function: Column => Column): DS[Row] = {
+ toDF(selectNumericColumns(colNames).map(function))
+ }
+
+ /**
+ * (Scala-specific) Compute aggregates by specifying the column names and
+ * aggregate methods. The resulting `DataFrame` will also contain the
grouping columns.
+ *
+ * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
+ * {{{
+ * // Selects the age of the oldest employee and the aggregate expense for
each department
+ * df.groupBy("department").agg(
+ * "age" -> "max",
+ * "expense" -> "sum"
+ * )
+ * }}}
+ *
+ * @since 1.3.0
+ */
+ def agg(aggExpr: (String, String), aggExprs: (String, String)*): DS[Row] =
+ toDF((aggExpr +: aggExprs).map(toAggCol))
+
+ /**
+ * (Scala-specific) Compute aggregates by specifying a map from column name
to
+ * aggregate methods. The resulting `DataFrame` will also contain the
grouping columns.
+ *
+ * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
+ * {{{
+ * // Selects the age of the oldest employee and the aggregate expense for
each department
+ * df.groupBy("department").agg(Map(
+ * "age" -> "max",
+ * "expense" -> "sum"
+ * ))
+ * }}}
+ *
+ * @since 1.3.0
+ */
+ def agg(exprs: Map[String, String]): DS[Row] =
toDF(exprs.map(toAggCol).toSeq)
+
+ /**
+ * (Java-specific) Compute aggregates by specifying a map from column name to
+ * aggregate methods. The resulting `DataFrame` will also contain the
grouping columns.
+ *
+ * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
+ * {{{
+ * // Selects the age of the oldest employee and the aggregate expense for
each department
+ * import com.google.common.collect.ImmutableMap;
+ * df.groupBy("department").agg(ImmutableMap.of("age", "max", "expense",
"sum"));
+ * }}}
+ *
+ * @since 1.3.0
+ */
+ def agg(exprs: util.Map[String, String]): DS[Row] = {
+ agg(exprs.asScala.toMap)
+ }
+
+ /**
+ * Compute aggregates by specifying a series of aggregate columns. Note that
this function by
+ * default retains the grouping columns in its output. To not retain
grouping columns, set
+ * `spark.sql.retainGroupColumns` to false.
+ *
+ * The available aggregate methods are defined in
[[org.apache.spark.sql.functions]].
+ *
+ * {{{
+ * // Selects the age of the oldest employee and the aggregate expense for
each department
+ *
+ * // Scala:
+ * import org.apache.spark.sql.functions._
+ * df.groupBy("department").agg(max("age"), sum("expense"))
+ *
+ * // Java:
+ * import static org.apache.spark.sql.functions.*;
+ * df.groupBy("department").agg(max("age"), sum("expense"));
+ * }}}
+ *
+ * Note that before Spark 1.4, the default behavior is to NOT retain
grouping columns. To change
+ * to that behavior, set config variable `spark.sql.retainGroupColumns` to
`false`.
+ * {{{
+ * // Scala, 1.3.x:
+ * df.groupBy("department").agg($"department", max("age"), sum("expense"))
+ *
+ * // Java, 1.3.x:
+ * df.groupBy("department").agg(col("department"), max("age"),
sum("expense"));
+ * }}}
+ *
+ * @since 1.3.0
+ */
+ @scala.annotation.varargs
+ def agg(expr: Column, exprs: Column*): DS[Row] = toDF(expr +: exprs)
+
+ /**
+ * Count the number of rows for each group.
+ * The resulting `DataFrame` will also contain the grouping columns.
+ *
+ * @since 1.3.0
+ */
+ def count(): DS[Row] = toDF(functions.count(functions.lit(1)).as("count") ::
Nil)
+
+ /**
+ * Compute the average value for each numeric columns for each group. This
is an alias for `avg`.
+ * The resulting `DataFrame` will also contain the grouping columns.
+ * When specified columns are given, only compute the average values for
them.
+ *
+ * @since 1.3.0
+ */
+ @scala.annotation.varargs
+ def mean(colNames: String*): DS[Row] = aggregateNumericColumns(colNames,
functions.avg)
+
+
+ /**
+ * Compute the max value for each numeric columns for each group.
+ * The resulting `DataFrame` will also contain the grouping columns.
+ * When specified columns are given, only compute the max values for them.
+ *
+ * @since 1.3.0
+ */
+ @scala.annotation.varargs
+ def max(colNames: String*): DS[Row] = aggregateNumericColumns(colNames,
functions.max)
+
+ /**
+ * Compute the mean value for each numeric columns for each group.
+ * The resulting `DataFrame` will also contain the grouping columns.
+ * When specified columns are given, only compute the mean values for them.
+ *
+ * @since 1.3.0
+ */
+ @scala.annotation.varargs
+ def avg(colNames: String*): DS[Row] = aggregateNumericColumns(colNames,
functions.avg)
+
+ /**
+ * Compute the min value for each numeric column for each group.
+ * The resulting `DataFrame` will also contain the grouping columns.
+ * When specified columns are given, only compute the min values for them.
+ *
+ * @since 1.3.0
+ */
+ @scala.annotation.varargs
+ def min(colNames: String*): DS[Row] = aggregateNumericColumns(colNames,
functions.min)
+
+ /**
+ * Compute the sum for each numeric columns for each group.
+ * The resulting `DataFrame` will also contain the grouping columns.
+ * When specified columns are given, only compute the sum for them.
+ *
+ * @since 1.3.0
+ */
+ @scala.annotation.varargs
+ def sum(colNames: String*): DS[Row] = aggregateNumericColumns(colNames,
functions.sum)
+
+ /**
+ * Pivots a column of the current `DataFrame` and performs the specified
aggregation.
+ *
+ * Spark will eagerly compute the distinct values in `pivotColumn` so it can
determine
+ * the resulting schema of the transformation. To avoid any eager
computations, provide an
+ * explicit list of values via `pivot(pivotColumn: String, values:
Seq[Any])`.
+ *
+ * {{{
+ * // Compute the sum of earnings for each year by course with each course
as a separate column
+ * df.groupBy("year").pivot("course").sum("earnings")
+ * }}}
+ *
+ * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
+ * except for the aggregation.
+ * @param pivotColumn Name of the column to pivot.
+ * @since 1.6.0
+ */
+ def pivot(pivotColumn: String): RGD = pivot(df.col(pivotColumn))
+
+ /**
+ * Pivots a column of the current `DataFrame` and performs the specified
aggregation.
+ * There are two versions of pivot function: one that requires the caller to
specify the list
+ * of distinct values to pivot on, and one that does not. The latter is more
concise but less
+ * efficient, because Spark needs to first compute the list of distinct
values internally.
+ *
+ * {{{
+ * // Compute the sum of earnings for each year by course with each course
as a separate column
+ * df.groupBy("year").pivot("course", Seq("dotNET",
"Java")).sum("earnings")
+ *
+ * // Or without specifying column values (less efficient)
+ * df.groupBy("year").pivot("course").sum("earnings")
+ * }}}
+ *
+ * From Spark 3.0.0, values can be literal columns, for instance, struct.
For pivoting by
+ * multiple columns, use the `struct` function to combine the columns and
values:
+ *
+ * {{{
+ * df.groupBy("year")
+ * .pivot("trainingCourse", Seq(struct(lit("java"), lit("Experts"))))
+ * .agg(sum($"earnings"))
+ * }}}
+ *
+ * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
+ * except for the aggregation.
+ * @param pivotColumn Name of the column to pivot.
+ * @param values List of values that will be translated to columns in
the output DataFrame.
+ * @since 1.6.0
+ */
+ def pivot(pivotColumn: String, values: Seq[Any]): RGD =
+ pivot(df.col(pivotColumn), values)
+
+ /**
+ * (Java-specific) Pivots a column of the current `DataFrame` and performs
the specified
+ * aggregation.
+ *
+ * There are two versions of pivot function: one that requires the caller to
specify the list
+ * of distinct values to pivot on, and one that does not. The latter is more
concise but less
+ * efficient, because Spark needs to first compute the list of distinct
values internally.
+ *
+ * {{{
+ * // Compute the sum of earnings for each year by course with each course
as a separate column
+ * df.groupBy("year").pivot("course", Arrays.<Object>asList("dotNET",
"Java")).sum("earnings");
+ *
+ * // Or without specifying column values (less efficient)
+ * df.groupBy("year").pivot("course").sum("earnings");
+ * }}}
+ *
+ * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
+ * except for the aggregation.
+ * @param pivotColumn Name of the column to pivot.
+ * @param values List of values that will be translated to columns in
the output DataFrame.
+ * @since 1.6.0
+ */
+ def pivot(pivotColumn: String, values: util.List[Any]): RGD =
+ pivot(df.col(pivotColumn), values)
+
+ /**
+ * (Java-specific) Pivots a column of the current `DataFrame` and performs
the specified
+ * aggregation. This is an overloaded version of the `pivot` method with
`pivotColumn` of
+ * the `String` type.
+ *
+ * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
+ * except for the aggregation.
+ * @param pivotColumn the column to pivot.
+ * @param values List of values that will be translated to columns in
the output DataFrame.
+ * @since 2.4.0
+ */
+ def pivot(pivotColumn: Column, values: util.List[Any]): RGD =
+ pivot(pivotColumn, values.asScala.toSeq)
+
+ /**
+ * Pivots a column of the current `DataFrame` and performs the specified
aggregation.
+ *
+ * Spark will eagerly compute the distinct values in `pivotColumn` so it can
determine
+ * the resulting schema of the transformation. To avoid any eager
computations, provide an
+ * explicit list of values via `pivot(pivotColumn: Column, values:
Seq[Any])`.
+ *
+ * {{{
+ * // Compute the sum of earnings for each year by course with each course
as a separate column
+ * df.groupBy($"year").pivot($"course").sum($"earnings");
+ * }}}
+ *
+ * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
+ * except for the aggregation.
+ * @param pivotColumn he column to pivot.
+ * @since 2.4.0
+ */
+ def pivot(pivotColumn: Column): RGD
+
+ /**
+ * Pivots a column of the current `DataFrame` and performs the specified
aggregation.
+ * This is an overloaded version of the `pivot` method with `pivotColumn` of
the `String` type.
+ *
+ * {{{
+ * // Compute the sum of earnings for each year by course with each course
as a separate column
+ * df.groupBy($"year").pivot($"course", Seq("dotNET",
"Java")).sum($"earnings")
+ * }}}
+ *
+ * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
+ * except for the aggregation.
+ * @param pivotColumn the column to pivot.
+ * @param values List of values that will be translated to columns in
the output DataFrame.
+ * @since 2.4.0
+ */
+ def pivot(pivotColumn: Column, values: Seq[Any]): RGD
+}
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/cube_string.json
b/sql/connect/common/src/test/resources/query-tests/queries/cube_string.json
index 5b9709ff0657..03625861d88f 100644
--- a/sql/connect/common/src/test/resources/query-tests/queries/cube_string.json
+++ b/sql/connect/common/src/test/resources/query-tests/queries/cube_string.json
@@ -14,11 +14,13 @@
"groupType": "GROUP_TYPE_CUBE",
"groupingExpressions": [{
"unresolvedAttribute": {
- "unparsedIdentifier": "a"
+ "unparsedIdentifier": "a",
+ "planId": "0"
}
}, {
"unresolvedAttribute": {
- "unparsedIdentifier": "b"
+ "unparsedIdentifier": "b",
+ "planId": "0"
}
}],
"aggregateExpressions": [{
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/cube_string.proto.bin
b/sql/connect/common/src/test/resources/query-tests/queries/cube_string.proto.bin
index d46e40b39dcf..59c7a5557120 100644
Binary files
a/sql/connect/common/src/test/resources/query-tests/queries/cube_string.proto.bin
and
b/sql/connect/common/src/test/resources/query-tests/queries/cube_string.proto.bin
differ
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.json
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.json
index 26320d404835..285c13f4bc8b 100644
---
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.json
+++
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.json
@@ -14,11 +14,13 @@
"groupType": "GROUP_TYPE_GROUPBY",
"groupingExpressions": [{
"unresolvedAttribute": {
- "unparsedIdentifier": "id"
+ "unparsedIdentifier": "id",
+ "planId": "0"
}
}, {
"unresolvedAttribute": {
- "unparsedIdentifier": "b"
+ "unparsedIdentifier": "b",
+ "planId": "0"
}
}],
"aggregateExpressions": [{
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.proto.bin
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.proto.bin
index 818146f7f693..674d506fa4a0 100644
Binary files
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.proto.bin
and
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.proto.bin
differ
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.json
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.json
index 5785eee2cadb..0ded46cf6cc7 100644
--- a/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.json
+++ b/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.json
@@ -22,7 +22,8 @@
"functionName": "avg",
"arguments": [{
"unresolvedAttribute": {
- "unparsedIdentifier": "a"
+ "unparsedIdentifier": "a",
+ "planId": "0"
}
}]
}
@@ -31,7 +32,8 @@
"functionName": "avg",
"arguments": [{
"unresolvedAttribute": {
- "unparsedIdentifier": "b"
+ "unparsedIdentifier": "b",
+ "planId": "0"
}
}]
}
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.proto.bin
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.proto.bin
index 4a18ea2d82d9..444b0c3853f1 100644
Binary files
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.proto.bin
and
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.proto.bin
differ
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.json
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.json
index 3225a475a9b3..ed186ff71351 100644
--- a/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.json
+++ b/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.json
@@ -22,7 +22,8 @@
"functionName": "max",
"arguments": [{
"unresolvedAttribute": {
- "unparsedIdentifier": "a"
+ "unparsedIdentifier": "a",
+ "planId": "0"
}
}]
}
@@ -31,7 +32,8 @@
"functionName": "max",
"arguments": [{
"unresolvedAttribute": {
- "unparsedIdentifier": "b"
+ "unparsedIdentifier": "b",
+ "planId": "0"
}
}]
}
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.proto.bin
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.proto.bin
index 651274b1afca..11cd163e9173 100644
Binary files
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.proto.bin
and
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.proto.bin
differ
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.json
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.json
index 5785eee2cadb..0ded46cf6cc7 100644
---
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.json
+++
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.json
@@ -22,7 +22,8 @@
"functionName": "avg",
"arguments": [{
"unresolvedAttribute": {
- "unparsedIdentifier": "a"
+ "unparsedIdentifier": "a",
+ "planId": "0"
}
}]
}
@@ -31,7 +32,8 @@
"functionName": "avg",
"arguments": [{
"unresolvedAttribute": {
- "unparsedIdentifier": "b"
+ "unparsedIdentifier": "b",
+ "planId": "0"
}
}]
}
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.proto.bin
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.proto.bin
index 4a18ea2d82d9..444b0c3853f1 100644
Binary files
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.proto.bin
and
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.proto.bin
differ
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.json
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.json
index afcc07d2c869..8c0ad283cb0a 100644
--- a/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.json
+++ b/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.json
@@ -22,7 +22,8 @@
"functionName": "min",
"arguments": [{
"unresolvedAttribute": {
- "unparsedIdentifier": "a"
+ "unparsedIdentifier": "a",
+ "planId": "0"
}
}]
}
@@ -31,7 +32,8 @@
"functionName": "min",
"arguments": [{
"unresolvedAttribute": {
- "unparsedIdentifier": "b"
+ "unparsedIdentifier": "b",
+ "planId": "0"
}
}]
}
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.proto.bin
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.proto.bin
index 6e038bf0b315..2bc985a1fe9f 100644
Binary files
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.proto.bin
and
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.proto.bin
differ
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.json
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.json
index 74dd5b045aa5..788b964491c6 100644
--- a/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.json
+++ b/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.json
@@ -22,7 +22,8 @@
"functionName": "sum",
"arguments": [{
"unresolvedAttribute": {
- "unparsedIdentifier": "a"
+ "unparsedIdentifier": "a",
+ "planId": "0"
}
}]
}
@@ -31,7 +32,8 @@
"functionName": "sum",
"arguments": [{
"unresolvedAttribute": {
- "unparsedIdentifier": "b"
+ "unparsedIdentifier": "b",
+ "planId": "0"
}
}]
}
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.proto.bin
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.proto.bin
index fe2451ca18fb..e92041399cbc 100644
Binary files
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.proto.bin
and
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.proto.bin
differ
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.json
b/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.json
index 07bbd315a5fe..8ff81d95d298 100644
---
a/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.json
+++
b/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.json
@@ -14,11 +14,13 @@
"groupType": "GROUP_TYPE_CUBE",
"groupingExpressions": [{
"unresolvedAttribute": {
- "unparsedIdentifier": "a"
+ "unparsedIdentifier": "a",
+ "planId": "0"
}
}, {
"unresolvedAttribute": {
- "unparsedIdentifier": "b"
+ "unparsedIdentifier": "b",
+ "planId": "0"
}
}],
"aggregateExpressions": [{
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.proto.bin
b/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.proto.bin
index 88b3f0593132..d1dded43ddf9 100644
Binary files
a/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.proto.bin
and
b/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.proto.bin
differ
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/pivot.json
b/sql/connect/common/src/test/resources/query-tests/queries/pivot.json
index 30bff04c531d..2af86606b9fc 100644
--- a/sql/connect/common/src/test/resources/query-tests/queries/pivot.json
+++ b/sql/connect/common/src/test/resources/query-tests/queries/pivot.json
@@ -30,7 +30,8 @@
"pivot": {
"col": {
"unresolvedAttribute": {
- "unparsedIdentifier": "a"
+ "unparsedIdentifier": "a",
+ "planId": "0"
}
},
"values": [{
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/pivot.proto.bin
b/sql/connect/common/src/test/resources/query-tests/queries/pivot.proto.bin
index 67063209a184..f545179e8496 100644
Binary files
a/sql/connect/common/src/test/resources/query-tests/queries/pivot.proto.bin and
b/sql/connect/common/src/test/resources/query-tests/queries/pivot.proto.bin
differ
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.json
b/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.json
index 5218a88988ea..aa043613795c 100644
---
a/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.json
+++
b/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.json
@@ -30,7 +30,8 @@
"pivot": {
"col": {
"unresolvedAttribute": {
- "unparsedIdentifier": "a"
+ "unparsedIdentifier": "a",
+ "planId": "0"
}
}
}
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.proto.bin
b/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.proto.bin
index aee3c980eaee..588b56f247e0 100644
Binary files
a/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.proto.bin
and
b/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.proto.bin
differ
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.json
b/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.json
index 1102db18830b..5082051031f8 100644
---
a/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.json
+++
b/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.json
@@ -14,11 +14,13 @@
"groupType": "GROUP_TYPE_ROLLUP",
"groupingExpressions": [{
"unresolvedAttribute": {
- "unparsedIdentifier": "a"
+ "unparsedIdentifier": "a",
+ "planId": "0"
}
}, {
"unresolvedAttribute": {
- "unparsedIdentifier": "b"
+ "unparsedIdentifier": "b",
+ "planId": "0"
}
}],
"aggregateExpressions": [{
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.proto.bin
b/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.proto.bin
index 64dbb597c365..63fdead641da 100644
Binary files
a/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.proto.bin
and
b/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.proto.bin
differ
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 03b6a8d6d737..a28dfbdbf66a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -214,6 +214,7 @@ class Dataset[T] private[sql](
@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
@DeveloperApi @Unstable @transient val encoder: Encoder[T])
extends api.Dataset[T, Dataset] {
+ type RGD = RelationalGroupedDataset
@transient lazy val sparkSession: SparkSession = {
if (queryExecution == null || queryExecution.sparkSession == null) {
@@ -891,73 +892,19 @@ class Dataset[T] private[sql](
RelationalGroupedDataset(toDF(), cols.map(_.expr),
RelationalGroupedDataset.GroupByType)
}
- /**
- * Create a multi-dimensional rollup for the current Dataset using the
specified columns,
- * so we can run aggregation on them.
- * See [[RelationalGroupedDataset]] for all the available aggregate
functions.
- *
- * {{{
- * // Compute the average for all numeric columns rolled up by department
and group.
- * ds.rollup($"department", $"group").avg()
- *
- * // Compute the max age and average salary, rolled up by department and
gender.
- * ds.rollup($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- *
- * @group untypedrel
- * @since 2.0.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
def rollup(cols: Column*): RelationalGroupedDataset = {
RelationalGroupedDataset(toDF(), cols.map(_.expr),
RelationalGroupedDataset.RollupType)
}
- /**
- * Create a multi-dimensional cube for the current Dataset using the
specified columns,
- * so we can run aggregation on them.
- * See [[RelationalGroupedDataset]] for all the available aggregate
functions.
- *
- * {{{
- * // Compute the average for all numeric columns cubed by department and
group.
- * ds.cube($"department", $"group").avg()
- *
- * // Compute the max age and average salary, cubed by department and
gender.
- * ds.cube($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- *
- * @group untypedrel
- * @since 2.0.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
def cube(cols: Column*): RelationalGroupedDataset = {
RelationalGroupedDataset(toDF(), cols.map(_.expr),
RelationalGroupedDataset.CubeType)
}
- /**
- * Create multi-dimensional aggregation for the current Dataset using the
specified grouping sets,
- * so we can run aggregation on them.
- * See [[RelationalGroupedDataset]] for all the available aggregate
functions.
- *
- * {{{
- * // Compute the average for all numeric columns group by specific
grouping sets.
- * ds.groupingSets(Seq(Seq($"department", $"group"), Seq()),
$"department", $"group").avg()
- *
- * // Compute the max age and average salary, group by specific grouping
sets.
- * ds.groupingSets(Seq($"department", $"gender"), Seq()), $"department",
$"group").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- *
- * @group untypedrel
- * @since 4.0.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
def groupingSets(groupingSets: Seq[Seq[Column]], cols: Column*):
RelationalGroupedDataset = {
RelationalGroupedDataset(
@@ -966,33 +913,6 @@ class Dataset[T] private[sql](
RelationalGroupedDataset.GroupingSetsType(groupingSets.map(_.map(_.expr))))
}
- /**
- * Groups the Dataset using the specified columns, so that we can run
aggregation on them.
- * See [[RelationalGroupedDataset]] for all the available aggregate
functions.
- *
- * This is a variant of groupBy that can only group by existing columns
using column names
- * (i.e. cannot construct expressions).
- *
- * {{{
- * // Compute the average for all numeric columns grouped by department.
- * ds.groupBy("department").avg()
- *
- * // Compute the max age and average salary, grouped by department and
gender.
- * ds.groupBy($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- * @group untypedrel
- * @since 2.0.0
- */
- @scala.annotation.varargs
- def groupBy(col1: String, cols: String*): RelationalGroupedDataset = {
- val colNames: Seq[String] = col1 +: cols
- RelationalGroupedDataset(
- toDF(), colNames.map(colName => resolve(colName)),
RelationalGroupedDataset.GroupByType)
- }
-
/** @inheritdoc */
def reduce(func: (T, T) => T): T = withNewRDDExecutionId("reduce") {
rdd.reduce(func)
@@ -1027,118 +947,6 @@ class Dataset[T] private[sql](
def groupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]):
KeyValueGroupedDataset[K, T] =
groupByKey(func.call(_))(encoder)
- /**
- * Create a multi-dimensional rollup for the current Dataset using the
specified columns,
- * so we can run aggregation on them.
- * See [[RelationalGroupedDataset]] for all the available aggregate
functions.
- *
- * This is a variant of rollup that can only group by existing columns using
column names
- * (i.e. cannot construct expressions).
- *
- * {{{
- * // Compute the average for all numeric columns rolled up by department
and group.
- * ds.rollup("department", "group").avg()
- *
- * // Compute the max age and average salary, rolled up by department and
gender.
- * ds.rollup($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- *
- * @group untypedrel
- * @since 2.0.0
- */
- @scala.annotation.varargs
- def rollup(col1: String, cols: String*): RelationalGroupedDataset = {
- val colNames: Seq[String] = col1 +: cols
- RelationalGroupedDataset(
- toDF(), colNames.map(colName => resolve(colName)),
RelationalGroupedDataset.RollupType)
- }
-
- /**
- * Create a multi-dimensional cube for the current Dataset using the
specified columns,
- * so we can run aggregation on them.
- * See [[RelationalGroupedDataset]] for all the available aggregate
functions.
- *
- * This is a variant of cube that can only group by existing columns using
column names
- * (i.e. cannot construct expressions).
- *
- * {{{
- * // Compute the average for all numeric columns cubed by department and
group.
- * ds.cube("department", "group").avg()
- *
- * // Compute the max age and average salary, cubed by department and
gender.
- * ds.cube($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- * @group untypedrel
- * @since 2.0.0
- */
- @scala.annotation.varargs
- def cube(col1: String, cols: String*): RelationalGroupedDataset = {
- val colNames: Seq[String] = col1 +: cols
- RelationalGroupedDataset(
- toDF(), colNames.map(colName => resolve(colName)),
RelationalGroupedDataset.CubeType)
- }
-
- /**
- * (Scala-specific) Aggregates on the entire Dataset without groups.
- * {{{
- * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
- * ds.agg("age" -> "max", "salary" -> "avg")
- * ds.groupBy().agg("age" -> "max", "salary" -> "avg")
- * }}}
- *
- * @group untypedrel
- * @since 2.0.0
- */
- def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame =
{
- groupBy().agg(aggExpr, aggExprs : _*)
- }
-
- /**
- * (Scala-specific) Aggregates on the entire Dataset without groups.
- * {{{
- * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
- * ds.agg(Map("age" -> "max", "salary" -> "avg"))
- * ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
- * }}}
- *
- * @group untypedrel
- * @since 2.0.0
- */
- def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs)
-
- /**
- * (Java-specific) Aggregates on the entire Dataset without groups.
- * {{{
- * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
- * ds.agg(Map("age" -> "max", "salary" -> "avg"))
- * ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
- * }}}
- *
- * @group untypedrel
- * @since 2.0.0
- */
- def agg(exprs: java.util.Map[String, String]): DataFrame =
groupBy().agg(exprs)
-
- /**
- * Aggregates on the entire Dataset without groups.
- * {{{
- * // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
- * ds.agg(max($"age"), avg($"salary"))
- * ds.groupBy().agg(max($"age"), avg($"salary"))
- * }}}
- *
- * @group untypedrel
- * @since 2.0.0
- */
- @scala.annotation.varargs
- def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs
: _*)
-
/** @inheritdoc */
def unpivot(
ids: Array[Column],
@@ -2223,6 +2031,35 @@ class Dataset[T] private[sql](
/** @inheritdoc */
override def distinct(): Dataset[T] = super.distinct()
+ /** @inheritdoc */
+ @scala.annotation.varargs
+ override def groupBy(col1: String, cols: String*): RelationalGroupedDataset =
+ super.groupBy(col1, cols: _*)
+
+ /** @inheritdoc */
+ @scala.annotation.varargs
+ override def rollup(col1: String, cols: String*): RelationalGroupedDataset =
+ super.rollup(col1, cols: _*)
+
+ /** @inheritdoc */
+ @scala.annotation.varargs
+ override def cube(col1: String, cols: String*): RelationalGroupedDataset =
+ super.cube(col1, cols: _*)
+
+ /** @inheritdoc */
+ override def agg(aggExpr: (String, String), aggExprs: (String, String)*):
DataFrame =
+ super.agg(aggExpr, aggExprs: _*)
+
+ /** @inheritdoc */
+ override def agg(exprs: Map[String, String]): DataFrame = super.agg(exprs)
+
+ /** @inheritdoc */
+ override def agg(exprs: java.util.Map[String, String]): DataFrame =
super.agg(exprs)
+
+ /** @inheritdoc */
+ @scala.annotation.varargs
+ override def agg(expr: Column, exprs: Column*): DataFrame = super.agg(expr,
exprs: _*)
+
////////////////////////////////////////////////////////////////////////////
// For Python API
////////////////////////////////////////////////////////////////////////////
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index 3cafe0d98f1b..777baa3e6268 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -17,15 +17,11 @@
package org.apache.spark.sql
-import java.util.Locale
-
-import scala.jdk.CollectionConverters._
-
import org.apache.spark.SparkRuntimeException
import org.apache.spark.annotation.Stable
import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAlias,
UnresolvedFunction}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -54,13 +50,19 @@ import org.apache.spark.util.ArrayImplicits._
*/
@Stable
class RelationalGroupedDataset protected[sql](
- private[sql] val df: DataFrame,
+ protected[sql] val df: DataFrame,
private[sql] val groupingExprs: Seq[Expression],
- groupType: RelationalGroupedDataset.GroupType) {
+ groupType: RelationalGroupedDataset.GroupType)
+ extends api.RelationalGroupedDataset[Dataset] {
+ type RGD = RelationalGroupedDataset
import RelationalGroupedDataset._
import df.sparkSession._
- private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
+ override protected def toDF(aggCols: Seq[Column]): DataFrame = {
+ val aggExprs = aggCols.map(expression).map { e =>
+ withInputType(e, df.exprEnc, df.logicalPlan.output)
+ }
+
@scala.annotation.nowarn("cat=deprecation")
val aggregates = if
(df.sparkSession.sessionState.conf.dataFrameRetainGroupColumns) {
groupingExprs match {
@@ -98,9 +100,7 @@ class RelationalGroupedDataset protected[sql](
}
}
- private[this] def aggregateNumericColumns(colNames: String*)(f: Expression
=> AggregateFunction)
- : DataFrame = {
-
+ override protected def selectNumericColumns(colNames: Seq[String]):
Seq[Column] = {
val columnExprs = if (colNames.isEmpty) {
// No columns specified. Use all numeric columns.
df.numericColumns
@@ -114,29 +114,9 @@ class RelationalGroupedDataset protected[sql](
namedExpr
}
}
- toDF(columnExprs.map(expr => f(expr).toAggregateExpression()))
+ columnExprs.map(column)
}
- private[this] def strToExpr(expr: String): (Expression => Expression) = {
- val exprToFunc: (Expression => Expression) = {
- (inputExpr: Expression) => expr.toLowerCase(Locale.ROOT) match {
- // We special handle a few cases that have alias that are not in
function registry.
- case "avg" | "average" | "mean" =>
- UnresolvedFunction("avg", inputExpr :: Nil, isDistinct = false)
- case "stddev" | "std" =>
- UnresolvedFunction("stddev", inputExpr :: Nil, isDistinct = false)
- // Also special handle count because we need to take care count(*).
- case "count" | "size" =>
- // Turn count(*) into count(1)
- inputExpr match {
- case s: Star => Count(Literal(1)).toAggregateExpression()
- case _ => Count(inputExpr).toAggregateExpression()
- }
- case name => UnresolvedFunction(name, inputExpr :: Nil, isDistinct =
false)
- }
- }
- (inputExpr: Expression) => exprToFunc(inputExpr)
- }
/**
* Returns a `KeyValueGroupedDataset` where the data is grouped by the
grouping expressions
@@ -159,296 +139,68 @@ class RelationalGroupedDataset protected[sql](
groupingAttributes)
}
- /**
- * (Scala-specific) Compute aggregates by specifying the column names and
- * aggregate methods. The resulting `DataFrame` will also contain the
grouping columns.
- *
- * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
- * {{{
- * // Selects the age of the oldest employee and the aggregate expense for
each department
- * df.groupBy("department").agg(
- * "age" -> "max",
- * "expense" -> "sum"
- * )
- * }}}
- *
- * @since 1.3.0
- */
- def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame =
{
- toDF((aggExpr +: aggExprs).map { case (colName, expr) =>
- strToExpr(expr)(df(colName).expr)
- })
- }
+ /** @inheritdoc */
+ override def agg(aggExpr: (String, String), aggExprs: (String, String)*):
DataFrame =
+ super.agg(aggExpr, aggExprs: _*)
- /**
- * (Scala-specific) Compute aggregates by specifying a map from column name
to
- * aggregate methods. The resulting `DataFrame` will also contain the
grouping columns.
- *
- * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
- * {{{
- * // Selects the age of the oldest employee and the aggregate expense for
each department
- * df.groupBy("department").agg(Map(
- * "age" -> "max",
- * "expense" -> "sum"
- * ))
- * }}}
- *
- * @since 1.3.0
- */
- def agg(exprs: Map[String, String]): DataFrame = {
- toDF(exprs.map { case (colName, expr) =>
- strToExpr(expr)(df(colName).expr)
- }.toSeq)
- }
+ /** @inheritdoc */
+ override def agg(exprs: Map[String, String]): DataFrame = super.agg(exprs)
- /**
- * (Java-specific) Compute aggregates by specifying a map from column name to
- * aggregate methods. The resulting `DataFrame` will also contain the
grouping columns.
- *
- * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
- * {{{
- * // Selects the age of the oldest employee and the aggregate expense for
each department
- * import com.google.common.collect.ImmutableMap;
- * df.groupBy("department").agg(ImmutableMap.of("age", "max", "expense",
"sum"));
- * }}}
- *
- * @since 1.3.0
- */
- def agg(exprs: java.util.Map[String, String]): DataFrame = {
- agg(exprs.asScala.toMap)
- }
+ /** @inheritdoc */
+ override def agg(exprs: java.util.Map[String, String]): DataFrame =
super.agg(exprs)
- /**
- * Compute aggregates by specifying a series of aggregate columns. Note that
this function by
- * default retains the grouping columns in its output. To not retain
grouping columns, set
- * `spark.sql.retainGroupColumns` to false.
- *
- * The available aggregate methods are defined in
[[org.apache.spark.sql.functions]].
- *
- * {{{
- * // Selects the age of the oldest employee and the aggregate expense for
each department
- *
- * // Scala:
- * import org.apache.spark.sql.functions._
- * df.groupBy("department").agg(max("age"), sum("expense"))
- *
- * // Java:
- * import static org.apache.spark.sql.functions.*;
- * df.groupBy("department").agg(max("age"), sum("expense"));
- * }}}
- *
- * Note that before Spark 1.4, the default behavior is to NOT retain
grouping columns. To change
- * to that behavior, set config variable `spark.sql.retainGroupColumns` to
`false`.
- * {{{
- * // Scala, 1.3.x:
- * df.groupBy("department").agg($"department", max("age"), sum("expense"))
- *
- * // Java, 1.3.x:
- * df.groupBy("department").agg(col("department"), max("age"),
sum("expense"));
- * }}}
- *
- * @since 1.3.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def agg(expr: Column, exprs: Column*): DataFrame = {
- toDF((expr +: exprs).map {
- case typed: TypedColumn[_, _] =>
- withInputType(typed.expr, df.exprEnc, df.logicalPlan.output)
- case c => c.expr
- })
- }
+ override def agg(expr: Column, exprs: Column*): DataFrame = super.agg(expr,
exprs: _*)
- /**
- * Count the number of rows for each group.
- * The resulting `DataFrame` will also contain the grouping columns.
- *
- * @since 1.3.0
- */
- def count(): DataFrame =
toDF(Seq(Alias(Count(Literal(1)).toAggregateExpression(), "count")()))
+ /** @inheritdoc */
+ override def count(): DataFrame = super.count()
- /**
- * Compute the average value for each numeric columns for each group. This
is an alias for `avg`.
- * The resulting `DataFrame` will also contain the grouping columns.
- * When specified columns are given, only compute the average values for
them.
- *
- * @since 1.3.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def mean(colNames: String*): DataFrame = {
- aggregateNumericColumns(colNames : _*)(Average(_))
- }
+ override def mean(colNames: String*): DataFrame = super.mean(colNames: _*)
- /**
- * Compute the max value for each numeric columns for each group.
- * The resulting `DataFrame` will also contain the grouping columns.
- * When specified columns are given, only compute the max values for them.
- *
- * @since 1.3.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def max(colNames: String*): DataFrame = {
- aggregateNumericColumns(colNames : _*)(Max)
- }
+ override def max(colNames: String*): DataFrame = super.max(colNames: _*)
- /**
- * Compute the mean value for each numeric columns for each group.
- * The resulting `DataFrame` will also contain the grouping columns.
- * When specified columns are given, only compute the mean values for them.
- *
- * @since 1.3.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def avg(colNames: String*): DataFrame = {
- aggregateNumericColumns(colNames : _*)(Average(_))
- }
+ override def avg(colNames: String*): DataFrame = super.avg(colNames: _*)
- /**
- * Compute the min value for each numeric column for each group.
- * The resulting `DataFrame` will also contain the grouping columns.
- * When specified columns are given, only compute the min values for them.
- *
- * @since 1.3.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def min(colNames: String*): DataFrame = {
- aggregateNumericColumns(colNames : _*)(Min)
- }
+ override def min(colNames: String*): DataFrame = super.min(colNames: _*)
- /**
- * Compute the sum for each numeric columns for each group.
- * The resulting `DataFrame` will also contain the grouping columns.
- * When specified columns are given, only compute the sum for them.
- *
- * @since 1.3.0
- */
+ /** @inheritdoc */
@scala.annotation.varargs
- def sum(colNames: String*): DataFrame = {
- aggregateNumericColumns(colNames : _*)(Sum(_))
- }
+ override def sum(colNames: String*): DataFrame = super.sum(colNames: _*)
- /**
- * Pivots a column of the current `DataFrame` and performs the specified
aggregation.
- *
- * Spark will eagerly compute the distinct values in `pivotColumn` so it can
determine
- * the resulting schema of the transformation. To avoid any eager
computations, provide an
- * explicit list of values via `pivot(pivotColumn: String, values:
Seq[Any])`.
- *
- * {{{
- * // Compute the sum of earnings for each year by course with each course
as a separate column
- * df.groupBy("year").pivot("course").sum("earnings")
- * }}}
- *
- * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
- * except for the aggregation.
- *
- * @param pivotColumn Name of the column to pivot.
- * @since 1.6.0
- */
- def pivot(pivotColumn: String): RelationalGroupedDataset =
pivot(Column(pivotColumn))
+ /** @inheritdoc */
+ override def pivot(pivotColumn: String): RelationalGroupedDataset =
super.pivot(pivotColumn)
- /**
- * Pivots a column of the current `DataFrame` and performs the specified
aggregation.
- * There are two versions of pivot function: one that requires the caller to
specify the list
- * of distinct values to pivot on, and one that does not. The latter is more
concise but less
- * efficient, because Spark needs to first compute the list of distinct
values internally.
- *
- * {{{
- * // Compute the sum of earnings for each year by course with each course
as a separate column
- * df.groupBy("year").pivot("course", Seq("dotNET",
"Java")).sum("earnings")
- *
- * // Or without specifying column values (less efficient)
- * df.groupBy("year").pivot("course").sum("earnings")
- * }}}
- *
- * From Spark 3.0.0, values can be literal columns, for instance, struct.
For pivoting by
- * multiple columns, use the `struct` function to combine the columns and
values:
- *
- * {{{
- * df.groupBy("year")
- * .pivot("trainingCourse", Seq(struct(lit("java"), lit("Experts"))))
- * .agg(sum($"earnings"))
- * }}}
- *
- * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
- * except for the aggregation.
- *
- * @param pivotColumn Name of the column to pivot.
- * @param values List of values that will be translated to columns in the
output DataFrame.
- * @since 1.6.0
- */
- def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset =
{
- pivot(Column(pivotColumn), values)
- }
+ /** @inheritdoc */
+ override def pivot(pivotColumn: String, values: Seq[Any]):
RelationalGroupedDataset =
+ super.pivot(pivotColumn, values)
- /**
- * (Java-specific) Pivots a column of the current `DataFrame` and performs
the specified
- * aggregation.
- *
- * There are two versions of pivot function: one that requires the caller to
specify the list
- * of distinct values to pivot on, and one that does not. The latter is more
concise but less
- * efficient, because Spark needs to first compute the list of distinct
values internally.
- *
- * {{{
- * // Compute the sum of earnings for each year by course with each course
as a separate column
- * df.groupBy("year").pivot("course", Arrays.<Object>asList("dotNET",
"Java")).sum("earnings");
- *
- * // Or without specifying column values (less efficient)
- * df.groupBy("year").pivot("course").sum("earnings");
- * }}}
- *
- * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
- * except for the aggregation.
- *
- * @param pivotColumn Name of the column to pivot.
- * @param values List of values that will be translated to columns in the
output DataFrame.
- * @since 1.6.0
- */
- def pivot(pivotColumn: String, values: java.util.List[Any]):
RelationalGroupedDataset = {
- pivot(Column(pivotColumn), values)
+ /** @inheritdoc */
+ override def pivot(pivotColumn: String, values: java.util.List[Any]):
RelationalGroupedDataset =
+ super.pivot(pivotColumn, values)
+
+ /** @inheritdoc */
+ override def pivot(pivotColumn: Column, values: java.util.List[Any]):
RelationalGroupedDataset = {
+ super.pivot(pivotColumn, values)
}
- /**
- * Pivots a column of the current `DataFrame` and performs the specified
aggregation.
- *
- * Spark will eagerly compute the distinct values in `pivotColumn` so it can
determine
- * the resulting schema of the transformation. To avoid any eager
computations, provide an
- * explicit list of values via `pivot(pivotColumn: Column, values:
Seq[Any])`.
- *
- * {{{
- * // Compute the sum of earnings for each year by course with each course
as a separate column
- * df.groupBy($"year").pivot($"course").sum($"earnings");
- * }}}
- *
- * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
- * except for the aggregation.
- *
- * @param pivotColumn he column to pivot.
- * @since 2.4.0
- */
- def pivot(pivotColumn: Column): RelationalGroupedDataset = {
+ /** @inheritdoc */
+ override def pivot(pivotColumn: Column): RelationalGroupedDataset =
pivot(pivotColumn, collectPivotValues(df, pivotColumn))
- }
- /**
- * Pivots a column of the current `DataFrame` and performs the specified
aggregation.
- * This is an overloaded version of the `pivot` method with `pivotColumn` of
the `String` type.
- *
- * {{{
- * // Compute the sum of earnings for each year by course with each course
as a separate column
- * df.groupBy($"year").pivot($"course", Seq("dotNET",
"Java")).sum($"earnings")
- * }}}
- *
- * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
- * except for the aggregation.
- *
- * @param pivotColumn the column to pivot.
- * @param values List of values that will be translated to columns in the
output DataFrame.
- * @since 2.4.0
- */
+ /** @inheritdoc */
def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset =
{
groupType match {
case RelationalGroupedDataset.GroupByType =>
- val valueExprs = values.map(_ match {
+ val valueExprs = values.map {
case c: Column => c.expr
case v =>
try {
@@ -457,7 +209,7 @@ class RelationalGroupedDataset protected[sql](
case _: SparkRuntimeException =>
throw QueryExecutionErrors.pivotColumnUnsupportedError(v,
pivotColumn.expr)
}
- })
+ }
new RelationalGroupedDataset(
df,
groupingExprs,
@@ -471,22 +223,6 @@ class RelationalGroupedDataset protected[sql](
}
}
- /**
- * (Java-specific) Pivots a column of the current `DataFrame` and performs
the specified
- * aggregation. This is an overloaded version of the `pivot` method with
`pivotColumn` of
- * the `String` type.
- *
- * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
- * except for the aggregation.
- *
- * @param pivotColumn the column to pivot.
- * @param values List of values that will be translated to columns in the
output DataFrame.
- * @since 2.4.0
- */
- def pivot(pivotColumn: Column, values: java.util.List[Any]):
RelationalGroupedDataset = {
- pivot(pivotColumn, values.asScala.toSeq)
- }
-
/**
* Applies the given serialized R function `func` to each group of data. For
each unique group,
* the function will be passed the group key and an iterator that contains
all of the elements in
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]