parisni opened a new issue, #6342:
URL: https://github.com/apache/hudi/issues/6342

   hudi 0.11.1
   spark 3.2.1
   
   when insert data with missing fields then 
`hoodie.datasource.write.reconcile.schema` works fine, until only one field is 
missing.
   When two or more fields missing it fails. See pyspark snippet to reproduce 
the issue
   
   ```
   sc.setLogLevel("WARN")
   from pyspark.sql.types import StructType, StructField, StringType
   
   datalaketable = "nested_table"
   tableName = "test_hudi_{datalaketable}".format(datalaketable=datalaketable)
   basePath = "/tmp/{tableName}".format(tableName=tableName)
   from pyspark.sql.functions import expr
   
   ##
   # Create a nested table
   ##
   data = [ (("James", None, "Smith"), "OH", "M"),
       (("Anna", "Rose", ""), "NY", "F"),
       (("Julia", "", "Williams"), "OH", "F"),
       (("Maria", "Anne", "Jones"), "NY", "M"),
       (("Jen", "Mary", "Brown"), "NY", "M"),
       (("Mike", "Mary", "Williams"), "OH", "M"),
   ]
   
   schema = StructType(
       [
           StructField(
               "name",
               StructType(
                   [
                       StructField("firstname", StringType(), True),
                       StructField("middlename", StringType(), True),
                       StructField("lastname", StringType(), True),
                   ]
               ),
           ),
           StructField("state", StringType(), True),
           StructField("gender", StringType(), True),
       ]
   )
   df = (
       spark.createDataFrame(data=data, schema=schema)
       .withColumn("event_id", expr("row_number() over(partition by 1 order by 
1)"))
       .withColumn("event_date", expr("current_date()"))
       .withColumn("version", expr("current_date()"))
       .withColumn("int_to_bigint", expr("cast(1 as int)"))
   )
   
   df.printSchema()
   hudi_options = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.recordkey.field": "event_id",
       "hoodie.datasource.write.partitionpath.field": "event_date",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "version",
       "hoodie.upsert.shuffle.parallelism": 80,
       "hoodie.insert.shuffle.parallelism": 80,
       "hoodie.delete.shuffle.parallelism": 80,
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.datasource.write.hive_style_partitioning": "true",
       "hoodie.datasource.hive_sync.database": "default",
       "hoodie.datasource.hive_sync.table": tableName,
       "hoodie.datasource.hive_sync.mode": "hms",
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.partition_fields": "event_date",
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
       'hoodie.datasource.hive_sync.use_jdbc': False,
       "hoodie.metadata.enable": "true",
       "hoodie.cleaner.policy.failed.writes": "LAZY",
       "hoodie.client.heartbeat.interval_in_ms": "240000",
       "hoodie.datasource.write.reconcile.schema": "true"
   }
   
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   
   
   
   ##
   # remove field
   ##
   data = [
       (("James", None, "Smith"), "OH", "M"),
       (("Anna", "Rose", ""), "NY", "F"),
       (("Julia", "", "Williams"), "OH", "F"),
       (("Maria", "Anne", "Jones"), "NY", "M"),
       (("Jen", "Mary", "Brown"), "NY", "M"),
       (("Mike", "Mary", "Williams"), "OH", "M"),
   ]
   
   schema = StructType(
       [
           StructField(
               "name",
               StructType(
                   [
                       StructField("firstname", StringType(), True),
                       StructField("middlename", StringType(), True),
                       StructField("lastname", StringType(), True),
                   ]
               ),
           ),
           StructField("state", StringType(), True),
           StructField("gender", StringType(), True),
       ]
   )
   df = (
       spark.createDataFrame(data=data, schema=schema)
       .withColumn("event_id", expr("row_number() over(partition by 1 order by 
1)"))
       .withColumn("event_date", expr("current_date()"))
       .withColumn("version", expr("current_date()"))
       .withColumn("int_to_bigint", expr("cast(1 as int)"))
       .drop("gender") 
       .drop("int_to_bigint") // REMOVE THIS ONE TO MAKE IT WORK
   )
   
   df.printSchema()
   hudi_options = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.recordkey.field": "event_id",
       "hoodie.datasource.write.partitionpath.field": "event_date",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "version",
       "hoodie.upsert.shuffle.parallelism": 80,
       "hoodie.insert.shuffle.parallelism": 80,
       "hoodie.delete.shuffle.parallelism": 80,
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.datasource.write.hive_style_partitioning": "true",
       "hoodie.datasource.hive_sync.database": "default",
       "hoodie.datasource.hive_sync.table": tableName,
       "hoodie.datasource.hive_sync.mode": "hms",
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.partition_fields": "event_date",
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
       "hoodie.metadata.enable": "true",
       "hoodie.datasource.write.reconcile.schema": "true"
   }
   
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
   spark.read.format("hudi").load(basePath).show()
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to