Repository: spark Updated Branches: refs/heads/master 88134e736 -> 8cdb81fa8
[SPARK-15204][SQL] improve nullability inference for Aggregator ## What changes were proposed in this pull request? TypedAggregateExpression sets nullable based on the schema of the outputEncoder ## How was this patch tested? Add test in DatasetAggregatorSuite Author: Koert Kuipers <ko...@tresata.com> Closes #13532 from koertkuipers/feat-aggregator-nullable. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8cdb81fa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8cdb81fa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8cdb81fa Branch: refs/heads/master Commit: 8cdb81fa8264085b1bc04638b649b681ae871843 Parents: 88134e7 Author: Koert Kuipers <ko...@tresata.com> Authored: Mon Jul 4 12:14:14 2016 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Mon Jul 4 12:14:14 2016 +0800 ---------------------------------------------------------------------- .../sql/execution/aggregate/TypedAggregateExpression.scala | 8 ++++---- .../scala/org/apache/spark/sql/DatasetAggregatorSuite.scala | 9 +++++++++ 2 files changed, 13 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8cdb81fa/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala index 8bdfa48..2cdf470 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala @@ -51,7 +51,8 @@ object TypedAggregateExpression { bufferDeserializer, outputEncoder.serializer, outputEncoder.deserializer.dataType, - outputType) + outputType, + !outputEncoder.flat || outputEncoder.schema.head.nullable) } } @@ -65,9 +66,8 @@ case class TypedAggregateExpression( bufferDeserializer: Expression, outputSerializer: Seq[Expression], outputExternalType: DataType, - dataType: DataType) extends DeclarativeAggregate with NonSQLExpression { - - override def nullable: Boolean = true + dataType: DataType, + nullable: Boolean) extends DeclarativeAggregate with NonSQLExpression { override def deterministic: Boolean = true http://git-wip-us.apache.org/repos/asf/spark/blob/8cdb81fa/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala index 32fcf84..ddc4dcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala @@ -305,4 +305,13 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext { val ds = Seq(1, 2, 3).toDS() checkDataset(ds.select(MapTypeBufferAgg.toColumn), 1) } + + test("SPARK-15204 improve nullability inference for Aggregator") { + val ds1 = Seq(1, 3, 2, 5).toDS() + assert(ds1.select(typed.sum((i: Int) => i)).schema.head.nullable === false) + val ds2 = Seq(AggData(1, "a"), AggData(2, "a")).toDS() + assert(ds2.select(SeqAgg.toColumn).schema.head.nullable === true) + val ds3 = sql("SELECT 'Some String' AS b, 1279869254 AS a").as[AggData] + assert(ds3.select(NameAgg.toColumn).schema.head.nullable === true) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org