This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f3c8d26eb0c3 Revert "[SPARK-49422][CONNECT][SQL] Add groupByKey to
sql/api"
f3c8d26eb0c3 is described below
commit f3c8d26eb0c3fd7f77950eb08c70bb2a9ab6493c
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Thu Sep 19 10:36:03 2024 +0900
Revert "[SPARK-49422][CONNECT][SQL] Add groupByKey to sql/api"
This reverts commit af45902d33c4d8e38a6427ac1d0c46fe057bb45a.
---
.../main/scala/org/apache/spark/sql/Dataset.scala | 24 +++++---
.../scala/org/apache/spark/sql/api/Dataset.scala | 22 -------
.../main/scala/org/apache/spark/sql/Dataset.scala | 68 ++++++++++++++++++----
3 files changed, 75 insertions(+), 39 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index accfff9f2b07..161a0d9d265f 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -524,11 +524,27 @@ class Dataset[T] private[sql] (
result(0)
}
- /** @inheritdoc */
+ /**
+ * (Scala-specific) Returns a [[KeyValueGroupedDataset]] where the data is
grouped by the given
+ * key `func`.
+ *
+ * @group typedrel
+ * @since 3.5.0
+ */
def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
KeyValueGroupedDatasetImpl[K, T](this, encoderFor[K], func)
}
+ /**
+ * (Java-specific) Returns a [[KeyValueGroupedDataset]] where the data is
grouped by the given
+ * key `func`.
+ *
+ * @group typedrel
+ * @since 3.5.0
+ */
+ def groupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]):
KeyValueGroupedDataset[K, T] =
+ groupByKey(ToScalaUDF(func))(encoder)
+
/** @inheritdoc */
@scala.annotation.varargs
def rollup(cols: Column*): RelationalGroupedDataset = {
@@ -1464,10 +1480,4 @@ class Dataset[T] private[sql] (
/** @inheritdoc */
@scala.annotation.varargs
override def agg(expr: Column, exprs: Column*): DataFrame = super.agg(expr,
exprs: _*)
-
- /** @inheritdoc */
- override def groupByKey[K](
- func: MapFunction[T, K],
- encoder: Encoder[K]): KeyValueGroupedDataset[K, T] =
- super.groupByKey(func, encoder).asInstanceOf[KeyValueGroupedDataset[K, T]]
}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
index 7a3d6b0e0387..284a69fe6ee3 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
@@ -1422,28 +1422,6 @@ abstract class Dataset[T] extends Serializable {
*/
def reduce(func: ReduceFunction[T]): T = reduce(ToScalaUDF(func))
- /**
- * (Scala-specific) Returns a [[KeyValueGroupedDataset]] where the data is
grouped by the given
- * key `func`.
- *
- * @group typedrel
- * @since 2.0.0
- */
- def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T, DS]
-
- /**
- * (Java-specific) Returns a [[KeyValueGroupedDataset]] where the data is
grouped by the given
- * key `func`.
- *
- * @group typedrel
- * @since 2.0.0
- */
- def groupByKey[K](
- func: MapFunction[T, K],
- encoder: Encoder[K]): KeyValueGroupedDataset[K, T, DS] = {
- groupByKey(ToScalaUDF(func))(encoder)
- }
-
/**
* Unpivot a DataFrame from wide format to long format, optionally leaving
identifier columns
* set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except
for the aggregation,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index ef628ca612b4..61f9e6ff7c04 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -62,7 +62,7 @@ import
org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation, FileTable}
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.stat.StatFunctions
-import org.apache.spark.sql.internal.{DataFrameWriterImpl,
DataFrameWriterV2Impl, MergeIntoWriterImpl, SQLConf}
+import org.apache.spark.sql.internal.{DataFrameWriterImpl,
DataFrameWriterV2Impl, MergeIntoWriterImpl, SQLConf, ToScalaUDF}
import org.apache.spark.sql.internal.ExpressionUtils.column
import org.apache.spark.sql.internal.TypedAggUtils.withInputType
import org.apache.spark.sql.streaming.DataStreamWriter
@@ -865,7 +865,24 @@ class Dataset[T] private[sql](
Filter(condition.expr, logicalPlan)
}
- /** @inheritdoc */
+ /**
+ * Groups the Dataset using the specified columns, so we can run aggregation
on them. See
+ * [[RelationalGroupedDataset]] for all the available aggregate functions.
+ *
+ * {{{
+ * // Compute the average for all numeric columns grouped by department.
+ * ds.groupBy($"department").avg()
+ *
+ * // Compute the max age and average salary, grouped by department and
gender.
+ * ds.groupBy($"department", $"gender").agg(Map(
+ * "salary" -> "avg",
+ * "age" -> "max"
+ * ))
+ * }}}
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
@scala.annotation.varargs
def groupBy(cols: Column*): RelationalGroupedDataset = {
RelationalGroupedDataset(toDF(), cols.map(_.expr),
RelationalGroupedDataset.GroupByType)
@@ -897,7 +914,13 @@ class Dataset[T] private[sql](
rdd.reduce(func)
}
- /** @inheritdoc */
+ /**
+ * (Scala-specific)
+ * Returns a [[KeyValueGroupedDataset]] where the data is grouped by the
given key `func`.
+ *
+ * @group typedrel
+ * @since 2.0.0
+ */
def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
val withGroupingKey = AppendColumns(func, logicalPlan)
val executed = sparkSession.sessionState.executePlan(withGroupingKey)
@@ -910,6 +933,16 @@ class Dataset[T] private[sql](
withGroupingKey.newColumns)
}
+ /**
+ * (Java-specific)
+ * Returns a [[KeyValueGroupedDataset]] where the data is grouped by the
given key `func`.
+ *
+ * @group typedrel
+ * @since 2.0.0
+ */
+ def groupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]):
KeyValueGroupedDataset[K, T] =
+ groupByKey(ToScalaUDF(func))(encoder)
+
/** @inheritdoc */
def unpivot(
ids: Array[Column],
@@ -1607,7 +1640,28 @@ class Dataset[T] private[sql](
new DataFrameWriterV2Impl[T](table, this)
}
- /** @inheritdoc */
+ /**
+ * Merges a set of updates, insertions, and deletions based on a source
table into
+ * a target table.
+ *
+ * Scala Examples:
+ * {{{
+ * spark.table("source")
+ * .mergeInto("target", $"source.id" === $"target.id")
+ * .whenMatched($"salary" === 100)
+ * .delete()
+ * .whenNotMatched()
+ * .insertAll()
+ * .whenNotMatchedBySource($"salary" === 100)
+ * .update(Map(
+ * "salary" -> lit(200)
+ * ))
+ * .merge()
+ * }}}
+ *
+ * @group basic
+ * @since 4.0.0
+ */
def mergeInto(table: String, condition: Column): MergeIntoWriter[T] = {
if (isStreaming) {
logicalPlan.failAnalysis(
@@ -1970,12 +2024,6 @@ class Dataset[T] private[sql](
@scala.annotation.varargs
override def agg(expr: Column, exprs: Column*): DataFrame = super.agg(expr,
exprs: _*)
- /** @inheritdoc */
- override def groupByKey[K](
- func: MapFunction[T, K],
- encoder: Encoder[K]): KeyValueGroupedDataset[K, T] =
- super.groupByKey(func, encoder).asInstanceOf[KeyValueGroupedDataset[K, T]]
-
////////////////////////////////////////////////////////////////////////////
// For Python API
////////////////////////////////////////////////////////////////////////////
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]