[ https://issues.apache.org/jira/browse/SPARK-45194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17766198#comment-17766198 ]
Ivan Sadikov commented on SPARK-45194: -------------------------------------- cc [~gengliang] [~cloud_fan] > Parquet reads fail with "RuntimeException: Unable to create Parquet converter > for data type "timestamp_ntz" due to incorrect schema inference > --------------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-45194 > URL: https://issues.apache.org/jira/browse/SPARK-45194 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.4.1, 3.5.0, 4.0.0 > Reporter: Ivan Sadikov > Priority: Major > > I found that Parquet reads could fail due to incorrect schema inference with > two conflicting types exist in files. This is caused by the fact that schema > inference only considers one file by default which could contain different > types than what in other file. > We have {{spark.sql.parquet.mergeSchema}} is set to `false` by default. This > causes schema inference to pick a file (depending on the order the file > system returns files) and infer schema based on that file. However, if you > have conflicting types or a smaller/narrower type is selected, instead of > failing during schema inference, an exception is thrown during the subsequent > read. > In this case, we infer schema based on the file with TIMESTAMP_NTZ and fail > to read the file that contains TIMESTAMP_LTZ: > {code:java} > [info] Cause: java.lang.RuntimeException: Unable to create Parquet > converter for data type "timestamp_ntz" whose Parquet type is int64 > time(TIMESTAMP(MILLIS,true)) > [info] at > org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.convertErrorForTimestampNTZ(ParquetVectorUpdaterFactory.java:209) > [info] at > org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.validateTimestampType(ParquetVectorUpdaterFactory.java:203) > [info] at > org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:121) > [info] at > org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:175){code} > Note that if the file with TIMESTAMP_LTZ is selected, the read succeeds. > > Here is the repro as a unit test that you can run in Spark master. Just add > the test to ParquetIOSuite or some other test suite. > {code:java} > import org.apache.hadoop.conf._ > import org.apache.hadoop.fs._ > import org.apache.parquet.example.data.simple._ > import org.apache.parquet.hadoop.example._ > import org.apache.parquet.schema._// Creates a Parquet file with two simple > columns: integer and timestamp. > // Depending on isUTC flag, the timestamp is either NTZ or LTZ. > private def createParquetFile(path: String, isUTC: Boolean): Unit = { > val schema = MessageTypeParser.parseMessageType( > s""" > message schema { > optional int32 a; > optional int64 ts (TIMESTAMP(MILLIS, $isUTC)); > } > """ > ) > val conf = new Configuration(false) > conf.set("parquet.example.schema", schema.toString) > val writer = ExampleParquetWriter.builder(new > Path(path)).withConf(conf).build() > for (i <- 0 until 2) { > val group = new SimpleGroup(schema) > group.add("a", 1) > group.add("ts", System.currentTimeMillis) > writer.write(group) > } > writer.close() > }test("repro") { > withTempPath { dir => > createParquetFile(dir + "/file-1.parquet", false) // NTZ > createParquetFile(dir + "/file-2.parquet", true) // LTZ val df = > spark.read.parquet(dir.getAbsolutePath) > df.show() // fails > } > } {code} > If you run the repro as is, you will get: > {code:java} > [info] Cause: java.lang.RuntimeException: Unable to create Parquet > converter for data type "timestamp_ntz" whose Parquet type is int64 > time(TIMESTAMP(MILLIS,true)) {code} > If you swap the files (file names), the read succeeds. > {code:java} > +---+--------------------+ > | a| ts| > +---+--------------------+ > | 1|2023-09-17 21:59:...| > | 1|2023-09-17 21:59:...| > | 1|2023-09-17 21:59:...| > | 1|2023-09-17 21:59:...| > +---+--------------------+ {code} > If you set spark.sql.parquet.mergeSchema to true, the schema inference fails > with > {code:java} > [info] org.apache.spark.SparkException: [CANNOT_MERGE_SCHEMAS] Failed > merging schemas: > [info] Initial schema: > [info] "STRUCT<a: INT, ts: TIMESTAMP_NTZ>" > [info] Schema that cannot be merged with the initial schema: > [info] "STRUCT<a: INT, ts: TIMESTAMP>". {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org