[ 
https://issues.apache.org/jira/browse/HUDI-8403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Geser Dugarov updated HUDI-8403:
--------------------------------
    Status: In Progress  (was: Open)

> Wrong bucketing when timestamp is used
> --------------------------------------
>
>                 Key: HUDI-8403
>                 URL: https://issues.apache.org/jira/browse/HUDI-8403
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Geser Dugarov
>            Assignee: Geser Dugarov
>            Priority: Major
>              Labels: pull-request-available
>
> We have a test for bucket pruning to illustrate wrong behavior:
> {code:java}
> void testBucketPruningSpecialKeyDataType(boolean logicalTimestamp) throws 
> Exception {
>   String tablePath1 = new Path(tempFile.getAbsolutePath(), "tbl1").toString();
>   Configuration conf1 = TestConfigurations.getDefaultConf(tablePath1, 
> TestConfigurations.ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE);
>   final String f1 = "f_timestamp";
>   conf1.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
>   conf1.setString(FlinkOptions.RECORD_KEY_FIELD, f1);
>   conf1.setString(FlinkOptions.PRECOMBINE_FIELD, f1);
>   conf1.removeConfig(FlinkOptions.PARTITION_PATH_FIELD);
>   conf1.setBoolean(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), 
> logicalTimestamp);
>   // test timestamp filtering
>   
> TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE,
>  conf1);
> }{code}
> To calculate hash for bucketing `KeyGenUtils::extractRecordKeysByFields` will 
> be used. But in this function timestamp as string will be passed without 
> column name, and also column name for hashing will be passed.
> In a result:
> {code:java}
> String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
> return Arrays.stream(fieldKV).map(kv -> 
> kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2))
>     .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() || 
> (fields.contains(kvArray[0])))
> ...{code}
> will filter timestamp and pass empty list back for hash calculation.
>  
> After writing test data, we should see 4 buckets due to default value of 
> `FlinkOptions::BUCKET_INDEX_NUM_BUCKETS`.
> But I see only one, because values for hash calculations are wrong:
> {code:java}
> /tmp/junit7256928206840261812/tbl1$ tree -a
> .
> ├── __HIVE_DEFAULT_PARTITION__
> │   ├── 00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet
> │   ├── 
> .00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet.crc
> │   ├── .hoodie_partition_metadata
> │   └── ..hoodie_partition_metadata.crc
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to