[
https://issues.apache.org/jira/browse/SPARK-18534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon resolved SPARK-18534.
----------------------------------
Resolution: Incomplete
> Datasets Aggregation with Maps
> ------------------------------
>
> Key: SPARK-18534
> URL: https://issues.apache.org/jira/browse/SPARK-18534
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.2, 1.6.3
> Reporter: Anton Okolnychyi
> Priority: Major
> Labels: bulk-closed
>
> There is a problem with user-defined aggregations in the Dataset API in Spark
> 1.6.3, while the identical code works fine in Spark 2.0.
> The problem appears only if {{ExpressionEncoder()}} is used for Maps. The
> same code with a Kryo-based alternative produces a correct result. If the
> encoder for a map is defined with the help of {{ExpressionEncoder()}}, Spark
> is not capable of reading the reduced values in the merge phase of the
> considered aggregation.
> Code to reproduce:
> {code}
> case class TestStopPoint(line: String, sequenceNumber: Int, id: String)
> // Does not work with ExpressionEncoder() and produces an empty map as a
> result
> implicit val intStringMapEncoder: Encoder[Map[Int, String]] =
> ExpressionEncoder()
> // Will work if a Kryo-based encoder is used
> // implicit val intStringMapEncoder: Encoder[Map[Int, String]] =
> org.apache.spark.sql.Encoders.kryo[Map[Int, String]]
> val sparkConf = new SparkConf()
> .setAppName("DS Spark 1.6 Test")
> .setMaster("local[4]")
> val sparkContext = new SparkContext(sparkConf)
> val sparkSqlContext = new SQLContext(sparkContext)
> import sparkSqlContext.implicits._
> val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33",
> 2, "id#2")).toDS()
> val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String],
> Map[Int, String]] {
> override def zero = Map[Int, String]()
> override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
> map.updated(stopPoint.sequenceNumber, stopPoint.id)
> }
> override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) =
> {
> map ++ anotherMap
> }
> override def finish(reduction: Map[Int, String]) = reduction
> }.toColumn
> val resultMap = stopPointDS
> .groupBy(_.line)
> .agg(stopPointSequenceMap)
> .collect()
> .toMap
> {code}
> The code above produces an empty map as a result if the Map encoder is
> defined as {{ExpressionEncoder()}}. The Kryo-based encoder works fine
> (commented in the code).
> A preliminary investigation was done to find out possible reasons for this
> behavior. I am not a Spark expert but hope it will help.
> The Physical Plan looks like:
> {noformat}
> == Physical Plan ==
> SortBasedAggregate(key=[value#55],
> functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)],
> output=[value#55,anon$1(line,sequenceNumber,id)#64])
> +- ConvertToSafe
> +- Sort [value#55 ASC], false, 0
> +- TungstenExchange hashpartitioning(value#55,1), None
> +- ConvertToUnsafe
> +- SortBasedAggregate(key=[value#55],
> functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)],
> output=[value#55,value#60])
> +- ConvertToSafe
> +- Sort [value#55 ASC], false, 0
> +- !AppendColumns <function1>, class[line[0]: string,
> sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55]
> +- ConvertToUnsafe
> +- LocalTableScan [line#4,sequenceNumber#5,id#6],
> [[0,2000000002,1,2800000004,3333,31236469],[0,2000000002,2,2800000004,3333,32236469]]
> {noformat}
>
> Everything untill the first (from bottom) {{SortBasedAggregate}} step and
> part of it is handled correctly. In particular, I see that each row correctly
> updates the mutable aggregation buffer in the {{update()}} method of the
> {{TypedAggregateExpression}} class. My initial idea was that the problem
> appeared in the {{ConvertToUnsafe}} step directly after the first
> {{SortBasedAggregate}}. If I take a look at the {{ConvertToUnsafe}} class, I
> can see that the first {{SortBasedAggregate}} returns a map with 2 elements
> (I call {{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of
> {{ConvertToUnsafe}} to see this). At the same time, if I examine the output
> of this {{ConvertToUnsafe}} in the identical way as its input, I see that the
> result map does not contain any elements. As a consequence, Spark operates on
> two empty maps in the {{merge()}} method of the {{TypedAggregateExpression}}
> class. However, my assumption was only partially correct. I did a more
> detailed investigation and its outcomes are described in comments.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]