[
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apache Spark reassigned SPARK-32317:
------------------------------------
Assignee: (was: Apache Spark)
> Parquet file loading with different schema(Decimal(N, P)) in files is not
> working as expected
> ---------------------------------------------------------------------------------------------
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
> Reporter: Krish
> Priority: Major
> Labels: easyfix
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> Hi,
>
> We generate parquet files which are partitioned on Date on a daily basis, and
> we send updates to historical data some times, what we noticed is due to some
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields.
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the
> files we send as updates has schema like DECIMAL(15,2).
>
> Having two different schema in a Date partition and when we load the data of
> a Date into spark, it is loading the data but the amount is getting
> manipulated.
>
> file1.snappy.parquet
> ID: INT
> AMOUNT: DECIMAL(15,6)
> Content:
> 1,19500.00
> 2,198.34
> file2.snappy.parquet
> ID: INT
> AMOUNT: DECIMAL(15,2)
> Content:
> 1,19500.00
> 3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-----+---------------+
> |ID| AMOUNT|
> +-----+---------------+
> |1| 1.950000|
> |3| 0.019834|
> |1|19500.000000|
> |2| 198.340000|
> +-----+---------------+
> x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet
> column cannot be converted in file file*****.snappy.parquet. Column:
> [AMOUNT], Expected: string, Found: INT64"
>
> I know merge schema works if it finds few extra columns in one file but the
> fileds which are in common needs to have same schema. That might nort work
> here.
>
> Looking for some work around solution here. Or if there is an option which I
> havent tried you can point me to that.
>
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. :
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
> at
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
> at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
> at
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44)
> at
> org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69)
> at scala.Option.orElse(Option.scala:447) at
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:69)
> at
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:63)
> at
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82)
> at
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80)
> at
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:141)
> at
> org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:225)
> at scala.Option.map(Option.scala:230) at
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:206) at
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:674) at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at
> py4j.Gateway.invoke(Gateway.java:282) at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at
> py4j.commands.CallCommand.execute(CallCommand.java:79) at
> py4j.GatewayConnection.run(GatewayConnection.java:238) at
> java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.spark.SparkException: Failed to merge fields 'AMOUNT' and
> 'AMOUNT'. Failed to merge decimal types with incompatible scala 2 and 6 at
> org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:559)
> at scala.Option.map(Option.scala:230) at
> org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:551)
> at
> org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted(StructType.scala:548)
> at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at
> org.apache.spark.sql.types.StructType$.merge(StructType.scala:548) at
> org.apache.spark.sql.types.StructType.merge(StructType.scala:458) at
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:97)
> ... 30 more
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]