[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krish updated SPARK-32317:
--------------------------
    Description: 
Hi,

 

We generate parquet files which are partitioned on Date on a daily basis, and 
we send updates to historical data some times, what we noticed is due to some 
configuration error the patch data schema is inconsistent to earlier files.

Assuming we had files generated with schema having ID and Amount as fields. 
Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
files we send as updates has schema like DECIMAL(15,2). 

 

Having two different schema in a Date partition and when we load the data of a 
Date into spark, it is loading the data but the amount is getting manipulated.

 

file1.snappy.parquet
 ID: INT
 AMOUNT: DECIMAL(15,6)
 Content:
 1,19500.00
 2,198.34

file2.snappy.parquet
 ID: INT
 AMOUNT: DECIMAL(15,2)
 Content:
 1,19500.00
 3,198.34

Load these two files togeather

df3 = spark.read.parquet("output/")

df3.show() #-we can see amount getting manipulated here,

+----+----------------+
|ID|       AMOUNT|

+----+----------------+
|1|        1.950000|
|3|        0.019834|
|1|19500.000000|
|2|    198.340000|

+----+----------------+

 x

Options Tried:

We tried to give schema as String for all fields, but that didt work

df3 = spark.read.format("parquet").schema(schema).load("output/")

Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet column 
cannot be converted in file file*****.snappy.parquet. Column: [AMOUNT], 
Expected: string, Found: INT64"

 

I know merge schema works if it finds few extra columns in one file but the 
fileds which are in common needs to have same schema. That might nort work here.

 

Looking for some work around solution here. Or if there is an option which I 
havent tried you can point me to that.

 

With schema merging I got below eeror:

An error occurred while calling o2272.parquet. : 
org.apache.spark.SparkException: Failed merging schema: root |-- _c0: string 
(nullable = true) |-- _c1: decimal(15,6) (nullable = true) at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
 at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
 at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) 
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
 at 
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44)
 at 
org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69)
 at scala.Option.orElse(Option.scala:447) at 
org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:69)
 at 
org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:63)
 at 
org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82)
 at 
org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80)
 at 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:141)
 at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:225) 
at scala.Option.map(Option.scala:230) at 
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:206) at 
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:674) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at 
py4j.Gateway.invoke(Gateway.java:282) at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at 
py4j.commands.CallCommand.execute(CallCommand.java:79) at 
py4j.GatewayConnection.run(GatewayConnection.java:238) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.spark.SparkException: Failed to merge fields '_c1' and '_c1'. Failed 
to merge decimal types with incompatible scala 2 and 6 at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:559) 
at scala.Option.map(Option.scala:230) at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:551) 
at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted(StructType.scala:548)
 at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) 
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
org.apache.spark.sql.types.StructType$.merge(StructType.scala:548) at 
org.apache.spark.sql.types.StructType.merge(StructType.scala:458) at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:97)
 ... 30 more

  was:
Hi,

 

We generate parquet files which are partitioned on Date on a daily basis, and 
we send updates to historical data some times, what we noticed is due to some 
configuration error the patch data schema is inconsistent to earlier files.

Assuming we had files generated with schema having ID and Amount as fields. 
Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
files we send as updates has schema like DECIMAL(15,2). 

 

Having two different schema in a Date partition and when we load the data of a 
Date into spark, it is loading the data but the amount is getting manipulated.

 

file1.snappy.parquet
ID: INT
AMOUNT: DECIMAL(15,6)
Content:
1,19500.00
2,198.34

file2.snappy.parquet
ID: INT
AMOUNT: DECIMAL(15,2)
Content:
1,19500.00
3,198.34

Load these two files togeather

df3 = spark.read.parquet("output/")

df3.show() #-we can see amount getting manipulated here,

+---+-----------------+
|ID|       AMOUNT|
+---+-----------------+
| 1|        1.950000|
| 3|        0.019834|
| 1|19500.000000|
| 2|    198.340000|
+---+-----------------+

 

Options Tried:

We tried to give schema as String for all fields, but that didt work

df3 = spark.read.format("parquet").schema(schema).load("output/")

Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet column 
cannot be converted in file file*****.snappy.parquet. Column: [AMOUNT], 
Expected: string, Found: INT64"

 

I know merge schema works if it finds few extra columns in one file but the 
fileds which are in common needs to have same schema. That might nort work here.

 

Looking for some work around solution here. Or if there is an option which I 
havent tried you can point me to that.


> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> ---------------------------------------------------------------------------------------------
>
>                 Key: SPARK-32317
>                 URL: https://issues.apache.org/jira/browse/SPARK-32317
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.0.0
>         Environment: Its failing in all environments that I tried.
>            Reporter: Krish
>            Priority: Major
>              Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +----+----------------+
> |ID|       AMOUNT|
> +----+----------------+
> |1|        1.950000|
> |3|        0.019834|
> |1|19500.000000|
> |2|    198.340000|
> +----+----------------+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*****.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- _c0: string 
> (nullable = true) |-- _c1: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
>  at 
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69)
>  at scala.Option.orElse(Option.scala:447) at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:69)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:63)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80)
>  at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:141)
>  at 
> org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:225)
>  at scala.Option.map(Option.scala:230) at 
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:206) at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:674) at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at 
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at 
> py4j.Gateway.invoke(Gateway.java:282) at 
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at 
> py4j.commands.CallCommand.execute(CallCommand.java:79) at 
> py4j.GatewayConnection.run(GatewayConnection.java:238) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.spark.SparkException: Failed to merge fields '_c1' and '_c1'. 
> Failed to merge decimal types with incompatible scala 2 and 6 at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:559) 
> at scala.Option.map(Option.scala:230) at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:551) 
> at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted(StructType.scala:548)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.types.StructType$.merge(StructType.scala:548) at 
> org.apache.spark.sql.types.StructType.merge(StructType.scala:458) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:97)
>  ... 30 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to