[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-32317:
------------------------------------

    Assignee: Apache Spark

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> ---------------------------------------------------------------------------------------------
>
>                 Key: SPARK-32317
>                 URL: https://issues.apache.org/jira/browse/SPARK-32317
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.0.0
>         Environment: Its failing in all environments that I tried.
>            Reporter: Krish
>            Assignee: Apache Spark
>            Priority: Major
>              Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-----+---------------+
> |ID|       AMOUNT|
> +-----+---------------+
> |1|        1.950000|
> |3|        0.019834|
> |1|19500.000000|
> |2|    198.340000|
> +-----+---------------+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*****.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
>  at 
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69)
>  at scala.Option.orElse(Option.scala:447) at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:69)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:63)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80)
>  at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:141)
>  at 
> org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:225)
>  at scala.Option.map(Option.scala:230) at 
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:206) at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:674) at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at 
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at 
> py4j.Gateway.invoke(Gateway.java:282) at 
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at 
> py4j.commands.CallCommand.execute(CallCommand.java:79) at 
> py4j.GatewayConnection.run(GatewayConnection.java:238) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.spark.SparkException: Failed to merge fields 'AMOUNT' and 
> 'AMOUNT'. Failed to merge decimal types with incompatible scala 2 and 6 at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:559) 
> at scala.Option.map(Option.scala:230) at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:551) 
> at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted(StructType.scala:548)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.types.StructType$.merge(StructType.scala:548) at 
> org.apache.spark.sql.types.StructType.merge(StructType.scala:458) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:97)
>  ... 30 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to