This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f3c8d26eb0c3 Revert "[SPARK-49422][CONNECT][SQL] Add groupByKey to 
sql/api"
f3c8d26eb0c3 is described below

commit f3c8d26eb0c3fd7f77950eb08c70bb2a9ab6493c
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Thu Sep 19 10:36:03 2024 +0900

    Revert "[SPARK-49422][CONNECT][SQL] Add groupByKey to sql/api"
    
    This reverts commit af45902d33c4d8e38a6427ac1d0c46fe057bb45a.
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 24 +++++---
 .../scala/org/apache/spark/sql/api/Dataset.scala   | 22 -------
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 68 ++++++++++++++++++----
 3 files changed, 75 insertions(+), 39 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index accfff9f2b07..161a0d9d265f 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -524,11 +524,27 @@ class Dataset[T] private[sql] (
     result(0)
   }
 
-  /** @inheritdoc */
+  /**
+   * (Scala-specific) Returns a [[KeyValueGroupedDataset]] where the data is 
grouped by the given
+   * key `func`.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
   def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
     KeyValueGroupedDatasetImpl[K, T](this, encoderFor[K], func)
   }
 
+  /**
+   * (Java-specific) Returns a [[KeyValueGroupedDataset]] where the data is 
grouped by the given
+   * key `func`.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def groupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]): 
KeyValueGroupedDataset[K, T] =
+    groupByKey(ToScalaUDF(func))(encoder)
+
   /** @inheritdoc */
   @scala.annotation.varargs
   def rollup(cols: Column*): RelationalGroupedDataset = {
@@ -1464,10 +1480,4 @@ class Dataset[T] private[sql] (
   /** @inheritdoc */
   @scala.annotation.varargs
   override def agg(expr: Column, exprs: Column*): DataFrame = super.agg(expr, 
exprs: _*)
-
-  /** @inheritdoc */
-  override def groupByKey[K](
-      func: MapFunction[T, K],
-      encoder: Encoder[K]): KeyValueGroupedDataset[K, T] =
-    super.groupByKey(func, encoder).asInstanceOf[KeyValueGroupedDataset[K, T]]
 }
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
index 7a3d6b0e0387..284a69fe6ee3 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
@@ -1422,28 +1422,6 @@ abstract class Dataset[T] extends Serializable {
    */
   def reduce(func: ReduceFunction[T]): T = reduce(ToScalaUDF(func))
 
-  /**
-   * (Scala-specific) Returns a [[KeyValueGroupedDataset]] where the data is 
grouped by the given
-   * key `func`.
-   *
-   * @group typedrel
-   * @since 2.0.0
-   */
-  def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T, DS]
-
-  /**
-   * (Java-specific) Returns a [[KeyValueGroupedDataset]] where the data is 
grouped by the given
-   * key `func`.
-   *
-   * @group typedrel
-   * @since 2.0.0
-   */
-  def groupByKey[K](
-      func: MapFunction[T, K],
-      encoder: Encoder[K]): KeyValueGroupedDataset[K, T, DS] = {
-    groupByKey(ToScalaUDF(func))(encoder)
-  }
-
   /**
    * Unpivot a DataFrame from wide format to long format, optionally leaving 
identifier columns
    * set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except 
for the aggregation,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index ef628ca612b4..61f9e6ff7c04 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -62,7 +62,7 @@ import 
org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanRelation, FileTable}
 import org.apache.spark.sql.execution.python.EvaluatePython
 import org.apache.spark.sql.execution.stat.StatFunctions
-import org.apache.spark.sql.internal.{DataFrameWriterImpl, 
DataFrameWriterV2Impl, MergeIntoWriterImpl, SQLConf}
+import org.apache.spark.sql.internal.{DataFrameWriterImpl, 
DataFrameWriterV2Impl, MergeIntoWriterImpl, SQLConf, ToScalaUDF}
 import org.apache.spark.sql.internal.ExpressionUtils.column
 import org.apache.spark.sql.internal.TypedAggUtils.withInputType
 import org.apache.spark.sql.streaming.DataStreamWriter
@@ -865,7 +865,24 @@ class Dataset[T] private[sql](
     Filter(condition.expr, logicalPlan)
   }
 
-  /** @inheritdoc */
+  /**
+   * Groups the Dataset using the specified columns, so we can run aggregation 
on them. See
+   * [[RelationalGroupedDataset]] for all the available aggregate functions.
+   *
+   * {{{
+   *   // Compute the average for all numeric columns grouped by department.
+   *   ds.groupBy($"department").avg()
+   *
+   *   // Compute the max age and average salary, grouped by department and 
gender.
+   *   ds.groupBy($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 2.0.0
+   */
   @scala.annotation.varargs
   def groupBy(cols: Column*): RelationalGroupedDataset = {
     RelationalGroupedDataset(toDF(), cols.map(_.expr), 
RelationalGroupedDataset.GroupByType)
@@ -897,7 +914,13 @@ class Dataset[T] private[sql](
     rdd.reduce(func)
   }
 
-  /** @inheritdoc */
+  /**
+   * (Scala-specific)
+   * Returns a [[KeyValueGroupedDataset]] where the data is grouped by the 
given key `func`.
+   *
+   * @group typedrel
+   * @since 2.0.0
+   */
   def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
     val withGroupingKey = AppendColumns(func, logicalPlan)
     val executed = sparkSession.sessionState.executePlan(withGroupingKey)
@@ -910,6 +933,16 @@ class Dataset[T] private[sql](
       withGroupingKey.newColumns)
   }
 
+  /**
+   * (Java-specific)
+   * Returns a [[KeyValueGroupedDataset]] where the data is grouped by the 
given key `func`.
+   *
+   * @group typedrel
+   * @since 2.0.0
+   */
+  def groupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]): 
KeyValueGroupedDataset[K, T] =
+    groupByKey(ToScalaUDF(func))(encoder)
+
   /** @inheritdoc */
   def unpivot(
       ids: Array[Column],
@@ -1607,7 +1640,28 @@ class Dataset[T] private[sql](
     new DataFrameWriterV2Impl[T](table, this)
   }
 
-  /** @inheritdoc */
+  /**
+   * Merges a set of updates, insertions, and deletions based on a source 
table into
+   * a target table.
+   *
+   * Scala Examples:
+   * {{{
+   *   spark.table("source")
+   *     .mergeInto("target", $"source.id" === $"target.id")
+   *     .whenMatched($"salary" === 100)
+   *     .delete()
+   *     .whenNotMatched()
+   *     .insertAll()
+   *     .whenNotMatchedBySource($"salary" === 100)
+   *     .update(Map(
+   *       "salary" -> lit(200)
+   *     ))
+   *     .merge()
+   * }}}
+   *
+   * @group basic
+   * @since 4.0.0
+   */
   def mergeInto(table: String, condition: Column): MergeIntoWriter[T] = {
     if (isStreaming) {
       logicalPlan.failAnalysis(
@@ -1970,12 +2024,6 @@ class Dataset[T] private[sql](
   @scala.annotation.varargs
   override def agg(expr: Column, exprs: Column*): DataFrame = super.agg(expr, 
exprs: _*)
 
-  /** @inheritdoc */
-  override def groupByKey[K](
-      func: MapFunction[T, K],
-      encoder: Encoder[K]): KeyValueGroupedDataset[K, T] =
-    super.groupByKey(func, encoder).asInstanceOf[KeyValueGroupedDataset[K, T]]
-
   ////////////////////////////////////////////////////////////////////////////
   // For Python API
   ////////////////////////////////////////////////////////////////////////////


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to