[ 
https://issues.apache.org/jira/browse/SPARK-44805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Robbins updated SPARK-44805:
----------------------------------
    Labels: correctness  (was: )

> Data lost after union using 
> spark.sql.parquet.enableNestedColumnVectorizedReader=true
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-44805
>                 URL: https://issues.apache.org/jira/browse/SPARK-44805
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.3.1
>         Environment: pySpark, linux, hadoop, parquet. 
>            Reporter: Jakub Wozniak
>            Priority: Major
>              Labels: correctness
>
> When union-ing two DataFrames read from parquet containing nested structures 
> (2 fields of array types where one is double and second is integer) data from 
> the second field seems to be lost (zeros are set instead). 
> This seems to be the case only if nested vectorised reader is used 
> (spark.sql.parquet.enableNestedColumnVectorizedReader=true). 
> The following Python code reproduces the problem: 
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql.types import *
> # PREPARING DATA
> data1 = []
> data2 = []
> for i in range(2): 
>     data1.append( (([1,2,3],[1,1,2]),i))
>     data2.append( (([1.0,2.0,3.0],[1,1]),i+10))
> schema1 = StructType([
>         StructField('value', StructType([
>              StructField('f1', ArrayType(IntegerType()), True),
>              StructField('f2', ArrayType(IntegerType()), True)             
>              ])),
>          StructField('id', IntegerType(), True)
> ])
> schema2 = StructType([
>         StructField('value', StructType([
>              StructField('f1', ArrayType(DoubleType()), True),
>              StructField('f2', ArrayType(IntegerType()), True)             
>              ])),
>          StructField('id', IntegerType(), True)
> ])
> spark = SparkSession.builder.getOrCreate()
> data_dir = "/user/<user>/"
> df1 = spark.createDataFrame(data1, schema1)
> df1.write.mode('overwrite').parquet(data_dir + "data1") 
> df2 = spark.createDataFrame(data2, schema2)
> df2.write.mode('overwrite').parquet(data_dir + "data2") 
> # READING DATA
> parquet1 = spark.read.parquet(data_dir + "data1")
> parquet2 = spark.read.parquet(data_dir + "data2")
> # UNION
> out = parquet1.union(parquet2)
> parquet1.select("value.f2").distinct().show()
> out.select("value.f2").distinct().show()
> print(parquet1.collect())
> print(out.collect()) {code}
> Output: 
> {code:java}
> +---------+
> |       f2|
> +---------+
> |[1, 1, 2]|
> +---------+
> +---------+
> |       f2|
> +---------+
> |[0, 0, 0]|
> |   [1, 1]|
> +---------+
> [
> Row(value=Row(f1=[1, 2, 3], f2=[1, 1, 2]), id=0), 
> Row(value=Row(f1=[1, 2, 3], f2=[1, 1, 2]), id=1)
> ]
> [
> Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[0, 0, 0]), id=0), 
> Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[0, 0, 0]), id=1), 
> Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[1, 1]), id=10), 
> Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[1, 1]), id=11)
> ] {code}
> Please notice that values for the field f2 are lost after the union is done. 
> This only happens when this data is read from parquet files. 
> Could you please look into this? 
> Best regards,
> Jakub



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to