[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17160370#comment-17160370
 ] 

JinxinTang edited comment on SPARK-32317 at 7/18/20, 8:51 AM:
--------------------------------------------------------------

Please try use `spark.read.*option("mergeSchema", true)*.parquet`if need to 
check the schema, and we can get the following exception when decimal is not 
compatible
{code:java}
//
scala> spark.sql("select cast(19500.00 as decimal(15,2)) as 
b").write.mode("append").parquet("file:///tmp/schema")
                                                                                
scala> spark.sql("select cast(19500.00 as decimal(15,6)) as 
b").write.mode("append").parquet("file:///tmp/schema")scala> 
spark.read.option("mergeSchema", true).parquet("file:///tmp/schema").show()
org.apache.spark.SparkException: Failed merging schema:
root
 |-- b: decimal(15,6) (nullable = false)  at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
  at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at 
org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
  at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:493)
  at 
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
  at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:163)
  at 
org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:193)
  at scala.Option.orElse(Option.scala:447)
  at 
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:190)
  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:401)
  at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
  at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:737)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:715)
  ... 47 elided
Caused by: org.apache.spark.SparkException: Failed to merge fields 'b' and 'b'. 
Failed to merge decimal types with incompatible scala 2 and 6
  at 
org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593){code}
test version: spark-3.0.0-bin-hadoop2.7, spark-3.0.0-bin-hadoop2.7


was (Author: jinxintang):
Please try use `spark.read.*option("mergeSchema", true)*.parquet`if need to 
check the schema, and we can get the following exception when decimal is not 
compatible `org.apache.spark.SparkException: Failed to merge fields 'b' and 
'b'. Failed to merge decimal types with incompatible scala 6 and 2`,

test enviroment: spark-3.0.0-bin-hadoop2.7 and spark-3.0.0-bin-hadoop2.7

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> ---------------------------------------------------------------------------------------------
>
>                 Key: SPARK-32317
>                 URL: https://issues.apache.org/jira/browse/SPARK-32317
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.0.0
>         Environment: Its failing in all environments that I tried.
>            Reporter: Krish
>            Priority: Major
>              Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-----+---------------+
> |ID|       AMOUNT|
> +-----+---------------+
> |1|        1.950000|
> |3|        0.019834|
> |1|19500.000000|
> |2|    198.340000|
> +-----+---------------+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*****.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
>  at 
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69)
>  at scala.Option.orElse(Option.scala:447) at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:69)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:63)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80)
>  at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:141)
>  at 
> org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:225)
>  at scala.Option.map(Option.scala:230) at 
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:206) at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:674) at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at 
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at 
> py4j.Gateway.invoke(Gateway.java:282) at 
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at 
> py4j.commands.CallCommand.execute(CallCommand.java:79) at 
> py4j.GatewayConnection.run(GatewayConnection.java:238) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.spark.SparkException: Failed to merge fields 'AMOUNT' and 
> 'AMOUNT'. Failed to merge decimal types with incompatible scala 2 and 6 at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:559) 
> at scala.Option.map(Option.scala:230) at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:551) 
> at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted(StructType.scala:548)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.types.StructType$.merge(StructType.scala:548) at 
> org.apache.spark.sql.types.StructType.merge(StructType.scala:458) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:97)
>  ... 30 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to