[
https://issues.apache.org/jira/browse/SPARK-44323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-44323:
-----------------------------------
Labels: pull-request-available (was: )
> Scala None shows up as null for Aggregator BUF or OUT
> -------------------------------------------------------
>
> Key: SPARK-44323
> URL: https://issues.apache.org/jira/browse/SPARK-44323
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.4.1
> Reporter: koert kuipers
> Priority: Major
> Labels: pull-request-available
>
> when doing an upgrade from spark 3.3.1 to spark 3.4.1 we suddenly started
> getting null pointer exceptions in Aggregators (classes extending
> org.apache.spark.sql.expressions.Aggregator) that use scala Option for BUF
> and/or OUT. basically None is now showing up as null.
> after adding a simple test case and doing a binary search on commits we
> landed on SPARK-37829 being the cause.
> we observed the issue at first with NPE inside Aggregator.merge because None
> was null. i am having a hard time replicating that in a spark unit test, but
> i did manage to get a None become null in the output. simple test that now
> fails:
>
> {code:java}
> diff --git
> a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
> b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
> index e9daa825dd4..a1959d7065d 100644
> ---
> a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
> +++
> b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
> @@ -228,6 +228,16 @@ case class FooAgg(s: Int) extends Aggregator[Row, Int,
> Int] {
> def outputEncoder: Encoder[Int] = Encoders.scalaInt
> }
>
> +object OptionStringAgg extends Aggregator[Option[String], Option[String],
> Option[String]] {
> + override def zero: Option[String] = None
> + override def reduce(b: Option[String], a: Option[String]): Option[String]
> = merge(b, a)
> + override def finish(reduction: Option[String]): Option[String] = reduction
> + override def merge(b1: Option[String], b2: Option[String]): Option[String]
> =
> + b1.map{ b1v => b2.map{ b2v => b1v ++ b2v }.getOrElse(b1v) }.orElse(b2)
> + override def bufferEncoder: Encoder[Option[String]] = ExpressionEncoder()
> + override def outputEncoder: Encoder[Option[String]] = ExpressionEncoder()
> +}
> +
> class DatasetAggregatorSuite extends QueryTest with SharedSparkSession {
> import testImplicits._
>
> @@ -432,4 +442,15 @@ class DatasetAggregatorSuite extends QueryTest with
> SharedSparkSession {
> val agg = df.select(mode(col("a"))).as[String]
> checkDataset(agg, "3")
> }
> +
> + test("typed aggregation: option string") {
> + val ds = Seq((1, Some("a")), (1, None), (1, Some("c")), (2, None)).toDS()
> +
> + checkDataset(
> + ds.groupByKey(_._1).mapValues(_._2).agg(
> + OptionStringAgg.toColumn
> + ),
> + (1, Some("ac")), (2, None)
> + )
> + }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]