[ 
https://issues.apache.org/jira/browse/SPARK-22563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16259367#comment-16259367
 ] 

Ben edited comment on SPARK-22563 at 11/20/17 3:34 PM:
-------------------------------------------------------

Hi [~srowen],
The example is what I would logically expect, but in reality, the ID column is 
not maintained but mixed up.


was (Author: someonehere15):
Hi [~srowen],
The example is what I would logically expect, but in reality, the ID column are 
not maintained but mixed up.

> Spark row_number() deterministic generation and materialization as a 
> checkpoint
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-22563
>                 URL: https://issues.apache.org/jira/browse/SPARK-22563
>             Project: Spark
>          Issue Type: Question
>          Components: PySpark, Shuffle, SQL
>    Affects Versions: 2.1.0
>            Reporter: Ben
>
> I have a large and complex DataFrame with nested structures in Spark 2.1.0 
> (pySpark) and I want to add an ID column to it. The way I did it was to add a 
> column like this:
> {code}
> df= df.selectExpr('*','row_number() OVER (PARTITION BY File ORDER BY NULL) AS 
> ID')
> {code}
> So it goes e.g. from this:
> {code}
> File     A        B
> a.txt    valA1    [valB11,valB12]
> a.txt    valA2    [valB21,valB22]
> {code}
> to this:
> {code}
> File     A         B                 ID
> a.txt    valA1    [valB11,valB12]    1
> a.txt    valA2    [valB21,valB22]    2
> {code}
> After I add this column, I don't immediately trigger a materialization in 
> Spark, but I first branch the DataFrame to a new variable:
> {code}
> dfOutput = df.select('A','ID')
> {code}
> with only columns A and ID and I write {{dfOutput}} to Hive, so I get e.g. 
> *Table1*:
> {code}
> A        ID
> valA1    1
> valA2    2
> {code}
> So far so good. Then I continue using {{df}} for further transformations, 
> namely I explode some of the nested arrays in the columns and drop the 
> original, like this:
> {code}
> df = df.withColumn('Bexpl',explode('B')).drop('B')
> {code}
> and I get this:
> {code}
> File     A        Bexpl      ID
> a.txt    valA1    valB11     1
> a.txt    valA1    valB12     1
> a.txt    valA2    valB21     2
> a.txt    valA2    valB22     2
> {code}
> and output other tables from it, sometimes after creating a second ID column 
> since there are more rows from the exploded arrays. E.g. I create *Table2*:
> {code}
> df= df.selectExpr('*','row_number() OVER (PARTITION BY File ORDER BY NULL) AS 
> ID2')
> {code}
> to get:
> {code}
> File     A        Bexpl      ID    ID2
> a.txt    valA1    valB11     1     1
> a.txt    valA1    valB12     1     2
> a.txt    valA2    valB21     2     3
> a.txt    valA2    valB22     2     4
> {code}
> and output as earlier:
> {code}
> dfOutput2 = df.select('Bexpl','ID','ID2')
> {code}
> to get:
> {code}
> Bexpl      ID    ID2
> valB11     1     1
> valB12     1     2
> valB21     2     3
> valB22     2     4
> {code}
> I would expect that the values of the first ID column remain the same and 
> match the data for each row from the point that this column was created. This 
> would allow me to keep a relation between *Table1* created from {{dfOutput}} 
> and subsequent tables from {{df}}, like {{dfOutput2}} and the resulting 
> *Table2*.
> The problem is that ID and ID2 are not as they should be in the example 
> above, but mixed up, and I'm trying to find out why. My guess is that the 
> values of the first ID column are not deterministic because {{df}} is not 
> materialized before branching to {{dfOutput}}. So when the data is actually 
> materialized when saving the table from {{dfOutput}}, the rows are shuffled 
> and IDs are different from the data that is materialized on a later point 
> from {{df}}, as in {{dfOutput2}}. I am however not sure, so my questions are:
> 1. Is my assumption correct, that IDs are generated differently for the 
> different branches although I add the column before branching?
> 2. Would materializing the DataFrame before branching to {{dfOutput}} (e.g. 
> through {{df.cache().count()}} ensure a fixed ID column which I can later 
> branch however I want from {{df}}, so that I can use this as a checkpoint?
> 3. If not, how can I solve this?
> I would appreciate any help or at least quick confirmation because I can't 
> test it properly. Spark would shuffle the data only if it doesn't have enough 
> memory, and reaching that point would mean loading a lot of data and in turn 
> need a long time, and may still provide coincidentally good results (already 
> tried with smaller datasets).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to