Repository: spark Updated Branches: refs/heads/master 38628dd1b -> ab2eafb3c
[SPARK-26085][SQL] Key attribute of non-struct type under typed aggregation should be named as "key" too ## What changes were proposed in this pull request? When doing typed aggregation on a Dataset, for struct key type, the key attribute is named as "key". But for non-struct type, the key attribute is named as "value". This key attribute should also be named as "key" for non-struct type. ## How was this patch tested? Added test. Closes #23054 from viirya/SPARK-26085. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab2eafb3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab2eafb3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab2eafb3 Branch: refs/heads/master Commit: ab2eafb3cdc7631452650c6cac03a92629255347 Parents: 38628dd Author: Liang-Chi Hsieh <[email protected]> Authored: Thu Nov 22 10:50:01 2018 +0800 Committer: Wenchen Fan <[email protected]> Committed: Thu Nov 22 10:50:01 2018 +0800 ---------------------------------------------------------------------- docs/sql-migration-guide-upgrade.md | 2 ++ .../scala/org/apache/spark/sql/internal/SQLConf.scala | 12 ++++++++++++ .../org/apache/spark/sql/KeyValueGroupedDataset.scala | 7 ++++++- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 10 ++++++++++ 4 files changed, 30 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ab2eafb3/docs/sql-migration-guide-upgrade.md ---------------------------------------------------------------------- diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index e8f2bcc..397ca59 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -20,6 +20,8 @@ displayTitle: Spark SQL Upgrading Guide - The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set. - In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful. + + - In Spark version 2.4 and earlier, `Dataset.groupByKey` results to a grouped dataset with key attribute wrongly named as "value", if the key is non-struct type, e.g. int, string, array, etc. This is counterintuitive and makes the schema of aggregation queries weird. For example, the schema of `ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the grouping attribute to "key". The old behaviour is preserved under a newly added configuration `spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue` with a default value of `false`. ## Upgrading From Spark SQL 2.3 to 2.4 http://git-wip-us.apache.org/repos/asf/spark/blob/ab2eafb3/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index cc0e972..7bcf215 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1595,6 +1595,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE = + buildConf("spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue") + .internal() + .doc("When set to true, the key attribute resulted from running `Dataset.groupByKey` " + + "for non-struct key type, will be named as `value`, following the behavior of Spark " + + "version 2.4 and earlier.") + .booleanConf + .createWithDefault(false) + val MAX_TO_STRING_FIELDS = buildConf("spark.sql.debug.maxToStringFields") .doc("Maximum number of fields of sequence-like entries can be converted to strings " + "in debug output. Any elements beyond the limit will be dropped and replaced by a" + @@ -2016,6 +2025,9 @@ class SQLConf extends Serializable with Logging { def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG) + def nameNonStructGroupingKeyAsValue: Boolean = + getConf(SQLConf.NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE) + def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS) /** ********************** SQLConf functionality methods ************ */ http://git-wip-us.apache.org/repos/asf/spark/blob/ab2eafb3/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 7a47242..2d849c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateStruct import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.expressions.ReduceAggregator +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode} /** @@ -459,7 +460,11 @@ class KeyValueGroupedDataset[K, V] private[sql]( columns.map(_.withInputType(vExprEnc, dataAttributes).named) val keyColumn = if (!kExprEnc.isSerializedAsStruct) { assert(groupingAttributes.length == 1) - groupingAttributes.head + if (SQLConf.get.nameNonStructGroupingKeyAsValue) { + groupingAttributes.head + } else { + Alias(groupingAttributes.head, "key")() + } } else { Alias(CreateStruct(groupingAttributes), "key")() } http://git-wip-us.apache.org/repos/asf/spark/blob/ab2eafb3/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 540fbff..baece2d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1572,6 +1572,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext { checkDatasetUnorderly(agg, ((1, 2), 1L, 3L), ((2, 3), 2L, 4L), ((3, 4), 3L, 5L)) } + test("SPARK-26085: fix key attribute name for atomic type for typed aggregation") { + val ds = Seq(1, 2, 3).toDS() + assert(ds.groupByKey(x => x).count().schema.head.name == "key") + + // Enable legacy flag to follow previous Spark behavior + withSQLConf(SQLConf.NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE.key -> "true") { + assert(ds.groupByKey(x => x).count().schema.head.name == "value") + } + } + test("SPARK-8288: class with only a companion object constructor") { val data = Seq(ScroogeLikeExample(1), ScroogeLikeExample(2)) val ds = data.toDS --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
