Repository: spark
Updated Branches:
  refs/heads/master 38628dd1b -> ab2eafb3c


[SPARK-26085][SQL] Key attribute of non-struct type under typed aggregation 
should be named as "key" too

## What changes were proposed in this pull request?

When doing typed aggregation on a Dataset, for struct key type, the key 
attribute is named as "key". But for non-struct type, the key attribute is 
named as "value". This key attribute should also be named as "key" for 
non-struct type.

## How was this patch tested?

Added test.

Closes #23054 from viirya/SPARK-26085.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab2eafb3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab2eafb3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab2eafb3

Branch: refs/heads/master
Commit: ab2eafb3cdc7631452650c6cac03a92629255347
Parents: 38628dd
Author: Liang-Chi Hsieh <[email protected]>
Authored: Thu Nov 22 10:50:01 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Thu Nov 22 10:50:01 2018 +0800

----------------------------------------------------------------------
 docs/sql-migration-guide-upgrade.md                     |  2 ++
 .../scala/org/apache/spark/sql/internal/SQLConf.scala   | 12 ++++++++++++
 .../org/apache/spark/sql/KeyValueGroupedDataset.scala   |  7 ++++++-
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala  | 10 ++++++++++
 4 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ab2eafb3/docs/sql-migration-guide-upgrade.md
----------------------------------------------------------------------
diff --git a/docs/sql-migration-guide-upgrade.md 
b/docs/sql-migration-guide-upgrade.md
index e8f2bcc..397ca59 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -20,6 +20,8 @@ displayTitle: Spark SQL Upgrading Guide
   - The `ADD JAR` command previously returned a result set with the single 
value 0. It now returns an empty result set.
 
   - In Spark version 2.4 and earlier, users can create map values with map 
type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since 
Spark 3.0, it's not allowed to create map values with map type key with these 
built-in functions. Users can still read map values with map type key from data 
source or Java/Scala collections, though they are not very useful.
+  
+  - In Spark version 2.4 and earlier, `Dataset.groupByKey` results to a 
grouped dataset with key attribute wrongly named as "value", if the key is 
non-struct type, e.g. int, string, array, etc. This is counterintuitive and 
makes the schema of aggregation queries weird. For example, the schema of 
`ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the 
grouping attribute to "key". The old behaviour is preserved under a newly added 
configuration `spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue` with a 
default value of `false`.
 
 ## Upgrading From Spark SQL 2.3 to 2.4
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ab2eafb3/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index cc0e972..7bcf215 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1595,6 +1595,15 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE =
+    buildConf("spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue")
+      .internal()
+      .doc("When set to true, the key attribute resulted from running 
`Dataset.groupByKey` " +
+        "for non-struct key type, will be named as `value`, following the 
behavior of Spark " +
+        "version 2.4 and earlier.")
+      .booleanConf
+      .createWithDefault(false)
+
   val MAX_TO_STRING_FIELDS = buildConf("spark.sql.debug.maxToStringFields")
     .doc("Maximum number of fields of sequence-like entries can be converted 
to strings " +
       "in debug output. Any elements beyond the limit will be dropped and 
replaced by a" +
@@ -2016,6 +2025,9 @@ class SQLConf extends Serializable with Logging {
 
   def integralDivideReturnLong: Boolean = 
getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG)
 
+  def nameNonStructGroupingKeyAsValue: Boolean =
+    getConf(SQLConf.NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE)
+
   def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)
 
   /** ********************** SQLConf functionality methods ************ */

http://git-wip-us.apache.org/repos/asf/spark/blob/ab2eafb3/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 7a47242..2d849c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
Attribute, CreateStruct
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.expressions.ReduceAggregator
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, 
OutputMode}
 
 /**
@@ -459,7 +460,11 @@ class KeyValueGroupedDataset[K, V] private[sql](
       columns.map(_.withInputType(vExprEnc, dataAttributes).named)
     val keyColumn = if (!kExprEnc.isSerializedAsStruct) {
       assert(groupingAttributes.length == 1)
-      groupingAttributes.head
+      if (SQLConf.get.nameNonStructGroupingKeyAsValue) {
+        groupingAttributes.head
+      } else {
+        Alias(groupingAttributes.head, "key")()
+      }
     } else {
       Alias(CreateStruct(groupingAttributes), "key")()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ab2eafb3/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 540fbff..baece2d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1572,6 +1572,16 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
     checkDatasetUnorderly(agg, ((1, 2), 1L, 3L), ((2, 3), 2L, 4L), ((3, 4), 
3L, 5L))
   }
 
+  test("SPARK-26085: fix key attribute name for atomic type for typed 
aggregation") {
+    val ds = Seq(1, 2, 3).toDS()
+    assert(ds.groupByKey(x => x).count().schema.head.name == "key")
+
+    // Enable legacy flag to follow previous Spark behavior
+    withSQLConf(SQLConf.NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE.key -> "true") {
+      assert(ds.groupByKey(x => x).count().schema.head.name == "value")
+    }
+  }
+
   test("SPARK-8288: class with only a companion object constructor") {
     val data = Seq(ScroogeLikeExample(1), ScroogeLikeExample(2))
     val ds = data.toDS


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to