ssandona opened a new issue, #7059:
URL: https://github.com/apache/hudi/issues/7059

   I'm using **Hudi 0.11.1** and noticed an interesting behavior related to 
meta table column statistics, range pruning and kind of recordkey.
   
   I enabled the columns stats and bloom index filter on the metadata table 
while writing:
   
   ```
   "hoodie.metadata.index.column.stats.enable":"true",
   "hoodie.bloom.index.use.metadata":"true",
   "hoodie.metadata.index.bloom.filter.enable":"true",
   ```
   
   1. If I use as recordkey a CompositeKey of columns "a,b" (2 string columns) 
I see on the Spark WebUi both the jobs **loadColumnRangesFromMetaIndex** and 
**loadColumnRangesFromFiles** being executed. This behavior is observed for 
every upsert operation
   2. If I use as recordkey a single column "c" obtained from the concatenation 
of columns "a,b" I see on the Spark WebUi both the jobs 
**loadColumnRangesFromMetaIndex** and **loadColumnRangesFromFiles** being 
executed only for the first upsert on a partition. Then for all the next 
upserts on the same partition I only see the **loadColumnRangesFromMetaIndex** 
job being executed
   
   The second observed behavior is the expected one as key ranges should only 
be retrieved from the metatable column_stats index and not from the files.
   
   Is there any issue related to using as recordKey a CompositeKey of 2 columns 
and multi-modal index for range pruning while upserting?
   
   By checking the code this line seems the one causing the behavior
   
   
[https://github.com/apache/hudi/blob/release-0.11.1/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java#L150](https://github.com/apache/hudi/blob/release-0.11.1/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java#L150)
   
   it seems to me the **fileInfoList** returned by 
**loadColumnRangesFromMetaIndex** is always empty for the first scenario.
   
   Here a sample code to reproduce this:
   
   ## Scenario1
   
   ```
   inputDF = spark.createDataFrame(
       [
           ("W01D000", "53.3572085", "202001150800","2020","01"),
           ("W01D001", "53.3572085", "202001150800","2020","01"),
           ("W01D002", "53.3572085", "202001150800","2020","01"),
           ("W01D003", "53.3572085", "202001150800","2020","01")
       ],
       ["device_id", "value", "timestamp","year","month"]
   )
   
   RECORD_KEY="timestamp,device_id"
   PARTITION_FIELD="year,month"
   PRECOMBINE_FIELD="timestamp"
   
COW_TABLE_LOCATION_TEST_ISSUE_TABLE="s3://MYBUCKET/datasets/test_issue_table/"
   COW_TABLE_NAME="test_issue_table"
   DATABASE="mydb"
   
   hudi_options = {
       "hoodie.metadata.index.column.stats.enable":"true",
       "hoodie.bloom.index.use.metadata":"true",
       "hoodie.metadata.index.bloom.filter.enable":"true",
       "hoodie.table.name": COW_TABLE_NAME,
       "hoodie.table.type": "COPY_ON_WRITE", 
       "hoodie.datasource.write.recordkey.field": RECORD_KEY, 
       "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, 
       "hoodie.datasource.write.hive_style_partitioning": "true", 
       "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, 
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.table": COW_TABLE_NAME, 
       "hoodie.datasource.hive_sync.database": DATABASE, 
       "hoodie.datasource.hive_sync.partition_fields": PARTITION_FIELD, 
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor", 
       "hoodie.datasource.hive_sync.use_jdbc": "false", 
       "hoodie.datasource.hive_sync.mode":"hms" 
   }
   
   inputDF.write.format("org.apache.hudi").option(
       "hoodie.datasource.write.operation", "upsert"
   
).options(**hudi_options).mode("append").save(COW_TABLE_LOCATION_TEST_ISSUE_TABLE)
   
   inputDF.write.format("org.apache.hudi").option(
       "hoodie.datasource.write.operation", "upsert"
   
).options(**hudi_options).mode("append").save(COW_TABLE_LOCATION_TEST_ISSUE_TABLE)
   ```
   
   The second upsert looks like this on the Spark History:
   
   ![Screenshot 2022-10-25 at 13 52 
45](https://user-images.githubusercontent.com/5663683/197769114-33e9f082-89a4-48bb-9ddb-4537421763a4.png)
   
   As we can see there are the both jobs highlighted with **Load meta index key 
ranges for file slices** and **Obtain key ranges for file slices (range 
pruning=on)** (**loadColumnRangesFromMetaIndex** and 
**loadColumnRangesFromFiles** functions)
   
   ## Scenario2
   
   ```
   inputDF = spark.createDataFrame(
       [
           ("W01D000", "53.3572085", "202001150800","2020","01"),
           ("W01D001", "53.3572085", "202001150800","2020","01"),
           ("W01D002", "53.3572085", "202001150800","2020","01"),
           ("W01D003", "53.3572085", "202001150800","2020","01")
       ],
       ["device_id", "value", "timestamp","year","month"]
   )
   
   
inputDFNew=inputDF.withColumn("id",concat(inputDF.timestamp,inputDF.device_id))
   
   RECORD_KEY="id"
   PARTITION_FIELD="year,month"
   PRECOMBINE_FIELD="timestamp"
   
COW_TABLE_LOCATION_TEST_ISSUE_TABLE="s3://MYBUCKET/datasets/test_issue_table_2/"
   COW_TABLE_NAME="test_issue_table_2"
   DATABASE="mydb"
   
   
   hudi_options = {
       "hoodie.metadata.index.column.stats.enable":"true",
       "hoodie.bloom.index.use.metadata":"true",
       "hoodie.metadata.index.bloom.filter.enable":"true",
       "hoodie.bulkinsert.shuffle.parallelism":"100",
       "hoodie.table.name": COW_TABLE_NAME,
       "hoodie.table.type": "COPY_ON_WRITE", 
       "hoodie.datasource.write.recordkey.field": RECORD_KEY, 
       "hoodie.datasource.write.partitionpath.field": PARTITION_FIELD, 
       "hoodie.datasource.write.hive_style_partitioning": "true", 
       "hoodie.datasource.write.precombine.field": PRECOMBINE_FIELD, 
       "hoodie.datasource.hive_sync.enable": "true",
       "hoodie.datasource.hive_sync.table": COW_TABLE_NAME, 
       "hoodie.datasource.hive_sync.database": DATABASE, 
       "hoodie.datasource.hive_sync.partition_fields": PARTITION_FIELD, 
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor", 
       "hoodie.datasource.hive_sync.use_jdbc": "false", 
       "hoodie.datasource.hive_sync.mode":"hms" 
   }
   
   inputDFNew.write.format("org.apache.hudi").option(
       "hoodie.datasource.write.operation", "upsert"
   
).options(**hudi_options).mode("append").save(COW_TABLE_LOCATION_TEST_ISSUE_TABLE)
   
   inputDFNew.write.format("org.apache.hudi").option(
       "hoodie.datasource.write.operation", "upsert"
   
).options(**hudi_options).mode("append").save(COW_TABLE_LOCATION_TEST_ISSUE_TABLE)
   ```
   
   The second upsert looks like this on the Spark History:
   
   ![Screenshot 2022-10-25 at 14 01 
44](https://user-images.githubusercontent.com/5663683/197769554-e3a29cb9-513e-4017-8a9f-297c3937a244.png)
   
   As we can see there are only jobs highlighted with **Load meta index key 
ranges for file slices** , no traces of jobs with **Obtain key ranges for file 
slices (range pruning=on)** (only **loadColumnRangesFromMetaIndex** function)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to