[ https://issues.apache.org/jira/browse/SPARK-22563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16259367#comment-16259367 ]
Ben commented on SPARK-22563: ----------------------------- The example is what I would logically expect, but in reality, the ID column are not maintained but mixed up. > Spark row_number() deterministic generation and materialization as a > checkpoint > ------------------------------------------------------------------------------- > > Key: SPARK-22563 > URL: https://issues.apache.org/jira/browse/SPARK-22563 > Project: Spark > Issue Type: Question > Components: PySpark, Shuffle, SQL > Affects Versions: 2.1.0 > Reporter: Ben > > I have a large and complex DataFrame with nested structures in Spark 2.1.0 > (pySpark) and I want to add an ID column to it. The way I did it was to add a > column like this: > {code} > df= df.selectExpr('*','row_number() OVER (PARTITION BY File ORDER BY NULL) AS > ID') > {code} > So it goes e.g. from this: > {code} > File A B > a.txt valA1 [valB11,valB12] > a.txt valA2 [valB21,valB22] > {code} > to this: > {code} > File A B ID > a.txt valA1 [valB11,valB12] 1 > a.txt valA2 [valB21,valB22] 2 > {code} > After I add this column, I don't immediately trigger a materialization in > Spark, but I first branch the DataFrame to a new variable: > {code} > dfOutput = df.select('A','ID') > {code} > with only columns A and ID and I write {{dfOutput}} to Hive, so I get e.g. > *Table1*: > {code} > A ID > valA1 1 > valA2 2 > {code} > So far so good. Then I continue using {{df}} for further transformations, > namely I explode some of the nested arrays in the columns and drop the > original, like this: > {code} > df = df.withColumn('Bexpl',explode('B')).drop('B') > {code} > and I get this: > {code} > File A Bexpl ID > a.txt valA1 valB11 1 > a.txt valA1 valB12 1 > a.txt valA2 valB21 2 > a.txt valA2 valB22 2 > {code} > and output other tables from it, sometimes after creating a second ID column > since there are more rows from the exploded arrays. E.g. I create *Table2*: > {code} > df= df.selectExpr('*','row_number() OVER (PARTITION BY File ORDER BY NULL) AS > ID2') > {code} > to get: > {code} > File A Bexpl ID ID2 > a.txt valA1 valB11 1 1 > a.txt valA1 valB12 1 2 > a.txt valA2 valB21 2 3 > a.txt valA2 valB22 2 4 > {code} > and output as earlier: > {code} > dfOutput2 = df.select('Bexpl','ID','ID2') > {code} > to get: > {code} > Bexpl ID ID2 > valB11 1 1 > valB12 1 2 > valB21 2 3 > valB22 2 4 > {code} > I would expect that the values of the first ID column remain the same and > match the data for each row from the point that this column was created. This > would allow me to keep a relation between *Table1* created from {{dfOutput}} > and subsequent tables from {{df}}, like {{dfOutput2}} and the resulting > *Table2*. > The problem is that ID and ID2 are not as they should be in the example > above, but mixed up, and I'm trying to find out why. My guess is that the > values of the first ID column are not deterministic because {{df}} is not > materialized before branching to {{dfOutput}}. So when the data is actually > materialized when saving the table from {{dfOutput}}, the rows are shuffled > and IDs are different from the data that is materialized on a later point > from {{df}}, as in {{dfOutput2}}. I am however not sure, so my questions are: > 1. Is my assumption correct, that IDs are generated differently for the > different branches although I add the column before branching? > 2. Would materializing the DataFrame before branching to {{dfOutput}} (e.g. > through {{df.cache().count()}} ensure a fixed ID column which I can later > branch however I want from {{df}}, so that I can use this as a checkpoint? > 3. If not, how can I solve this? > I would appreciate any help or at least quick confirmation because I can't > test it properly. Spark would shuffle the data only if it doesn't have enough > memory, and reaching that point would mean loading a lot of data and in turn > need a long time, and may still provide coincidentally good results (already > tried with smaller datasets). -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org