ganczarek opened a new issue, #8756:
URL: https://github.com/apache/hudi/issues/8756
**Describe the problem you faced**
Hudi evolves schema for nested fields in unsupported way (bool -> str and
timestamp -> str), writes corrupted data, alters Hive table schema and later
fails to read it.
**To Reproduce**
```
case class TestClass(bool_value: Boolean, timestamp_value: Timestamp)
Seq(
("1", "a", TestClass(true,
Timestamp.from(LocalDateTime.now().toInstant(ZoneOffset.UTC)))),
("1", "b", TestClass(false,
Timestamp.from(LocalDateTime.now().toInstant(ZoneOffset.UTC))))
).toDF("partition", "id", "detail")
.write
.format("org.apache.hudi")
.options(
Map(
HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> "id",
KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key -> "id",
HoodieWriteConfig.TBL_NAME.key -> "test_table",
HiveSyncConfig.HIVE_SYNC_ENABLED.key -> "true",
HiveSyncConfig.HIVE_URL.key -> "jdbc:hive2://localhost:10000",
HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key -> "true",
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> "test_db",
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key ->
classOf[NonPartitionedExtractor].getName,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "test_table",
KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key -> "partition",
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key -> "true",
)
)
.mode(SaveMode.Overwrite)
.save(s"s3://$s3Bucket/temp/test_db/test_table")
case class TestClass2(bool_value: String, timestamp_value: String)
Seq(
("2", "c", TestClass2("str1", LocalDateTime.now().toString)),
("2", "d", TestClass2("str2", LocalDateTime.now().toString))
).toDF("partition", "id", "detail")
.write
.format("org.apache.hudi")
.options(
Map(
HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> "id",
KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key -> "id",
HoodieWriteConfig.TBL_NAME.key -> "test_table",
HiveSyncConfig.HIVE_SYNC_ENABLED.key -> "true",
HiveSyncConfig.HIVE_URL.key -> "jdbc:hive2://localhost:10000",
HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key -> "true",
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> "test_db",
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key ->
classOf[NonPartitionedExtractor].getName,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "test_table",
KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key -> "partition",
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key -> "true",
)
)
.mode(SaveMode.Append)
.save(s"s3://$s3Bucket/temp/test_db/test_table")
spark.read.format("org.apache.hudi").load(s"s3://$s3Bucket/temp/test_db/test_table").show(false)
```
**Expected behavior**
Hudi should not succeed to write data if field types evolve in unsupported
way. There are two things that I think go wrong here: 1) Hudi creates a new
partition with invalid Parquet files what corrupts the table, and 2) Hudi hive
sync alters Hive table schema in a way that it doesn't support later (bool ->
str, bigint -> str).
**Environment Description**
* Hudi version : 0.12.2-amzn-0 (AWS EMR-6.10.0)
* Spark version : 3.3.1
* Hive version : 3.1.3
* Hadoop version : 3.3.3
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : no
**Additional context**
I've played with this issue a little bit and it seems that the problem is a
combination of the fact that fields are nested and new partitions are created.
When fields are at root level (not nested) and table is not partitioned,
then when field type changes from bool to string or from timestamp to string,
Hudi fails to merge records, commit is aborted and hive sync is never executed.
If fields are at root level, but table is partitioned, then when new data
with changed field types is written to a new partition, then Hudi write is
successful, but Hudi hive sync fails with the following error:
```
Caused by: org.apache.hudi.exception.HoodieException: Got runtime exception
when hive syncing test_table_2
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:145)
at
org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:58)
... 95 more
Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Could not convert
field Type from BIGINT to string for field timestamp_value
at
org.apache.hudi.hive.util.HiveSchemaUtil.getSchemaDifference(HiveSchemaUtil.java:109)
at org.apache.hudi.hive.HiveSyncTool.syncSchema(HiveSyncTool.java:285)
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:217)
at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:154)
at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:142)
... 96 more
```
When both fields are nested and table is partitioned, then write and Hive
sync operations are successful.
**Stacktrace**
Error message when reading from corrupted Hive table:
```
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 777.0 failed 4 times, most recent failure: Lost task 0.3 in stage 777.0
(TID 866) (ip-10-203-88-126.eu-west-2.compute.internal executor 16):
org.apache.spark.sql.execution.QueryExecutionException: Encountered error while
reading file
s3://REDACTED/temp/test_db/test_table/partition=1/a08715d6-5d14-49ed-b15d-309ba3c09252-0_0-711-793_20230517125708406.parquet.
Details:
at
org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:731)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:402)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:227)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown
Source)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:955)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:383)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
value at 0 in block -1 in file
s3://REDACTED/temp/test_db/test_table/partition=1/a08715d6-5d14-49ed-b15d-309ba3c09252-0_0-711-793_20230517125708406.parquet
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
at
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at
org.apache.spark.sql.execution.datasources.RecordReaderIterator$$anon$1.hasNext(RecordReaderIterator.scala:61)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:227)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:393)
... 20 more
Caused by: java.lang.UnsupportedOperationException:
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
at
org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetStringConverter.$anonfun$setDictionary$1(ParquetRowConverter.scala:507)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetStringConverter.$anonfun$setDictionary$1$adapted(ParquetRowConverter.scala:506)
at scala.Array$.tabulate(Array.scala:418)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetStringConverter.setDictionary(ParquetRowConverter.scala:506)
at
org.apache.parquet.column.impl.ColumnReaderBase.<init>(ColumnReaderBase.java:415)
at
org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:46)
at
org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:82)
at
org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
at
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
at
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
at
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:177)
at
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225)
... 25 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]