[
https://issues.apache.org/jira/browse/SPARK-10685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14877372#comment-14877372
]
Apache Spark commented on SPARK-10685:
--------------------------------------
User 'rxin' has created a pull request for this issue:
https://github.com/apache/spark/pull/8835
> Misaligned data with RDD.zip and DataFrame.withColumn after repartition
> -----------------------------------------------------------------------
>
> Key: SPARK-10685
> URL: https://issues.apache.org/jira/browse/SPARK-10685
> Project: Spark
> Issue Type: Bug
> Components: PySpark, SQL
> Affects Versions: 1.3.0, 1.4.1, 1.5.0
> Environment: - OSX 10.10.4, java 1.7.0_51, hadoop 2.6.0-cdh5.4.5
> - Ubuntu 12.04, java 1.7.0_80, hadoop 2.6.0-cdh5.4.5
> Reporter: Dan Brown
> Priority: Blocker
>
> Here's a weird behavior where {{RDD.zip}} or {{DataFrame.withColumn}} after a
> {{repartition}} produces "misaligned" data, meaning different column values
> in the same row aren't matched, as if a zip shuffled the collections before
> zipping them. It's difficult to reproduce because it's nondeterministic,
> doesn't occur in local mode, and requires ≥2 workers (≥3 in one case). I was
> able to repro it using pyspark 1.3.0 (cdh5.4.5), 1.4.1 (bin-without-hadoop),
> and 1.5.0 (bin-without-hadoop).
> Here's the most similar issue I was able to find. It appears to not have been
> repro'd and then closed optimistically, and it smells like it could have been
> the same underlying cause that was never fixed:
> - https://issues.apache.org/jira/browse/SPARK-9131
> Also, this {{DataFrame.zip}} issue is related in spirit, since we were trying
> to build it ourselves when we ran into this problem. Let me put in my vote
> for reopening the issue and supporting {{DataFrame.zip}} in the standard lib.
> - https://issues.apache.org/jira/browse/SPARK-7460
> h3. Brief repro
> Fail: withColumn(udf) after DataFrame.repartition
> {code}
> df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df = df.repartition(100)
> df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
> [r for r in df.collect() if r.a != r.b][:3] # Should be []
> {code}
> Sample outputs (nondeterministic):
> {code}
> [Row(a=39, b=639), Row(a=139, b=739), Row(a=239, b=839)]
> [Row(a=639, b=39), Row(a=739, b=139), Row(a=839, b=239)]
> []
> [Row(a=641, b=41), Row(a=741, b=141), Row(a=841, b=241)]
> [Row(a=641, b=1343), Row(a=741, b=1443), Row(a=841, b=1543)]
> [Row(a=639, b=39), Row(a=739, b=139), Row(a=839, b=239)]
> {code}
> Fail: RDD.zip after DataFrame.repartition
> {code}
> df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df = df.repartition(100)
> rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a,
> b=y.b))
> [r for r in rdd.collect() if r.a != r.b][:3] # Should be []
> {code}
> Sample outputs (nondeterministic):
> {code}
> []
> [Row(a=50, b=6947), Row(a=150, b=7047), Row(a=250, b=7147)]
> []
> []
> [Row(a=44, b=644), Row(a=144, b=744), Row(a=244, b=844)]
> []
> {code}
> Test setup:
> - local\[8]: {{MASTER=local\[8]}}
> - dist\[N]: 1 driver + 1 master + N workers
> {code}
> "Fail" tests pass? cluster mode spark version
> ----------------------------------------------------
> yes local[8] 1.3.0-cdh5.4.5
> no dist[4] 1.3.0-cdh5.4.5
> yes local[8] 1.4.1
> yes dist[1] 1.4.1
> no dist[2] 1.4.1
> no dist[4] 1.4.1
> yes local[8] 1.5.0
> yes dist[1] 1.5.0
> no dist[2] 1.5.0
> no dist[4] 1.5.0
> {code}
> h3. Detailed repro
> Start `pyspark` and run these imports:
> {code}
> from pyspark.sql import Row
> from pyspark.sql.functions import udf
> from pyspark.sql.types import IntegerType, StructType, StructField
> {code}
> Fail: withColumn(udf) after DataFrame.repartition
> {code}
> df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df = df.repartition(100)
> df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Ok: withColumn(udf) after DataFrame.repartition(100) after 1 starting
> partition
> {code}
> df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)),
> numSlices=1))
> df = df.repartition(100)
> df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Fail: withColumn(udf) after DataFrame.repartition(100) after 100 starting
> partitions
> {code}
> df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)),
> numSlices=100))
> df = df.repartition(100)
> df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Fail: withColumn(udf) after DataFrame.repartition(1) after 100 starting
> partitions
> {code}
> df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)),
> numSlices=100))
> df = df.repartition(1)
> df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Ok: withColumn(udf) after DataFrame.coalesce(10) after 100 starting partitions
> {code}
> df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)),
> numSlices=100))
> df = df.coalesce(10)
> df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Ok: withColumn without udf
> {code}
> df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df = df.repartition(100)
> df = df.withColumn('b', df.a)
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Ok: createDataFrame(RDD.map) instead of withColumn(udf)
> {code}
> df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df = df.repartition(100)
> rdd = df.map(lambda r: Row(a=r.a, b=r.a))
> df = sqlCtx.createDataFrame(rdd, StructType(df.schema.fields +
> [StructField('b', IntegerType())]))
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Fail: createDataFrame(RDD.zip) instead of withColumn(udf)
> {code}
> df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df = df.repartition(100)
> rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a,
> b=y.b))
> df = sqlCtx.createDataFrame(rdd, StructType(df.schema.fields +
> [StructField('b', IntegerType())]))
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Fail: RDD.zip after DataFrame.repartition
> {code}
> df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df = df.repartition(100)
> rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a,
> b=y.b))
> len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
> {code}
> Fail: RDD.zip after RDD.repartition after 100 starting partitions
> - Failure requires ≥3 workers (whether dist or pseudo-dist)
> {code}
> rdd = sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100)
> rdd = rdd.repartition(100)
> rdd = rdd.zip(rdd.map(lambda a: a)).map(lambda (a,b): Row(a=a,b=b))
> len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
> {code}
> Ok: RDD.zip after RDD.repartition after 1 starting partition
> {code}
> rdd = sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=1)
> rdd = rdd.repartition(100)
> rdd = rdd.zip(rdd.map(lambda a: a)).map(lambda (a,b): Row(a=a,b=b))
> len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
> {code}
> Test setup:
> - local\[8]: {{MASTER=local\[8]}}
> - pseudo-dist\[N]: 1 driver + 1 master + N workers; master and workers all on
> same OS
> - dist\[N]: 1 driver + 1 master + N workers; master and workers all on
> separate OS's
> - Spark 1.3.0-cdh5.4.5 with dist\[4] didn't trip any of the {{withColumn}}
> failures, but did trip the {{zip}} failures
> - {{-}} indicates a configuration I didn't try
> {code}
> "Ok" tests pass? "Fail" tests pass? platform cluster mode spark
> version
> ----------------------------------------------------------------
> yes yes ubuntu local[8]
> 1.3.0-cdh5.4.5
> - - ubuntu pseudo-dist[1]
> 1.3.0-cdh5.4.5
> - - ubuntu pseudo-dist[2]
> 1.3.0-cdh5.4.5
> yes no[zip], yes[withColumn] ubuntu dist[4]
> 1.3.0-cdh5.4.5
> yes yes osx local[8] 1.4.1
> yes yes ubuntu local[8] 1.4.1
> yes yes osx pseudo-dist[1] 1.4.1
> - - ubuntu pseudo-dist[1] 1.4.1
> yes no osx pseudo-dist[2] 1.4.1
> - - ubuntu pseudo-dist[2] 1.4.1
> - - osx dist[4] 1.4.1
> yes no ubuntu dist[4] 1.4.1
> yes yes osx local[8] 1.5.0
> yes yes ubuntu local[8] 1.5.0
> yes yes osx pseudo-dist[1] 1.5.0
> yes yes ubuntu pseudo-dist[1] 1.5.0
> yes no osx pseudo-dist[2] 1.5.0
> yes no ubuntu pseudo-dist[2] 1.5.0
> - - osx dist[4] 1.5.0
> yes no ubuntu dist[4] 1.5.0
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]