[ 
https://issues.apache.org/jira/browse/SPARK-45194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17766198#comment-17766198
 ] 

Ivan Sadikov commented on SPARK-45194:
--------------------------------------

cc [~gengliang] [~cloud_fan]

> Parquet reads fail with "RuntimeException: Unable to create Parquet converter 
> for data type "timestamp_ntz" due to incorrect schema inference
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-45194
>                 URL: https://issues.apache.org/jira/browse/SPARK-45194
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.1, 3.5.0, 4.0.0
>            Reporter: Ivan Sadikov
>            Priority: Major
>
> I found that Parquet reads could fail due to incorrect schema inference with 
> two conflicting types exist in files. This is caused by the fact that schema 
> inference only considers one file by default which could contain different 
> types than what in other file.
> We have {{spark.sql.parquet.mergeSchema}} is set to `false` by default. This 
> causes schema inference to pick a file (depending on the order the file 
> system returns files) and infer schema based on that file. However, if you 
> have conflicting types or a smaller/narrower type is selected, instead of 
> failing during schema inference, an exception is thrown during the subsequent 
> read.
> In this case, we infer schema based on the file with TIMESTAMP_NTZ and fail 
> to read the file that contains TIMESTAMP_LTZ:
> {code:java}
> [info]   Cause: java.lang.RuntimeException: Unable to create Parquet 
> converter for data type "timestamp_ntz" whose Parquet type is int64 
> time(TIMESTAMP(MILLIS,true))
> [info]   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.convertErrorForTimestampNTZ(ParquetVectorUpdaterFactory.java:209)
> [info]   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.validateTimestampType(ParquetVectorUpdaterFactory.java:203)
> [info]   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:121)
> [info]   at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:175){code}
> Note that if the file with TIMESTAMP_LTZ is selected, the read succeeds.
>  
> Here is the repro as a unit test that you can run in Spark master. Just add 
> the test to ParquetIOSuite or some other test suite.
> {code:java}
> import org.apache.hadoop.conf._
> import org.apache.hadoop.fs._
> import org.apache.parquet.example.data.simple._
> import org.apache.parquet.hadoop.example._
> import org.apache.parquet.schema._// Creates a Parquet file with two simple 
> columns: integer and timestamp.
> // Depending on isUTC flag, the timestamp is either NTZ or LTZ.
> private def createParquetFile(path: String, isUTC: Boolean): Unit = {
>   val schema = MessageTypeParser.parseMessageType(
>     s"""
>     message schema {
>       optional int32 a;
>       optional int64 ts (TIMESTAMP(MILLIS, $isUTC));
>     }
>     """
>   )
>   val conf = new Configuration(false)
>   conf.set("parquet.example.schema", schema.toString)
>   val writer = ExampleParquetWriter.builder(new 
> Path(path)).withConf(conf).build()
>   for (i <- 0 until 2) {
>     val group = new SimpleGroup(schema)
>     group.add("a", 1)
>     group.add("ts", System.currentTimeMillis)
>     writer.write(group)
>   }
>   writer.close()
> }test("repro") {
>   withTempPath { dir =>
>     createParquetFile(dir + "/file-1.parquet", false) // NTZ
>     createParquetFile(dir + "/file-2.parquet", true) // LTZ    val df = 
> spark.read.parquet(dir.getAbsolutePath)
>     df.show() // fails
>   }
> } {code}
> If you run the repro as is, you will get: 
> {code:java}
> [info]   Cause: java.lang.RuntimeException: Unable to create Parquet 
> converter for data type "timestamp_ntz" whose Parquet type is int64 
> time(TIMESTAMP(MILLIS,true)) {code}
> If you swap the files (file names), the read succeeds.
> {code:java}
> +---+--------------------+
> |  a|                  ts|
> +---+--------------------+
> |  1|2023-09-17 21:59:...|
> |  1|2023-09-17 21:59:...|
> |  1|2023-09-17 21:59:...|
> |  1|2023-09-17 21:59:...|
> +---+--------------------+ {code}
> If you set spark.sql.parquet.mergeSchema to true, the schema inference fails 
> with
> {code:java}
> [info]   org.apache.spark.SparkException: [CANNOT_MERGE_SCHEMAS] Failed 
> merging schemas:
> [info] Initial schema:
> [info] "STRUCT<a: INT, ts: TIMESTAMP_NTZ>"
> [info] Schema that cannot be merged with the initial schema:
> [info] "STRUCT<a: INT, ts: TIMESTAMP>". {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to