[ 
https://issues.apache.org/jira/browse/SPARK-12555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15188447#comment-15188447
 ] 

Luciano Resende commented on SPARK-12555:
-----------------------------------------

This issue is still reproducible in Spark 1.6.x but seems resolved in 2.x. I 
have added a test case in trunk (PR #11623)  to avoid future regression, but 
please let us know if there is a need to backport fixes.

> Datasets: data is corrupted when input data is reordered
> --------------------------------------------------------
>
>                 Key: SPARK-12555
>                 URL: https://issues.apache.org/jira/browse/SPARK-12555
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 1.6.0
>         Environment: ALL platforms on 1.6
>            Reporter: Tim Preece
>              Labels: big-endian
>
> Testcase
> -----------
> {code}
> import org.apache.spark.sql.expressions.Aggregator
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.Dataset
> case class people(age: Int, name: String)
> object nameAgg extends Aggregator[people, String, String] {
>       def zero: String = ""
>       def reduce(b: String, a: people): String = a.name + b
>       def merge(b1: String, b2: String): String = b1 + b2
>       def finish(r: String): String = r
> }
> object DataSetAgg {
>   def main(args: Array[String]) {
>     val conf = new SparkConf().setAppName("DataSetAgg")
>     val spark = new SparkContext(conf)
>     val sqlContext = new SQLContext(spark)
>     import sqlContext.implicits._
>     val peopleds: Dataset[people] = sqlContext.sql("SELECT 'Tim Preece' AS 
> name, 1279869254 AS age").as[people]
>     peopleds.groupBy(_.age).agg(nameAgg.toColumn).show()
>   }
> }
> {code}
> Result ( on a Little Endian Platform )
> --------
> {noformat}
> +----------+----------+
> |        _1|        _2|
> +----------+----------+
> |1279869254|FAILTi|
> +----------+----------+
> {noformat}
> Explanation
> ---------------
> Internally the String variable in the unsafe row is not updated after an 
> unsafe row join operation.
> The displayed string is corrupted and shows part of the integer ( interpreted 
> as a string ) along with "Ti"
> The column names also look different on a Little Endian platform.
> Result ( on a Big Endian Platform )
> {noformat}
> +----------+------------------+
> |     value|nameAgg$(name,age)|
> +----------+------------------+
> |1279869254|        LIAFTi|
> +----------+------------------+
> {noformat}
> The following Unit test also fails ( but only explicitly on a Big Endian 
> platorm )
> {code}
> org.apache.spark.sql.DatasetAggregatorSuite
> - typed aggregation: class input with reordering *** FAILED ***
>   Results do not match for query:
>   == Parsed Logical Plan ==
>   Aggregate [value#748], 
> [value#748,(ClassInputAgg$(b#650,a#651),mode=Complete,isDistinct=false) AS 
> ClassInputAgg$(b,a)#762]
>   +- AppendColumns <function1>, class[a[0]: int, b[0]: string], 
> class[value[0]: string], [value#748]
>      +- Project [one AS b#650,1 AS a#651]
>         +- OneRowRelation$
>   
>   == Analyzed Logical Plan ==
>   value: string, ClassInputAgg$(b,a): int
>   Aggregate [value#748], 
> [value#748,(ClassInputAgg$(b#650,a#651),mode=Complete,isDistinct=false) AS 
> ClassInputAgg$(b,a)#762]
>   +- AppendColumns <function1>, class[a[0]: int, b[0]: string], 
> class[value[0]: string], [value#748]
>      +- Project [one AS b#650,1 AS a#651]
>         +- OneRowRelation$
>   
>   == Optimized Logical Plan ==
>   Aggregate [value#748], 
> [value#748,(ClassInputAgg$(b#650,a#651),mode=Complete,isDistinct=false) AS 
> ClassInputAgg$(b,a)#762]
>   +- AppendColumns <function1>, class[a[0]: int, b[0]: string], 
> class[value[0]: string], [value#748]
>      +- Project [one AS b#650,1 AS a#651]
>         +- OneRowRelation$
>   
>   == Physical Plan ==
>   TungstenAggregate(key=[value#748], 
> functions=[(ClassInputAgg$(b#650,a#651),mode=Final,isDistinct=false)], 
> output=[value#748,ClassInputAgg$(b,a)#762])
>   +- TungstenExchange hashpartitioning(value#748,5), None
>      +- TungstenAggregate(key=[value#748], 
> functions=[(ClassInputAgg$(b#650,a#651),mode=Partial,isDistinct=false)], 
> output=[value#748,value#758])
>         +- !AppendColumns <function1>, class[a[0]: int, b[0]: string], 
> class[value[0]: string], [value#748]
>            +- Project [one AS b#650,1 AS a#651]
>               +- Scan OneRowRelation[]
>   == Results ==
>   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
>   ![one,1]                    [one,9] (QueryTest.scala:127)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to