This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 03180ece177d [SPARK-49421][CONNECT][SQL] Create a shared 
RelationalGroupedDataset interface
03180ece177d is described below

commit 03180ece177d3ca9ea9ee6aa7a17979696e386ad
Author: Herman van Hovell <[email protected]>
AuthorDate: Thu Aug 29 22:59:55 2024 -0400

    [SPARK-49421][CONNECT][SQL] Create a shared RelationalGroupedDataset 
interface
    
    ### What changes were proposed in this pull request?
    This PR introduces a shared RelationalGroupedDataset interface.
    
    ### Why are the changes needed?
    We want to unify the Classic and Connect Scala DataFrame APIs.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #47918 from hvanhovell/SPARK-49421.
    
    Authored-by: Herman van Hovell <[email protected]>
    Signed-off-by: Herman van Hovell <[email protected]>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 254 ++------------
 .../spark/sql/RelationalGroupedDataset.scala       | 378 ++++-----------------
 .../scala/org/apache/spark/sql/api/Dataset.scala   | 218 ++++++++++++
 .../spark/sql/api/RelationalGroupedDataset.scala   | 338 ++++++++++++++++++
 .../resources/query-tests/queries/cube_string.json |   6 +-
 .../query-tests/queries/cube_string.proto.bin      | Bin 92 -> 96 bytes
 .../query-tests/queries/groupby_agg_string.json    |   6 +-
 .../queries/groupby_agg_string.proto.bin           | Bin 103 -> 107 bytes
 .../resources/query-tests/queries/groupby_avg.json |   6 +-
 .../query-tests/queries/groupby_avg.proto.bin      | Bin 90 -> 94 bytes
 .../resources/query-tests/queries/groupby_max.json |   6 +-
 .../query-tests/queries/groupby_max.proto.bin      | Bin 90 -> 94 bytes
 .../query-tests/queries/groupby_mean.json          |   6 +-
 .../query-tests/queries/groupby_mean.proto.bin     | Bin 90 -> 94 bytes
 .../resources/query-tests/queries/groupby_min.json |   6 +-
 .../query-tests/queries/groupby_min.proto.bin      | Bin 90 -> 94 bytes
 .../resources/query-tests/queries/groupby_sum.json |   6 +-
 .../query-tests/queries/groupby_sum.proto.bin      | Bin 90 -> 94 bytes
 .../queries/grouping_and_grouping_id.json          |   6 +-
 .../queries/grouping_and_grouping_id.proto.bin     | Bin 138 -> 142 bytes
 .../test/resources/query-tests/queries/pivot.json  |   3 +-
 .../resources/query-tests/queries/pivot.proto.bin  | Bin 97 -> 99 bytes
 .../queries/pivot_without_column_values.json       |   3 +-
 .../queries/pivot_without_column_values.proto.bin  | Bin 85 -> 87 bytes
 .../query-tests/queries/rollup_string.json         |   6 +-
 .../query-tests/queries/rollup_string.proto.bin    | Bin 92 -> 96 bytes
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 229 ++-----------
 .../spark/sql/RelationalGroupedDataset.scala       | 364 +++-----------------
 28 files changed, 775 insertions(+), 1066 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 37a182675b6c..d05834c4fc6c 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -135,6 +135,7 @@ class Dataset[T] private[sql] (
     @DeveloperApi val plan: proto.Plan,
     val encoder: Encoder[T])
     extends api.Dataset[T, Dataset] {
+  type RGD = RelationalGroupedDataset
 
   import sparkSession.RichColumn
 
@@ -506,58 +507,12 @@ class Dataset[T] private[sql] (
     }
   }
 
-  /**
-   * Groups the Dataset using the specified columns, so we can run aggregation 
on them. See
-   * [[RelationalGroupedDataset]] for all the available aggregate functions.
-   *
-   * {{{
-   *   // Compute the average for all numeric columns grouped by department.
-   *   ds.groupBy($"department").avg()
-   *
-   *   // Compute the max age and average salary, grouped by department and 
gender.
-   *   ds.groupBy($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
   def groupBy(cols: Column*): RelationalGroupedDataset = {
     new RelationalGroupedDataset(toDF(), cols, 
proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
   }
 
-  /**
-   * Groups the Dataset using the specified columns, so that we can run 
aggregation on them. See
-   * [[RelationalGroupedDataset]] for all the available aggregate functions.
-   *
-   * This is a variant of groupBy that can only group by existing columns 
using column names (i.e.
-   * cannot construct expressions).
-   *
-   * {{{
-   *   // Compute the average for all numeric columns grouped by department.
-   *   ds.groupBy("department").avg()
-   *
-   *   // Compute the max age and average salary, grouped by department and 
gender.
-   *   ds.groupBy($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   * @group untypedrel
-   * @since 3.4.0
-   */
-  @scala.annotation.varargs
-  def groupBy(col1: String, cols: String*): RelationalGroupedDataset = {
-    val colNames: Seq[String] = col1 +: cols
-    new RelationalGroupedDataset(
-      toDF(),
-      colNames.map(colName => Column(colName)),
-      proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
-  }
-
   /** @inheritdoc */
   def reduce(func: (T, T) => T): T = {
     val udf = SparkUserDefinedFunction(
@@ -599,134 +554,19 @@ class Dataset[T] private[sql] (
   def groupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]): 
KeyValueGroupedDataset[K, T] =
     groupByKey(UdfUtils.mapFunctionToScalaFunc(func))(encoder)
 
-  /**
-   * Create a multi-dimensional rollup for the current Dataset using the 
specified columns, so we
-   * can run aggregation on them. See [[RelationalGroupedDataset]] for all the 
available aggregate
-   * functions.
-   *
-   * {{{
-   *   // Compute the average for all numeric columns rolled up by department 
and group.
-   *   ds.rollup($"department", $"group").avg()
-   *
-   *   // Compute the max age and average salary, rolled up by department and 
gender.
-   *   ds.rollup($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
   def rollup(cols: Column*): RelationalGroupedDataset = {
     new RelationalGroupedDataset(toDF(), cols, 
proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
   }
 
-  /**
-   * Create a multi-dimensional rollup for the current Dataset using the 
specified columns, so we
-   * can run aggregation on them. See [[RelationalGroupedDataset]] for all the 
available aggregate
-   * functions.
-   *
-   * This is a variant of rollup that can only group by existing columns using 
column names (i.e.
-   * cannot construct expressions).
-   *
-   * {{{
-   *   // Compute the average for all numeric columns rolled up by department 
and group.
-   *   ds.rollup("department", "group").avg()
-   *
-   *   // Compute the max age and average salary, rolled up by department and 
gender.
-   *   ds.rollup($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 3.4.0
-   */
-  @scala.annotation.varargs
-  def rollup(col1: String, cols: String*): RelationalGroupedDataset = {
-    val colNames: Seq[String] = col1 +: cols
-    new RelationalGroupedDataset(
-      toDF(),
-      colNames.map(colName => Column(colName)),
-      proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
-  }
-
-  /**
-   * Create a multi-dimensional cube for the current Dataset using the 
specified columns, so we
-   * can run aggregation on them. See [[RelationalGroupedDataset]] for all the 
available aggregate
-   * functions.
-   *
-   * {{{
-   *   // Compute the average for all numeric columns cubed by department and 
group.
-   *   ds.cube($"department", $"group").avg()
-   *
-   *   // Compute the max age and average salary, cubed by department and 
gender.
-   *   ds.cube($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
   def cube(cols: Column*): RelationalGroupedDataset = {
     new RelationalGroupedDataset(toDF(), cols, 
proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
   }
 
-  /**
-   * Create a multi-dimensional cube for the current Dataset using the 
specified columns, so we
-   * can run aggregation on them. See [[RelationalGroupedDataset]] for all the 
available aggregate
-   * functions.
-   *
-   * This is a variant of cube that can only group by existing columns using 
column names (i.e.
-   * cannot construct expressions).
-   *
-   * {{{
-   *   // Compute the average for all numeric columns cubed by department and 
group.
-   *   ds.cube("department", "group").avg()
-   *
-   *   // Compute the max age and average salary, cubed by department and 
gender.
-   *   ds.cube($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   * @group untypedrel
-   * @since 3.4.0
-   */
-  @scala.annotation.varargs
-  def cube(col1: String, cols: String*): RelationalGroupedDataset = {
-    val colNames: Seq[String] = col1 +: cols
-    new RelationalGroupedDataset(
-      toDF(),
-      colNames.map(colName => Column(colName)),
-      proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
-  }
-
-  /**
-   * Create multi-dimensional aggregation for the current Dataset using the 
specified grouping
-   * sets, so we can run aggregation on them. See [[RelationalGroupedDataset]] 
for all the
-   * available aggregate functions.
-   *
-   * {{{
-   *   // Compute the average for all numeric columns group by specific 
grouping sets.
-   *   ds.groupingSets(Seq(Seq($"department", $"group"), Seq()), 
$"department", $"group").avg()
-   *
-   *   // Compute the max age and average salary, group by specific grouping 
sets.
-   *   ds.groupingSets(Seq($"department", $"gender"), Seq()), $"department", 
$"group").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 4.0.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
   def groupingSets(groupingSets: Seq[Seq[Column]], cols: Column*): 
RelationalGroupedDataset = {
     val groupingSetMsgs = groupingSets.map { groupingSet =>
@@ -743,61 +583,6 @@ class Dataset[T] private[sql] (
       groupingSets = Some(groupingSetMsgs))
   }
 
-  /**
-   * (Scala-specific) Aggregates on the entire Dataset without groups.
-   * {{{
-   *   // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
-   *   ds.agg("age" -> "max", "salary" -> "avg")
-   *   ds.groupBy().agg("age" -> "max", "salary" -> "avg")
-   * }}}
-   *
-   * @group untypedrel
-   * @since 3.4.0
-   */
-  def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = 
{
-    groupBy().agg(aggExpr, aggExprs: _*)
-  }
-
-  /**
-   * (Scala-specific) Aggregates on the entire Dataset without groups.
-   * {{{
-   *   // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
-   *   ds.agg(Map("age" -> "max", "salary" -> "avg"))
-   *   ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 3.4.0
-   */
-  def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs)
-
-  /**
-   * (Java-specific) Aggregates on the entire Dataset without groups.
-   * {{{
-   *   // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
-   *   ds.agg(Map("age" -> "max", "salary" -> "avg"))
-   *   ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 3.4.0
-   */
-  def agg(exprs: java.util.Map[String, String]): DataFrame = 
groupBy().agg(exprs)
-
-  /**
-   * Aggregates on the entire Dataset without groups.
-   * {{{
-   *   // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
-   *   ds.agg(max($"age"), avg($"salary"))
-   *   ds.groupBy().agg(max($"age"), avg($"salary"))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 3.4.0
-   */
-  @scala.annotation.varargs
-  def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, 
exprs: _*)
-
   /** @inheritdoc */
   def unpivot(
       ids: Array[Column],
@@ -1745,4 +1530,33 @@ class Dataset[T] private[sql] (
 
   /** @inheritdoc */
   override def distinct(): Dataset[T] = super.distinct()
+
+  /** @inheritdoc */
+  @scala.annotation.varargs
+  override def groupBy(col1: String, cols: String*): RelationalGroupedDataset =
+    super.groupBy(col1, cols: _*)
+
+  /** @inheritdoc */
+  @scala.annotation.varargs
+  override def rollup(col1: String, cols: String*): RelationalGroupedDataset =
+    super.rollup(col1, cols: _*)
+
+  /** @inheritdoc */
+  @scala.annotation.varargs
+  override def cube(col1: String, cols: String*): RelationalGroupedDataset =
+    super.cube(col1, cols: _*)
+
+  /** @inheritdoc */
+  override def agg(aggExpr: (String, String), aggExprs: (String, String)*): 
DataFrame =
+    super.agg(aggExpr, aggExprs: _*)
+
+  /** @inheritdoc */
+  override def agg(exprs: Map[String, String]): DataFrame = super.agg(exprs)
+
+  /** @inheritdoc */
+  override def agg(exprs: java.util.Map[String, String]): DataFrame = 
super.agg(exprs)
+
+  /** @inheritdoc */
+  @scala.annotation.varargs
+  override def agg(expr: Column, exprs: Column*): DataFrame = super.agg(expr, 
exprs: _*)
 }
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index 0c8657e12d8d..c9b011ca4535 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql
 
-import java.util.Locale
-
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.connect.proto
@@ -36,42 +34,52 @@ import org.apache.spark.connect.proto
  * @since 3.4.0
  */
 class RelationalGroupedDataset private[sql] (
-    private[sql] val df: DataFrame,
+    protected val df: DataFrame,
     private[sql] val groupingExprs: Seq[Column],
     groupType: proto.Aggregate.GroupType,
     pivot: Option[proto.Aggregate.Pivot] = None,
-    groupingSets: Option[Seq[proto.Aggregate.GroupingSets]] = None) {
+    groupingSets: Option[Seq[proto.Aggregate.GroupingSets]] = None)
+    extends api.RelationalGroupedDataset[Dataset] {
+  type RGD = RelationalGroupedDataset
   import df.sparkSession.RichColumn
 
-  private[this] def toDF(aggExprs: Seq[Column]): DataFrame = {
+  protected def toDF(aggExprs: Seq[Column]): DataFrame = {
     df.sparkSession.newDataFrame { builder =>
-      builder.getAggregateBuilder
+      val aggBuilder = builder.getAggregateBuilder
         .setInput(df.plan.getRoot)
-        .addAllGroupingExpressions(groupingExprs.map(_.expr).asJava)
-        .addAllAggregateExpressions(aggExprs.map(e => 
e.typedExpr(df.encoder)).asJava)
+      groupingExprs.foreach(c => aggBuilder.addGroupingExpressions(c.expr))
+      aggExprs.foreach(c => 
aggBuilder.addAggregateExpressions(c.typedExpr(df.encoder)))
 
       groupType match {
         case proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP =>
-          
builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
+          aggBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
         case proto.Aggregate.GroupType.GROUP_TYPE_CUBE =>
-          
builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
+          aggBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
         case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
-          
builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
+          aggBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
         case proto.Aggregate.GroupType.GROUP_TYPE_PIVOT =>
           assert(pivot.isDefined)
-          builder.getAggregateBuilder
+          aggBuilder
             .setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_PIVOT)
             .setPivot(pivot.get)
         case proto.Aggregate.GroupType.GROUP_TYPE_GROUPING_SETS =>
           assert(groupingSets.isDefined)
-          val aggBuilder = builder.getAggregateBuilder
-            .setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPING_SETS)
+          
aggBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPING_SETS)
           groupingSets.get.foreach(aggBuilder.addGroupingSets)
         case g => throw new UnsupportedOperationException(g.toString)
       }
     }
   }
 
+  protected def selectNumericColumns(colNames: Seq[String]): Seq[Column] = {
+    // This behaves different than the classic implementation. The classic 
implementation validates
+    // if a column is actually a number, and if it is not it throws an error 
immediately. In connect
+    // it depends on the input type (casting) rules for the method invoked. If 
the input violates
+    // the a different error will be thrown. However it is also possible to 
get a result for a
+    // non-numeric column in connect, for example when you use min/max.
+    colNames.map(df.col)
+  }
+
   /**
    * Returns a `KeyValueGroupedDataset` where the data is grouped by the 
grouping expressions of
    * current `RelationalGroupedDataset`.
@@ -82,295 +90,71 @@ class RelationalGroupedDataset private[sql] (
     KeyValueGroupedDatasetImpl[K, T](df, encoderFor[K], encoderFor[T], 
groupingExprs)
   }
 
-  /**
-   * (Scala-specific) Compute aggregates by specifying the column names and 
aggregate methods. The
-   * resulting `DataFrame` will also contain the grouping columns.
-   *
-   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
-   * {{{
-   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
-   *   df.groupBy("department").agg(
-   *     "age" -> "max",
-   *     "expense" -> "sum"
-   *   )
-   * }}}
-   *
-   * @since 3.4.0
-   */
-  def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = 
{
-    toDF((aggExpr +: aggExprs).map { case (colName, expr) =>
-      strToColumn(expr, df(colName))
-    })
-  }
-
-  /**
-   * (Scala-specific) Compute aggregates by specifying a map from column name 
to aggregate
-   * methods. The resulting `DataFrame` will also contain the grouping columns.
-   *
-   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
-   * {{{
-   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
-   *   df.groupBy("department").agg(Map(
-   *     "age" -> "max",
-   *     "expense" -> "sum"
-   *   ))
-   * }}}
-   *
-   * @since 3.4.0
-   */
-  def agg(exprs: Map[String, String]): DataFrame = {
-    toDF(exprs.map { case (colName, expr) =>
-      strToColumn(expr, df(colName))
-    }.toSeq)
-  }
+  /** @inheritdoc */
+  override def agg(aggExpr: (String, String), aggExprs: (String, String)*): 
DataFrame =
+    super.agg(aggExpr, aggExprs: _*)
 
-  /**
-   * (Java-specific) Compute aggregates by specifying a map from column name 
to aggregate methods.
-   * The resulting `DataFrame` will also contain the grouping columns.
-   *
-   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
-   * {{{
-   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
-   *   import com.google.common.collect.ImmutableMap;
-   *   df.groupBy("department").agg(ImmutableMap.of("age", "max", "expense", 
"sum"));
-   * }}}
-   *
-   * @since 3.4.0
-   */
-  def agg(exprs: java.util.Map[String, String]): DataFrame = {
-    agg(exprs.asScala.toMap)
-  }
+  /** @inheritdoc */
+  override def agg(exprs: Map[String, String]): DataFrame = super.agg(exprs)
 
-  private[this] def strToColumn(expr: String, inputExpr: Column): Column = {
-    expr.toLowerCase(Locale.ROOT) match {
-      case "avg" | "average" | "mean" => functions.avg(inputExpr)
-      case "stddev" | "std" => functions.stddev(inputExpr)
-      case "count" | "size" => functions.count(inputExpr)
-      case name => Column.fn(name, inputExpr)
-    }
-  }
+  /** @inheritdoc */
+  override def agg(exprs: java.util.Map[String, String]): DataFrame = 
super.agg(exprs)
 
-  /**
-   * Compute aggregates by specifying a series of aggregate columns. Note that 
this function by
-   * default retains the grouping columns in its output. To not retain 
grouping columns, set
-   * `spark.sql.retainGroupColumns` to false.
-   *
-   * The available aggregate methods are defined in 
[[org.apache.spark.sql.functions]].
-   *
-   * {{{
-   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
-   *
-   *   // Scala:
-   *   import org.apache.spark.sql.functions._
-   *   df.groupBy("department").agg(max("age"), sum("expense"))
-   *
-   *   // Java:
-   *   import static org.apache.spark.sql.functions.*;
-   *   df.groupBy("department").agg(max("age"), sum("expense"));
-   * }}}
-   *
-   * Note that before Spark 1.4, the default behavior is to NOT retain 
grouping columns. To change
-   * to that behavior, set config variable `spark.sql.retainGroupColumns` to 
`false`.
-   * {{{
-   *   // Scala, 1.3.x:
-   *   df.groupBy("department").agg($"department", max("age"), sum("expense"))
-   *
-   *   // Java, 1.3.x:
-   *   df.groupBy("department").agg(col("department"), max("age"), 
sum("expense"));
-   * }}}
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def agg(expr: Column, exprs: Column*): DataFrame = {
-    toDF(expr +: exprs)
-  }
+  override def agg(expr: Column, exprs: Column*): DataFrame = super.agg(expr, 
exprs: _*)
 
-  /**
-   * Count the number of rows for each group. The resulting `DataFrame` will 
also contain the
-   * grouping columns.
-   *
-   * @since 3.4.0
-   */
-  def count(): DataFrame = 
toDF(Seq(functions.count(functions.lit(1)).alias("count")))
+  /** @inheritdoc */
+  override def count(): DataFrame = super.count()
 
-  /**
-   * Compute the average value for each numeric columns for each group. This 
is an alias for
-   * `avg`. The resulting `DataFrame` will also contain the grouping columns. 
When specified
-   * columns are given, only compute the average values for them.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def mean(colNames: String*): DataFrame = {
-    toDF(colNames.map(colName => functions.mean(colName)))
-  }
+  override def mean(colNames: String*): DataFrame = super.mean(colNames: _*)
 
-  /**
-   * Compute the max value for each numeric columns for each group. The 
resulting `DataFrame` will
-   * also contain the grouping columns. When specified columns are given, only 
compute the max
-   * values for them.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def max(colNames: String*): DataFrame = {
-    toDF(colNames.map(colName => functions.max(colName)))
-  }
+  override def max(colNames: String*): DataFrame = super.max(colNames: _*)
 
-  /**
-   * Compute the mean value for each numeric columns for each group. The 
resulting `DataFrame`
-   * will also contain the grouping columns. When specified columns are given, 
only compute the
-   * mean values for them.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def avg(colNames: String*): DataFrame = {
-    toDF(colNames.map(colName => functions.avg(colName)))
-  }
+  override def avg(colNames: String*): DataFrame = super.avg(colNames: _*)
 
-  /**
-   * Compute the min value for each numeric column for each group. The 
resulting `DataFrame` will
-   * also contain the grouping columns. When specified columns are given, only 
compute the min
-   * values for them.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def min(colNames: String*): DataFrame = {
-    toDF(colNames.map(colName => functions.min(colName)))
-  }
+  override def min(colNames: String*): DataFrame = super.min(colNames: _*)
 
-  /**
-   * Compute the sum for each numeric columns for each group. The resulting 
`DataFrame` will also
-   * contain the grouping columns. When specified columns are given, only 
compute the sum for
-   * them.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def sum(colNames: String*): DataFrame = {
-    toDF(colNames.map(colName => functions.sum(colName)))
-  }
+  override def sum(colNames: String*): DataFrame = super.sum(colNames: _*)
 
-  /**
-   * Pivots a column of the current `DataFrame` and performs the specified 
aggregation.
-   *
-   * Spark will eagerly compute the distinct values in `pivotColumn` so it can 
determine the
-   * resulting schema of the transformation. To avoid any eager computations, 
provide an explicit
-   * list of values via `pivot(pivotColumn: String, values: Seq[Any])`.
-   *
-   * {{{
-   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
-   *   df.groupBy("year").pivot("course").sum("earnings")
-   * }}}
-   *
-   * @see
-   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, 
except for the
-   *   aggregation.
-   *
-   * @param pivotColumn
-   *   Name of the column to pivot.
-   * @since 3.4.0
-   */
-  def pivot(pivotColumn: String): RelationalGroupedDataset = 
pivot(Column(pivotColumn))
+  /** @inheritdoc */
+  override def pivot(pivotColumn: String): RelationalGroupedDataset = 
super.pivot(pivotColumn)
 
-  /**
-   * Pivots a column of the current `DataFrame` and performs the specified 
aggregation. There are
-   * two versions of pivot function: one that requires the caller to specify 
the list of distinct
-   * values to pivot on, and one that does not. The latter is more concise but 
less efficient,
-   * because Spark needs to first compute the list of distinct values 
internally.
-   *
-   * {{{
-   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
-   *   df.groupBy("year").pivot("course", Seq("dotNET", 
"Java")).sum("earnings")
-   *
-   *   // Or without specifying column values (less efficient)
-   *   df.groupBy("year").pivot("course").sum("earnings")
-   * }}}
-   *
-   * From Spark 3.0.0, values can be literal columns, for instance, struct. 
For pivoting by
-   * multiple columns, use the `struct` function to combine the columns and 
values:
-   *
-   * {{{
-   *   df.groupBy("year")
-   *     .pivot("trainingCourse", Seq(struct(lit("java"), lit("Experts"))))
-   *     .agg(sum($"earnings"))
-   * }}}
-   *
-   * @see
-   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, 
except for the
-   *   aggregation.
-   *
-   * @param pivotColumn
-   *   Name of the column to pivot.
-   * @param values
-   *   List of values that will be translated to columns in the output 
DataFrame.
-   * @since 3.4.0
-   */
-  def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = 
{
-    pivot(Column(pivotColumn), values)
-  }
+  /** @inheritdoc */
+  override def pivot(pivotColumn: String, values: Seq[Any]): 
RelationalGroupedDataset =
+    super.pivot(pivotColumn, values)
 
-  /**
-   * (Java-specific) Pivots a column of the current `DataFrame` and performs 
the specified
-   * aggregation.
-   *
-   * There are two versions of pivot function: one that requires the caller to 
specify the list of
-   * distinct values to pivot on, and one that does not. The latter is more 
concise but less
-   * efficient, because Spark needs to first compute the list of distinct 
values internally.
-   *
-   * {{{
-   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
-   *   df.groupBy("year").pivot("course", Arrays.<Object>asList("dotNET", 
"Java")).sum("earnings");
-   *
-   *   // Or without specifying column values (less efficient)
-   *   df.groupBy("year").pivot("course").sum("earnings");
-   * }}}
-   *
-   * @see
-   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, 
except for the
-   *   aggregation.
-   *
-   * @param pivotColumn
-   *   Name of the column to pivot.
-   * @param values
-   *   List of values that will be translated to columns in the output 
DataFrame.
-   * @since 3.4.0
-   */
-  def pivot(pivotColumn: String, values: java.util.List[Any]): 
RelationalGroupedDataset = {
-    pivot(Column(pivotColumn), values)
+  /** @inheritdoc */
+  override def pivot(pivotColumn: String, values: java.util.List[Any]): 
RelationalGroupedDataset =
+    super.pivot(pivotColumn, values)
+
+  /** @inheritdoc */
+  override def pivot(
+      pivotColumn: Column,
+      values: java.util.List[Any]): RelationalGroupedDataset = {
+    super.pivot(pivotColumn, values)
   }
 
-  /**
-   * Pivots a column of the current `DataFrame` and performs the specified 
aggregation. This is an
-   * overloaded version of the `pivot` method with `pivotColumn` of the 
`String` type.
-   *
-   * {{{
-   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
-   *   df.groupBy($"year").pivot($"course", Seq("dotNET", 
"Java")).sum($"earnings")
-   * }}}
-   *
-   * @see
-   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, 
except for the
-   *   aggregation.
-   *
-   * @param pivotColumn
-   *   the column to pivot.
-   * @param values
-   *   List of values that will be translated to columns in the output 
DataFrame.
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = 
{
     groupType match {
       case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
-        val valueExprs = values.map(_ match {
+        val valueExprs = values.map {
           case c: Column if c.expr.hasLiteral => c.expr.getLiteral
           case c: Column if !c.expr.hasLiteral =>
             throw new IllegalArgumentException("values only accept literal 
Column")
           case v => functions.lit(v).expr.getLiteral
-        })
+        }
         new RelationalGroupedDataset(
           df,
           groupingExprs,
@@ -386,46 +170,8 @@ class RelationalGroupedDataset private[sql] (
     }
   }
 
-  /**
-   * Pivots a column of the current `DataFrame` and performs the specified 
aggregation.
-   *
-   * Spark will eagerly compute the distinct values in `pivotColumn` so it can 
determine the
-   * resulting schema of the transformation. To avoid any eager computations, 
provide an explicit
-   * list of values via `pivot(pivotColumn: Column, values: Seq[Any])`.
-   *
-   * {{{
-   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
-   *   df.groupBy($"year").pivot($"course").sum($"earnings");
-   * }}}
-   *
-   * @see
-   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, 
except for the
-   *   aggregation.
-   *
-   * @param pivotColumn
-   *   he column to pivot.
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def pivot(pivotColumn: Column): RelationalGroupedDataset = {
-    pivot(pivotColumn, Seq())
-  }
-
-  /**
-   * (Java-specific) Pivots a column of the current `DataFrame` and performs 
the specified
-   * aggregation. This is an overloaded version of the `pivot` method with 
`pivotColumn` of the
-   * `String` type.
-   *
-   * @see
-   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, 
except for the
-   *   aggregation.
-   *
-   * @param pivotColumn
-   *   the column to pivot.
-   * @param values
-   *   List of values that will be translated to columns in the output 
DataFrame.
-   * @since 3.4.0
-   */
-  def pivot(pivotColumn: Column, values: java.util.List[Any]): 
RelationalGroupedDataset = {
-    pivot(pivotColumn, values.asScala.toSeq)
+    pivot(pivotColumn, Nil)
   }
 }
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
index 16f15205cabe..2b071a384e0a 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
@@ -120,6 +120,8 @@ import org.apache.spark.util.SparkClassUtils
  */
 @Stable
 abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable {
+  type RGD <: RelationalGroupedDataset[DS]
+
   def sparkSession: SparkSession[DS]
 
   val encoder: Encoder[T]
@@ -1137,6 +1139,222 @@ abstract class Dataset[T, DS[U] <: Dataset[U, DS]] 
extends Serializable {
    */
   def where(conditionExpr: String): DS[T] = filter(conditionExpr)
 
+  /**
+   * Groups the Dataset using the specified columns, so we can run aggregation 
on them. See
+   * [[RelationalGroupedDataset]] for all the available aggregate functions.
+   *
+   * {{{
+   *   // Compute the average for all numeric columns grouped by department.
+   *   ds.groupBy($"department").avg()
+   *
+   *   // Compute the max age and average salary, grouped by department and 
gender.
+   *   ds.groupBy($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 2.0.0
+   */
+  @scala.annotation.varargs
+  def groupBy(cols: Column*): RGD
+
+  /**
+   * Groups the Dataset using the specified columns, so that we can run 
aggregation on them.
+   * See [[RelationalGroupedDataset]] for all the available aggregate 
functions.
+   *
+   * This is a variant of groupBy that can only group by existing columns 
using column names
+   * (i.e. cannot construct expressions).
+   *
+   * {{{
+   *   // Compute the average for all numeric columns grouped by department.
+   *   ds.groupBy("department").avg()
+   *
+   *   // Compute the max age and average salary, grouped by department and 
gender.
+   *   ds.groupBy($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 2.0.0
+   */
+  @scala.annotation.varargs
+  def groupBy(col1: String, cols: String*): RGD = groupBy((col1 +: 
cols).map(col): _*)
+
+  /**
+   * Create a multi-dimensional rollup for the current Dataset using the 
specified columns,
+   * so we can run aggregation on them.
+   * See [[RelationalGroupedDataset]] for all the available aggregate 
functions.
+   *
+   * {{{
+   *   // Compute the average for all numeric columns rolled up by department 
and group.
+   *   ds.rollup($"department", $"group").avg()
+   *
+   *   // Compute the max age and average salary, rolled up by department and 
gender.
+   *   ds.rollup($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 2.0.0
+   */
+  @scala.annotation.varargs
+  def rollup(cols: Column*): RGD
+
+  /**
+   * Create a multi-dimensional rollup for the current Dataset using the 
specified columns,
+   * so we can run aggregation on them.
+   * See [[RelationalGroupedDataset]] for all the available aggregate 
functions.
+   *
+   * This is a variant of rollup that can only group by existing columns using 
column names
+   * (i.e. cannot construct expressions).
+   *
+   * {{{
+   *   // Compute the average for all numeric columns rolled up by department 
and group.
+   *   ds.rollup("department", "group").avg()
+   *
+   *   // Compute the max age and average salary, rolled up by department and 
gender.
+   *   ds.rollup($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 2.0.0
+   */
+  @scala.annotation.varargs
+  def rollup(col1: String, cols: String*): RGD = rollup((col1 +: 
cols).map(col): _*)
+
+  /**
+   * Create a multi-dimensional cube for the current Dataset using the 
specified columns,
+   * so we can run aggregation on them.
+   * See [[RelationalGroupedDataset]] for all the available aggregate 
functions.
+   *
+   * {{{
+   *   // Compute the average for all numeric columns cubed by department and 
group.
+   *   ds.cube($"department", $"group").avg()
+   *
+   *   // Compute the max age and average salary, cubed by department and 
gender.
+   *   ds.cube($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 2.0.0
+   */
+  @scala.annotation.varargs
+  def cube(cols: Column*): RGD
+
+  /**
+   * Create a multi-dimensional cube for the current Dataset using the 
specified columns,
+   * so we can run aggregation on them.
+   * See [[RelationalGroupedDataset]] for all the available aggregate 
functions.
+   *
+   * This is a variant of cube that can only group by existing columns using 
column names
+   * (i.e. cannot construct expressions).
+   *
+   * {{{
+   *   // Compute the average for all numeric columns cubed by department and 
group.
+   *   ds.cube("department", "group").avg()
+   *
+   *   // Compute the max age and average salary, cubed by department and 
gender.
+   *   ds.cube($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 2.0.0
+   */
+  @scala.annotation.varargs
+  def cube(col1: String, cols: String*): RGD = cube((col1 +: cols).map(col): 
_*)
+
+  /**
+   * Create multi-dimensional aggregation for the current Dataset using the 
specified grouping sets,
+   * so we can run aggregation on them.
+   * See [[RelationalGroupedDataset]] for all the available aggregate 
functions.
+   *
+   * {{{
+   *   // Compute the average for all numeric columns group by specific 
grouping sets.
+   *   ds.groupingSets(Seq(Seq($"department", $"group"), Seq()), 
$"department", $"group").avg()
+   *
+   *   // Compute the max age and average salary, group by specific grouping 
sets.
+   *   ds.groupingSets(Seq($"department", $"gender"), Seq()), $"department", 
$"group").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 4.0.0
+   */
+  @scala.annotation.varargs
+  def groupingSets(groupingSets: Seq[Seq[Column]], cols: Column*): RGD
+
+  /**
+   * (Scala-specific) Aggregates on the entire Dataset without groups.
+   * {{{
+   *   // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
+   *   ds.agg("age" -> "max", "salary" -> "avg")
+   *   ds.groupBy().agg("age" -> "max", "salary" -> "avg")
+   * }}}
+   *
+   * @group untypedrel
+   * @since 2.0.0
+   */
+  def agg(aggExpr: (String, String), aggExprs: (String, String)*): DS[Row] = {
+    groupBy().agg(aggExpr, aggExprs: _*)
+  }
+
+  /**
+   * (Scala-specific) Aggregates on the entire Dataset without groups.
+   * {{{
+   *   // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
+   *   ds.agg(Map("age" -> "max", "salary" -> "avg"))
+   *   ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 2.0.0
+   */
+  def agg(exprs: Map[String, String]): DS[Row] = groupBy().agg(exprs)
+
+  /**
+   * (Java-specific) Aggregates on the entire Dataset without groups.
+   * {{{
+   *   // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
+   *   ds.agg(Map("age" -> "max", "salary" -> "avg"))
+   *   ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 2.0.0
+   */
+  def agg(exprs: util.Map[String, String]): DS[Row] = groupBy().agg(exprs)
+
+  /**
+   * Aggregates on the entire Dataset without groups.
+   * {{{
+   *   // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
+   *   ds.agg(max($"age"), avg($"salary"))
+   *   ds.groupBy().agg(max($"age"), avg($"salary"))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 2.0.0
+   */
+  @scala.annotation.varargs
+  def agg(expr: Column, exprs: Column*): DS[Row] = groupBy().agg(expr, exprs: 
_*)
+
   /**
    * (Scala-specific)
    * Reduces the elements of this Dataset using the specified binary function. 
The given `func`
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/api/RelationalGroupedDataset.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/api/RelationalGroupedDataset.scala
new file mode 100644
index 000000000000..30b2992d43a0
--- /dev/null
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/api/RelationalGroupedDataset.scala
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.api
+
+import scala.jdk.CollectionConverters._
+
+import _root_.java.util
+
+import org.apache.spark.annotation.Stable
+import org.apache.spark.sql.{functions, Column, Row}
+
+/**
+ * A set of methods for aggregations on a `DataFrame`, created by 
[[Dataset#groupBy groupBy]],
+ * [[Dataset#cube cube]] or [[Dataset#rollup rollup]] (and also `pivot`).
+ *
+ * The main method is the `agg` function, which has multiple variants. This 
class also contains
+ * some first-order statistics such as `mean`, `sum` for convenience.
+ *
+ * @note This class was named `GroupedData` in Spark 1.x.
+ * @since 2.0.0
+ */
+@Stable
+abstract class RelationalGroupedDataset[DS[U] <: Dataset[U, DS]] {
+  type RGD <: RelationalGroupedDataset[DS]
+
+  protected def df: DS[Row]
+
+  /**
+   * Create a aggregation based on the grouping column, the grouping type, and 
the aggregations.
+   */
+  protected def toDF(aggCols: Seq[Column]): DS[Row]
+
+  protected def selectNumericColumns(colNames: Seq[String]): Seq[Column]
+
+  /**
+   * Convert a name method tuple into a Column.
+   */
+  private def toAggCol(colAndMethod: (String, String)): Column = {
+    val col = df.col(colAndMethod._1)
+    colAndMethod._2.toLowerCase(util.Locale.ROOT) match {
+      case "avg" | "average" | "mean" => functions.avg(col)
+      case "stddev" | "std" => functions.stddev(col)
+      case "count" | "size" => functions.count(col)
+      case name => Column.fn(name, col)
+    }
+  }
+
+  private def aggregateNumericColumns(
+      colNames: Seq[String],
+      function: Column => Column): DS[Row] = {
+    toDF(selectNumericColumns(colNames).map(function))
+  }
+
+  /**
+   * (Scala-specific) Compute aggregates by specifying the column names and
+   * aggregate methods. The resulting `DataFrame` will also contain the 
grouping columns.
+   *
+   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
+   * {{{
+   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
+   *   df.groupBy("department").agg(
+   *     "age" -> "max",
+   *     "expense" -> "sum"
+   *   )
+   * }}}
+   *
+   * @since 1.3.0
+   */
+  def agg(aggExpr: (String, String), aggExprs: (String, String)*): DS[Row] =
+    toDF((aggExpr +: aggExprs).map(toAggCol))
+
+  /**
+   * (Scala-specific) Compute aggregates by specifying a map from column name 
to
+   * aggregate methods. The resulting `DataFrame` will also contain the 
grouping columns.
+   *
+   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
+   * {{{
+   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
+   *   df.groupBy("department").agg(Map(
+   *     "age" -> "max",
+   *     "expense" -> "sum"
+   *   ))
+   * }}}
+   *
+   * @since 1.3.0
+   */
+  def agg(exprs: Map[String, String]): DS[Row] = 
toDF(exprs.map(toAggCol).toSeq)
+
+  /**
+   * (Java-specific) Compute aggregates by specifying a map from column name to
+   * aggregate methods. The resulting `DataFrame` will also contain the 
grouping columns.
+   *
+   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
+   * {{{
+   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
+   *   import com.google.common.collect.ImmutableMap;
+   *   df.groupBy("department").agg(ImmutableMap.of("age", "max", "expense", 
"sum"));
+   * }}}
+   *
+   * @since 1.3.0
+   */
+  def agg(exprs: util.Map[String, String]): DS[Row] = {
+    agg(exprs.asScala.toMap)
+  }
+
+  /**
+   * Compute aggregates by specifying a series of aggregate columns. Note that 
this function by
+   * default retains the grouping columns in its output. To not retain 
grouping columns, set
+   * `spark.sql.retainGroupColumns` to false.
+   *
+   * The available aggregate methods are defined in 
[[org.apache.spark.sql.functions]].
+   *
+   * {{{
+   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
+   *
+   *   // Scala:
+   *   import org.apache.spark.sql.functions._
+   *   df.groupBy("department").agg(max("age"), sum("expense"))
+   *
+   *   // Java:
+   *   import static org.apache.spark.sql.functions.*;
+   *   df.groupBy("department").agg(max("age"), sum("expense"));
+   * }}}
+   *
+   * Note that before Spark 1.4, the default behavior is to NOT retain 
grouping columns. To change
+   * to that behavior, set config variable `spark.sql.retainGroupColumns` to 
`false`.
+   * {{{
+   *   // Scala, 1.3.x:
+   *   df.groupBy("department").agg($"department", max("age"), sum("expense"))
+   *
+   *   // Java, 1.3.x:
+   *   df.groupBy("department").agg(col("department"), max("age"), 
sum("expense"));
+   * }}}
+   *
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def agg(expr: Column, exprs: Column*): DS[Row] = toDF(expr +: exprs)
+
+  /**
+   * Count the number of rows for each group.
+   * The resulting `DataFrame` will also contain the grouping columns.
+   *
+   * @since 1.3.0
+   */
+  def count(): DS[Row] = toDF(functions.count(functions.lit(1)).as("count") :: 
Nil)
+
+  /**
+   * Compute the average value for each numeric columns for each group. This 
is an alias for `avg`.
+   * The resulting `DataFrame` will also contain the grouping columns.
+   * When specified columns are given, only compute the average values for 
them.
+   *
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def mean(colNames: String*): DS[Row] = aggregateNumericColumns(colNames, 
functions.avg)
+
+
+  /**
+   * Compute the max value for each numeric columns for each group.
+   * The resulting `DataFrame` will also contain the grouping columns.
+   * When specified columns are given, only compute the max values for them.
+   *
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def max(colNames: String*): DS[Row] = aggregateNumericColumns(colNames, 
functions.max)
+
+  /**
+   * Compute the mean value for each numeric columns for each group.
+   * The resulting `DataFrame` will also contain the grouping columns.
+   * When specified columns are given, only compute the mean values for them.
+   *
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def avg(colNames: String*): DS[Row] = aggregateNumericColumns(colNames, 
functions.avg)
+
+  /**
+   * Compute the min value for each numeric column for each group.
+   * The resulting `DataFrame` will also contain the grouping columns.
+   * When specified columns are given, only compute the min values for them.
+   *
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def min(colNames: String*): DS[Row] = aggregateNumericColumns(colNames, 
functions.min)
+
+  /**
+   * Compute the sum for each numeric columns for each group.
+   * The resulting `DataFrame` will also contain the grouping columns.
+   * When specified columns are given, only compute the sum for them.
+   *
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def sum(colNames: String*): DS[Row] = aggregateNumericColumns(colNames, 
functions.sum)
+
+  /**
+   * Pivots a column of the current `DataFrame` and performs the specified 
aggregation.
+   *
+   * Spark will eagerly compute the distinct values in `pivotColumn` so it can 
determine
+   * the resulting schema of the transformation. To avoid any eager 
computations, provide an
+   * explicit list of values via `pivot(pivotColumn: String, values: 
Seq[Any])`.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
+   *   df.groupBy("year").pivot("course").sum("earnings")
+   * }}}
+   *
+   * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
+   *      except for the aggregation.
+   * @param pivotColumn Name of the column to pivot.
+   * @since 1.6.0
+   */
+  def pivot(pivotColumn: String): RGD = pivot(df.col(pivotColumn))
+
+  /**
+   * Pivots a column of the current `DataFrame` and performs the specified 
aggregation.
+   * There are two versions of pivot function: one that requires the caller to 
specify the list
+   * of distinct values to pivot on, and one that does not. The latter is more 
concise but less
+   * efficient, because Spark needs to first compute the list of distinct 
values internally.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
+   *   df.groupBy("year").pivot("course", Seq("dotNET", 
"Java")).sum("earnings")
+   *
+   *   // Or without specifying column values (less efficient)
+   *   df.groupBy("year").pivot("course").sum("earnings")
+   * }}}
+   *
+   * From Spark 3.0.0, values can be literal columns, for instance, struct. 
For pivoting by
+   * multiple columns, use the `struct` function to combine the columns and 
values:
+   *
+   * {{{
+   *   df.groupBy("year")
+   *     .pivot("trainingCourse", Seq(struct(lit("java"), lit("Experts"))))
+   *     .agg(sum($"earnings"))
+   * }}}
+   *
+   * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
+   *      except for the aggregation.
+   * @param pivotColumn Name of the column to pivot.
+   * @param values      List of values that will be translated to columns in 
the output DataFrame.
+   * @since 1.6.0
+   */
+  def pivot(pivotColumn: String, values: Seq[Any]): RGD =
+    pivot(df.col(pivotColumn), values)
+
+  /**
+   * (Java-specific) Pivots a column of the current `DataFrame` and performs 
the specified
+   * aggregation.
+   *
+   * There are two versions of pivot function: one that requires the caller to 
specify the list
+   * of distinct values to pivot on, and one that does not. The latter is more 
concise but less
+   * efficient, because Spark needs to first compute the list of distinct 
values internally.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
+   *   df.groupBy("year").pivot("course", Arrays.<Object>asList("dotNET", 
"Java")).sum("earnings");
+   *
+   *   // Or without specifying column values (less efficient)
+   *   df.groupBy("year").pivot("course").sum("earnings");
+   * }}}
+   *
+   * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
+   *      except for the aggregation.
+   * @param pivotColumn Name of the column to pivot.
+   * @param values      List of values that will be translated to columns in 
the output DataFrame.
+   * @since 1.6.0
+   */
+  def pivot(pivotColumn: String, values: util.List[Any]): RGD =
+    pivot(df.col(pivotColumn), values)
+
+  /**
+   * (Java-specific) Pivots a column of the current `DataFrame` and performs 
the specified
+   * aggregation. This is an overloaded version of the `pivot` method with 
`pivotColumn` of
+   * the `String` type.
+   *
+   * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
+   *      except for the aggregation.
+   * @param pivotColumn the column to pivot.
+   * @param values      List of values that will be translated to columns in 
the output DataFrame.
+   * @since 2.4.0
+   */
+  def pivot(pivotColumn: Column, values: util.List[Any]): RGD =
+    pivot(pivotColumn, values.asScala.toSeq)
+
+  /**
+   * Pivots a column of the current `DataFrame` and performs the specified 
aggregation.
+   *
+   * Spark will eagerly compute the distinct values in `pivotColumn` so it can 
determine
+   * the resulting schema of the transformation. To avoid any eager 
computations, provide an
+   * explicit list of values via `pivot(pivotColumn: Column, values: 
Seq[Any])`.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
+   *   df.groupBy($"year").pivot($"course").sum($"earnings");
+   * }}}
+   *
+   * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
+   *      except for the aggregation.
+   * @param pivotColumn he column to pivot.
+   * @since 2.4.0
+   */
+  def pivot(pivotColumn: Column): RGD
+
+  /**
+   * Pivots a column of the current `DataFrame` and performs the specified 
aggregation.
+   * This is an overloaded version of the `pivot` method with `pivotColumn` of 
the `String` type.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
+   *   df.groupBy($"year").pivot($"course", Seq("dotNET", 
"Java")).sum($"earnings")
+   * }}}
+   *
+   * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
+   *      except for the aggregation.
+   * @param pivotColumn the column to pivot.
+   * @param values      List of values that will be translated to columns in 
the output DataFrame.
+   * @since 2.4.0
+   */
+  def pivot(pivotColumn: Column, values: Seq[Any]): RGD
+}
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/cube_string.json 
b/sql/connect/common/src/test/resources/query-tests/queries/cube_string.json
index 5b9709ff0657..03625861d88f 100644
--- a/sql/connect/common/src/test/resources/query-tests/queries/cube_string.json
+++ b/sql/connect/common/src/test/resources/query-tests/queries/cube_string.json
@@ -14,11 +14,13 @@
     "groupType": "GROUP_TYPE_CUBE",
     "groupingExpressions": [{
       "unresolvedAttribute": {
-        "unparsedIdentifier": "a"
+        "unparsedIdentifier": "a",
+        "planId": "0"
       }
     }, {
       "unresolvedAttribute": {
-        "unparsedIdentifier": "b"
+        "unparsedIdentifier": "b",
+        "planId": "0"
       }
     }],
     "aggregateExpressions": [{
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/cube_string.proto.bin
 
b/sql/connect/common/src/test/resources/query-tests/queries/cube_string.proto.bin
index d46e40b39dcf..59c7a5557120 100644
Binary files 
a/sql/connect/common/src/test/resources/query-tests/queries/cube_string.proto.bin
 and 
b/sql/connect/common/src/test/resources/query-tests/queries/cube_string.proto.bin
 differ
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.json
 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.json
index 26320d404835..285c13f4bc8b 100644
--- 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.json
+++ 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.json
@@ -14,11 +14,13 @@
     "groupType": "GROUP_TYPE_GROUPBY",
     "groupingExpressions": [{
       "unresolvedAttribute": {
-        "unparsedIdentifier": "id"
+        "unparsedIdentifier": "id",
+        "planId": "0"
       }
     }, {
       "unresolvedAttribute": {
-        "unparsedIdentifier": "b"
+        "unparsedIdentifier": "b",
+        "planId": "0"
       }
     }],
     "aggregateExpressions": [{
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.proto.bin
 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.proto.bin
index 818146f7f693..674d506fa4a0 100644
Binary files 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.proto.bin
 and 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_agg_string.proto.bin
 differ
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.json 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.json
index 5785eee2cadb..0ded46cf6cc7 100644
--- a/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.json
+++ b/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.json
@@ -22,7 +22,8 @@
         "functionName": "avg",
         "arguments": [{
           "unresolvedAttribute": {
-            "unparsedIdentifier": "a"
+            "unparsedIdentifier": "a",
+            "planId": "0"
           }
         }]
       }
@@ -31,7 +32,8 @@
         "functionName": "avg",
         "arguments": [{
           "unresolvedAttribute": {
-            "unparsedIdentifier": "b"
+            "unparsedIdentifier": "b",
+            "planId": "0"
           }
         }]
       }
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.proto.bin
 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.proto.bin
index 4a18ea2d82d9..444b0c3853f1 100644
Binary files 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.proto.bin
 and 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_avg.proto.bin
 differ
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.json 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.json
index 3225a475a9b3..ed186ff71351 100644
--- a/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.json
+++ b/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.json
@@ -22,7 +22,8 @@
         "functionName": "max",
         "arguments": [{
           "unresolvedAttribute": {
-            "unparsedIdentifier": "a"
+            "unparsedIdentifier": "a",
+            "planId": "0"
           }
         }]
       }
@@ -31,7 +32,8 @@
         "functionName": "max",
         "arguments": [{
           "unresolvedAttribute": {
-            "unparsedIdentifier": "b"
+            "unparsedIdentifier": "b",
+            "planId": "0"
           }
         }]
       }
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.proto.bin
 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.proto.bin
index 651274b1afca..11cd163e9173 100644
Binary files 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.proto.bin
 and 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_max.proto.bin
 differ
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.json 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.json
index 5785eee2cadb..0ded46cf6cc7 100644
--- 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.json
+++ 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.json
@@ -22,7 +22,8 @@
         "functionName": "avg",
         "arguments": [{
           "unresolvedAttribute": {
-            "unparsedIdentifier": "a"
+            "unparsedIdentifier": "a",
+            "planId": "0"
           }
         }]
       }
@@ -31,7 +32,8 @@
         "functionName": "avg",
         "arguments": [{
           "unresolvedAttribute": {
-            "unparsedIdentifier": "b"
+            "unparsedIdentifier": "b",
+            "planId": "0"
           }
         }]
       }
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.proto.bin
 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.proto.bin
index 4a18ea2d82d9..444b0c3853f1 100644
Binary files 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.proto.bin
 and 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_mean.proto.bin
 differ
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.json 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.json
index afcc07d2c869..8c0ad283cb0a 100644
--- a/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.json
+++ b/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.json
@@ -22,7 +22,8 @@
         "functionName": "min",
         "arguments": [{
           "unresolvedAttribute": {
-            "unparsedIdentifier": "a"
+            "unparsedIdentifier": "a",
+            "planId": "0"
           }
         }]
       }
@@ -31,7 +32,8 @@
         "functionName": "min",
         "arguments": [{
           "unresolvedAttribute": {
-            "unparsedIdentifier": "b"
+            "unparsedIdentifier": "b",
+            "planId": "0"
           }
         }]
       }
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.proto.bin
 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.proto.bin
index 6e038bf0b315..2bc985a1fe9f 100644
Binary files 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.proto.bin
 and 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_min.proto.bin
 differ
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.json 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.json
index 74dd5b045aa5..788b964491c6 100644
--- a/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.json
+++ b/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.json
@@ -22,7 +22,8 @@
         "functionName": "sum",
         "arguments": [{
           "unresolvedAttribute": {
-            "unparsedIdentifier": "a"
+            "unparsedIdentifier": "a",
+            "planId": "0"
           }
         }]
       }
@@ -31,7 +32,8 @@
         "functionName": "sum",
         "arguments": [{
           "unresolvedAttribute": {
-            "unparsedIdentifier": "b"
+            "unparsedIdentifier": "b",
+            "planId": "0"
           }
         }]
       }
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.proto.bin
 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.proto.bin
index fe2451ca18fb..e92041399cbc 100644
Binary files 
a/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.proto.bin
 and 
b/sql/connect/common/src/test/resources/query-tests/queries/groupby_sum.proto.bin
 differ
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.json
 
b/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.json
index 07bbd315a5fe..8ff81d95d298 100644
--- 
a/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.json
+++ 
b/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.json
@@ -14,11 +14,13 @@
     "groupType": "GROUP_TYPE_CUBE",
     "groupingExpressions": [{
       "unresolvedAttribute": {
-        "unparsedIdentifier": "a"
+        "unparsedIdentifier": "a",
+        "planId": "0"
       }
     }, {
       "unresolvedAttribute": {
-        "unparsedIdentifier": "b"
+        "unparsedIdentifier": "b",
+        "planId": "0"
       }
     }],
     "aggregateExpressions": [{
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.proto.bin
 
b/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.proto.bin
index 88b3f0593132..d1dded43ddf9 100644
Binary files 
a/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.proto.bin
 and 
b/sql/connect/common/src/test/resources/query-tests/queries/grouping_and_grouping_id.proto.bin
 differ
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/pivot.json 
b/sql/connect/common/src/test/resources/query-tests/queries/pivot.json
index 30bff04c531d..2af86606b9fc 100644
--- a/sql/connect/common/src/test/resources/query-tests/queries/pivot.json
+++ b/sql/connect/common/src/test/resources/query-tests/queries/pivot.json
@@ -30,7 +30,8 @@
     "pivot": {
       "col": {
         "unresolvedAttribute": {
-          "unparsedIdentifier": "a"
+          "unparsedIdentifier": "a",
+          "planId": "0"
         }
       },
       "values": [{
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/pivot.proto.bin 
b/sql/connect/common/src/test/resources/query-tests/queries/pivot.proto.bin
index 67063209a184..f545179e8496 100644
Binary files 
a/sql/connect/common/src/test/resources/query-tests/queries/pivot.proto.bin and 
b/sql/connect/common/src/test/resources/query-tests/queries/pivot.proto.bin 
differ
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.json
 
b/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.json
index 5218a88988ea..aa043613795c 100644
--- 
a/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.json
+++ 
b/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.json
@@ -30,7 +30,8 @@
     "pivot": {
       "col": {
         "unresolvedAttribute": {
-          "unparsedIdentifier": "a"
+          "unparsedIdentifier": "a",
+          "planId": "0"
         }
       }
     }
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.proto.bin
 
b/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.proto.bin
index aee3c980eaee..588b56f247e0 100644
Binary files 
a/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.proto.bin
 and 
b/sql/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.proto.bin
 differ
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.json 
b/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.json
index 1102db18830b..5082051031f8 100644
--- 
a/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.json
+++ 
b/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.json
@@ -14,11 +14,13 @@
     "groupType": "GROUP_TYPE_ROLLUP",
     "groupingExpressions": [{
       "unresolvedAttribute": {
-        "unparsedIdentifier": "a"
+        "unparsedIdentifier": "a",
+        "planId": "0"
       }
     }, {
       "unresolvedAttribute": {
-        "unparsedIdentifier": "b"
+        "unparsedIdentifier": "b",
+        "planId": "0"
       }
     }],
     "aggregateExpressions": [{
diff --git 
a/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.proto.bin
 
b/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.proto.bin
index 64dbb597c365..63fdead641da 100644
Binary files 
a/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.proto.bin
 and 
b/sql/connect/common/src/test/resources/query-tests/queries/rollup_string.proto.bin
 differ
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 03b6a8d6d737..a28dfbdbf66a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -214,6 +214,7 @@ class Dataset[T] private[sql](
     @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
     @DeveloperApi @Unstable @transient val encoder: Encoder[T])
   extends api.Dataset[T, Dataset] {
+  type RGD = RelationalGroupedDataset
 
   @transient lazy val sparkSession: SparkSession = {
     if (queryExecution == null || queryExecution.sparkSession == null) {
@@ -891,73 +892,19 @@ class Dataset[T] private[sql](
     RelationalGroupedDataset(toDF(), cols.map(_.expr), 
RelationalGroupedDataset.GroupByType)
   }
 
-  /**
-   * Create a multi-dimensional rollup for the current Dataset using the 
specified columns,
-   * so we can run aggregation on them.
-   * See [[RelationalGroupedDataset]] for all the available aggregate 
functions.
-   *
-   * {{{
-   *   // Compute the average for all numeric columns rolled up by department 
and group.
-   *   ds.rollup($"department", $"group").avg()
-   *
-   *   // Compute the max age and average salary, rolled up by department and 
gender.
-   *   ds.rollup($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
   def rollup(cols: Column*): RelationalGroupedDataset = {
     RelationalGroupedDataset(toDF(), cols.map(_.expr), 
RelationalGroupedDataset.RollupType)
   }
 
-  /**
-   * Create a multi-dimensional cube for the current Dataset using the 
specified columns,
-   * so we can run aggregation on them.
-   * See [[RelationalGroupedDataset]] for all the available aggregate 
functions.
-   *
-   * {{{
-   *   // Compute the average for all numeric columns cubed by department and 
group.
-   *   ds.cube($"department", $"group").avg()
-   *
-   *   // Compute the max age and average salary, cubed by department and 
gender.
-   *   ds.cube($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
   def cube(cols: Column*): RelationalGroupedDataset = {
     RelationalGroupedDataset(toDF(), cols.map(_.expr), 
RelationalGroupedDataset.CubeType)
   }
 
-  /**
-   * Create multi-dimensional aggregation for the current Dataset using the 
specified grouping sets,
-   * so we can run aggregation on them.
-   * See [[RelationalGroupedDataset]] for all the available aggregate 
functions.
-   *
-   * {{{
-   *   // Compute the average for all numeric columns group by specific 
grouping sets.
-   *   ds.groupingSets(Seq(Seq($"department", $"group"), Seq()), 
$"department", $"group").avg()
-   *
-   *   // Compute the max age and average salary, group by specific grouping 
sets.
-   *   ds.groupingSets(Seq($"department", $"gender"), Seq()), $"department", 
$"group").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 4.0.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
   def groupingSets(groupingSets: Seq[Seq[Column]], cols: Column*): 
RelationalGroupedDataset = {
     RelationalGroupedDataset(
@@ -966,33 +913,6 @@ class Dataset[T] private[sql](
       
RelationalGroupedDataset.GroupingSetsType(groupingSets.map(_.map(_.expr))))
   }
 
-  /**
-   * Groups the Dataset using the specified columns, so that we can run 
aggregation on them.
-   * See [[RelationalGroupedDataset]] for all the available aggregate 
functions.
-   *
-   * This is a variant of groupBy that can only group by existing columns 
using column names
-   * (i.e. cannot construct expressions).
-   *
-   * {{{
-   *   // Compute the average for all numeric columns grouped by department.
-   *   ds.groupBy("department").avg()
-   *
-   *   // Compute the max age and average salary, grouped by department and 
gender.
-   *   ds.groupBy($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   * @group untypedrel
-   * @since 2.0.0
-   */
-  @scala.annotation.varargs
-  def groupBy(col1: String, cols: String*): RelationalGroupedDataset = {
-    val colNames: Seq[String] = col1 +: cols
-    RelationalGroupedDataset(
-      toDF(), colNames.map(colName => resolve(colName)), 
RelationalGroupedDataset.GroupByType)
-  }
-
   /** @inheritdoc */
   def reduce(func: (T, T) => T): T = withNewRDDExecutionId("reduce") {
     rdd.reduce(func)
@@ -1027,118 +947,6 @@ class Dataset[T] private[sql](
   def groupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]): 
KeyValueGroupedDataset[K, T] =
     groupByKey(func.call(_))(encoder)
 
-  /**
-   * Create a multi-dimensional rollup for the current Dataset using the 
specified columns,
-   * so we can run aggregation on them.
-   * See [[RelationalGroupedDataset]] for all the available aggregate 
functions.
-   *
-   * This is a variant of rollup that can only group by existing columns using 
column names
-   * (i.e. cannot construct expressions).
-   *
-   * {{{
-   *   // Compute the average for all numeric columns rolled up by department 
and group.
-   *   ds.rollup("department", "group").avg()
-   *
-   *   // Compute the max age and average salary, rolled up by department and 
gender.
-   *   ds.rollup($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 2.0.0
-   */
-  @scala.annotation.varargs
-  def rollup(col1: String, cols: String*): RelationalGroupedDataset = {
-    val colNames: Seq[String] = col1 +: cols
-    RelationalGroupedDataset(
-      toDF(), colNames.map(colName => resolve(colName)), 
RelationalGroupedDataset.RollupType)
-  }
-
-  /**
-   * Create a multi-dimensional cube for the current Dataset using the 
specified columns,
-   * so we can run aggregation on them.
-   * See [[RelationalGroupedDataset]] for all the available aggregate 
functions.
-   *
-   * This is a variant of cube that can only group by existing columns using 
column names
-   * (i.e. cannot construct expressions).
-   *
-   * {{{
-   *   // Compute the average for all numeric columns cubed by department and 
group.
-   *   ds.cube("department", "group").avg()
-   *
-   *   // Compute the max age and average salary, cubed by department and 
gender.
-   *   ds.cube($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   * @group untypedrel
-   * @since 2.0.0
-   */
-  @scala.annotation.varargs
-  def cube(col1: String, cols: String*): RelationalGroupedDataset = {
-    val colNames: Seq[String] = col1 +: cols
-    RelationalGroupedDataset(
-      toDF(), colNames.map(colName => resolve(colName)), 
RelationalGroupedDataset.CubeType)
-  }
-
-  /**
-   * (Scala-specific) Aggregates on the entire Dataset without groups.
-   * {{{
-   *   // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
-   *   ds.agg("age" -> "max", "salary" -> "avg")
-   *   ds.groupBy().agg("age" -> "max", "salary" -> "avg")
-   * }}}
-   *
-   * @group untypedrel
-   * @since 2.0.0
-   */
-  def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = 
{
-    groupBy().agg(aggExpr, aggExprs : _*)
-  }
-
-  /**
-   * (Scala-specific) Aggregates on the entire Dataset without groups.
-   * {{{
-   *   // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
-   *   ds.agg(Map("age" -> "max", "salary" -> "avg"))
-   *   ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 2.0.0
-   */
-  def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs)
-
-  /**
-   * (Java-specific) Aggregates on the entire Dataset without groups.
-   * {{{
-   *   // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
-   *   ds.agg(Map("age" -> "max", "salary" -> "avg"))
-   *   ds.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 2.0.0
-   */
-  def agg(exprs: java.util.Map[String, String]): DataFrame = 
groupBy().agg(exprs)
-
-  /**
-   * Aggregates on the entire Dataset without groups.
-   * {{{
-   *   // ds.agg(...) is a shorthand for ds.groupBy().agg(...)
-   *   ds.agg(max($"age"), avg($"salary"))
-   *   ds.groupBy().agg(max($"age"), avg($"salary"))
-   * }}}
-   *
-   * @group untypedrel
-   * @since 2.0.0
-   */
-  @scala.annotation.varargs
-  def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs 
: _*)
-
   /** @inheritdoc */
   def unpivot(
       ids: Array[Column],
@@ -2223,6 +2031,35 @@ class Dataset[T] private[sql](
   /** @inheritdoc */
   override def distinct(): Dataset[T] = super.distinct()
 
+  /** @inheritdoc */
+  @scala.annotation.varargs
+  override def groupBy(col1: String, cols: String*): RelationalGroupedDataset =
+    super.groupBy(col1, cols: _*)
+
+  /** @inheritdoc */
+  @scala.annotation.varargs
+  override def rollup(col1: String, cols: String*): RelationalGroupedDataset =
+    super.rollup(col1, cols: _*)
+
+  /** @inheritdoc */
+  @scala.annotation.varargs
+  override def cube(col1: String, cols: String*): RelationalGroupedDataset =
+    super.cube(col1, cols: _*)
+
+  /** @inheritdoc */
+  override def agg(aggExpr: (String, String), aggExprs: (String, String)*): 
DataFrame =
+    super.agg(aggExpr, aggExprs: _*)
+
+  /** @inheritdoc */
+  override def agg(exprs: Map[String, String]): DataFrame = super.agg(exprs)
+
+  /** @inheritdoc */
+  override def agg(exprs: java.util.Map[String, String]): DataFrame = 
super.agg(exprs)
+
+  /** @inheritdoc */
+  @scala.annotation.varargs
+  override def agg(expr: Column, exprs: Column*): DataFrame = super.agg(expr, 
exprs: _*)
+
   ////////////////////////////////////////////////////////////////////////////
   // For Python API
   ////////////////////////////////////////////////////////////////////////////
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index 3cafe0d98f1b..777baa3e6268 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -17,15 +17,11 @@
 
 package org.apache.spark.sql
 
-import java.util.Locale
-
-import scala.jdk.CollectionConverters._
-
 import org.apache.spark.SparkRuntimeException
 import org.apache.spark.annotation.Stable
 import org.apache.spark.api.python.PythonEvalType
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAlias, 
UnresolvedFunction}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
 import org.apache.spark.sql.catalyst.encoders.encoderFor
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -54,13 +50,19 @@ import org.apache.spark.util.ArrayImplicits._
  */
 @Stable
 class RelationalGroupedDataset protected[sql](
-    private[sql] val df: DataFrame,
+    protected[sql] val df: DataFrame,
     private[sql] val groupingExprs: Seq[Expression],
-    groupType: RelationalGroupedDataset.GroupType) {
+    groupType: RelationalGroupedDataset.GroupType)
+  extends api.RelationalGroupedDataset[Dataset] {
+  type RGD = RelationalGroupedDataset
   import RelationalGroupedDataset._
   import df.sparkSession._
 
-  private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
+  override protected def toDF(aggCols: Seq[Column]): DataFrame = {
+    val aggExprs = aggCols.map(expression).map { e =>
+      withInputType(e, df.exprEnc, df.logicalPlan.output)
+    }
+
     @scala.annotation.nowarn("cat=deprecation")
     val aggregates = if 
(df.sparkSession.sessionState.conf.dataFrameRetainGroupColumns) {
       groupingExprs match {
@@ -98,9 +100,7 @@ class RelationalGroupedDataset protected[sql](
     }
   }
 
-  private[this] def aggregateNumericColumns(colNames: String*)(f: Expression 
=> AggregateFunction)
-    : DataFrame = {
-
+  override protected def selectNumericColumns(colNames: Seq[String]): 
Seq[Column] = {
     val columnExprs = if (colNames.isEmpty) {
       // No columns specified. Use all numeric columns.
       df.numericColumns
@@ -114,29 +114,9 @@ class RelationalGroupedDataset protected[sql](
         namedExpr
       }
     }
-    toDF(columnExprs.map(expr => f(expr).toAggregateExpression()))
+    columnExprs.map(column)
   }
 
-  private[this] def strToExpr(expr: String): (Expression => Expression) = {
-    val exprToFunc: (Expression => Expression) = {
-      (inputExpr: Expression) => expr.toLowerCase(Locale.ROOT) match {
-        // We special handle a few cases that have alias that are not in 
function registry.
-        case "avg" | "average" | "mean" =>
-          UnresolvedFunction("avg", inputExpr :: Nil, isDistinct = false)
-        case "stddev" | "std" =>
-          UnresolvedFunction("stddev", inputExpr :: Nil, isDistinct = false)
-        // Also special handle count because we need to take care count(*).
-        case "count" | "size" =>
-          // Turn count(*) into count(1)
-          inputExpr match {
-            case s: Star => Count(Literal(1)).toAggregateExpression()
-            case _ => Count(inputExpr).toAggregateExpression()
-          }
-        case name => UnresolvedFunction(name, inputExpr :: Nil, isDistinct = 
false)
-      }
-    }
-    (inputExpr: Expression) => exprToFunc(inputExpr)
-  }
 
   /**
    * Returns a `KeyValueGroupedDataset` where the data is grouped by the 
grouping expressions
@@ -159,296 +139,68 @@ class RelationalGroupedDataset protected[sql](
       groupingAttributes)
   }
 
-  /**
-   * (Scala-specific) Compute aggregates by specifying the column names and
-   * aggregate methods. The resulting `DataFrame` will also contain the 
grouping columns.
-   *
-   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
-   * {{{
-   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
-   *   df.groupBy("department").agg(
-   *     "age" -> "max",
-   *     "expense" -> "sum"
-   *   )
-   * }}}
-   *
-   * @since 1.3.0
-   */
-  def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = 
{
-    toDF((aggExpr +: aggExprs).map { case (colName, expr) =>
-      strToExpr(expr)(df(colName).expr)
-    })
-  }
+  /** @inheritdoc */
+  override def agg(aggExpr: (String, String), aggExprs: (String, String)*): 
DataFrame =
+    super.agg(aggExpr, aggExprs: _*)
 
-  /**
-   * (Scala-specific) Compute aggregates by specifying a map from column name 
to
-   * aggregate methods. The resulting `DataFrame` will also contain the 
grouping columns.
-   *
-   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
-   * {{{
-   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
-   *   df.groupBy("department").agg(Map(
-   *     "age" -> "max",
-   *     "expense" -> "sum"
-   *   ))
-   * }}}
-   *
-   * @since 1.3.0
-   */
-  def agg(exprs: Map[String, String]): DataFrame = {
-    toDF(exprs.map { case (colName, expr) =>
-      strToExpr(expr)(df(colName).expr)
-    }.toSeq)
-  }
+  /** @inheritdoc */
+  override def agg(exprs: Map[String, String]): DataFrame = super.agg(exprs)
 
-  /**
-   * (Java-specific) Compute aggregates by specifying a map from column name to
-   * aggregate methods. The resulting `DataFrame` will also contain the 
grouping columns.
-   *
-   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
-   * {{{
-   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
-   *   import com.google.common.collect.ImmutableMap;
-   *   df.groupBy("department").agg(ImmutableMap.of("age", "max", "expense", 
"sum"));
-   * }}}
-   *
-   * @since 1.3.0
-   */
-  def agg(exprs: java.util.Map[String, String]): DataFrame = {
-    agg(exprs.asScala.toMap)
-  }
+  /** @inheritdoc */
+  override def agg(exprs: java.util.Map[String, String]): DataFrame = 
super.agg(exprs)
 
-  /**
-   * Compute aggregates by specifying a series of aggregate columns. Note that 
this function by
-   * default retains the grouping columns in its output. To not retain 
grouping columns, set
-   * `spark.sql.retainGroupColumns` to false.
-   *
-   * The available aggregate methods are defined in 
[[org.apache.spark.sql.functions]].
-   *
-   * {{{
-   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
-   *
-   *   // Scala:
-   *   import org.apache.spark.sql.functions._
-   *   df.groupBy("department").agg(max("age"), sum("expense"))
-   *
-   *   // Java:
-   *   import static org.apache.spark.sql.functions.*;
-   *   df.groupBy("department").agg(max("age"), sum("expense"));
-   * }}}
-   *
-   * Note that before Spark 1.4, the default behavior is to NOT retain 
grouping columns. To change
-   * to that behavior, set config variable `spark.sql.retainGroupColumns` to 
`false`.
-   * {{{
-   *   // Scala, 1.3.x:
-   *   df.groupBy("department").agg($"department", max("age"), sum("expense"))
-   *
-   *   // Java, 1.3.x:
-   *   df.groupBy("department").agg(col("department"), max("age"), 
sum("expense"));
-   * }}}
-   *
-   * @since 1.3.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def agg(expr: Column, exprs: Column*): DataFrame = {
-    toDF((expr +: exprs).map {
-      case typed: TypedColumn[_, _] =>
-        withInputType(typed.expr, df.exprEnc, df.logicalPlan.output)
-      case c => c.expr
-    })
-  }
+  override def agg(expr: Column, exprs: Column*): DataFrame = super.agg(expr, 
exprs: _*)
 
-  /**
-   * Count the number of rows for each group.
-   * The resulting `DataFrame` will also contain the grouping columns.
-   *
-   * @since 1.3.0
-   */
-  def count(): DataFrame = 
toDF(Seq(Alias(Count(Literal(1)).toAggregateExpression(), "count")()))
+  /** @inheritdoc */
+  override def count(): DataFrame = super.count()
 
-  /**
-   * Compute the average value for each numeric columns for each group. This 
is an alias for `avg`.
-   * The resulting `DataFrame` will also contain the grouping columns.
-   * When specified columns are given, only compute the average values for 
them.
-   *
-   * @since 1.3.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def mean(colNames: String*): DataFrame = {
-    aggregateNumericColumns(colNames : _*)(Average(_))
-  }
+  override def mean(colNames: String*): DataFrame = super.mean(colNames: _*)
 
-  /**
-   * Compute the max value for each numeric columns for each group.
-   * The resulting `DataFrame` will also contain the grouping columns.
-   * When specified columns are given, only compute the max values for them.
-   *
-   * @since 1.3.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def max(colNames: String*): DataFrame = {
-    aggregateNumericColumns(colNames : _*)(Max)
-  }
+  override def max(colNames: String*): DataFrame = super.max(colNames: _*)
 
-  /**
-   * Compute the mean value for each numeric columns for each group.
-   * The resulting `DataFrame` will also contain the grouping columns.
-   * When specified columns are given, only compute the mean values for them.
-   *
-   * @since 1.3.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def avg(colNames: String*): DataFrame = {
-    aggregateNumericColumns(colNames : _*)(Average(_))
-  }
+  override def avg(colNames: String*): DataFrame = super.avg(colNames: _*)
 
-  /**
-   * Compute the min value for each numeric column for each group.
-   * The resulting `DataFrame` will also contain the grouping columns.
-   * When specified columns are given, only compute the min values for them.
-   *
-   * @since 1.3.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def min(colNames: String*): DataFrame = {
-    aggregateNumericColumns(colNames : _*)(Min)
-  }
+  override def min(colNames: String*): DataFrame = super.min(colNames: _*)
 
-  /**
-   * Compute the sum for each numeric columns for each group.
-   * The resulting `DataFrame` will also contain the grouping columns.
-   * When specified columns are given, only compute the sum for them.
-   *
-   * @since 1.3.0
-   */
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def sum(colNames: String*): DataFrame = {
-    aggregateNumericColumns(colNames : _*)(Sum(_))
-  }
+  override def sum(colNames: String*): DataFrame = super.sum(colNames: _*)
 
-  /**
-   * Pivots a column of the current `DataFrame` and performs the specified 
aggregation.
-   *
-   * Spark will eagerly compute the distinct values in `pivotColumn` so it can 
determine
-   * the resulting schema of the transformation. To avoid any eager 
computations, provide an
-   * explicit list of values via `pivot(pivotColumn: String, values: 
Seq[Any])`.
-   *
-   * {{{
-   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
-   *   df.groupBy("year").pivot("course").sum("earnings")
-   * }}}
-   *
-   * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
-   *      except for the aggregation.
-   *
-   * @param pivotColumn Name of the column to pivot.
-   * @since 1.6.0
-   */
-  def pivot(pivotColumn: String): RelationalGroupedDataset = 
pivot(Column(pivotColumn))
+  /** @inheritdoc */
+  override def pivot(pivotColumn: String): RelationalGroupedDataset = 
super.pivot(pivotColumn)
 
-  /**
-   * Pivots a column of the current `DataFrame` and performs the specified 
aggregation.
-   * There are two versions of pivot function: one that requires the caller to 
specify the list
-   * of distinct values to pivot on, and one that does not. The latter is more 
concise but less
-   * efficient, because Spark needs to first compute the list of distinct 
values internally.
-   *
-   * {{{
-   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
-   *   df.groupBy("year").pivot("course", Seq("dotNET", 
"Java")).sum("earnings")
-   *
-   *   // Or without specifying column values (less efficient)
-   *   df.groupBy("year").pivot("course").sum("earnings")
-   * }}}
-   *
-   * From Spark 3.0.0, values can be literal columns, for instance, struct. 
For pivoting by
-   * multiple columns, use the `struct` function to combine the columns and 
values:
-   *
-   * {{{
-   *   df.groupBy("year")
-   *     .pivot("trainingCourse", Seq(struct(lit("java"), lit("Experts"))))
-   *     .agg(sum($"earnings"))
-   * }}}
-   *
-   * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
-   *      except for the aggregation.
-   *
-   * @param pivotColumn Name of the column to pivot.
-   * @param values List of values that will be translated to columns in the 
output DataFrame.
-   * @since 1.6.0
-   */
-  def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = 
{
-    pivot(Column(pivotColumn), values)
-  }
+  /** @inheritdoc */
+  override def pivot(pivotColumn: String, values: Seq[Any]): 
RelationalGroupedDataset =
+    super.pivot(pivotColumn, values)
 
-  /**
-   * (Java-specific) Pivots a column of the current `DataFrame` and performs 
the specified
-   * aggregation.
-   *
-   * There are two versions of pivot function: one that requires the caller to 
specify the list
-   * of distinct values to pivot on, and one that does not. The latter is more 
concise but less
-   * efficient, because Spark needs to first compute the list of distinct 
values internally.
-   *
-   * {{{
-   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
-   *   df.groupBy("year").pivot("course", Arrays.<Object>asList("dotNET", 
"Java")).sum("earnings");
-   *
-   *   // Or without specifying column values (less efficient)
-   *   df.groupBy("year").pivot("course").sum("earnings");
-   * }}}
-   *
-   * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
-   *      except for the aggregation.
-   *
-   * @param pivotColumn Name of the column to pivot.
-   * @param values List of values that will be translated to columns in the 
output DataFrame.
-   * @since 1.6.0
-   */
-  def pivot(pivotColumn: String, values: java.util.List[Any]): 
RelationalGroupedDataset = {
-    pivot(Column(pivotColumn), values)
+  /** @inheritdoc */
+  override def pivot(pivotColumn: String, values: java.util.List[Any]): 
RelationalGroupedDataset =
+    super.pivot(pivotColumn, values)
+
+  /** @inheritdoc */
+  override def pivot(pivotColumn: Column, values: java.util.List[Any]): 
RelationalGroupedDataset = {
+    super.pivot(pivotColumn, values)
   }
 
-  /**
-   * Pivots a column of the current `DataFrame` and performs the specified 
aggregation.
-   *
-   * Spark will eagerly compute the distinct values in `pivotColumn` so it can 
determine
-   * the resulting schema of the transformation. To avoid any eager 
computations, provide an
-   * explicit list of values via `pivot(pivotColumn: Column, values: 
Seq[Any])`.
-   *
-   * {{{
-   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
-   *   df.groupBy($"year").pivot($"course").sum($"earnings");
-   * }}}
-   *
-   * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
-   *      except for the aggregation.
-   *
-   * @param pivotColumn he column to pivot.
-   * @since 2.4.0
-   */
-  def pivot(pivotColumn: Column): RelationalGroupedDataset = {
+  /** @inheritdoc */
+  override def pivot(pivotColumn: Column): RelationalGroupedDataset =
     pivot(pivotColumn, collectPivotValues(df, pivotColumn))
-  }
 
-  /**
-   * Pivots a column of the current `DataFrame` and performs the specified 
aggregation.
-   * This is an overloaded version of the `pivot` method with `pivotColumn` of 
the `String` type.
-   *
-   * {{{
-   *   // Compute the sum of earnings for each year by course with each course 
as a separate column
-   *   df.groupBy($"year").pivot($"course", Seq("dotNET", 
"Java")).sum($"earnings")
-   * }}}
-   *
-   * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
-   *      except for the aggregation.
-   *
-   * @param pivotColumn the column to pivot.
-   * @param values List of values that will be translated to columns in the 
output DataFrame.
-   * @since 2.4.0
-   */
+  /** @inheritdoc */
   def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = 
{
     groupType match {
       case RelationalGroupedDataset.GroupByType =>
-        val valueExprs = values.map(_ match {
+        val valueExprs = values.map {
           case c: Column => c.expr
           case v =>
             try {
@@ -457,7 +209,7 @@ class RelationalGroupedDataset protected[sql](
               case _: SparkRuntimeException =>
                 throw QueryExecutionErrors.pivotColumnUnsupportedError(v, 
pivotColumn.expr)
             }
-        })
+        }
         new RelationalGroupedDataset(
           df,
           groupingExprs,
@@ -471,22 +223,6 @@ class RelationalGroupedDataset protected[sql](
     }
   }
 
-  /**
-   * (Java-specific) Pivots a column of the current `DataFrame` and performs 
the specified
-   * aggregation. This is an overloaded version of the `pivot` method with 
`pivotColumn` of
-   * the `String` type.
-   *
-   * @see `org.apache.spark.sql.Dataset.unpivot` for the reverse operation,
-   *      except for the aggregation.
-   *
-   * @param pivotColumn the column to pivot.
-   * @param values List of values that will be translated to columns in the 
output DataFrame.
-   * @since 2.4.0
-   */
-  def pivot(pivotColumn: Column, values: java.util.List[Any]): 
RelationalGroupedDataset = {
-    pivot(pivotColumn, values.asScala.toSeq)
-  }
-
   /**
    * Applies the given serialized R function `func` to each group of data. For 
each unique group,
    * the function will be passed the group key and an iterator that contains 
all of the elements in


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to