parisni opened a new issue, #6343:
URL: https://github.com/apache/hudi/issues/6343

   hudi 0.11.1
   spark 3.2.1
   
   when inserting integer field in place of bigint then reconcile fails. I 
would expect it to work with `hoodie.datasource.write.reconcile.schema` 
enabled. See snippet to reproduce the issue.
   
   ```
   ##
   # Create a nested table
   ##
   data = [ (("James", None, "Smith"), "OH", "M"),
       (("Anna", "Rose", ""), "NY", "F"),
       (("Julia", "", "Williams"), "OH", "F"),
       (("Maria", "Anne", "Jones"), "NY", "M"),
       (("Jen", "Mary", "Brown"), "NY", "M"),
       (("Mike", "Mary", "Williams"), "OH", "M"),
   ]
   
   schema = StructType(
       [
           StructField(
               "name",
               StructType(
                   [
                       StructField("firstname", StringType(), True),
                       StructField("middlename", StringType(), True),
                       StructField("lastname", StringType(), True),
                   ]
               ),
           ),
           StructField("state", StringType(), True),
           StructField("gender", StringType(), True),
       ]
   )
   df = (
       spark.createDataFrame(data=data, schema=schema)
       .withColumn("event_id", expr("row_number() over(partition by 1 order by 
1)"))
       .withColumn("event_date", expr("current_date()"))
       .withColumn("version", expr("current_date()"))
       .withColumn("int_to_bigint", expr("cast(1 as bigint)"))
   )
   
   df.printSchema()
   hudi_options = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.recordkey.field": "event_id",
       "hoodie.datasource.write.partitionpath.field": "event_date",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "version",
       "hoodie.upsert.shuffle.parallelism": 80,
       "hoodie.insert.shuffle.parallelism": 80,
       "hoodie.delete.shuffle.parallelism": 80,
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.datasource.write.hive_style_partitioning": "true",
       "hoodie.datasource.hive_sync.database": "default",
       "hoodie.datasource.hive_sync.table": tableName,
       "hoodie.datasource.hive_sync.mode": "hms",
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.partition_fields": "event_date",
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
       'hoodie.datasource.hive_sync.use_jdbc': False,
       "hoodie.metadata.enable": "true",
       "hoodie.cleaner.policy.failed.writes": "LAZY",
       "hoodie.client.heartbeat.interval_in_ms": "240000",
       "hoodie.datasource.write.reconcile.schema": "true"
   }
   
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   
   
   ##
   # type promotion field + drop gender
   ##
   data = [ (("James", None, "Smith"), "OH", "M"),
       (("Anna", "Rose", ""), "NY", "F"),
       (("Julia", "", "Williams"), "OH", "F"),
       (("Maria", "Anne", "Jones"), "NY", "M"),
       (("Jen", "Mary", "Brown"), "NY", "M"),
       (("Mike", "Mary", "Williams"), "OH", "M"),
   ]
   
   schema = StructType(
       [
           StructField(
               "name",
               StructType(
                   [
                       StructField("firstname", StringType(), True),
                       StructField("middlename", StringType(), True),
                       StructField("lastname", StringType(), True),
                   ]
               ),
           ),
           StructField("state", StringType(), True),
           StructField("gender", StringType(), True),
       ]
   )
   df = (
       spark.createDataFrame(data=data, schema=schema)
       .withColumn("event_id", expr("row_number() over(partition by 1 order by 
1)"))
       .withColumn("event_date", expr("current_date()"))
       .withColumn("version", expr("current_date()"))
       .withColumn("int_to_bigint", expr("cast(1 as int)"))
   
   )
   
   df.printSchema()
   hudi_options = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.recordkey.field": "event_id",
       "hoodie.datasource.write.partitionpath.field": "event_date",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "version",
       "hoodie.upsert.shuffle.parallelism": 80,
       "hoodie.insert.shuffle.parallelism": 80,
       "hoodie.delete.shuffle.parallelism": 80,
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.datasource.write.hive_style_partitioning": "true",
       "hoodie.datasource.hive_sync.database": "default",
       "hoodie.datasource.hive_sync.table": tableName,
       "hoodie.datasource.hive_sync.mode": "hms",
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.partition_fields": "event_date",
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
       "hoodie.metadata.enable": "true",
       "hoodie.datasource.write.reconcile.schema": "true"
   }
   
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   ```
   
   here is the error:
   ```
   Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read 
value at 1 in block 0 in file 
file:/tmp/test_hudi_nested_table/event_date=2022-08-09/ae482557-774d-4e08-96f2-203521a9a183-0_0-312-4069_20220809111419994.parquet
        at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
        ... 8 more
   Caused by: java.lang.UnsupportedOperationException: 
org.apache.parquet.avro.AvroConverters$FieldIntegerConverter
        at 
org.apache.parquet.io.api.PrimitiveConverter.addLong(PrimitiveConverter.java:105)
        at 
org.apache.parquet.column.impl.ColumnReaderBase$2$4.writeValue(ColumnReaderBase.java:325)
        at 
org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
        at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
        at 
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
        at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)
        ... 11 more
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to