[
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15720387#comment-15720387
]
koert kuipers commented on SPARK-15810:
---------------------------------------
next, again with spark 2.1.x branch i wanted to test an aggregator that
calculates min and max in one go:
{noformat}
val agg1 = new Aggregator[Int, Option[(Int, Int)], Option[(Int, Int)]] {
def zero: Option[(Int, Int)] = {
val x = None
println(s"zero ${x}")
x
}
def reduce(b: Option[(Int, Int)], a: Int): Option[(Int, Int)] = {
println(s"reduce ${b} and ${a}")
b.map(minMax => (math.min(minMax._1, a), math.max(minMax._2,
a))).orElse(Some((a, a)))
}
def merge(b1: Option[(Int, Int)], b2: Option[(Int, Int)]): Option[(Int,
Int)] = {
println(s"merge ${b1} and ${b2}")
(b1, b2) match {
case (Some((min1, max1)), Some((min2, max2))) => Some((math.min(min1,
min2), math.max(max1, max2)))
case (Some(minMax1), _) => Some(minMax1)
case (_, Some(minMax2)) => Some(minMax2)
case _ => None
}
}
def finish(reduction: Option[(Int, Int)]): Option[(Int, Int)] = {
println(s"finish ${reduction}")
reduction
}
def bufferEncoder: Encoder[Option[(Int, Int)]] =
ExpressionEncoder[Option[(Int, Int)]]()
def outputEncoder: Encoder[Option[(Int, Int)]] =
ExpressionEncoder[Option[(Int, Int)]]()
}
val x = Seq(("a", 1), ("a", 2))
.toDS
.groupByKey(_._1)
.mapValues(_._2)
.agg(agg1.toColumn)
x.printSchema
x.show
{noformat}
this gives:
{noformat}
java.lang.UnsupportedOperationException: Cannot create encoder for Option of
Product type, because Product type is represented as a row, and the entire row
can not be null in Spark SQL like normal databases. You can wrap your type with
Tuple1 if you do want top level null Product objects, e.g. instead of creating
`Dataset[Option[MyClass]]`, you can do something like `val ds:
Dataset[Tuple1[MyClass]] = Seq(Tuple1(MyClass(...)), Tuple1(null)).toDS`
{noformat}
> Aggregator doesn't play nice with Option
> ----------------------------------------
>
> Key: SPARK-15810
> URL: https://issues.apache.org/jira/browse/SPARK-15810
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Environment: spark 2.0.0-SNAPSHOT
> Reporter: koert kuipers
>
> {code}
> val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
> val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }
> val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]),
> Option[Int], Option[Int]]{
> def zero: Option[Int] = None
> def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] =
> b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)
> def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v =>
> b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)
> def finish(reduction: Option[Int]): Option[Int] = reduction
> def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
> def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
> }.toColumn)
> ds3.printSchema
> ds3.show
> {code}
> i get as output a somewhat odd looking schema, and after that the program
> just hangs pinning one cpu at 100%. the data never shows.
> output:
> {noformat}
> root
> |-- value: string (nullable = true)
> |-- $anon$1(scala.Tuple2): struct (nullable = true)
> | |-- value: integer (nullable = true)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]