This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 2c5a9cfaa74 [SPARK-42542][CONNECT] Support Pivot without providing pivot column values 2c5a9cfaa74 is described below commit 2c5a9cfaa74b4fedf72ac8abf8c32a6536c4931d Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Mon Feb 27 18:39:57 2023 -0400 [SPARK-42542][CONNECT] Support Pivot without providing pivot column values ### What changes were proposed in this pull request? Add `Pivot` API when pivot column values are not provided. The decision here is that we push everything into server thus does not do max value validation for the pivot column on the client sides (both Scala and Python) now. ### Why are the changes needed? API coverage ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #40200 from amaliujia/pivot_2. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit fdb36df3f6005f2cdc3d76016a656117d62e1efc) Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../spark/sql/RelationalGroupedDataset.scala | 46 +++++++++++++++++++++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 4 ++ .../sql/connect/client/CompatibilitySuite.scala | 1 - .../pivot_without_column_values.explain | 4 ++ .../queries/pivot_without_column_values.json | 38 +++++++++++++++++ .../queries/pivot_without_column_values.proto.bin | Bin 0 -> 85 bytes 6 files changed, 92 insertions(+), 1 deletion(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index 89bc5bfec57..ed0642504b0 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -240,6 +240,31 @@ class RelationalGroupedDataset protected[sql] ( toDF(colNames.map(colName => functions.sum(colName))) } + /** + * Pivots a column of the current `DataFrame` and performs the specified aggregation. + * + * There are two versions of `pivot` function: one that requires the caller to specify the list + * of distinct values to pivot on, and one that does not. The latter is more concise but less + * efficient, because Spark needs to first compute the list of distinct values internally. + * + * {{{ + * // Compute the sum of earnings for each year by course with each course as a separate column + * df.groupBy("year").pivot("course", Seq("dotNET", "Java")).sum("earnings") + * + * // Or without specifying column values (less efficient) + * df.groupBy("year").pivot("course").sum("earnings") + * }}} + * + * @see + * `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, except for the + * aggregation. + * + * @param pivotColumn + * Name of the column to pivot. + * @since 3.4.0 + */ + def pivot(pivotColumn: String): RelationalGroupedDataset = pivot(Column(pivotColumn)) + /** * Pivots a column of the current `DataFrame` and performs the specified aggregation. There are * two versions of pivot function: one that requires the caller to specify the list of distinct @@ -350,6 +375,27 @@ class RelationalGroupedDataset protected[sql] ( } } + /** + * Pivots a column of the current `DataFrame` and performs the specified aggregation. This is an + * overloaded version of the `pivot` method with `pivotColumn` of the `String` type. + * + * {{{ + * // Or without specifying column values (less efficient) + * df.groupBy($"year").pivot($"course").sum($"earnings"); + * }}} + * + * @see + * `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, except for the + * aggregation. + * + * @param pivotColumn + * he column to pivot. + * @since 3.4.0 + */ + def pivot(pivotColumn: Column): RelationalGroupedDataset = { + pivot(pivotColumn, Seq()) + } + /** * (Java-specific) Pivots a column of the current `DataFrame` and performs the specified * aggregation. This is an overloaded version of the `pivot` method with `pivotColumn` of the diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index 52e5d892012..bc7111e9bf8 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -1970,6 +1970,10 @@ class PlanGenerationTestSuite simple.groupBy(Column("id")).pivot("a", Seq(1, 2, 3)).agg(functions.count(Column("b"))) } + test("pivot without column values") { + simple.groupBy(Column("id")).pivot("a").agg(functions.count(Column("b"))) + } + test("function lit") { simple.select( fn.lit(fn.col("id")), diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala index ccee3b550eb..0c86ddc3a87 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala @@ -143,7 +143,6 @@ class CompatibilitySuite extends ConnectFunSuite { // RelationalGroupedDataset ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.apply"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.as"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.pivot"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.RelationalGroupedDataset.this"), // SparkSession diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/pivot_without_column_values.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/pivot_without_column_values.explain new file mode 100644 index 00000000000..1a50919770c --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/pivot_without_column_values.explain @@ -0,0 +1,4 @@ +Project [id#0L] ++- Aggregate [id#0L], [id#0L, pivotfirst(a#0, count(b)#0L, 0, 0) AS __pivot_count(b) AS `count(b)`#0] + +- Aggregate [id#0L, a#0], [id#0L, a#0, count(b#0) AS count(b)#0L] + +- LocalRelation <empty>, [id#0L, a#0, b#0] diff --git a/connector/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.json b/connector/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.json new file mode 100644 index 00000000000..5218a88988e --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.json @@ -0,0 +1,38 @@ +{ + "common": { + "planId": "1" + }, + "aggregate": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double\u003e" + } + }, + "groupType": "GROUP_TYPE_PIVOT", + "groupingExpressions": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }], + "aggregateExpressions": [{ + "unresolvedFunction": { + "functionName": "count", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "b" + } + }] + } + }], + "pivot": { + "col": { + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + } + } + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.proto.bin new file mode 100644 index 00000000000..aee3c980eae Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/pivot_without_column_values.proto.bin differ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org