ssandona opened a new issue, #9857:
URL: https://github.com/apache/hudi/issues/9857
I'm using **Hudi 0.13.1** and using a COW table, BLOOM filter and meta table
enabled, in specific enabling metatable indexes for bloom_filters,column_stats
and files. In addition I'm using `hoodie.bloom.index.use.metadata` to enable
range pruning through the metadata column_stats.
What I noticed is that with `hoodie.metadata.index.column.stats.enable=true`
if we specify values for `hoodie.metadata.index.column.stats.column.list` the
record key column is not included inside the `column_stats` index. As a result
column_stats index cannot be used for range pruning during upsert operations.
In specific here sample scenarios I tested with the related results:
## Scenario 1
```
inputDF = spark.createDataFrame(
[
("W01D000", "53.3572085", "202001150800","2020","01"),
("W01D001", "53.3572085", "202001150800","2020","01"),
("W01D002", "53.3572085", "202001150800","2020","01"),
("W01D003", "53.3572085", "202001150800","2020","01")
],
["device_id", "value", "timestamp","year","month"]
)
inputDFNew=inputDF.withColumn("id",concat(inputDF.timestamp,inputDF.device_id))
RECORD_KEY="id"
PARTITION_FIELD="year,month"
PRECOMBINE_FIELD="timestamp"
DATABASE="default"
TABLE_STATS="table_1"
TABLE_STATS_LOCATION="s3://mybucket/datasets/table_1/"
hudi_options_with_statistics = {
"hoodie.table.name": TABLE_STATS,
"hoodie.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.recordkey.field": RECORD_KEY,
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.table": TABLE_STATS,
"hoodie.datasource.hive_sync.database": DATABASE,
"hoodie.datasource.hive_sync.partition_fields": PARTITION_FIELD,
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode":"hms",
"hoodie.metadata.index.bloom.filter.enable": "true",
"hoodie.bloom.index.use.metadata": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.metadata.enable": "true",
"hoodie.index.type": "BLOOM"
}
inputDFNew.write.format("org.apache.hudi").option(
"hoodie.datasource.write.operation", "upsert"
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
inputDFNew.write.format("org.apache.hudi").option(
"hoodie.datasource.write.operation", "upsert"
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
```
The second upsert uses the index for range pruning. This because the
`hoodie.metadata.index.column.stats.column.list` property is left empty and so
all the columns are added inside the column_stats.

## Scenario 2
```
inputDF = spark.createDataFrame(
[
("W01D000", "53.3572085", "202001150800","2020","01"),
("W01D001", "53.3572085", "202001150800","2020","01"),
("W01D002", "53.3572085", "202001150800","2020","01"),
("W01D003", "53.3572085", "202001150800","2020","01")
],
["device_id", "value", "timestamp","year","month"]
)
inputDFNew=inputDF.withColumn("id",concat(inputDF.timestamp,inputDF.device_id))
RECORD_KEY="id"
PARTITION_FIELD="year,month"
PRECOMBINE_FIELD="timestamp"
DATABASE="default"
TABLE_STATS="table_2"
TABLE_STATS_LOCATION="s3://mybucket/datasets/table_2/"
hudi_options_with_statistics = {
"hoodie.table.name": TABLE_STATS,
"hoodie.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.recordkey.field": RECORD_KEY,
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.table": TABLE_STATS,
"hoodie.datasource.hive_sync.database": DATABASE,
"hoodie.datasource.hive_sync.partition_fields": PARTITION_FIELD,
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode":"hms",
"hoodie.metadata.index.bloom.filter.enable": "true",
"hoodie.bloom.index.use.metadata": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.metadata.index.column.stats.column.list": "year",
"hoodie.metadata.enable": "true",
"hoodie.index.type": "BLOOM"
}
inputDFNew.write.format("org.apache.hudi").option(
"hoodie.datasource.write.operation", "upsert"
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
inputDFNew.write.format("org.apache.hudi").option(
"hoodie.datasource.write.operation", "upsert"
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
```
The second upsert tried to use the index for range pruning but ended up
loading column ranges from files. This because the
`hoodie.metadata.index.column.stats.column.list` contained the value `year` and
so only that column is added inside the column_stats.

## Scenario 3
```
inputDF = spark.createDataFrame(
[
("W01D000", "53.3572085", "202001150800","2020","01"),
("W01D001", "53.3572085", "202001150800","2020","01"),
("W01D002", "53.3572085", "202001150800","2020","01"),
("W01D003", "53.3572085", "202001150800","2020","01")
],
["device_id", "value", "timestamp","year","month"]
)
inputDFNew=inputDF.withColumn("id",concat(inputDF.timestamp,inputDF.device_id))
RECORD_KEY="id"
PARTITION_FIELD="year,month"
PRECOMBINE_FIELD="timestamp"
DATABASE="default"
TABLE_STATS="table_3"
TABLE_STATS_LOCATION="s3://mybucket/datasets/table_3/"
hudi_options_with_statistics = {
"hoodie.table.name": TABLE_STATS,
"hoodie.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.recordkey.field": RECORD_KEY,
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.table": TABLE_STATS,
"hoodie.datasource.hive_sync.database": DATABASE,
"hoodie.datasource.hive_sync.partition_fields": PARTITION_FIELD,
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode":"hms",
"hoodie.metadata.index.bloom.filter.enable": "true",
"hoodie.bloom.index.use.metadata": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.metadata.index.column.stats.column.list": "year,id",
"hoodie.metadata.enable": "true",
"hoodie.index.type": "BLOOM"
}
inputDFNew.write.format("org.apache.hudi").option(
"hoodie.datasource.write.operation", "upsert"
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
inputDFNew.write.format("org.apache.hudi").option(
"hoodie.datasource.write.operation", "upsert"
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
```
The second upsert uses the index for range pruning. This because the
`hoodie.metadata.index.column.stats.column.list` property contains the `id`
column which is used as `hoodie.datasource.write.recordkey.field`.

## Scenario 4
```
inputDF = spark.createDataFrame(
[
("W01D000", "53.3572085", "202001150800","2020","01"),
("W01D001", "53.3572085", "202001150800","2020","01"),
("W01D002", "53.3572085", "202001150800","2020","01"),
("W01D003", "53.3572085", "202001150800","2020","01")
],
["device_id", "value", "timestamp","year","month"]
)
inputDFNew=inputDF.withColumn("id",concat(inputDF.timestamp,inputDF.device_id))
RECORD_KEY="id"
PARTITION_FIELD="year,month"
PRECOMBINE_FIELD="timestamp"
DATABASE="default"
TABLE_STATS="table_4"
TABLE_STATS_LOCATION="s3://mybucket/datasets/table_4/"
hudi_options_with_statistics = {
"hoodie.table.name": TABLE_STATS,
"hoodie.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.recordkey.field": RECORD_KEY,
"hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.table": TABLE_STATS,
"hoodie.datasource.hive_sync.database": DATABASE,
"hoodie.datasource.hive_sync.partition_fields": PARTITION_FIELD,
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode":"hms",
"hoodie.metadata.index.bloom.filter.enable": "true",
"hoodie.bloom.index.use.metadata": "true",
"hoodie.metadata.index.column.stats.enable": "true",
"hoodie.metadata.index.column.stats.column.list":
"year,_hoodie_record_key",
"hoodie.metadata.enable": "true",
"hoodie.index.type": "BLOOM"
}
inputDFNew.write.format("org.apache.hudi").option(
"hoodie.datasource.write.operation", "upsert"
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
inputDFNew.write.format("org.apache.hudi").option(
"hoodie.datasource.write.operation", "upsert"
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
```
The second upsert tried to use the index for range pruning but ended up
loading column ranges from files. So it seems adding inside
`hoodie.metadata.index.column.stats.column.list` the `_hoodie_record_key`
colum, is adding that inside the column_stats but that is not used during range
pruning.

## Conclusions
1. In order to leverage column_stats for range pruning during upsert
operations we have to include the column used as record key (i.e., the `id`
column in my case) on the `hoodie.metadata.index.column.stats.column.list` list.
This should be added inside the Hudi doc for clarity or (better) record key
should be always added in the column_stats and the
`hoodie.metadata.index.column.stats.column.list` list should specify additional
fields.
2. When using a Composite key (i.e., `RECORD_KEY="timestamp,device_id"`) I
see that I'm never able to leverage the column_stats for range pruning during
upsert operations. Neither with
`"hoodie.metadata.index.column.stats.column.list": "year,_hoodie_record_key"`
nor with `"hoodie.metadata.index.column.stats.column.list":
"year,timestamp,device_id"` . In both cases I see the second upsert tried to
use the index for range pruning but ended up loading column ranges from files.
This should reinforce what mentioned in point 1, record key should be always
added in the column_stats and the
`hoodie.metadata.index.column.stats.column.list` list should specify additional
fields.
Anything I'm missing here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]