[ https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15369509#comment-15369509 ]
Amit Sela edited comment on SPARK-15810 at 7/10/16 9:01 AM: ------------------------------------------------------------ Running the (sort of) same Java code: {code} SparkSession session = SparkSession.builder() .appName("TestAggregatorJava") .master("local[*]") .getOrCreate(); Dataset<Tuple2<String, Integer>> ds1 = session.createDataset(Arrays.asList( new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("a", 3) ), Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset<Tuple2<String, Integer>> ds2 = ds1.map( new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> value) throws Exception { if (value._2() > 1) { return value; } else { return new Tuple2<>(value._1, null); } } }, Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset<Tuple2<String, Integer>> ds3 = ds2.groupByKey( new MapFunction<Tuple2<String,Integer>, String>() { @Override public String call(Tuple2<String, Integer> value) throws Exception { return value._1(); } }, Encoders.STRING()).agg(new Aggregator<Tuple2<String, Integer>, Integer, Integer>() { @Override public Integer zero() { return null; } @Override public Integer reduce(Integer b, Tuple2<String, Integer> a) { return merge(b, a._2()); } @Override public Integer merge(Integer b1, Integer b2) { if (b1 == null) { return b2; } else if (b2 == null){ return b1; } else { return b1 + b2; } } @Override public Integer finish(Integer reduction) { return reduction; } @Override public Encoder<Integer> bufferEncoder() { return Encoders.INT(); } @Override public Encoder<Integer> outputEncoder() { return Encoders.INT(); } }.toColumn()); ds3.printSchema(); ds3.show(); } {code} I get this schema: {noformat} root |-- value: string (nullable = true) |-- (scala.Tuple2): integer (nullable = true) {noformat} And this result: {noformat} +-----+--------------+ |value|(scala.Tuple2)| +-----+--------------+ | a| null| +-----+--------------+ {noformat} As for Scala, it's clear that `Option` is preferred on `null`, but because the Dataset API is supposed to support Java as well, it should not discard the aggregation if the zero method returns null. For Java, I currently use Guava's `Optional` but that just seems cumbersome to me. was (Author: amitsela): Running the (sort of) same Java code: {code} SparkSession session = SparkSession.builder() .appName("TestAggregatorJava") .master("local[*]") .getOrCreate(); Dataset<Tuple2<String, Integer>> ds1 = session.createDataset(Arrays.asList( new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("a", 3) ), Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset<Tuple2<String, Integer>> ds2 = ds1.map( new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> value) throws Exception { if (value._2() > 1) { return value; } else { return new Tuple2<>(value._1, null); } } }, Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset<Tuple2<String, Integer>> ds3 = ds2.groupByKey( new MapFunction<Tuple2<String,Integer>, String>() { @Override public String call(Tuple2<String, Integer> value) throws Exception { return value._1(); } }, Encoders.STRING()).agg(new Aggregator<Tuple2<String, Integer>, Integer, Integer>() { @Override public Integer zero() { return null; } @Override public Integer reduce(Integer b, Tuple2<String, Integer> a) { return merge(b, a._2()); } @Override public Integer merge(Integer b1, Integer b2) { if (b1 == null) { return b2; } else if (b2 == null){ return b1; } else { return b1 + b2; } } @Override public Integer finish(Integer reduction) { return reduction; } @Override public Encoder<Integer> bufferEncoder() { return Encoders.INT(); } @Override public Encoder<Integer> outputEncoder() { return Encoders.INT(); } }.toColumn()); ds3.printSchema(); ds3.show(); } {code} I get this schema: {noformat} root |-- value: string (nullable = true) |-- (scala.Tuple2): integer (nullable = true) {noformat} And this result: {noformat} +-----+--------------+ |value|(scala.Tuple2)| +-----+--------------+ | a| null| +-----+--------------+ {noformat} As for Scala, it's clear that `Option` is preferred on `null`, but because Dataset API is supposed to support Java as well, it should not discard the aggregation if the zero method returns null. For Java, I currently use Guava's `Optional` but that just seems cumbersome to me. > Aggregator doesn't play nice with Option > ---------------------------------------- > > Key: SPARK-15810 > URL: https://issues.apache.org/jira/browse/SPARK-15810 > Project: Spark > Issue Type: Sub-task > Components: SQL > Environment: spark 2.0.0-SNAPSHOT > Reporter: koert kuipers > > {code} > val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS > val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) } > val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), > Option[Int], Option[Int]]{ > def zero: Option[Int] = None > def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = > b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2) > def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => > b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2) > def finish(reduction: Option[Int]): Option[Int] = reduction > def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > }.toColumn) > ds3.printSchema > ds3.show > {code} > i get as output a somewhat odd looking schema, and after that the program > just hangs pinning one cpu at 100%. the data never shows. > output: > {noformat} > root > |-- value: string (nullable = true) > |-- $anon$1(scala.Tuple2): struct (nullable = true) > | |-- value: integer (nullable = true) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org