ssandona opened a new issue, #9870:
URL: https://github.com/apache/hudi/issues/9870
I see during upsert operations duplicate data. This seems to be related to
range pruning not properly using column statistics as
by disabling `hoodie.bloom.index.use.metadata` before doing the upserts, the
issue is solved.
I'm using **Hudi 0.13.1**.
I'm not able to replicate with a small dataset but with my dataset the issue
is persistent and can be replicated.
Here the observed behavior:
1. Initial dataset with Partition1 (bulk inserted): 229874354 records
2. Perform an Upsert operation using a dataframe containing 9640013 records,
2223979 updates related to partition 1 and 7416034 inserts related to partition
2
- Here I expect to end up with 229874354 records in Partition 1 and 7416034
records in partition 2, instead I end up with 232098333 records in partition 1
and 7416034 in partition 2.
- This means that the updates were treated as inserts
Here my upsert code:
```
COW_TABLE_NAME="hudi_drones_cow_table_opt"
PARTITION_FIELD = "year,month"
PRECOMBINE_FIELD = "timestamp"
COW_TABLE_LOCATION="s3://mybucket/datasets/hudi_drones_cow_table_opt/"
hudi_options_opt = {
"hoodie.table.name": COW_TABLE_NAME,
"hoodie.table.type": "COPY_ON_WRITE",
"hoodie.index.type": "BLOOM",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.bloom.filter.enable": "true",
"hoodie.bloom.index.use.metadata": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.metadata.index.column.stats.column.list":
"id,timestamp,current_height"
}
#Create upsert_df
(upsert_df.write.format("org.apache.hudi")
.option("hoodie.datasource.write.operation", "upsert")
.options(**hudi_options_opt)
.mode("append")
.save(COW_TABLE_LOCATION))
```
To understand what was causing the issues I tried with multiple options.
## OptionA: column stats in meta table + bloom filter in meta table
```
hudi_options_opt = {
"hoodie.table.name": COW_TABLE_NAME,
"hoodie.table.type": "COPY_ON_WRITE",
"hoodie.index.type": "BLOOM",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.bloom.filter.enable": "true",
"hoodie.bloom.index.use.metadata": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.metadata.index.column.stats.column.list":
"id,timestamp,current_height"
}
```
This ended up with duplicates.
## OptionB: column stats in meta table + NO bloom filter in meta table
```
hudi_options_opt = {
"hoodie.table.name": COW_TABLE_NAME,
"hoodie.table.type": "COPY_ON_WRITE",
"hoodie.index.type": "BLOOM",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.bloom.filter.enable": "false",
"hoodie.bloom.index.use.metadata": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.metadata.index.column.stats.column.list":
"id,timestamp,current_height"
}
```
This ended up with duplicates.
## OptionC: column stats for all columns in meta table + NO bloom filter in
meta table
```
hudi_options_opt = {
"hoodie.table.name": COW_TABLE_NAME,
"hoodie.table.type": "COPY_ON_WRITE",
"hoodie.index.type": "BLOOM",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.bloom.filter.enable": "false",
"hoodie.bloom.index.use.metadata": "true",
"hoodie.metadata.index.column.stats.enable": "true",
}
```
This ended up with duplicates.
## OptionD: NO column stats in meta table + bloom filter in meta table
```
hudi_options_opt = {
"hoodie.table.name": COW_TABLE_NAME,
"hoodie.table.type": "COPY_ON_WRITE",
"hoodie.index.type": "BLOOM",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.bloom.filter.enable": "true",
"hoodie.bloom.index.use.metadata": "true",
"hoodie.metadata.index.column.stats.enable": "false"
}
```
This ended up with no duplicates, everything ok. BUT from the execution in
the Spark UI it is not clear if it actually used the bloom filter from the
metatable.
[Here](https://github.com/apache/hudi/blob/release-0.13.1/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java#L134C1-L134C1
) the code where `findMatchingFilesForRecordKeys` is invoked
I see [this
point](https://github.com/apache/hudi/blob/release-0.13.1/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java#L175
) on the code is reached
I do not see any stage for the calculation of `keyLookupResultRDD` . In
specific I would have expected to see a stage related to [this
code](https://github.com/apache/hudi/blob/release-0.13.1/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java#L152)
as [this
check](https://github.com/apache/hudi/blob/release-0.13.1/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java#L101)
with the above configurations should pass
Here the stages on the SparkUI related to this Option D:

## Summary
It seems Range Pruning is not working properly with column statistics as
upsert operations end up with duplicate data.
Is there an easy way to find out if the issue is related to statistics
wrongly calculated or to the code not using them properly? It would be great if
there was an easy way to print out for each file which column statistics are
present in the metatable.
These 2 other issues may be related:
- https://github.com/apache/hudi/issues/9857
- https://github.com/apache/hudi/issues/9271
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]