[ https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-32317: ------------------------------------ Assignee: Apache Spark > Parquet file loading with different schema(Decimal(N, P)) in files is not > working as expected > --------------------------------------------------------------------------------------------- > > Key: SPARK-32317 > URL: https://issues.apache.org/jira/browse/SPARK-32317 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.0.0 > Environment: Its failing in all environments that I tried. > Reporter: Krish > Assignee: Apache Spark > Priority: Major > Labels: easyfix > Original Estimate: 24h > Remaining Estimate: 24h > > Hi, > > We generate parquet files which are partitioned on Date on a daily basis, and > we send updates to historical data some times, what we noticed is due to some > configuration error the patch data schema is inconsistent to earlier files. > Assuming we had files generated with schema having ID and Amount as fields. > Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the > files we send as updates has schema like DECIMAL(15,2). > > Having two different schema in a Date partition and when we load the data of > a Date into spark, it is loading the data but the amount is getting > manipulated. > > file1.snappy.parquet > ID: INT > AMOUNT: DECIMAL(15,6) > Content: > 1,19500.00 > 2,198.34 > file2.snappy.parquet > ID: INT > AMOUNT: DECIMAL(15,2) > Content: > 1,19500.00 > 3,198.34 > Load these two files togeather > df3 = spark.read.parquet("output/") > df3.show() #-we can see amount getting manipulated here, > +-----+---------------+ > |ID| AMOUNT| > +-----+---------------+ > |1| 1.950000| > |3| 0.019834| > |1|19500.000000| > |2| 198.340000| > +-----+---------------+ > x > Options Tried: > We tried to give schema as String for all fields, but that didt work > df3 = spark.read.format("parquet").schema(schema).load("output/") > Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet > column cannot be converted in file file*****.snappy.parquet. Column: > [AMOUNT], Expected: string, Found: INT64" > > I know merge schema works if it finds few extra columns in one file but the > fileds which are in common needs to have same schema. That might nort work > here. > > Looking for some work around solution here. Or if there is an option which I > havent tried you can point me to that. > > With schema merging I got below eeror: > An error occurred while calling o2272.parquet. : > org.apache.spark.SparkException: Failed merging schema: root |-- ID: string > (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at > org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100) > at > org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at > org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107) > at > org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44) > at > org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69) > at scala.Option.orElse(Option.scala:447) at > org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:69) > at > org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:63) > at > org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82) > at > org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80) > at > org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:141) > at > org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:225) > at scala.Option.map(Option.scala:230) at > org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:206) at > org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:674) at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at > py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at > py4j.Gateway.invoke(Gateway.java:282) at > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at > py4j.commands.CallCommand.execute(CallCommand.java:79) at > py4j.GatewayConnection.run(GatewayConnection.java:238) at > java.lang.Thread.run(Thread.java:748) Caused by: > org.apache.spark.SparkException: Failed to merge fields 'AMOUNT' and > 'AMOUNT'. Failed to merge decimal types with incompatible scala 2 and 6 at > org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:559) > at scala.Option.map(Option.scala:230) at > org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:551) > at > org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted(StructType.scala:548) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at > org.apache.spark.sql.types.StructType$.merge(StructType.scala:548) at > org.apache.spark.sql.types.StructType.merge(StructType.scala:458) at > org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:97) > ... 30 more -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org