[
https://issues.apache.org/jira/browse/SPARK-22563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16259517#comment-16259517
]
Ben commented on SPARK-22563:
-----------------------------
I cannot give you the actual data, but in this case it would be e.g.:
{code:java}
Bexpl ID ID2
valB11 1 1
valB12 2 2
valB21 2 3
valB22 1 4
{code}
basically, the first column ID is not maintained when transforming further.
I need to be able to create an ID column, and output a table containing this
column and others. Then I want to output another table where I create a second
ID2 column, but also keeping the first ID column as is. The steps would be as I
described above, so that I can have a relation in the end between two tables
created from one DataFrame through ID and ID2.
> Spark row_number() deterministic generation and materialization as a
> checkpoint
> -------------------------------------------------------------------------------
>
> Key: SPARK-22563
> URL: https://issues.apache.org/jira/browse/SPARK-22563
> Project: Spark
> Issue Type: Question
> Components: PySpark, Shuffle, SQL
> Affects Versions: 2.1.0
> Reporter: Ben
>
> I have a large and complex DataFrame with nested structures in Spark 2.1.0
> (pySpark) and I want to add an ID column to it. The way I did it was to add a
> column like this:
> {code}
> df= df.selectExpr('*','row_number() OVER (PARTITION BY File ORDER BY NULL) AS
> ID')
> {code}
> So it goes e.g. from this:
> {code}
> File A B
> a.txt valA1 [valB11,valB12]
> a.txt valA2 [valB21,valB22]
> {code}
> to this:
> {code}
> File A B ID
> a.txt valA1 [valB11,valB12] 1
> a.txt valA2 [valB21,valB22] 2
> {code}
> After I add this column, I don't immediately trigger a materialization in
> Spark, but I first branch the DataFrame to a new variable:
> {code}
> dfOutput = df.select('A','ID')
> {code}
> with only columns A and ID and I write {{dfOutput}} to Hive, so I get e.g.
> *Table1*:
> {code}
> A ID
> valA1 1
> valA2 2
> {code}
> So far so good. Then I continue using {{df}} for further transformations,
> namely I explode some of the nested arrays in the columns and drop the
> original, like this:
> {code}
> df = df.withColumn('Bexpl',explode('B')).drop('B')
> {code}
> and I get this:
> {code}
> File A Bexpl ID
> a.txt valA1 valB11 1
> a.txt valA1 valB12 1
> a.txt valA2 valB21 2
> a.txt valA2 valB22 2
> {code}
> and output other tables from it, sometimes after creating a second ID column
> since there are more rows from the exploded arrays. E.g. I create *Table2*:
> {code}
> df= df.selectExpr('*','row_number() OVER (PARTITION BY File ORDER BY NULL) AS
> ID2')
> {code}
> to get:
> {code}
> File A Bexpl ID ID2
> a.txt valA1 valB11 1 1
> a.txt valA1 valB12 1 2
> a.txt valA2 valB21 2 3
> a.txt valA2 valB22 2 4
> {code}
> and output as earlier:
> {code}
> dfOutput2 = df.select('Bexpl','ID','ID2')
> {code}
> to get:
> {code}
> Bexpl ID ID2
> valB11 1 1
> valB12 1 2
> valB21 2 3
> valB22 2 4
> {code}
> *This last result is what I would logically expect*, namely that the values
> of the first ID column remain the same and match the data for each row from
> the point that this column was created. This would allow me to keep a
> relation between *Table1* created from {{dfOutput}} and subsequent tables
> from {{df}}, like {{dfOutput2}} and the resulting *Table2*.
> The problem is that ID and ID2 are not as they should be in the example
> above, but mixed up, and I'm trying to find out why. My guess is that the
> values of the first ID column are not deterministic because {{df}} is not
> materialized before branching to {{dfOutput}}. So when the data is actually
> materialized when saving the table from {{dfOutput}}, the rows are shuffled
> and IDs are different from the data that is materialized on a later point
> from {{df}}, as in {{dfOutput2}}. I am however not sure, so my questions are:
> 1. Is my assumption correct, that IDs are generated differently for the
> different branches although I add the column before branching?
> 2. Would materializing the DataFrame before branching to {{dfOutput}} (e.g.
> through {{df.cache().count()}} ensure a fixed ID column which I can later
> branch however I want from {{df}}, so that I can use this as a checkpoint?
> 3. If not, how can I solve this?
> I would appreciate any help or at least quick confirmation because I can't
> test it properly. Spark would shuffle the data only if it doesn't have enough
> memory, and reaching that point would mean loading a lot of data and in turn
> need a long time, and may still provide coincidentally good results (already
> tried with smaller datasets).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]