Dan Brown created SPARK-10685:
---------------------------------
Summary: Misaligned data with RDD.zip and DataFrame.withColumn
after repartition
Key: SPARK-10685
URL: https://issues.apache.org/jira/browse/SPARK-10685
Project: Spark
Issue Type: Bug
Affects Versions: 1.5.0, 1.4.1, 1.3.0
Environment: - OSX 10.10.4, java 1.7.0_51, hadoop 2.6.0-cdh5.4.5
- Ubuntu 12.04, java 1.7.0_80, hadoop 2.6.0-cdh5.4.5
Reporter: Dan Brown
Here's a weird behavior where {{RDD.zip}} or {{DataFrame.withColumn}} after a
{{repartition}} produces "misaligned" data, meaning different column values in
the same row aren't matched, as if a zip shuffled the collections before
zipping them. It's difficult to reproduce because it's nondeterministic,
doesn't occur in local mode, and requires ≥2 workers (≥3 in one case). I was
able to repro it using pyspark 1.3.0 (cdh5.4.5), 1.4.1 (bin-without-hadoop),
and 1.5.0 (bin-without-hadoop).
Here's the most similar issue I was able to find. It appears to not have been
repro'd and then closed optimistically, and it smells like it could have been
the same underlying cause that was never fixed:
- https://issues.apache.org/jira/browse/SPARK-9131
Also, this {{DataFrame.zip}} issue is related in spirit, since we were trying
to build it ourselves when we ran into this problem. Let me put in my vote for
reopening the issue and supporting {{DataFrame.zip}} in the standard lib.
- https://issues.apache.org/jira/browse/SPARK-7460
h3. Brief repro
Fail: withColumn(udf) after DataFrame.repartition
{code}
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df = df.repartition(100)
df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
[r for r in df.collect() if r.a != r.b][:3] # Should be []
{code}
Sample outputs (nondeterministic):
{code}
[Row(a=39, b=639), Row(a=139, b=739), Row(a=239, b=839)]
[Row(a=639, b=39), Row(a=739, b=139), Row(a=839, b=239)]
[]
[Row(a=641, b=41), Row(a=741, b=141), Row(a=841, b=241)]
[Row(a=641, b=1343), Row(a=741, b=1443), Row(a=841, b=1543)]
[Row(a=639, b=39), Row(a=739, b=139), Row(a=839, b=239)]
{code}
Fail: RDD.zip after DataFrame.repartition
{code}
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df = df.repartition(100)
rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a,
b=y.b))
[r for r in rdd.collect() if r.a != r.b][:3] # Should be []
{code}
Sample outputs (nondeterministic):
{code}
[]
[Row(a=50, b=6947), Row(a=150, b=7047), Row(a=250, b=7147)]
[]
[]
[Row(a=44, b=644), Row(a=144, b=744), Row(a=244, b=844)]
[]
{code}
Test setup:
- local\[8]: {{MASTER=local\[8]}}
- dist\[N]: 1 driver + 1 master + N workers
{code}
"Fail" tests pass? cluster mode spark version
----------------------------------------------------
yes local[8] 1.3.0-cdh5.4.5
no dist[4] 1.3.0-cdh5.4.5
yes local[8] 1.4.1
yes dist[1] 1.4.1
no dist[2] 1.4.1
no dist[4] 1.4.1
yes local[8] 1.5.0
yes dist[1] 1.5.0
no dist[2] 1.5.0
no dist[4] 1.5.0
{code}
h3. Detailed repro
Start `pyspark` and run these imports:
{code}
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StructType, StructField
{code}
Fail: withColumn(udf) after DataFrame.repartition
{code}
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df = df.repartition(100)
df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}
Ok: withColumn(udf) after DataFrame.repartition(100) after 1 starting partition
{code}
df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)),
numSlices=1))
df = df.repartition(100)
df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}
Fail: withColumn(udf) after DataFrame.repartition(100) after 100 starting
partitions
{code}
df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)),
numSlices=100))
df = df.repartition(100)
df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}
Fail: withColumn(udf) after DataFrame.repartition(1) after 100 starting
partitions
{code}
df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)),
numSlices=100))
df = df.repartition(1)
df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}
Ok: withColumn(udf) after DataFrame.coalesce(10) after 100 starting partitions
{code}
df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)),
numSlices=100))
df = df.coalesce(10)
df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}
Ok: withColumn without udf
{code}
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df = df.repartition(100)
df = df.withColumn('b', df.a)
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}
Ok: createDataFrame(RDD.map) instead of withColumn(udf)
{code}
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df = df.repartition(100)
rdd = df.map(lambda r: Row(a=r.a, b=r.a))
df = sqlCtx.createDataFrame(rdd, StructType(df.schema.fields +
[StructField('b', IntegerType())]))
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}
Fail: createDataFrame(RDD.zip) instead of withColumn(udf)
{code}
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df = df.repartition(100)
rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a,
b=y.b))
df = sqlCtx.createDataFrame(rdd, StructType(df.schema.fields +
[StructField('b', IntegerType())]))
len([r for r in df.collect() if r.a != r.b]) # Should be 0
{code}
Fail: RDD.zip after DataFrame.repartition
{code}
df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
df = df.repartition(100)
rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a,
b=y.b))
len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
{code}
Fail: RDD.zip after RDD.repartition after 100 starting partitions
- Failure requires ≥3 workers (whether dist or pseudo-dist)
{code}
rdd = sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100)
rdd = rdd.repartition(100)
rdd = rdd.zip(rdd.map(lambda a: a)).map(lambda (a,b): Row(a=a,b=b))
len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
{code}
Ok: RDD.zip after RDD.repartition after 1 starting partition
{code}
rdd = sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=1)
rdd = rdd.repartition(100)
rdd = rdd.zip(rdd.map(lambda a: a)).map(lambda (a,b): Row(a=a,b=b))
len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
{code}
Test setup:
- local\[8]: {{MASTER=local\[8]}}
- pseudo-dist\[N]: 1 driver + 1 master + N workers; master and workers all on
same OS
- dist\[N]: 1 driver + 1 master + N workers; master and workers all on separate
OS's
- Spark 1.3.0-cdh5.4.5 with dist\[4] didn't trip any of the {{withColumn}}
failures, but did trip the {{zip}} failures
- {{-}} indicates a configuration I didn't try
{code}
"Ok" tests pass? "Fail" tests pass? platform cluster mode spark
version
----------------------------------------------------------------
yes yes ubuntu local[8]
1.3.0-cdh5.4.5
- - ubuntu pseudo-dist[1]
1.3.0-cdh5.4.5
- - ubuntu pseudo-dist[2]
1.3.0-cdh5.4.5
yes no[zip], yes[withColumn] ubuntu dist[4]
1.3.0-cdh5.4.5
yes yes osx local[8] 1.4.1
yes yes ubuntu local[8] 1.4.1
yes yes osx pseudo-dist[1] 1.4.1
- - ubuntu pseudo-dist[1] 1.4.1
yes no osx pseudo-dist[2] 1.4.1
- - ubuntu pseudo-dist[2] 1.4.1
- - osx dist[4] 1.4.1
yes no ubuntu dist[4] 1.4.1
yes yes osx local[8] 1.5.0
yes yes ubuntu local[8] 1.5.0
yes yes osx pseudo-dist[1] 1.5.0
yes yes ubuntu pseudo-dist[1] 1.5.0
yes no osx pseudo-dist[2] 1.5.0
yes no ubuntu pseudo-dist[2] 1.5.0
- - osx dist[4] 1.5.0
yes no ubuntu dist[4] 1.5.0
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]