ssandona opened a new issue, #9857:
URL: https://github.com/apache/hudi/issues/9857

   I'm using **Hudi 0.13.1** and using a COW table, BLOOM filter and meta table 
enabled, in specific enabling metatable indexes for bloom_filters,column_stats 
and files. In addition I'm using `hoodie.bloom.index.use.metadata` to enable 
range pruning through the metadata column_stats.
   
   What I noticed is that with `hoodie.metadata.index.column.stats.enable=true` 
if we specify values for `hoodie.metadata.index.column.stats.column.list` the 
record key column is not included inside the `column_stats` index. As a result 
column_stats index cannot be used for range pruning during upsert operations.
   
   In specific here sample scenarios I tested with the related results:
   
   ## Scenario 1
   
   ```
   inputDF = spark.createDataFrame(
       [
           ("W01D000", "53.3572085", "202001150800","2020","01"),
           ("W01D001", "53.3572085", "202001150800","2020","01"),
           ("W01D002", "53.3572085", "202001150800","2020","01"),
           ("W01D003", "53.3572085", "202001150800","2020","01")
       ],
       ["device_id", "value", "timestamp","year","month"]
   )
   
   
inputDFNew=inputDF.withColumn("id",concat(inputDF.timestamp,inputDF.device_id))
   
   RECORD_KEY="id"
   PARTITION_FIELD="year,month"
   PRECOMBINE_FIELD="timestamp"
   
   DATABASE="default"
   
   TABLE_STATS="table_1"
   TABLE_STATS_LOCATION="s3://mybucket/datasets/table_1/"
   
   hudi_options_with_statistics = {
       "hoodie.table.name": TABLE_STATS,
       "hoodie.table.type": "COPY_ON_WRITE",
       "hoodie.datasource.write.recordkey.field": RECORD_KEY,
       "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
       "hoodie.datasource.write.hive_style_partitioning": "true",
       "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.table": TABLE_STATS,
       "hoodie.datasource.hive_sync.database": DATABASE,
       "hoodie.datasource.hive_sync.partition_fields": PARTITION_FIELD,
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
       "hoodie.datasource.hive_sync.use_jdbc": "false",
       "hoodie.datasource.hive_sync.mode":"hms",
       "hoodie.metadata.index.bloom.filter.enable": "true",
       "hoodie.bloom.index.use.metadata": "true",
       "hoodie.metadata.index.column.stats.enable": "true",
       "hoodie.metadata.enable": "true",
       "hoodie.index.type": "BLOOM"
   }
   
   inputDFNew.write.format("org.apache.hudi").option(
       "hoodie.datasource.write.operation", "upsert"
   
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
   
   inputDFNew.write.format("org.apache.hudi").option(
       "hoodie.datasource.write.operation", "upsert"
   
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
   ```
   
   The second upsert uses the index for range pruning. This because the 
`hoodie.metadata.index.column.stats.column.list` property is left empty and so 
all the columns are added inside the column_stats.
   
   ![Screenshot 2023-10-13 at 10 05 
53](https://github.com/apache/hudi/assets/5663683/51901aca-626e-4b93-bdde-931af9246708)
   
   ## Scenario 2
   
   ```
   inputDF = spark.createDataFrame(
       [
           ("W01D000", "53.3572085", "202001150800","2020","01"),
           ("W01D001", "53.3572085", "202001150800","2020","01"),
           ("W01D002", "53.3572085", "202001150800","2020","01"),
           ("W01D003", "53.3572085", "202001150800","2020","01")
       ],
       ["device_id", "value", "timestamp","year","month"]
   )
   
   
inputDFNew=inputDF.withColumn("id",concat(inputDF.timestamp,inputDF.device_id))
   
   RECORD_KEY="id"
   PARTITION_FIELD="year,month"
   PRECOMBINE_FIELD="timestamp"
   
   DATABASE="default"
   
   TABLE_STATS="table_2"
   TABLE_STATS_LOCATION="s3://mybucket/datasets/table_2/"
   
   hudi_options_with_statistics = {
       "hoodie.table.name": TABLE_STATS,
       "hoodie.table.type": "COPY_ON_WRITE",
       "hoodie.datasource.write.recordkey.field": RECORD_KEY,
       "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
       "hoodie.datasource.write.hive_style_partitioning": "true",
       "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.table": TABLE_STATS,
       "hoodie.datasource.hive_sync.database": DATABASE,
       "hoodie.datasource.hive_sync.partition_fields": PARTITION_FIELD,
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
       "hoodie.datasource.hive_sync.use_jdbc": "false",
       "hoodie.datasource.hive_sync.mode":"hms",
       "hoodie.metadata.index.bloom.filter.enable": "true",
       "hoodie.bloom.index.use.metadata": "true",
       "hoodie.metadata.index.column.stats.enable": "true",
       "hoodie.metadata.index.column.stats.column.list": "year",
       "hoodie.metadata.enable": "true",
       "hoodie.index.type": "BLOOM"
   }
   
   inputDFNew.write.format("org.apache.hudi").option(
       "hoodie.datasource.write.operation", "upsert"
   
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
   
   inputDFNew.write.format("org.apache.hudi").option(
       "hoodie.datasource.write.operation", "upsert"
   
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
   ```
   The second upsert tried to use the index for range pruning but ended up 
loading column ranges from files. This because the 
`hoodie.metadata.index.column.stats.column.list` contained the value `year` and 
so only that column is added inside the column_stats.
   
   
   ![Screenshot 2023-10-13 at 10 12 
17](https://github.com/apache/hudi/assets/5663683/6dd5aa57-4020-45e8-9059-bac1afb4cab6)
   
   ## Scenario 3
   
   ```
   inputDF = spark.createDataFrame(
       [
           ("W01D000", "53.3572085", "202001150800","2020","01"),
           ("W01D001", "53.3572085", "202001150800","2020","01"),
           ("W01D002", "53.3572085", "202001150800","2020","01"),
           ("W01D003", "53.3572085", "202001150800","2020","01")
       ],
       ["device_id", "value", "timestamp","year","month"]
   )
   
   
inputDFNew=inputDF.withColumn("id",concat(inputDF.timestamp,inputDF.device_id))
   
   RECORD_KEY="id"
   PARTITION_FIELD="year,month"
   PRECOMBINE_FIELD="timestamp"
   
   DATABASE="default"
   
   TABLE_STATS="table_3"
   TABLE_STATS_LOCATION="s3://mybucket/datasets/table_3/"
   
   hudi_options_with_statistics = {
       "hoodie.table.name": TABLE_STATS,
       "hoodie.table.type": "COPY_ON_WRITE",
       "hoodie.datasource.write.recordkey.field": RECORD_KEY,
       "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
       "hoodie.datasource.write.hive_style_partitioning": "true",
       "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.table": TABLE_STATS,
       "hoodie.datasource.hive_sync.database": DATABASE,
       "hoodie.datasource.hive_sync.partition_fields": PARTITION_FIELD,
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
       "hoodie.datasource.hive_sync.use_jdbc": "false",
       "hoodie.datasource.hive_sync.mode":"hms",
       "hoodie.metadata.index.bloom.filter.enable": "true",
       "hoodie.bloom.index.use.metadata": "true",
       "hoodie.metadata.index.column.stats.enable": "true",
       "hoodie.metadata.index.column.stats.column.list": "year,id",
       "hoodie.metadata.enable": "true",
       "hoodie.index.type": "BLOOM"
   }
   
   inputDFNew.write.format("org.apache.hudi").option(
       "hoodie.datasource.write.operation", "upsert"
   
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
   
   inputDFNew.write.format("org.apache.hudi").option(
       "hoodie.datasource.write.operation", "upsert"
   
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
   ```
   
   The second upsert uses the index for range pruning. This because the 
`hoodie.metadata.index.column.stats.column.list` property contains the `id` 
column which is used as `hoodie.datasource.write.recordkey.field`.
   
   ![Screenshot 2023-10-13 at 10 16 
30](https://github.com/apache/hudi/assets/5663683/68016341-b1b6-4ede-b558-9f694f05b85a)
   
   ## Scenario 4
   
   ```
   inputDF = spark.createDataFrame(
       [
           ("W01D000", "53.3572085", "202001150800","2020","01"),
           ("W01D001", "53.3572085", "202001150800","2020","01"),
           ("W01D002", "53.3572085", "202001150800","2020","01"),
           ("W01D003", "53.3572085", "202001150800","2020","01")
       ],
       ["device_id", "value", "timestamp","year","month"]
   )
   
   
inputDFNew=inputDF.withColumn("id",concat(inputDF.timestamp,inputDF.device_id))
   
   RECORD_KEY="id"
   PARTITION_FIELD="year,month"
   PRECOMBINE_FIELD="timestamp"
   
   DATABASE="default"
   
   TABLE_STATS="table_4"
   TABLE_STATS_LOCATION="s3://mybucket/datasets/table_4/"
   
   hudi_options_with_statistics = {
       "hoodie.table.name": TABLE_STATS,
       "hoodie.table.type": "COPY_ON_WRITE",
       "hoodie.datasource.write.recordkey.field": RECORD_KEY,
       "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD,
       "hoodie.datasource.write.hive_style_partitioning": "true",
       "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD,
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.table": TABLE_STATS,
       "hoodie.datasource.hive_sync.database": DATABASE,
       "hoodie.datasource.hive_sync.partition_fields": PARTITION_FIELD,
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
       "hoodie.datasource.hive_sync.use_jdbc": "false",
       "hoodie.datasource.hive_sync.mode":"hms",
       "hoodie.metadata.index.bloom.filter.enable": "true",
       "hoodie.bloom.index.use.metadata": "true",
       "hoodie.metadata.index.column.stats.enable": "true",
       "hoodie.metadata.index.column.stats.column.list": 
"year,_hoodie_record_key",
       "hoodie.metadata.enable": "true",
       "hoodie.index.type": "BLOOM"
   }
   
   inputDFNew.write.format("org.apache.hudi").option(
       "hoodie.datasource.write.operation", "upsert"
   
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
   
   inputDFNew.write.format("org.apache.hudi").option(
       "hoodie.datasource.write.operation", "upsert"
   
).options(**hudi_options_with_statistics).mode("append").save(TABLE_STATS_LOCATION)
   ```
   
   The second upsert tried to use the index for range pruning but ended up 
loading column ranges from files. So it seems adding inside 
`hoodie.metadata.index.column.stats.column.list` the `_hoodie_record_key` 
colum, is adding that inside the column_stats but that is not used during range 
pruning.
   
   ![Screenshot 2023-10-13 at 10 20 
37](https://github.com/apache/hudi/assets/5663683/dd615786-4f4a-4338-b422-e767e1778ee6)
   
   ## Conclusions
   
   1. In order to leverage column_stats for range pruning during upsert 
operations we have to include the column used as record key (i.e., the `id` 
column in my case) on the `hoodie.metadata.index.column.stats.column.list` list.
   
   This should be added inside the Hudi doc for clarity or (better) record key 
should be always added in the column_stats and the 
`hoodie.metadata.index.column.stats.column.list` list should specify additional 
fields.
   
   2. When using a Composite key (i.e., `RECORD_KEY="timestamp,device_id"`) I 
see that I'm never able to leverage the column_stats for range pruning during 
upsert operations. Neither with 
`"hoodie.metadata.index.column.stats.column.list": "year,_hoodie_record_key"` 
nor with `"hoodie.metadata.index.column.stats.column.list": 
"year,timestamp,device_id"` . In both cases I see the second upsert tried to 
use the index for range pruning but ended up loading column ranges from files.
   
   This should reinforce what mentioned in point 1, record key should be always 
added in the column_stats and the 
`hoodie.metadata.index.column.stats.column.list` list should specify additional 
fields.
   
   Anything I'm missing here?
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to