[
https://issues.apache.org/jira/browse/SPARK-22563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ben updated SPARK-22563:
------------------------
Description:
I have a large and complex DataFrame with nested structures in Spark 2.1.0
(pySpark) and I want to add an ID column to it. The way I did it was to add a
column like this:
{code}
df= df.selectExpr('*','row_number() OVER (PARTITION BY File ORDER BY NULL) AS
ID')
{code}
So it goes e.g. from this:
{code}
File A B
a.txt valA1 [valB11,valB12]
a.txt valA2 [valB21,valB22]
{code}
to this:
{code}
File A B ID
a.txt valA1 [valB11,valB12] 1
a.txt valA2 [valB21,valB22] 2
{code}
After I add this column, I don't immediately trigger a materialization in
Spark, but I first branch the DataFrame to a new variable:
{code}
dfOutput = df.select('A','ID')
{code}
with only columns A and ID and I write {{dfOutput}} to Hive, so I get e.g.
*Table1*:
{code}
A ID
valA1 1
valA2 2
{code}
So far so good. Then I continue using {{df}} for further transformations,
namely I explode some of the nested arrays in the columns and drop the
original, like this:
{code}
df = df.withColumn('Bexpl',explode('B')).drop('B')
{code}
and I get this:
{code}
File A Bexpl ID
a.txt valA1 valB11 1
a.txt valA1 valB12 1
a.txt valA2 valB21 2
a.txt valA2 valB22 2
{code}
and output other tables from it, sometimes after creating a second ID column
since there are more rows from the exploded arrays. E.g. I create *Table2*:
{code}
df= df.selectExpr('*','row_number() OVER (PARTITION BY File ORDER BY NULL) AS
ID2')
{code}
to get:
{code}
File A Bexpl ID ID2
a.txt valA1 valB11 1 1
a.txt valA1 valB12 1 2
a.txt valA2 valB21 2 3
a.txt valA2 valB22 2 4
{code}
and output as earlier:
{code}
dfOutput2 = df.select('Bexpl','ID','ID2')
{code}
to get:
{code}
Bexpl ID ID2
valB11 1 1
valB12 1 2
valB21 2 3
valB22 2 4
{code}
*This last result is what I would logically expect*, namely that the values of
the first ID column remain the same and match the data for each row from the
point that this column was created. This would allow me to keep a relation
between *Table1* created from {{dfOutput}} and subsequent tables from {{df}},
like {{dfOutput2}} and the resulting *Table2*.
The problem is that ID and ID2 are not as they should be in the example above,
but mixed up, and I'm trying to find out why. My guess is that the values of
the first ID column are not deterministic because {{df}} is not materialized
before branching to {{dfOutput}}. So when the data is actually materialized
when saving the table from {{dfOutput}}, the rows are shuffled and IDs are
different from the data that is materialized on a later point from {{df}}, as
in {{dfOutput2}}. I am however not sure, so my questions are:
1. Is my assumption correct, that IDs are generated differently for the
different branches although I add the column before branching?
2. Would materializing the DataFrame before branching to {{dfOutput}} (e.g.
through {{df.cache().count()}} ensure a fixed ID column which I can later
branch however I want from {{df}}, so that I can use this as a checkpoint?
3. If not, how can I solve this?
I would appreciate any help or at least quick confirmation because I can't test
it properly. Spark would shuffle the data only if it doesn't have enough
memory, and reaching that point would mean loading a lot of data and in turn
need a long time, and may still provide coincidentally good results (already
tried with smaller datasets).
was:
I have a large and complex DataFrame with nested structures in Spark 2.1.0
(pySpark) and I want to add an ID column to it. The way I did it was to add a
column like this:
{code}
df= df.selectExpr('*','row_number() OVER (PARTITION BY File ORDER BY NULL) AS
ID')
{code}
So it goes e.g. from this:
{code}
File A B
a.txt valA1 [valB11,valB12]
a.txt valA2 [valB21,valB22]
{code}
to this:
{code}
File A B ID
a.txt valA1 [valB11,valB12] 1
a.txt valA2 [valB21,valB22] 2
{code}
After I add this column, I don't immediately trigger a materialization in
Spark, but I first branch the DataFrame to a new variable:
{code}
dfOutput = df.select('A','ID')
{code}
with only columns A and ID and I write {{dfOutput}} to Hive, so I get e.g.
*Table1*:
{code}
A ID
valA1 1
valA2 2
{code}
So far so good. Then I continue using {{df}} for further transformations,
namely I explode some of the nested arrays in the columns and drop the
original, like this:
{code}
df = df.withColumn('Bexpl',explode('B')).drop('B')
{code}
and I get this:
{code}
File A Bexpl ID
a.txt valA1 valB11 1
a.txt valA1 valB12 1
a.txt valA2 valB21 2
a.txt valA2 valB22 2
{code}
and output other tables from it, sometimes after creating a second ID column
since there are more rows from the exploded arrays. E.g. I create *Table2*:
{code}
df= df.selectExpr('*','row_number() OVER (PARTITION BY File ORDER BY NULL) AS
ID2')
{code}
to get:
{code}
File A Bexpl ID ID2
a.txt valA1 valB11 1 1
a.txt valA1 valB12 1 2
a.txt valA2 valB21 2 3
a.txt valA2 valB22 2 4
{code}
and output as earlier:
{code}
dfOutput2 = df.select('Bexpl','ID','ID2')
{code}
to get:
{code}
Bexpl ID ID2
valB11 1 1
valB12 1 2
valB21 2 3
valB22 2 4
{code}
I would expect that the values of the first ID column remain the same and match
the data for each row from the point that this column was created. This would
allow me to keep a relation between *Table1* created from {{dfOutput}} and
subsequent tables from {{df}}, like {{dfOutput2}} and the resulting *Table2*.
The problem is that ID and ID2 are not as they should be in the example above,
but mixed up, and I'm trying to find out why. My guess is that the values of
the first ID column are not deterministic because {{df}} is not materialized
before branching to {{dfOutput}}. So when the data is actually materialized
when saving the table from {{dfOutput}}, the rows are shuffled and IDs are
different from the data that is materialized on a later point from {{df}}, as
in {{dfOutput2}}. I am however not sure, so my questions are:
1. Is my assumption correct, that IDs are generated differently for the
different branches although I add the column before branching?
2. Would materializing the DataFrame before branching to {{dfOutput}} (e.g.
through {{df.cache().count()}} ensure a fixed ID column which I can later
branch however I want from {{df}}, so that I can use this as a checkpoint?
3. If not, how can I solve this?
I would appreciate any help or at least quick confirmation because I can't test
it properly. Spark would shuffle the data only if it doesn't have enough
memory, and reaching that point would mean loading a lot of data and in turn
need a long time, and may still provide coincidentally good results (already
tried with smaller datasets).
> Spark row_number() deterministic generation and materialization as a
> checkpoint
> -------------------------------------------------------------------------------
>
> Key: SPARK-22563
> URL: https://issues.apache.org/jira/browse/SPARK-22563
> Project: Spark
> Issue Type: Question
> Components: PySpark, Shuffle, SQL
> Affects Versions: 2.1.0
> Reporter: Ben
>
> I have a large and complex DataFrame with nested structures in Spark 2.1.0
> (pySpark) and I want to add an ID column to it. The way I did it was to add a
> column like this:
> {code}
> df= df.selectExpr('*','row_number() OVER (PARTITION BY File ORDER BY NULL) AS
> ID')
> {code}
> So it goes e.g. from this:
> {code}
> File A B
> a.txt valA1 [valB11,valB12]
> a.txt valA2 [valB21,valB22]
> {code}
> to this:
> {code}
> File A B ID
> a.txt valA1 [valB11,valB12] 1
> a.txt valA2 [valB21,valB22] 2
> {code}
> After I add this column, I don't immediately trigger a materialization in
> Spark, but I first branch the DataFrame to a new variable:
> {code}
> dfOutput = df.select('A','ID')
> {code}
> with only columns A and ID and I write {{dfOutput}} to Hive, so I get e.g.
> *Table1*:
> {code}
> A ID
> valA1 1
> valA2 2
> {code}
> So far so good. Then I continue using {{df}} for further transformations,
> namely I explode some of the nested arrays in the columns and drop the
> original, like this:
> {code}
> df = df.withColumn('Bexpl',explode('B')).drop('B')
> {code}
> and I get this:
> {code}
> File A Bexpl ID
> a.txt valA1 valB11 1
> a.txt valA1 valB12 1
> a.txt valA2 valB21 2
> a.txt valA2 valB22 2
> {code}
> and output other tables from it, sometimes after creating a second ID column
> since there are more rows from the exploded arrays. E.g. I create *Table2*:
> {code}
> df= df.selectExpr('*','row_number() OVER (PARTITION BY File ORDER BY NULL) AS
> ID2')
> {code}
> to get:
> {code}
> File A Bexpl ID ID2
> a.txt valA1 valB11 1 1
> a.txt valA1 valB12 1 2
> a.txt valA2 valB21 2 3
> a.txt valA2 valB22 2 4
> {code}
> and output as earlier:
> {code}
> dfOutput2 = df.select('Bexpl','ID','ID2')
> {code}
> to get:
> {code}
> Bexpl ID ID2
> valB11 1 1
> valB12 1 2
> valB21 2 3
> valB22 2 4
> {code}
> *This last result is what I would logically expect*, namely that the values
> of the first ID column remain the same and match the data for each row from
> the point that this column was created. This would allow me to keep a
> relation between *Table1* created from {{dfOutput}} and subsequent tables
> from {{df}}, like {{dfOutput2}} and the resulting *Table2*.
> The problem is that ID and ID2 are not as they should be in the example
> above, but mixed up, and I'm trying to find out why. My guess is that the
> values of the first ID column are not deterministic because {{df}} is not
> materialized before branching to {{dfOutput}}. So when the data is actually
> materialized when saving the table from {{dfOutput}}, the rows are shuffled
> and IDs are different from the data that is materialized on a later point
> from {{df}}, as in {{dfOutput2}}. I am however not sure, so my questions are:
> 1. Is my assumption correct, that IDs are generated differently for the
> different branches although I add the column before branching?
> 2. Would materializing the DataFrame before branching to {{dfOutput}} (e.g.
> through {{df.cache().count()}} ensure a fixed ID column which I can later
> branch however I want from {{df}}, so that I can use this as a checkpoint?
> 3. If not, how can I solve this?
> I would appreciate any help or at least quick confirmation because I can't
> test it properly. Spark would shuffle the data only if it doesn't have enough
> memory, and reaching that point would mean loading a lot of data and in turn
> need a long time, and may still provide coincidentally good results (already
> tried with smaller datasets).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]