[
https://issues.apache.org/jira/browse/SPARK-17061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apache Spark reassigned SPARK-17061:
------------------------------------
Assignee: Apache Spark
> Incorrect results returned following a join of two datasets and a map step
> where total number of columns >100
> -------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-17061
> URL: https://issues.apache.org/jira/browse/SPARK-17061
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.0.0, 2.0.1
> Reporter: Jamie Hutton
> Assignee: Apache Spark
> Priority: Critical
>
> We have hit a consistent bug where we have a dataset with more than 100
> columns. I am raising as a blocker because spark is returning the WRONG
> results rather than erroring, leading to data integrity issues
> I have put together the following test case which will show the issue (it
> will run in spark-shell). In this example i am joining a dataset with lots of
> fields onto another dataset.
> The join works fine and if you show the dataset you will get the expected
> result. However if you run a map step over the dataset you end up with a
> strange error where the sequence that is in the right dataset now only
> contains the last value.
> Whilst this test may seem a rather contrived example, what we are doing here
> is a very standard analtical pattern. My original code was designed to:
> - take a dataset of child records
> - groupByKey up to the parent: giving a Dataset of (ParentID, Seq[Children])
> - join the children onto the parent by parentID: giving
> ((Parent),(ParentID,Seq[Children])
> - map over the result to give a tuple of (Parent,Seq[Children])
> Notes:
> - The issue is resolved by having less fields - as soon as we go <= 100 the
> integrity issue goes away. Try removing one of the fields from BigCaseClass
> below
> - The issue will arise based on the total number of fields in the resulting
> dataset. Below i have a small case class and a big case class, but two case
> classes of 50 variable would give the same issue
> - the issue occurs where the case class being joined on (on the right) has a
> case class type. It doesnt occur if you have a Seq[String]
> - If i go back to an RDD for the map step after the join i can workaround the
> issue, but i lose all the benefits of datasets
> Scala code test case:
> case class Name(name: String)
> case class SmallCaseClass (joinkey: Integer, names: Seq[Name])
> case class BigCaseClass (field1: Integer,field2: Integer,field3:
> Integer,field4: Integer,field5: Integer,field6: Integer,field7:
> Integer,field8: Integer,field9: Integer,field10: Integer,field11:
> Integer,field12: Integer,field13: Integer,field14: Integer,field15:
> Integer,field16: Integer,field17: Integer,field18: Integer,field19:
> Integer,field20: Integer,field21: Integer,field22: Integer,field23:
> Integer,field24: Integer,field25: Integer,field26: Integer,field27:
> Integer,field28: Integer,field29: Integer,field30: Integer,field31:
> Integer,field32: Integer,field33: Integer,field34: Integer,field35:
> Integer,field36: Integer,field37: Integer,field38: Integer,field39:
> Integer,field40: Integer,field41: Integer,field42: Integer,field43:
> Integer,field44: Integer,field45: Integer,field46: Integer,field47:
> Integer,field48: Integer,field49: Integer,field50: Integer,field51:
> Integer,field52: Integer,field53: Integer,field54: Integer,field55:
> Integer,field56: Integer,field57: Integer,field58: Integer,field59:
> Integer,field60: Integer,field61: Integer,field62: Integer,field63:
> Integer,field64: Integer,field65: Integer,field66: Integer,field67:
> Integer,field68: Integer,field69: Integer,field70: Integer,field71:
> Integer,field72: Integer,field73: Integer,field74: Integer,field75:
> Integer,field76: Integer,field77: Integer,field78: Integer,field79:
> Integer,field80: Integer,field81: Integer,field82: Integer,field83:
> Integer,field84: Integer,field85: Integer,field86: Integer,field87:
> Integer,field88: Integer,field89: Integer,field90: Integer,field91:
> Integer,field92: Integer,field93: Integer,field94: Integer,field95:
> Integer,field96: Integer,field97: Integer,field98: Integer,field99: Integer)
>
> val bigCC=Seq(BigCaseClass(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
> 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
> 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52,
> 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71,
> 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90,
> 91, 92, 93, 94, 95, 96, 97, 98, 99))
>
> val smallCC=Seq(SmallCaseClass(1,Seq(
> Name("Jamie"),
> Name("Ian"),
> Name("Dave"),
> Name("Will")
> )))
>
>
> val bigCCDS = spark.createDataset(spark.sparkContext.parallelize(bigCC))
> val smallCCDS =
> spark.createDataset(spark.sparkContext.parallelize(smallCC))
>
> val joined_test=bigCCDS.as("A").joinWith(smallCCDS.as("B"),
> $"A.field1"===$"B.joinkey", "LEFT")
>
> /*This next step is fine - it shows all 4 names:
> *
> [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99]
> * [1,WrappedArray([Jamie], [Ian], [Dave], [Will])]
> * */
> joined_test.show(false)
>
> /*This one ends up repeating will - I did the most simple map step
> possible here
> *
> [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99]
> * [1,WrappedArray([Will], [Will], [Will], [Will])]
> * */
> joined_test.map(identity).show(false)
>
> /*This one works because we have less than 100 fields:
> * [Jamie], [Ian], [Dave], [Will]*/
> joined_test.map(_._2).show(false)
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]