[
https://issues.apache.org/jira/browse/HUDI-8403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Geser Dugarov updated HUDI-8403:
--------------------------------
Description:
We have a test for bucket pruning to illustrate wrong behavior:
{code:java}
void testBucketPruningSpecialKeyDataType(boolean logicalTimestamp) throws
Exception {
String tablePath1 = new Path(tempFile.getAbsolutePath(), "tbl1").toString();
Configuration conf1 = TestConfigurations.getDefaultConf(tablePath1,
TestConfigurations.ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE);
final String f1 = "f_timestamp";
conf1.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
conf1.setString(FlinkOptions.RECORD_KEY_FIELD, f1);
conf1.setString(FlinkOptions.PRECOMBINE_FIELD, f1);
conf1.removeConfig(FlinkOptions.PARTITION_PATH_FIELD);
conf1.setBoolean(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
logicalTimestamp);
// test timestamp filtering
TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE,
conf1);
}{code}
To calculate hash for bucketing `KeyGenUtils::extractRecordKeysByFields` will
be used. But in this function timestamp as string will be passed without column
name, and also column name for hashing will be passed.
In a result:
{code:java}
String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
return Arrays.stream(fieldKV).map(kv ->
kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2))
.filter(kvArray -> kvArray.length == 1 || fields.isEmpty() ||
(fields.contains(kvArray[0])))
...{code}
will filter timestamp and pass empty list back for hash calculation.
After writing test data, we should see 4 buckets due to default value of
`FlinkOptions::BUCKET_INDEX_NUM_BUCKETS`.
But I see only one, because values for hash calculations are wrong:
{code:java}
/tmp/junit7256928206840261812/tbl1$ tree -a
.
├── __HIVE_DEFAULT_PARTITION__
│ ├── 00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet
│ ├──
.00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet.crc
│ ├── .hoodie_partition_metadata
│ └── ..hoodie_partition_metadata.crc
{code}
was:
We have a test for bucket pruning to illustrate wrong behavior:
{code:java}
void testBucketPruningSpecialKeyDataType(boolean logicalTimestamp) throws
Exception {
String tablePath1 = new Path(tempFile.getAbsolutePath(), "tbl1").toString();
Configuration conf1 = TestConfigurations.getDefaultConf(tablePath1,
TestConfigurations.ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE);
final String f1 = "f_timestamp";
conf1.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
conf1.setString(FlinkOptions.RECORD_KEY_FIELD, f1);
conf1.setString(FlinkOptions.PRECOMBINE_FIELD, f1);
conf1.removeConfig(FlinkOptions.PARTITION_PATH_FIELD);
conf1.setBoolean(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
logicalTimestamp);
// test timestamp filtering
TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE,
conf1);
}{code}
To calculate hash for bucketing `KeyGenUtils::extractRecordKeysByFields` will
be used. But in this function timestamp as string will be passed without column
name, and also column name for hashing will be passed.
In a result:
{code:java}
String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
return Arrays.stream(fieldKV).map(kv ->
kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2))
.filter(kvArray -> kvArray.length == 1 || fields.isEmpty() ||
(fields.contains(kvArray[0])))
...{code}
will filter timestamp and pass empty list back for hash calculation.
After writing test data, we should see 4 buckets due to default value of
`FlinkOptions::BUCKET_INDEX_NUM_BUCKETS`.
But I see only one, because values for hash calculations are wrong:
{code:java}
d00838679@linux-pc:/tmp/junit7256928206840261812/tbl1$ tree -a
.
├── __HIVE_DEFAULT_PARTITION__
│ ├── 00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet
│ ├──
.00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet.crc
│ ├── .hoodie_partition_metadata
│ └── ..hoodie_partition_metadata.crc
{code}
> Wrong bucketing when timestamp is used
> --------------------------------------
>
> Key: HUDI-8403
> URL: https://issues.apache.org/jira/browse/HUDI-8403
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Geser Dugarov
> Assignee: Geser Dugarov
> Priority: Major
>
> We have a test for bucket pruning to illustrate wrong behavior:
> {code:java}
> void testBucketPruningSpecialKeyDataType(boolean logicalTimestamp) throws
> Exception {
> String tablePath1 = new Path(tempFile.getAbsolutePath(), "tbl1").toString();
> Configuration conf1 = TestConfigurations.getDefaultConf(tablePath1,
> TestConfigurations.ROW_DATA_TYPE_HOODIE_KEY_SPECIAL_DATA_TYPE);
> final String f1 = "f_timestamp";
> conf1.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
> conf1.setString(FlinkOptions.RECORD_KEY_FIELD, f1);
> conf1.setString(FlinkOptions.PRECOMBINE_FIELD, f1);
> conf1.removeConfig(FlinkOptions.PARTITION_PATH_FIELD);
> conf1.setBoolean(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
> logicalTimestamp);
> // test timestamp filtering
>
> TestData.writeDataAsBatch(TestData.DATA_SET_INSERT_HOODIE_KEY_SPECIAL_DATA_TYPE,
> conf1);
> }{code}
> To calculate hash for bucketing `KeyGenUtils::extractRecordKeysByFields` will
> be used. But in this function timestamp as string will be passed without
> column name, and also column name for hashing will be passed.
> In a result:
>
> {code:java}
> String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
> return Arrays.stream(fieldKV).map(kv ->
> kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2))
> .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() ||
> (fields.contains(kvArray[0])))
> ...{code}
> will filter timestamp and pass empty list back for hash calculation.
>
>
> After writing test data, we should see 4 buckets due to default value of
> `FlinkOptions::BUCKET_INDEX_NUM_BUCKETS`.
> But I see only one, because values for hash calculations are wrong:
>
> {code:java}
> /tmp/junit7256928206840261812/tbl1$ tree -a
> .
> ├── __HIVE_DEFAULT_PARTITION__
> │ ├── 00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet
> │ ├──
> .00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet.crc
> │ ├── .hoodie_partition_metadata
> │ └── ..hoodie_partition_metadata.crc
> {code}
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)