[
https://issues.apache.org/jira/browse/SPARK-44805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun closed SPARK-44805.
---------------------------------
> Data lost after union using
> spark.sql.parquet.enableNestedColumnVectorizedReader=true
> -------------------------------------------------------------------------------------
>
> Key: SPARK-44805
> URL: https://issues.apache.org/jira/browse/SPARK-44805
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.3.1, 3.4.1
> Environment: pySpark, linux, hadoop, parquet.
> Reporter: Jakub Wozniak
> Assignee: Bruce Robbins
> Priority: Major
> Labels: correctness, pull-request-available
> Fix For: 3.4.2, 3.5.1, 3.3.4, 4.0.0
>
>
> When union-ing two DataFrames read from parquet containing nested structures
> (2 fields of array types where one is double and second is integer) data from
> the second field seems to be lost (zeros are set instead).
> This seems to be the case only if nested vectorised reader is used
> (spark.sql.parquet.enableNestedColumnVectorizedReader=true).
> The following Python code reproduces the problem:
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql.types import *
> # PREPARING DATA
> data1 = []
> data2 = []
> for i in range(2):
> data1.append( (([1,2,3],[1,1,2]),i))
> data2.append( (([1.0,2.0,3.0],[1,1]),i+10))
> schema1 = StructType([
> StructField('value', StructType([
> StructField('f1', ArrayType(IntegerType()), True),
> StructField('f2', ArrayType(IntegerType()), True)
> ])),
> StructField('id', IntegerType(), True)
> ])
> schema2 = StructType([
> StructField('value', StructType([
> StructField('f1', ArrayType(DoubleType()), True),
> StructField('f2', ArrayType(IntegerType()), True)
> ])),
> StructField('id', IntegerType(), True)
> ])
> spark = SparkSession.builder.getOrCreate()
> data_dir = "/user/<user>/"
> df1 = spark.createDataFrame(data1, schema1)
> df1.write.mode('overwrite').parquet(data_dir + "data1")
> df2 = spark.createDataFrame(data2, schema2)
> df2.write.mode('overwrite').parquet(data_dir + "data2")
> # READING DATA
> parquet1 = spark.read.parquet(data_dir + "data1")
> parquet2 = spark.read.parquet(data_dir + "data2")
> # UNION
> out = parquet1.union(parquet2)
> parquet1.select("value.f2").distinct().show()
> out.select("value.f2").distinct().show()
> print(parquet1.collect())
> print(out.collect()) {code}
> Output:
> {code:java}
> +---------+
> | f2|
> +---------+
> |[1, 1, 2]|
> +---------+
> +---------+
> | f2|
> +---------+
> |[0, 0, 0]|
> | [1, 1]|
> +---------+
> [
> Row(value=Row(f1=[1, 2, 3], f2=[1, 1, 2]), id=0),
> Row(value=Row(f1=[1, 2, 3], f2=[1, 1, 2]), id=1)
> ]
> [
> Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[0, 0, 0]), id=0),
> Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[0, 0, 0]), id=1),
> Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[1, 1]), id=10),
> Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[1, 1]), id=11)
> ] {code}
> Please notice that values for the field f2 are lost after the union is done.
> This only happens when this data is read from parquet files.
> Could you please look into this?
> Best regards,
> Jakub
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]