[ 
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15720397#comment-15720397
 ] 

koert kuipers commented on SPARK-15810:
---------------------------------------

to continue the previous example: wrapping all my Option[(Int, Int)] in Tuple1 
turns the code into:

{noformat}
  val agg1 = new Aggregator[Int, Tuple1[Option[(Int, Int)]], 
Tuple1[Option[(Int, Int)]]] {
    def zero: Tuple1[Option[(Int, Int)]] = {
      val x = Tuple1(None)
      println(s"zero ${x}")
      x
    }

    def reduce(b: Tuple1[Option[(Int, Int)]], a: Int): Tuple1[Option[(Int, 
Int)]] = {
      println(s"reduce ${b} and ${a}") 
      Tuple1(b._1.map(minMax => (math.min(minMax._1, a), math.max(minMax._2, 
a))).orElse(Some((a, a))))
    }

    def merge(b1: Tuple1[Option[(Int, Int)]], b2: Tuple1[Option[(Int, Int)]]): 
Tuple1[Option[(Int, Int)]] = {
      println(s"merge ${b1} and ${b2}")
      (b1._1, b2._1) match {
        case (Some((min1, max1)), Some((min2, max2))) => 
Tuple1(Some((math.min(min1, min2), math.max(max1, max2))))
        case (Some(minMax1), _) => Tuple1(Some(minMax1))
        case (_, Some(minMax2)) => Tuple1(Some(minMax2))
        case _ => Tuple1(None)
      }
    }

    def finish(reduction: Tuple1[Option[(Int, Int)]]): Tuple1[Option[(Int, 
Int)]] = {
      println(s"finish ${reduction}")
      reduction
    }

    def bufferEncoder: Encoder[Tuple1[Option[(Int, Int)]]] = 
ExpressionEncoder[Tuple1[Option[(Int, Int)]]]()

    def outputEncoder: Encoder[Tuple1[Option[(Int, Int)]]] = 
ExpressionEncoder[Tuple1[Option[(Int, Int)]]]()
  }

  val x = Seq(("a", 1), ("a", 2))
    .toDS
    .groupByKey(_._1)
    .mapValues(_._2)
    .agg(agg1.toColumn)
  x.printSchema
  x.show
{noformat}

this seems to run. the output still has the issue with the option coming out as 
a struct, but besides that it seems to work:
{noformat}
root
 |-- value: string (nullable = true)
 |-- anon$1(int): struct (nullable = true)
 |    |-- _1: struct (nullable = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: integer (nullable = false)

zero (None)
zero (None)
reduce (None) and 1
reduce (None) and 2
zero (None)
merge (None) and (Some((1,1)))
merge (Some((1,1))) and (Some((2,2)))
finish (Some((1,2)))
+-----+-----------+
|value|anon$1(int)|
+-----+-----------+
|    a|    [[1,2]]|
+-----+-----------+
{noformat}

> Aggregator doesn't play nice with Option
> ----------------------------------------
>
>                 Key: SPARK-15810
>                 URL: https://issues.apache.org/jira/browse/SPARK-15810
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>         Environment: spark 2.0.0-SNAPSHOT
>            Reporter: koert kuipers
>
> {code}
> val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
> val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }
> val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), 
> Option[Int], Option[Int]]{
>   def zero: Option[Int] = None
>   def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = 
> b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)
>   def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => 
> b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)
>   def finish(reduction: Option[Int]): Option[Int] = reduction
>   def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
>   def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
> }.toColumn)
> ds3.printSchema
> ds3.show
> {code}
> i get as output a somewhat odd looking schema, and after that the program 
> just hangs pinning one cpu at 100%. the data never shows.
> output:
> {noformat}
> root
>  |-- value: string (nullable = true)
>  |-- $anon$1(scala.Tuple2): struct (nullable = true)
>  |    |-- value: integer (nullable = true)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to