[ 
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15369509#comment-15369509
 ] 

Amit Sela edited comment on SPARK-15810 at 7/10/16 9:01 AM:
------------------------------------------------------------

Running the (sort of) same Java code:
{code}
    SparkSession session = SparkSession.builder()
                                       .appName("TestAggregatorJava")
                                       .master("local[*]")
                                       .getOrCreate();
    Dataset<Tuple2<String, Integer>> ds1 = session.createDataset(Arrays.asList(
            new Tuple2<>("a", 1),
            new Tuple2<>("a", 2),
            new Tuple2<>("a", 3)
    ), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
    Dataset<Tuple2<String, Integer>> ds2 = ds1.map(
        new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

      @Override
      public Tuple2<String, Integer> call(Tuple2<String, Integer> value) throws 
Exception {
        if (value._2() > 1) {
          return value;
        } else {
          return new Tuple2<>(value._1, null);
        }
      }
    }, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
    Dataset<Tuple2<String, Integer>> ds3 = ds2.groupByKey(
        new MapFunction<Tuple2<String,Integer>, String>() {

      @Override
      public String call(Tuple2<String, Integer> value) throws Exception {
        return value._1();
      }
    }, Encoders.STRING()).agg(new Aggregator<Tuple2<String, Integer>, Integer, 
Integer>() {
      @Override
      public Integer zero() {
        return null;
      }

      @Override
      public Integer reduce(Integer b, Tuple2<String, Integer> a) {
        return merge(b, a._2());
      }

      @Override
      public Integer merge(Integer b1, Integer b2) {
        if (b1 == null) {
          return b2;
        } else if (b2 == null){
          return b1;
        } else {
          return b1 + b2;
        }
      }

      @Override
      public Integer finish(Integer reduction) {
        return reduction;
      }

      @Override
      public Encoder<Integer> bufferEncoder() {
        return Encoders.INT();
      }

      @Override
      public Encoder<Integer> outputEncoder() {
        return Encoders.INT();
      }
    }.toColumn());

    ds3.printSchema();
    ds3.show();
  }
{code} 
I get this schema:
{noformat}
root
 |-- value: string (nullable = true)
 |-- (scala.Tuple2): integer (nullable = true)
{noformat}
And this result:
{noformat}
+-----+--------------+
|value|(scala.Tuple2)|
+-----+--------------+
|    a|          null|
+-----+--------------+
{noformat}

As for Scala, it's clear that `Option` is preferred on `null`, but because the 
Dataset API is supposed to support Java as well, it should not discard the 
aggregation if the zero method returns null.
For Java, I currently use Guava's `Optional` but that just seems cumbersome to 
me.


was (Author: amitsela):
Running the (sort of) same Java code:
{code}
    SparkSession session = SparkSession.builder()
                                       .appName("TestAggregatorJava")
                                       .master("local[*]")
                                       .getOrCreate();
    Dataset<Tuple2<String, Integer>> ds1 = session.createDataset(Arrays.asList(
            new Tuple2<>("a", 1),
            new Tuple2<>("a", 2),
            new Tuple2<>("a", 3)
    ), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
    Dataset<Tuple2<String, Integer>> ds2 = ds1.map(
        new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

      @Override
      public Tuple2<String, Integer> call(Tuple2<String, Integer> value) throws 
Exception {
        if (value._2() > 1) {
          return value;
        } else {
          return new Tuple2<>(value._1, null);
        }
      }
    }, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
    Dataset<Tuple2<String, Integer>> ds3 = ds2.groupByKey(
        new MapFunction<Tuple2<String,Integer>, String>() {

      @Override
      public String call(Tuple2<String, Integer> value) throws Exception {
        return value._1();
      }
    }, Encoders.STRING()).agg(new Aggregator<Tuple2<String, Integer>, Integer, 
Integer>() {
      @Override
      public Integer zero() {
        return null;
      }

      @Override
      public Integer reduce(Integer b, Tuple2<String, Integer> a) {
        return merge(b, a._2());
      }

      @Override
      public Integer merge(Integer b1, Integer b2) {
        if (b1 == null) {
          return b2;
        } else if (b2 == null){
          return b1;
        } else {
          return b1 + b2;
        }
      }

      @Override
      public Integer finish(Integer reduction) {
        return reduction;
      }

      @Override
      public Encoder<Integer> bufferEncoder() {
        return Encoders.INT();
      }

      @Override
      public Encoder<Integer> outputEncoder() {
        return Encoders.INT();
      }
    }.toColumn());

    ds3.printSchema();
    ds3.show();
  }
{code} 
I get this schema:
{noformat}
root
 |-- value: string (nullable = true)
 |-- (scala.Tuple2): integer (nullable = true)
{noformat}
And this result:
{noformat}
+-----+--------------+
|value|(scala.Tuple2)|
+-----+--------------+
|    a|          null|
+-----+--------------+
{noformat}

As for Scala, it's clear that `Option` is preferred on `null`, but because 
Dataset API is supposed to support Java as well, it should not discard the 
aggregation if the zero method returns null.
For Java, I currently use Guava's `Optional` but that just seems cumbersome to 
me.

> Aggregator doesn't play nice with Option
> ----------------------------------------
>
>                 Key: SPARK-15810
>                 URL: https://issues.apache.org/jira/browse/SPARK-15810
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>         Environment: spark 2.0.0-SNAPSHOT
>            Reporter: koert kuipers
>
> {code}
> val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
> val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }
> val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), 
> Option[Int], Option[Int]]{
>   def zero: Option[Int] = None
>   def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = 
> b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)
>   def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => 
> b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)
>   def finish(reduction: Option[Int]): Option[Int] = reduction
>   def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
>   def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
> }.toColumn)
> ds3.printSchema
> ds3.show
> {code}
> i get as output a somewhat odd looking schema, and after that the program 
> just hangs pinning one cpu at 100%. the data never shows.
> output:
> {noformat}
> root
>  |-- value: string (nullable = true)
>  |-- $anon$1(scala.Tuple2): struct (nullable = true)
>  |    |-- value: integer (nullable = true)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to