parisni opened a new issue, #6342:
URL: https://github.com/apache/hudi/issues/6342
hudi 0.11.1
spark 3.2.1
when insert data with missing fields then
`hoodie.datasource.write.reconcile.schema` works fine, until only one field is
missing.
When two or more fields missing it fails. See pyspark snippet to reproduce
the issue
```
sc.setLogLevel("WARN")
from pyspark.sql.types import StructType, StructField, StringType
datalaketable = "nested_table"
tableName = "test_hudi_{datalaketable}".format(datalaketable=datalaketable)
basePath = "/tmp/{tableName}".format(tableName=tableName)
from pyspark.sql.functions import expr
##
# Create a nested table
##
data = [ (("James", None, "Smith"), "OH", "M"),
(("Anna", "Rose", ""), "NY", "F"),
(("Julia", "", "Williams"), "OH", "F"),
(("Maria", "Anne", "Jones"), "NY", "M"),
(("Jen", "Mary", "Brown"), "NY", "M"),
(("Mike", "Mary", "Williams"), "OH", "M"),
]
schema = StructType(
[
StructField(
"name",
StructType(
[
StructField("firstname", StringType(), True),
StructField("middlename", StringType(), True),
StructField("lastname", StringType(), True),
]
),
),
StructField("state", StringType(), True),
StructField("gender", StringType(), True),
]
)
df = (
spark.createDataFrame(data=data, schema=schema)
.withColumn("event_id", expr("row_number() over(partition by 1 order by
1)"))
.withColumn("event_date", expr("current_date()"))
.withColumn("version", expr("current_date()"))
.withColumn("int_to_bigint", expr("cast(1 as int)"))
)
df.printSchema()
hudi_options = {
"hoodie.table.name": tableName,
"hoodie.datasource.write.recordkey.field": "event_id",
"hoodie.datasource.write.partitionpath.field": "event_date",
"hoodie.datasource.write.table.name": tableName,
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "version",
"hoodie.upsert.shuffle.parallelism": 80,
"hoodie.insert.shuffle.parallelism": 80,
"hoodie.delete.shuffle.parallelism": 80,
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.database": "default",
"hoodie.datasource.hive_sync.table": tableName,
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.partition_fields": "event_date",
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
'hoodie.datasource.hive_sync.use_jdbc': False,
"hoodie.metadata.enable": "true",
"hoodie.cleaner.policy.failed.writes": "LAZY",
"hoodie.client.heartbeat.interval_in_ms": "240000",
"hoodie.datasource.write.reconcile.schema": "true"
}
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
spark.read.format("hudi").load(basePath).show()
##
# remove field
##
data = [
(("James", None, "Smith"), "OH", "M"),
(("Anna", "Rose", ""), "NY", "F"),
(("Julia", "", "Williams"), "OH", "F"),
(("Maria", "Anne", "Jones"), "NY", "M"),
(("Jen", "Mary", "Brown"), "NY", "M"),
(("Mike", "Mary", "Williams"), "OH", "M"),
]
schema = StructType(
[
StructField(
"name",
StructType(
[
StructField("firstname", StringType(), True),
StructField("middlename", StringType(), True),
StructField("lastname", StringType(), True),
]
),
),
StructField("state", StringType(), True),
StructField("gender", StringType(), True),
]
)
df = (
spark.createDataFrame(data=data, schema=schema)
.withColumn("event_id", expr("row_number() over(partition by 1 order by
1)"))
.withColumn("event_date", expr("current_date()"))
.withColumn("version", expr("current_date()"))
.withColumn("int_to_bigint", expr("cast(1 as int)"))
.drop("gender")
.drop("int_to_bigint") // REMOVE THIS ONE TO MAKE IT WORK
)
df.printSchema()
hudi_options = {
"hoodie.table.name": tableName,
"hoodie.datasource.write.recordkey.field": "event_id",
"hoodie.datasource.write.partitionpath.field": "event_date",
"hoodie.datasource.write.table.name": tableName,
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "version",
"hoodie.upsert.shuffle.parallelism": 80,
"hoodie.insert.shuffle.parallelism": 80,
"hoodie.delete.shuffle.parallelism": 80,
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.database": "default",
"hoodie.datasource.hive_sync.table": tableName,
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.partition_fields": "event_date",
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.metadata.enable": "true",
"hoodie.datasource.write.reconcile.schema": "true"
}
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
spark.read.format("hudi").load(basePath).show()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]