[ 
https://issues.apache.org/jira/browse/HUDI-8403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Geser Dugarov updated HUDI-8403:
--------------------------------
    Description: 
We have a test for bucket pruning to illustrate wrong behavior:
{code:java}
void testBucketPruningSpecialKeyDataType(boolean logicalTimestamp) throws 
Exception {
  String tablePath1 = new Path(tempFile.getAbsolutePath(), "tbl1").toString();
  Configuration conf1 = TestConfigurations.getDefaultConf(tablePath1, 
TestConfigurations.ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE);
  final String f1 = "f_timestamp";
  conf1.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
  conf1.setString(FlinkOptions.RECORD_KEY_FIELD, f1);
  conf1.setString(FlinkOptions.PRECOMBINE_FIELD, f1);
  conf1.removeConfig(FlinkOptions.PARTITION_PATH_FIELD);
  conf1.setBoolean(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), 
logicalTimestamp);

  // test timestamp filtering
  
TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE,
 conf1);
}{code}
To calculate hash for bucketing `KeyGenUtils::extractRecordKeysByFields` will 
be used. But in this function timestamp as string will be passed without column 
name, and also column name for hashing will be passed.

In a result:
{code:java}
String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
return Arrays.stream(fieldKV).map(kv -> 
kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2))
    .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() || 
(fields.contains(kvArray[0])))
...{code}
will filter timestamp and pass empty list back for hash calculation.

 

After writing test data, we should see 4 buckets due to default value of 
`FlinkOptions::BUCKET_INDEX_NUM_BUCKETS`.

But I see only one, because values for hash calculations are wrong:
{code:java}
/tmp/junit7256928206840261812/tbl1$ tree -a
.
├── __HIVE_DEFAULT_PARTITION__
│   ├── 00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet
│   ├── 
.00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet.crc
│   ├── .hoodie_partition_metadata
│   └── ..hoodie_partition_metadata.crc
{code}
 

  was:
We have a test for bucket pruning to illustrate wrong behavior:
{code:java}
void testBucketPruningSpecialKeyDataType(boolean logicalTimestamp) throws 
Exception {
  String tablePath1 = new Path(tempFile.getAbsolutePath(), "tbl1").toString();
  Configuration conf1 = TestConfigurations.getDefaultConf(tablePath1, 
TestConfigurations.ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE);
  final String f1 = "f_timestamp";
  conf1.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
  conf1.setString(FlinkOptions.RECORD_KEY_FIELD, f1);
  conf1.setString(FlinkOptions.PRECOMBINE_FIELD, f1);
  conf1.removeConfig(FlinkOptions.PARTITION_PATH_FIELD);
  conf1.setBoolean(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), 
logicalTimestamp);

  // test timestamp filtering
  
TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE,
 conf1);
}{code}
To calculate hash for bucketing `KeyGenUtils::extractRecordKeysByFields` will 
be used. But in this function timestamp as string will be passed without column 
name, and also column name for hashing will be passed.

In a result:

 
{code:java}
String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
return Arrays.stream(fieldKV).map(kv -> 
kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2))
    .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() || 
(fields.contains(kvArray[0])))
...{code}
will filter timestamp and pass empty list back for hash calculation.

 

 

After writing test data, we should see 4 buckets due to default value of 
`FlinkOptions::BUCKET_INDEX_NUM_BUCKETS`.

But I see only one, because values for hash calculations are wrong:

 
{code:java}
/tmp/junit7256928206840261812/tbl1$ tree -a
.
├── __HIVE_DEFAULT_PARTITION__
│   ├── 00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet
│   ├── 
.00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet.crc
│   ├── .hoodie_partition_metadata
│   └── ..hoodie_partition_metadata.crc
{code}
 

 

 

 

 


> Wrong bucketing when timestamp is used
> --------------------------------------
>
>                 Key: HUDI-8403
>                 URL: https://issues.apache.org/jira/browse/HUDI-8403
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Geser Dugarov
>            Assignee: Geser Dugarov
>            Priority: Major
>
> We have a test for bucket pruning to illustrate wrong behavior:
> {code:java}
> void testBucketPruningSpecialKeyDataType(boolean logicalTimestamp) throws 
> Exception {
>   String tablePath1 = new Path(tempFile.getAbsolutePath(), "tbl1").toString();
>   Configuration conf1 = TestConfigurations.getDefaultConf(tablePath1, 
> TestConfigurations.ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE);
>   final String f1 = "f_timestamp";
>   conf1.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
>   conf1.setString(FlinkOptions.RECORD_KEY_FIELD, f1);
>   conf1.setString(FlinkOptions.PRECOMBINE_FIELD, f1);
>   conf1.removeConfig(FlinkOptions.PARTITION_PATH_FIELD);
>   conf1.setBoolean(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), 
> logicalTimestamp);
>   // test timestamp filtering
>   
> TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE,
>  conf1);
> }{code}
> To calculate hash for bucketing `KeyGenUtils::extractRecordKeysByFields` will 
> be used. But in this function timestamp as string will be passed without 
> column name, and also column name for hashing will be passed.
> In a result:
> {code:java}
> String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
> return Arrays.stream(fieldKV).map(kv -> 
> kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2))
>     .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() || 
> (fields.contains(kvArray[0])))
> ...{code}
> will filter timestamp and pass empty list back for hash calculation.
>  
> After writing test data, we should see 4 buckets due to default value of 
> `FlinkOptions::BUCKET_INDEX_NUM_BUCKETS`.
> But I see only one, because values for hash calculations are wrong:
> {code:java}
> /tmp/junit7256928206840261812/tbl1$ tree -a
> .
> ├── __HIVE_DEFAULT_PARTITION__
> │   ├── 00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet
> │   ├── 
> .00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet.crc
> │   ├── .hoodie_partition_metadata
> │   └── ..hoodie_partition_metadata.crc
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to