[ 
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

koert kuipers updated SPARK-15810:
----------------------------------
    Description: 
{noformat}
      val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
      val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }
      val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), 
Option[Int], Option[Int]]{
        def zero: Option[Int] = None

        def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = 
b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)

        def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v 
=> b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)

        def finish(reduction: Option[Int]): Option[Int] = reduction

        def bufferEncoder: Encoder[Option[Int]] = 
implicitly[Encoder[Option[Int]]]

        def outputEncoder: Encoder[Option[Int]] = 
implicitly[Encoder[Option[Int]]]
      }.toColumn)
      ds3.printSchema
      ds3.show
{noformat}

i get as output a somewhat odd looking schema, and after that the program just 
hangs pinning one cpu at 100%. the data never shows.
output:
{noformat}
root
 |-- value: string (nullable = true)
 |-- $anon$1(scala.Tuple2): struct (nullable = true)
 |    |-- value: integer (nullable = true)
{noformat}


  was:
{noformat}
      val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
      val df1 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) 
}.toDF("k", "v")
      val df2 = df1.groupBy("k").agg(new Aggregator[(String, Option[Int]), 
Option[Int], Option[Int]]{
        def zero: Option[Int] = None

        def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = 
b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)

        def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v 
=> b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)

        def finish(reduction: Option[Int]): Option[Int] = reduction

        def bufferEncoder: Encoder[Option[Int]] = 
implicitly[Encoder[Option[Int]]]

        def outputEncoder: Encoder[Option[Int]] = 
implicitly[Encoder[Option[Int]]]
      }.toColumn)
      df2.printSchema
      df2.show
{noformat}

i get as output a somewhat odd looking schema, and after that the program just 
hangs pinning one cpu at 100%. the data never shows.
output:
{noformat}
root
 |-- k: string (nullable = true)
 |-- $anon$1(org.apache.spark.sql.Row): struct (nullable = true)
 |    |-- value: integer (nullable = true)
{noformat}



> Aggregator doesn't play nice with Option
> ----------------------------------------
>
>                 Key: SPARK-15810
>                 URL: https://issues.apache.org/jira/browse/SPARK-15810
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>         Environment: spark 2.0.0-SNAPSHOT
>            Reporter: koert kuipers
>
> {noformat}
>       val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
>       val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }
>       val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, 
> Option[Int]), Option[Int], Option[Int]]{
>         def zero: Option[Int] = None
>         def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = 
> b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)
>         def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v 
> => b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)
>         def finish(reduction: Option[Int]): Option[Int] = reduction
>         def bufferEncoder: Encoder[Option[Int]] = 
> implicitly[Encoder[Option[Int]]]
>         def outputEncoder: Encoder[Option[Int]] = 
> implicitly[Encoder[Option[Int]]]
>       }.toColumn)
>       ds3.printSchema
>       ds3.show
> {noformat}
> i get as output a somewhat odd looking schema, and after that the program 
> just hangs pinning one cpu at 100%. the data never shows.
> output:
> {noformat}
> root
>  |-- value: string (nullable = true)
>  |-- $anon$1(scala.Tuple2): struct (nullable = true)
>  |    |-- value: integer (nullable = true)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to