Repository: spark
Updated Branches:
  refs/heads/master a381bce72 -> 4de0425df


[SPARK-24569][SQL] Aggregator with output type Option should produce consistent 
schema

## What changes were proposed in this pull request?

SQL `Aggregator` with output type `Option[Boolean]` creates column of type 
`StructType`. It's not in consistency with a Dataset of similar java class.

This changes the way `definedByConstructorParams` checks given type. For 
`Option[_]`, it goes to check its type argument.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <[email protected]>

Closes #21611 from viirya/SPARK-24569.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4de0425d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4de0425d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4de0425d

Branch: refs/heads/master
Commit: 4de0425df8d2545718a0583bc26592108aebc5ac
Parents: a381bce
Author: Liang-Chi Hsieh <[email protected]>
Authored: Sat Jul 7 10:54:14 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Sat Jul 7 10:54:14 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/ScalaReflection.scala    |  7 ++-
 .../spark/sql/DatasetAggregatorSuite.scala      | 60 ++++++++++++++++++++
 .../org/apache/spark/sql/DatasetSuite.scala     | 11 ++++
 3 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4de0425d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index f9acc20..4543bba 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -798,7 +798,12 @@ object ScalaReflection extends ScalaReflection {
    * Whether the fields of the given type is defined entirely by its 
constructor parameters.
    */
   def definedByConstructorParams(tpe: Type): Boolean = 
cleanUpReflectionObjects {
-    tpe.dealias <:< localTypeOf[Product] || tpe.dealias <:< 
localTypeOf[DefinedByConstructorParams]
+    tpe.dealias match {
+      // `Option` is a `Product`, but we don't wanna treat `Option[Int]` as a 
struct type.
+      case t if t <:< localTypeOf[Option[_]] => 
definedByConstructorParams(t.typeArgs.head)
+      case _ => tpe.dealias <:< localTypeOf[Product] ||
+        tpe.dealias <:< localTypeOf[DefinedByConstructorParams]
+    }
   }
 
   private val javaKeywords = Set("abstract", "assert", "boolean", "break", 
"byte", "case", "catch",

http://git-wip-us.apache.org/repos/asf/spark/blob/4de0425d/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
index 0e7eaa9..538ea3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
@@ -148,6 +148,41 @@ object VeryComplexResultAgg extends Aggregator[Row, 
String, ComplexAggData] {
 }
 
 
+case class OptionBooleanData(name: String, isGood: Option[Boolean])
+
+case class OptionBooleanAggregator(colName: String)
+    extends Aggregator[Row, Option[Boolean], Option[Boolean]] {
+
+  override def zero: Option[Boolean] = None
+
+  override def reduce(buffer: Option[Boolean], row: Row): Option[Boolean] = {
+    val index = row.fieldIndex(colName)
+    val value = if (row.isNullAt(index)) {
+      Option.empty[Boolean]
+    } else {
+      Some(row.getBoolean(index))
+    }
+    merge(buffer, value)
+  }
+
+  override def merge(b1: Option[Boolean], b2: Option[Boolean]): 
Option[Boolean] = {
+    if ((b1.isDefined && b1.get) || (b2.isDefined && b2.get)) {
+      Some(true)
+    } else if (b1.isDefined) {
+      b1
+    } else {
+      b2
+    }
+  }
+
+  override def finish(reduction: Option[Boolean]): Option[Boolean] = reduction
+
+  override def bufferEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder
+  override def outputEncoder: Encoder[Option[Boolean]] = OptionalBoolEncoder
+
+  def OptionalBoolEncoder: Encoder[Option[Boolean]] = ExpressionEncoder()
+}
+
 class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
 
@@ -333,4 +368,29 @@ class DatasetAggregatorSuite extends QueryTest with 
SharedSQLContext {
       df.groupBy($"i").agg(VeryComplexResultAgg.toColumn),
       Row(1, Row(Row(1, "a"), Row(1, "a"))) :: Row(2, Row(Row(2, "bc"), Row(2, 
"bc"))) :: Nil)
   }
+
+  test("SPARK-24569: Aggregator with output type Option[Boolean] creates 
column of type Row") {
+    val df = Seq(
+      OptionBooleanData("bob", Some(true)),
+      OptionBooleanData("bob", Some(false)),
+      OptionBooleanData("bob", None)).toDF()
+    val group = df
+      .groupBy("name")
+      .agg(OptionBooleanAggregator("isGood").toColumn.alias("isGood"))
+    assert(df.schema == group.schema)
+    checkAnswer(group, Row("bob", true) :: Nil)
+    checkDataset(group.as[OptionBooleanData], OptionBooleanData("bob", 
Some(true)))
+  }
+
+  test("SPARK-24569: groupByKey with Aggregator of output type 
Option[Boolean]") {
+    val df = Seq(
+      OptionBooleanData("bob", Some(true)),
+      OptionBooleanData("bob", Some(false)),
+      OptionBooleanData("bob", None)).toDF()
+    val grouped = df.groupByKey((r: Row) => r.getString(0))
+      .agg(OptionBooleanAggregator("isGood").toColumn).toDF("name", "isGood")
+
+    assert(grouped.schema == df.schema)
+    checkDataset(grouped.as[OptionBooleanData], OptionBooleanData("bob", 
Some(true)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4de0425d/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 2d20c50..ce8db99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1467,6 +1467,17 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
     intercept[NullPointerException](ds.as[(Int, Int)].collect())
   }
 
+  test("SPARK-24569: Option of primitive types are mistakenly mapped to struct 
type") {
+    withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
+      val a = Seq(Some(1)).toDS
+      val b = Seq(Some(1.2)).toDS
+      val expected = Seq((Some(1), Some(1.2))).toDS
+      val joined = a.joinWith(b, lit(true))
+      assert(joined.schema == expected.schema)
+      checkDataset(joined, expected.collect: _*)
+    }
+  }
+
   test("SPARK-24548: Dataset with tuple encoders should have correct schema") {
     val encoder = Encoders.tuple(newStringEncoder,
       Encoders.tuple(newStringEncoder, newStringEncoder))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to