parisni opened a new issue, #6343:
URL: https://github.com/apache/hudi/issues/6343
hudi 0.11.1
spark 3.2.1
when inserting integer field in place of bigint then reconcile fails. I
would expect it to work with `hoodie.datasource.write.reconcile.schema`
enabled. See snippet to reproduce the issue.
```
##
# Create a nested table
##
data = [ (("James", None, "Smith"), "OH", "M"),
(("Anna", "Rose", ""), "NY", "F"),
(("Julia", "", "Williams"), "OH", "F"),
(("Maria", "Anne", "Jones"), "NY", "M"),
(("Jen", "Mary", "Brown"), "NY", "M"),
(("Mike", "Mary", "Williams"), "OH", "M"),
]
schema = StructType(
[
StructField(
"name",
StructType(
[
StructField("firstname", StringType(), True),
StructField("middlename", StringType(), True),
StructField("lastname", StringType(), True),
]
),
),
StructField("state", StringType(), True),
StructField("gender", StringType(), True),
]
)
df = (
spark.createDataFrame(data=data, schema=schema)
.withColumn("event_id", expr("row_number() over(partition by 1 order by
1)"))
.withColumn("event_date", expr("current_date()"))
.withColumn("version", expr("current_date()"))
.withColumn("int_to_bigint", expr("cast(1 as bigint)"))
)
df.printSchema()
hudi_options = {
"hoodie.table.name": tableName,
"hoodie.datasource.write.recordkey.field": "event_id",
"hoodie.datasource.write.partitionpath.field": "event_date",
"hoodie.datasource.write.table.name": tableName,
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "version",
"hoodie.upsert.shuffle.parallelism": 80,
"hoodie.insert.shuffle.parallelism": 80,
"hoodie.delete.shuffle.parallelism": 80,
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.database": "default",
"hoodie.datasource.hive_sync.table": tableName,
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.partition_fields": "event_date",
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
'hoodie.datasource.hive_sync.use_jdbc': False,
"hoodie.metadata.enable": "true",
"hoodie.cleaner.policy.failed.writes": "LAZY",
"hoodie.client.heartbeat.interval_in_ms": "240000",
"hoodie.datasource.write.reconcile.schema": "true"
}
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
spark.read.format("hudi").load(basePath).show()
##
# type promotion field + drop gender
##
data = [ (("James", None, "Smith"), "OH", "M"),
(("Anna", "Rose", ""), "NY", "F"),
(("Julia", "", "Williams"), "OH", "F"),
(("Maria", "Anne", "Jones"), "NY", "M"),
(("Jen", "Mary", "Brown"), "NY", "M"),
(("Mike", "Mary", "Williams"), "OH", "M"),
]
schema = StructType(
[
StructField(
"name",
StructType(
[
StructField("firstname", StringType(), True),
StructField("middlename", StringType(), True),
StructField("lastname", StringType(), True),
]
),
),
StructField("state", StringType(), True),
StructField("gender", StringType(), True),
]
)
df = (
spark.createDataFrame(data=data, schema=schema)
.withColumn("event_id", expr("row_number() over(partition by 1 order by
1)"))
.withColumn("event_date", expr("current_date()"))
.withColumn("version", expr("current_date()"))
.withColumn("int_to_bigint", expr("cast(1 as int)"))
)
df.printSchema()
hudi_options = {
"hoodie.table.name": tableName,
"hoodie.datasource.write.recordkey.field": "event_id",
"hoodie.datasource.write.partitionpath.field": "event_date",
"hoodie.datasource.write.table.name": tableName,
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "version",
"hoodie.upsert.shuffle.parallelism": 80,
"hoodie.insert.shuffle.parallelism": 80,
"hoodie.delete.shuffle.parallelism": 80,
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.database": "default",
"hoodie.datasource.hive_sync.table": tableName,
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.partition_fields": "event_date",
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.metadata.enable": "true",
"hoodie.datasource.write.reconcile.schema": "true"
}
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
spark.read.format("hudi").load(basePath).show()
```
here is the error:
```
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
value at 1 in block 0 in file
file:/tmp/test_hudi_nested_table/event_date=2022-08-09/ae482557-774d-4e08-96f2-203521a9a183-0_0-312-4069_20220809111419994.parquet
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
... 8 more
Caused by: java.lang.UnsupportedOperationException:
org.apache.parquet.avro.AvroConverters$FieldIntegerConverter
at
org.apache.parquet.io.api.PrimitiveConverter.addLong(PrimitiveConverter.java:105)
at
org.apache.parquet.column.impl.ColumnReaderBase$2$4.writeValue(ColumnReaderBase.java:325)
at
org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
at
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
at
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)
... 11 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]