David Lyness created SPARK-26200:
------------------------------------
Summary: Column values are incorrectly transposed when a field in
a PySpark Row requires serialization
Key: SPARK-26200
URL: https://issues.apache.org/jira/browse/SPARK-26200
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 2.4.0
Environment: {noformat}
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Scala version 2.11.12, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_144
Branch
Compiled by user on 2018-10-29T06:22:05Z{noformat}
The same issue is observed when PySpark is run on both macOS 10.13.6 and CentOS
7, so this appears to be a cross-platform issue.
Reporter: David Lyness
h2. Description of issue
Whenever a field in a PySpark {{Row}} requires serialization (such as a
{{DateType}} or {{TimestampType}}), the DataFrame generated by the code below
will assign column values *in alphabetical order*, rather than assigning each
column value to its specified columns.
h3. Code to reproduce:
{code:java}
import datetime
from pyspark.sql import Row
from pyspark.sql.session import SparkSession
from pyspark.sql.types import DateType, StringType, StructField, StructType
spark = SparkSession.builder.getOrCreate()
schema = StructType([
StructField("date_column", DateType()),
StructField("my_b_column", StringType()),
StructField("my_a_column", StringType()),
])
spark.createDataFrame([Row(
date_column=datetime.date.today(),
my_b_column="my_b_value",
my_a_column="my_a_value"
)], schema).show()
{code}
h3. Expected result:
{noformat}
+-----------+-----------+-----------+
|date_column|my_b_column|my_a_column|
+-----------+-----------+-----------+
| 2018-11-28| my_b_value| my_a_value|
+-----------+-----------+-----------+{noformat}
h3. Actual result:
{noformat}
+-----------+-----------+-----------+
|date_column|my_b_column|my_a_column|
+-----------+-----------+-----------+
| 2018-11-28| my_a_value| my_b_value|
+-----------+-----------+-----------+{noformat}
(Note that {{my_a_value}} and {{my_b_value}} are transposed.)
h2. Analysis of issue
Reviewing [the relevant code on
GitHub|https://github.com/apache/spark/blame/master/python/pyspark/sql/types.py#L593-L622],
there are two relevant conditional blocks:
{code:java}
if self._needSerializeAnyField:
# Block 1, does not work correctly
else:
# Block 2, works correctly
{code}
{{Row}} is implemented as both a tuple of alphabetically-sorted columns, and a
dictionary of named columns. In Block 2, there is a conditional that works
specifically to serialize a {{Row}} object:
{code:java}
elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False):
return tuple(obj[n] for n in self.names)
{code}
There is no such condition in Block 1, so we fall into this instead:
{code:java}
elif isinstance(obj, (tuple, list)):
return tuple(f.toInternal(v) if c else v
for f, v, c in zip(self.fields, obj, self._needConversion))
{code}
The behaviour in the {{zip}} call is wrong, since {{obj}} (the {{Row}}) will
return a different ordering than the schema fields. So we end up with:
{code:java}
(date, date, True),
(b, a, False),
(a, b, False)
{code}
h2. Workarounds
Correct behaviour is observed if you use a Python {{list}} or {{dict}} instead
of PySpark's {{Row}} object:
{code:java}
# Using a list works
spark.createDataFrame([[
datetime.date.today(),
"my_b_value",
"my_a_value"
]], schema)
# Using a dict also works
spark.createDataFrame([{
"date_column": datetime.date.today(),
"my_b_column": "my_b_value",
"my_a_column": "my_a_value"
}], schema){code}
Correct behaviour is also observed if you have no fields that require
serialization; in this example, changing {{date_column}} to {{StringType}}
avoids the correctness issue.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]