[
https://issues.apache.org/jira/browse/SPARK-12555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tim Preece updated SPARK-12555:
-------------------------------
Description:
Testcase
-----------
{code}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Dataset
case class people(age: Int, name: String)
object nameAgg extends Aggregator[people, String, String] {
def zero: String = ""
def reduce(b: String, a: people): String = a.name + b
def merge(b1: String, b2: String): String = b1 + b2
def finish(r: String): String = r
}
object DataSetAgg {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("DataSetAgg")
val spark = new SparkContext(conf)
val sqlContext = new SQLContext(spark)
import sqlContext.implicits._
val peopleds: Dataset[people] = sqlContext.sql("SELECT 'Tim Preece' AS
name, 1279869254 AS age").as[people]
peopleds.groupBy(_.age).agg(nameAgg.toColumn).show()
}
}
{code}
Result ( on a Little Endian Platform )
--------
{noformat}
+----------+----------+
| _1| _2|
+----------+----------+
|1279869254|FAILTi|
+----------+----------+
{noformat}
Explanation
---------------
Internally the String variable in the unsafe row is not updated after an unsafe
row join operation.
The displayed string is corrupted and shows part of the integer ( interpreted
as a string ) along with "Ti"
The column names also look incorrect on a Little Endian platform.
Result ( on a Big Endian Platform )
{noformat}
+----------+------------------+
| value|nameAgg$(name,age)|
+----------+------------------+
|1279869254| LIAFTi|
+----------+------------------+
{noformat}
The following Unit test also fails ( but only explicitly on a Big Endian
platorm )
org.apache.spark.sql.DatasetAggregatorSuite
- typed aggregation: class input with reordering *** FAILED ***
Results do not match for query:
== Parsed Logical Plan ==
Aggregate [value#748],
[value#748,(ClassInputAgg$(b#650,a#651),mode=Complete,isDistinct=false) AS
ClassInputAgg$(b,a)#762]
+- AppendColumns <function1>, class[a[0]: int, b[0]: string], class[value[0]:
string], [value#748]
+- Project [one AS b#650,1 AS a#651]
+- OneRowRelation$
== Analyzed Logical Plan ==
value: string, ClassInputAgg$(b,a): int
Aggregate [value#748],
[value#748,(ClassInputAgg$(b#650,a#651),mode=Complete,isDistinct=false) AS
ClassInputAgg$(b,a)#762]
+- AppendColumns <function1>, class[a[0]: int, b[0]: string], class[value[0]:
string], [value#748]
+- Project [one AS b#650,1 AS a#651]
+- OneRowRelation$
== Optimized Logical Plan ==
Aggregate [value#748],
[value#748,(ClassInputAgg$(b#650,a#651),mode=Complete,isDistinct=false) AS
ClassInputAgg$(b,a)#762]
+- AppendColumns <function1>, class[a[0]: int, b[0]: string], class[value[0]:
string], [value#748]
+- Project [one AS b#650,1 AS a#651]
+- OneRowRelation$
== Physical Plan ==
TungstenAggregate(key=[value#748],
functions=[(ClassInputAgg$(b#650,a#651),mode=Final,isDistinct=false)],
output=[value#748,ClassInputAgg$(b,a)#762])
+- TungstenExchange hashpartitioning(value#748,5), None
+- TungstenAggregate(key=[value#748],
functions=[(ClassInputAgg$(b#650,a#651),mode=Partial,isDistinct=false)],
output=[value#748,value#758])
+- !AppendColumns <function1>, class[a[0]: int, b[0]: string],
class[value[0]: string], [value#748]
+- Project [one AS b#650,1 AS a#651]
+- Scan OneRowRelation[]
== Results ==
!== Correct Answer - 1 == == Spark Answer - 1 ==
![one,1] [one,9] (QueryTest.scala:127)
was:
Testcase
-----------
{code}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Dataset
case class people(age: Int, name: String)
object nameAgg extends Aggregator[people, String, String] {
def zero: String = ""
def reduce(b: String, a: people): String = a.name + b
def merge(b1: String, b2: String): String = b1 + b2
def finish(r: String): String = r
}
object DataSetAgg {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("DataSetAgg")
val spark = new SparkContext(conf)
val sqlContext = new SQLContext(spark)
import sqlContext.implicits._
val peopleds: Dataset[people] = sqlContext.sql("SELECT 'Tim Preece' AS
name, 1279869254 AS age").as[people]
peopleds.groupBy(_.age).agg(nameAgg.toColumn).show()
}
}
{code}
Result ( on a Little Endian Platform )
--------
+----------+----------+
| _1| _2|
+----------+----------+
|1279869254|FAILTi|
+----------+----------+
Explanation
---------------
Internally the String variable in the unsafe row is not updated after an unsafe
row join operation.
The displayed string is corrupted and shows part of the integer ( interpreted
as a string ) along with "Ti"
The column names also look incorrect on a Little Endian platform.
Result ( on a Big Endian Platform )
+----------+------------------+
| value|nameAgg$(name,age)|
+----------+------------------+
|1279869254| LIAFTi|
+----------+------------------+
The following Unit test also fails ( but only explicitly on a Big Endian
platorm )
org.apache.spark.sql.DatasetAggregatorSuite
- typed aggregation: class input with reordering *** FAILED ***
Results do not match for query:
== Parsed Logical Plan ==
Aggregate [value#748],
[value#748,(ClassInputAgg$(b#650,a#651),mode=Complete,isDistinct=false) AS
ClassInputAgg$(b,a)#762]
+- AppendColumns <function1>, class[a[0]: int, b[0]: string], class[value[0]:
string], [value#748]
+- Project [one AS b#650,1 AS a#651]
+- OneRowRelation$
== Analyzed Logical Plan ==
value: string, ClassInputAgg$(b,a): int
Aggregate [value#748],
[value#748,(ClassInputAgg$(b#650,a#651),mode=Complete,isDistinct=false) AS
ClassInputAgg$(b,a)#762]
+- AppendColumns <function1>, class[a[0]: int, b[0]: string], class[value[0]:
string], [value#748]
+- Project [one AS b#650,1 AS a#651]
+- OneRowRelation$
== Optimized Logical Plan ==
Aggregate [value#748],
[value#748,(ClassInputAgg$(b#650,a#651),mode=Complete,isDistinct=false) AS
ClassInputAgg$(b,a)#762]
+- AppendColumns <function1>, class[a[0]: int, b[0]: string], class[value[0]:
string], [value#748]
+- Project [one AS b#650,1 AS a#651]
+- OneRowRelation$
== Physical Plan ==
TungstenAggregate(key=[value#748],
functions=[(ClassInputAgg$(b#650,a#651),mode=Final,isDistinct=false)],
output=[value#748,ClassInputAgg$(b,a)#762])
+- TungstenExchange hashpartitioning(value#748,5), None
+- TungstenAggregate(key=[value#748],
functions=[(ClassInputAgg$(b#650,a#651),mode=Partial,isDistinct=false)],
output=[value#748,value#758])
+- !AppendColumns <function1>, class[a[0]: int, b[0]: string],
class[value[0]: string], [value#748]
+- Project [one AS b#650,1 AS a#651]
+- Scan OneRowRelation[]
== Results ==
!== Correct Answer - 1 == == Spark Answer - 1 ==
![one,1] [one,9] (QueryTest.scala:127)
> Datasets: data is corrupted when input data is reordered
> --------------------------------------------------------
>
> Key: SPARK-12555
> URL: https://issues.apache.org/jira/browse/SPARK-12555
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.0
> Environment: ALL platforms on 1.6
> Reporter: Tim Preece
>
> Testcase
> -----------
> {code}
> import org.apache.spark.sql.expressions.Aggregator
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.Dataset
> case class people(age: Int, name: String)
> object nameAgg extends Aggregator[people, String, String] {
> def zero: String = ""
> def reduce(b: String, a: people): String = a.name + b
> def merge(b1: String, b2: String): String = b1 + b2
> def finish(r: String): String = r
> }
> object DataSetAgg {
> def main(args: Array[String]) {
> val conf = new SparkConf().setAppName("DataSetAgg")
> val spark = new SparkContext(conf)
> val sqlContext = new SQLContext(spark)
> import sqlContext.implicits._
> val peopleds: Dataset[people] = sqlContext.sql("SELECT 'Tim Preece' AS
> name, 1279869254 AS age").as[people]
> peopleds.groupBy(_.age).agg(nameAgg.toColumn).show()
> }
> }
> {code}
> Result ( on a Little Endian Platform )
> --------
> {noformat}
> +----------+----------+
> | _1| _2|
> +----------+----------+
> |1279869254|FAILTi|
> +----------+----------+
> {noformat}
> Explanation
> ---------------
> Internally the String variable in the unsafe row is not updated after an
> unsafe row join operation.
> The displayed string is corrupted and shows part of the integer ( interpreted
> as a string ) along with "Ti"
> The column names also look incorrect on a Little Endian platform.
> Result ( on a Big Endian Platform )
> {noformat}
> +----------+------------------+
> | value|nameAgg$(name,age)|
> +----------+------------------+
> |1279869254| LIAFTi|
> +----------+------------------+
> {noformat}
> The following Unit test also fails ( but only explicitly on a Big Endian
> platorm )
> org.apache.spark.sql.DatasetAggregatorSuite
> - typed aggregation: class input with reordering *** FAILED ***
> Results do not match for query:
> == Parsed Logical Plan ==
> Aggregate [value#748],
> [value#748,(ClassInputAgg$(b#650,a#651),mode=Complete,isDistinct=false) AS
> ClassInputAgg$(b,a)#762]
> +- AppendColumns <function1>, class[a[0]: int, b[0]: string],
> class[value[0]: string], [value#748]
> +- Project [one AS b#650,1 AS a#651]
> +- OneRowRelation$
>
> == Analyzed Logical Plan ==
> value: string, ClassInputAgg$(b,a): int
> Aggregate [value#748],
> [value#748,(ClassInputAgg$(b#650,a#651),mode=Complete,isDistinct=false) AS
> ClassInputAgg$(b,a)#762]
> +- AppendColumns <function1>, class[a[0]: int, b[0]: string],
> class[value[0]: string], [value#748]
> +- Project [one AS b#650,1 AS a#651]
> +- OneRowRelation$
>
> == Optimized Logical Plan ==
> Aggregate [value#748],
> [value#748,(ClassInputAgg$(b#650,a#651),mode=Complete,isDistinct=false) AS
> ClassInputAgg$(b,a)#762]
> +- AppendColumns <function1>, class[a[0]: int, b[0]: string],
> class[value[0]: string], [value#748]
> +- Project [one AS b#650,1 AS a#651]
> +- OneRowRelation$
>
> == Physical Plan ==
> TungstenAggregate(key=[value#748],
> functions=[(ClassInputAgg$(b#650,a#651),mode=Final,isDistinct=false)],
> output=[value#748,ClassInputAgg$(b,a)#762])
> +- TungstenExchange hashpartitioning(value#748,5), None
> +- TungstenAggregate(key=[value#748],
> functions=[(ClassInputAgg$(b#650,a#651),mode=Partial,isDistinct=false)],
> output=[value#748,value#758])
> +- !AppendColumns <function1>, class[a[0]: int, b[0]: string],
> class[value[0]: string], [value#748]
> +- Project [one AS b#650,1 AS a#651]
> +- Scan OneRowRelation[]
> == Results ==
> !== Correct Answer - 1 == == Spark Answer - 1 ==
> ![one,1] [one,9] (QueryTest.scala:127)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]