[
https://issues.apache.org/jira/browse/SPARK-22232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472714#comment-16472714
]
Bryan Cutler commented on SPARK-22232:
--------------------------------------
I'm closing the PR for now, will reopen for Spark 3.0.0. The fix makes the
behavior consistent but might cause a breaking change for some users. It's not
necessary to put the fix in and safeguard with a config because there are
workarounds. For example, this is a workaround for the code in the description:
{code}
from pyspark.sql.types import *
from pyspark.sql import *
UnsortedRow = Row("a", "c", "b")
def toRow(i):
return UnsortedRow("1", 3.0, 2)
schema = StructType([
StructField("a", StringType(), False),
StructField("c", FloatType(), False),
StructField("b", IntegerType(), False),
])
rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))
print rdd.repartition(3).toDF(schema).take(2)
# [Row(a=u'1', c=3.0, b=2), Row(a=u'1', c=3.0, b=2)]
{code}
> Row objects in pyspark created using the `Row(**kwars)` syntax do not get
> serialized/deserialized properly
> ----------------------------------------------------------------------------------------------------------
>
> Key: SPARK-22232
> URL: https://issues.apache.org/jira/browse/SPARK-22232
> Project: Spark
> Issue Type: Bug
> Components: PySpark, SQL
> Affects Versions: 2.2.0
> Reporter: Bago Amirbekian
> Priority: Major
>
> The fields in a Row object created from a dict (ie {{Row(**kwargs)}}) should
> be accessed by field name, not by position because {{Row.__new__}} sorts the
> fields alphabetically by name. It seems like this promise is not being
> honored when these Row objects are shuffled. I've included an example to help
> reproduce the issue.
> {code:none}
> from pyspark.sql.types import *
> from pyspark.sql import *
> def toRow(i):
> return Row(a="a", c=3.0, b=2)
> schema = StructType([
> # Putting fields in alphabetical order masks the issue
> StructField("a", StringType(), False),
> StructField("c", FloatType(), False),
> StructField("b", IntegerType(), False),
> ])
> rdd = sc.parallelize(range(10)).repartition(2).map(lambda i: toRow(i))
> # As long as we don't shuffle things work fine.
> print rdd.toDF(schema).take(2)
> # If we introduce a shuffle we have issues
> print rdd.repartition(3).toDF(schema).take(2)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]