geserdugarov commented on issue #12155:
URL: https://github.com/apache/hudi/issues/12155#issuecomment-2434948710
The main problem, that current filtering is really bad. I wrote some code
for quick illustration:
```Java
package org.apache.hudi.index.bucket;
import org.apache.hudi.common.model.HoodieKey;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class TestBucketIdentifier {
public static String DEFAULT_RECORD_KEY_PARTS_SEPARATOR = ",";
public static String DEFAULT_COLUMN_VALUE_SEPARATOR = ":";
public static String NULL_RECORDKEY_PLACEHOLDER = "__null__";
public static String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
@Test
public void testGetHashKeys() {
// for current hash, all timestamps to only one bucket
// List<String> recordKeys = Arrays.asList("2024-10-24 10:11:11.234",
"2024-10-24 10:22:12.345", "2024-10-24 10:33:13.456", "2024-10-24
10:44:14.567", "2024-10-24 10:55:15.678");
// String indexKeyFields = "ts";
// current hash, all to only one bucket
// List<String> recordKeys =
Arrays.asList("f1:101,010,f2:forceToOneBucket",
"f1:122,120,f2:forceToOneBucket", "f1:123,410,f2:forceToOneBucket");
// String indexKeyFields = "f2";
// to illustrate problem with duplication considering migration for
current hash to new one
List<String> recordKeys = Arrays.asList("f1:abc,f2:bcd,123",
"f1:hij,f2:jkl,,x", "f1:123,f2:234,567", "f1:234,f2:,,7,89");
String indexKeyFields = "f1,f2";
int numBuckets = 3;
for (String recordKey : recordKeys) {
List<String> keysNew = newGetHashKeys(recordKey, indexKeyFields);
List<String> keysOld = currentGetHashKeys(recordKey, indexKeyFields);
int bucketNew = getBucketId(keysNew, numBuckets);
int bucketOld = getBucketId(keysOld, numBuckets);
System.out.println("Key:" + recordKey + ";\tnew hash: " + keysNew +
";\tcurrent hash: " + keysOld +
";\tnew bucket = " + bucketNew + ";\tcurrent bucket = " +
bucketOld);
}
System.out.println("Finished");
}
public static int getBucketId(List<String> hashKeyFields, int numBuckets) {
return (hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets;
}
public static List<String> newGetHashKeys(String recordKey, String
indexKeyFields) {
return Arrays.asList(newExtractRecordKeysByFields(recordKey,
Arrays.asList(indexKeyFields.split(","))));
}
public static List<String> currentGetHashKeys(String recordKey, String
indexKeyFields) {
return !recordKey.contains(":") ?
Collections.singletonList(recordKey) :
Arrays.asList(currentExtractRecordKeysByFields(recordKey,
Arrays.asList(indexKeyFields.split(","))));
}
public static String[] newExtractRecordKeysByFields(String recordKey,
List<String> fields) {
// if there is no ',' and ':', then it's a key value
if (!recordKey.contains(DEFAULT_RECORD_KEY_PARTS_SEPARATOR) ||
!recordKey.contains(DEFAULT_COLUMN_VALUE_SEPARATOR)) {
return new String[] {recordKey};
}
// complex key case
// Here we're reducing memory allocation for substrings and use index
positions,
// because for bucket index this will be called for each record, which
leads to GC overhead
int keyValueSep1;
int keyValueSep2;
int commaPosition;
String currentField;
String currentValue;
List<String> values = new ArrayList<>();
int processed = 0;
while (processed < recordKey.length()) {
// note that keyValueSeps and commaPosition are absolute
keyValueSep1 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR,
processed);
currentField = recordKey.substring(processed, keyValueSep1);
keyValueSep2 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR,
keyValueSep1 + 1);
if (fields.isEmpty() || (fields.size() == 1 &&
fields.get(0).isEmpty()) || fields.contains(currentField)) {
if (keyValueSep2 < 0) {
// there is no next key value pair
currentValue = recordKey.substring(keyValueSep1 + 1);
processed = recordKey.length();
} else {
// looking for ',' in reverse order to support multiple ',' in key
values by looking for the latest ','
commaPosition =
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
// commaPosition could be -1 if didn't find ',', or we could find
',' from previous key-value pair ('col1:val1,...')
// also we could have the last value with ':', so need to check if
keyValueSep2 > 0
while (commaPosition < keyValueSep1 && keyValueSep2 > 0) {
// If we have key value as a timestamp with ':',
// then we continue to skip ':' until before the next ':' there
is a ',' character.
// For instance, 'col1:val1,col2:2014-10-22 13:50:42,col3:val3'
// ^ ^ ^ ^
// 1) keyValueSep1 skip skip
keyValueSep2
// ^
// commaPosition
// 2) | currentValue |
// ^
// 3) processed
keyValueSep2 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR,
keyValueSep2 + 1);
commaPosition =
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
}
if (commaPosition > 0) {
currentValue = recordKey.substring(keyValueSep1 + 1,
commaPosition);
processed = commaPosition + 1;
} else {
// it could be the last value with many ':', in this case we
wouldn't find any ',' before
currentValue = recordKey.substring(keyValueSep1 + 1);
processed = recordKey.length();
}
}
// here could be any logic of conditional replacing of currentValue
if (currentValue.equals(NULL_RECORDKEY_PLACEHOLDER)) {
values.add(null);
} else if (currentValue.equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
values.add("");
} else {
values.add(currentValue);
}
} else {
if (keyValueSep2 < 0) {
processed = recordKey.length();
} else {
commaPosition =
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
while (commaPosition < keyValueSep1) {
// described above
keyValueSep2 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR,
keyValueSep2 + 1);
commaPosition =
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
}
if (commaPosition < 0) {
// if something went wrong, and there is no ',', we should stop
here, and pass the whole recordKey,
// otherwise processed = commaPosition + 1 would lead to
infinite loop
processed = recordKey.length();
} else {
processed = commaPosition + 1;
}
}
}
}
return values.isEmpty() ? new String[] {recordKey} : values.toArray(new
String[0]);
}
public static String[] currentExtractRecordKeysByFields(String recordKey,
List<String> fields) {
String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
return Arrays.stream(fieldKV).map(kv ->
kv.split(DEFAULT_COLUMN_VALUE_SEPARATOR, 2))
.filter(kvArray -> kvArray.length == 1 || fields.isEmpty() ||
(fields.contains(kvArray[0])))
.map(kvArray -> {
if (kvArray.length == 1) {
return kvArray[0];
} else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) {
return null;
} else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
return "";
} else {
return kvArray[1];
}
}).toArray(String[]::new);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]