Repository: spark Updated Branches: refs/heads/master 8148f19ca -> 99e32f8ba
[SPARK-22224][SQL] Override toString of KeyValue/Relational-GroupedDataset ## What changes were proposed in this pull request? #### before ```scala scala> val words = spark.read.textFile("README.md").flatMap(_.split(" ")) words: org.apache.spark.sql.Dataset[String] = [value: string] scala> val grouped = words.groupByKey(identity) grouped: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = org.apache.spark.sql.KeyValueGroupedDataset65214862 ``` #### after ```scala scala> val words = spark.read.textFile("README.md").flatMap(_.split(" ")) words: org.apache.spark.sql.Dataset[String] = [value: string] scala> val grouped = words.groupByKey(identity) grouped: org.apache.spark.sql.KeyValueGroupedDataset[String,String] = [key: [value: string], value: [value: string]] ``` ## How was this patch tested? existing ut cc gatorsmile cloud-fan Author: Kent Yao <yaooq...@hotmail.com> Closes #19363 from yaooqinn/minor-dataset-tostring. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99e32f8b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99e32f8b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99e32f8b Branch: refs/heads/master Commit: 99e32f8ba5d908d5408e9857fd96ac1d7d7e5876 Parents: 8148f19 Author: Kent Yao <yaooq...@hotmail.com> Authored: Tue Oct 17 17:58:45 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Tue Oct 17 17:58:45 2017 +0800 ---------------------------------------------------------------------- .../spark/sql/KeyValueGroupedDataset.scala | 22 ++++++- .../spark/sql/RelationalGroupedDataset.scala | 19 +++++- .../org/apache/spark/sql/DatasetSuite.scala | 61 ++++++++++++++++++++ .../scala/org/apache/spark/sql/QueryTest.scala | 12 +--- 4 files changed, 100 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/99e32f8b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index cb42e9e..6bab21d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -24,7 +24,6 @@ import org.apache.spark.api.java.function._ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateStruct} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.expressions.ReduceAggregator import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode} @@ -564,4 +563,25 @@ class KeyValueGroupedDataset[K, V] private[sql]( encoder: Encoder[R]): Dataset[R] = { cogroup(other)((key, left, right) => f.call(key, left.asJava, right.asJava).asScala)(encoder) } + + override def toString: String = { + val builder = new StringBuilder + val kFields = kExprEnc.schema.map { + case f => s"${f.name}: ${f.dataType.simpleString(2)}" + } + val vFields = vExprEnc.schema.map { + case f => s"${f.name}: ${f.dataType.simpleString(2)}" + } + builder.append("KeyValueGroupedDataset: [key: [") + builder.append(kFields.take(2).mkString(", ")) + if (kFields.length > 2) { + builder.append(" ... " + (kFields.length - 2) + " more field(s)") + } + builder.append("], value: [") + builder.append(vFields.take(2).mkString(", ")) + if (vFields.length > 2) { + builder.append(" ... " + (vFields.length - 2) + " more field(s)") + } + builder.append("]]").toString() + } } http://git-wip-us.apache.org/repos/asf/spark/blob/99e32f8b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index cd0ac1f..33ec3a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression import org.apache.spark.sql.execution.python.PythonUDF import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{NumericType, StructField, StructType} +import org.apache.spark.sql.types.{NumericType, StructType} /** * A set of methods for aggregations on a `DataFrame`, created by [[Dataset#groupBy groupBy]], @@ -465,6 +465,19 @@ class RelationalGroupedDataset protected[sql]( Dataset.ofRows(df.sparkSession, plan) } + + override def toString: String = { + val builder = new StringBuilder + builder.append("RelationalGroupedDataset: [grouping expressions: [") + val kFields = groupingExprs.map(_.asInstanceOf[NamedExpression]).map { + case f => s"${f.name}: ${f.dataType.simpleString(2)}" + } + builder.append(kFields.take(2).mkString(", ")) + if (kFields.length > 2) { + builder.append(" ... " + (kFields.length - 2) + " more field(s)") + } + builder.append(s"], value: ${df.toString}, type: $groupType]").toString() + } } private[sql] object RelationalGroupedDataset { @@ -479,7 +492,9 @@ private[sql] object RelationalGroupedDataset { /** * The Grouping Type */ - private[sql] trait GroupType + private[sql] trait GroupType { + override def toString: String = getClass.getSimpleName.stripSuffix("$").stripSuffix("Type") + } /** * To indicate it's the GroupBy http://git-wip-us.apache.org/repos/asf/spark/blob/99e32f8b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index dace682..1537ce3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1341,8 +1341,69 @@ class DatasetSuite extends QueryTest with SharedSQLContext { Seq(1).toDS().map(_ => ("", TestForTypeAlias.seqOfTupleTypeAlias)), ("", Seq((1, 1), (2, 2)))) } + + test("Check RelationalGroupedDataset toString: Single data") { + val kvDataset = (1 to 3).toDF("id").groupBy("id") + val expected = "RelationalGroupedDataset: [" + + "grouping expressions: [id: int], value: [id: int], type: GroupBy]" + val actual = kvDataset.toString + assert(expected === actual) + } + + test("Check RelationalGroupedDataset toString: over length schema ") { + val kvDataset = (1 to 3).map( x => (x, x.toString, x.toLong)) + .toDF("id", "val1", "val2").groupBy("id") + val expected = "RelationalGroupedDataset:" + + " [grouping expressions: [id: int]," + + " value: [id: int, val1: string ... 1 more field]," + + " type: GroupBy]" + val actual = kvDataset.toString + assert(expected === actual) + } + + + test("Check KeyValueGroupedDataset toString: Single data") { + val kvDataset = (1 to 3).toDF("id").as[SingleData].groupByKey(identity) + val expected = "KeyValueGroupedDataset: [key: [id: int], value: [id: int]]" + val actual = kvDataset.toString + assert(expected === actual) + } + + test("Check KeyValueGroupedDataset toString: Unnamed KV-pair") { + val kvDataset = (1 to 3).map(x => (x, x.toString)) + .toDF("id", "val1").as[DoubleData].groupByKey(x => (x.id, x.val1)) + val expected = "KeyValueGroupedDataset:" + + " [key: [_1: int, _2: string]," + + " value: [id: int, val1: string]]" + val actual = kvDataset.toString + assert(expected === actual) + } + + test("Check KeyValueGroupedDataset toString: Named KV-pair") { + val kvDataset = (1 to 3).map( x => (x, x.toString)) + .toDF("id", "val1").as[DoubleData].groupByKey(x => DoubleData(x.id, x.val1)) + val expected = "KeyValueGroupedDataset:" + + " [key: [id: int, val1: string]," + + " value: [id: int, val1: string]]" + val actual = kvDataset.toString + assert(expected === actual) + } + + test("Check KeyValueGroupedDataset toString: over length schema ") { + val kvDataset = (1 to 3).map( x => (x, x.toString, x.toLong)) + .toDF("id", "val1", "val2").as[TripleData].groupByKey(identity) + val expected = "KeyValueGroupedDataset:" + + " [key: [id: int, val1: string ... 1 more field(s)]," + + " value: [id: int, val1: string ... 1 more field(s)]]" + val actual = kvDataset.toString + assert(expected === actual) + } } +case class SingleData(id: Int) +case class DoubleData(id: Int, val1: String) +case class TripleData(id: Int, val1: String, val2: Long) + case class WithImmutableMap(id: String, map_test: scala.collection.immutable.Map[Long, String]) case class WithMap(id: String, map_test: scala.collection.Map[Long, String]) case class WithMapInOption(m: Option[scala.collection.Map[Int, Int]]) http://git-wip-us.apache.org/repos/asf/spark/blob/99e32f8b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index f980883..fcaca3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -17,23 +17,13 @@ package org.apache.spark.sql -import java.util.{ArrayDeque, Locale, TimeZone} +import java.util.{Locale, TimeZone} import scala.collection.JavaConverters._ -import scala.util.control.NonFatal -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression import org.apache.spark.sql.execution.columnar.InMemoryRelation -import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.execution.streaming.MemoryPlan -import org.apache.spark.sql.types.{Metadata, ObjectType} abstract class QueryTest extends PlanTest { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org