ganczarek opened a new issue, #8756:
URL: https://github.com/apache/hudi/issues/8756

   **Describe the problem you faced**
   
   Hudi evolves schema for nested fields in unsupported way (bool -> str and 
timestamp -> str), writes corrupted data, alters Hive table schema and later 
fails to read it.
   
   **To Reproduce**
   
   ```
   case class TestClass(bool_value: Boolean, timestamp_value: Timestamp)
   Seq(
     ("1", "a", TestClass(true, 
Timestamp.from(LocalDateTime.now().toInstant(ZoneOffset.UTC)))),
     ("1", "b", TestClass(false, 
Timestamp.from(LocalDateTime.now().toInstant(ZoneOffset.UTC))))
   ).toDF("partition", "id", "detail")
     .write
     .format("org.apache.hudi")
     .options(
       Map(
         HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> "id",
         KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key -> "id",
         HoodieWriteConfig.TBL_NAME.key -> "test_table",
         HiveSyncConfig.HIVE_SYNC_ENABLED.key -> "true",
         HiveSyncConfig.HIVE_URL.key -> "jdbc:hive2://localhost:10000",
         HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key -> "true",
         HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> "test_db",
         HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> 
classOf[NonPartitionedExtractor].getName,
         HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "test_table",
         KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key -> "partition",
         KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key -> "true",
       )
     )
     .mode(SaveMode.Overwrite)
     .save(s"s3://$s3Bucket/temp/test_db/test_table")
   
   case class TestClass2(bool_value: String, timestamp_value: String)
   Seq(
     ("2", "c", TestClass2("str1", LocalDateTime.now().toString)),
     ("2", "d", TestClass2("str2", LocalDateTime.now().toString))
   ).toDF("partition", "id", "detail")
     .write
     .format("org.apache.hudi")
     .options(
       Map(
         HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key -> "id",
         KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key -> "id",
         HoodieWriteConfig.TBL_NAME.key -> "test_table",
         HiveSyncConfig.HIVE_SYNC_ENABLED.key -> "true",
         HiveSyncConfig.HIVE_URL.key -> "jdbc:hive2://localhost:10000",
         HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key -> "true",
         HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> "test_db",
         HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> 
classOf[NonPartitionedExtractor].getName,
         HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "test_table",
         KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key -> "partition",
         KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key -> "true",
       )
     )
     .mode(SaveMode.Append)
     .save(s"s3://$s3Bucket/temp/test_db/test_table")
   
   
spark.read.format("org.apache.hudi").load(s"s3://$s3Bucket/temp/test_db/test_table").show(false)
   ```
   
   **Expected behavior**
   
   Hudi should not succeed to write data if field types evolve in unsupported 
way. There are two things that I think go wrong here: 1) Hudi creates a new 
partition with invalid Parquet files what corrupts the table, and 2) Hudi hive 
sync alters Hive table schema in a way that it doesn't support later (bool -> 
str, bigint -> str).
   
   **Environment Description**
   
   * Hudi version : 0.12.2-amzn-0 (AWS EMR-6.10.0)
   
   * Spark version : 3.3.1
   
   * Hive version : 3.1.3
   
   * Hadoop version : 3.3.3
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   I've played with this issue a little bit and it seems that the problem is a 
combination of the fact that fields are nested and new partitions are created.
   
   When fields are at root level (not nested) and table is not partitioned, 
then when field type changes from bool to string or from timestamp to string, 
Hudi fails to merge records, commit is aborted and hive sync is never executed.
   
   If fields are at root level, but table is partitioned, then when new data 
with changed field types is written to a new partition, then Hudi write is 
successful, but Hudi hive sync fails with the following error:
   ```
   Caused by: org.apache.hudi.exception.HoodieException: Got runtime exception 
when hive syncing test_table_2
     at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:145)
     at 
org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:58)
     ... 95 more
   Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Could not convert 
field Type from BIGINT to string for field timestamp_value
     at 
org.apache.hudi.hive.util.HiveSchemaUtil.getSchemaDifference(HiveSchemaUtil.java:109)
     at org.apache.hudi.hive.HiveSyncTool.syncSchema(HiveSyncTool.java:285)
     at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:217)
     at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:154)
     at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:142)
     ... 96 more 
   ```
   
   When both fields are nested and table is partitioned, then write and Hive 
sync operations are successful.
   
   **Stacktrace**
   
   Error message when reading from corrupted Hive table:
   ```
   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 777.0 failed 4 times, most recent failure: Lost task 0.3 in stage 777.0 
(TID 866) (ip-10-203-88-126.eu-west-2.compute.internal executor 16): 
org.apache.spark.sql.execution.QueryExecutionException: Encountered error while 
reading file 
s3://REDACTED/temp/test_db/test_table/partition=1/a08715d6-5d14-49ed-b15d-309ba3c09252-0_0-711-793_20230517125708406.parquet.
 Details:
       at 
org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:731)
       at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:402)
       at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:227)
       at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
       at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
       at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
       at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown
 Source)
       at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:955)
       at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:383)
       at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
       at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
       at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
       at org.apache.spark.scheduler.Task.run(Task.scala:138)
       at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
       at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read 
value at 0 in block -1 in file 
s3://REDACTED/temp/test_db/test_table/partition=1/a08715d6-5d14-49ed-b15d-309ba3c09252-0_0-711-793_20230517125708406.parquet
       at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
       at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
       at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
       at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator$$anon$1.hasNext(RecordReaderIterator.scala:61)
       at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:227)
       at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:393)
       ... 20 more
   Caused by: java.lang.UnsupportedOperationException: 
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
       at 
org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
       at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetStringConverter.$anonfun$setDictionary$1(ParquetRowConverter.scala:507)
       at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetStringConverter.$anonfun$setDictionary$1$adapted(ParquetRowConverter.scala:506)
       at scala.Array$.tabulate(Array.scala:418)
       at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$ParquetStringConverter.setDictionary(ParquetRowConverter.scala:506)
       at 
org.apache.parquet.column.impl.ColumnReaderBase.<init>(ColumnReaderBase.java:415)
       at 
org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:46)
       at 
org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:82)
       at 
org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
       at 
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
       at 
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
       at 
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:177)
       at 
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
       at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
       at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225)
       ... 25 more 
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to