This is an automated email from the ASF dual-hosted git repository.
wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e19c815bb12 [HUDI-8403] Fixed values extraction for bucketing and
optimized `KeyGenUtils::extractRecordKeysByFields` (#12120)
e19c815bb12 is described below
commit e19c815bb125e9a758bf50c07370b91498ae35cf
Author: Geser Dugarov <[email protected]>
AuthorDate: Mon Oct 28 09:43:43 2024 +0700
[HUDI-8403] Fixed values extraction for bucketing and optimized
`KeyGenUtils::extractRecordKeysByFields` (#12120)
* [HUDI-8387] [HUDI-8403] Optimized
`KeyGenUtils::extractRecordKeysByFields`, fixed hash calculation for buckets
---
.../apache/hudi/index/bucket/BucketIdentifier.java | 7 +-
.../java/org/apache/hudi/keygen/KeyGenUtils.java | 100 +++++++++++++++++----
.../hudi/index/bucket/TestBucketIdentifier.java | 11 +--
.../org/apache/hudi/keygen/TestKeyGenUtils.java | 21 +++--
.../apache/hudi/keygen/BuiltinKeyGenerator.java | 8 +-
.../spark/sql/hudi/dml/TestInsertTable.scala | 2 +-
6 files changed, 110 insertions(+), 39 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index 475ee36c55e..ea98ecc009d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -25,7 +25,6 @@ import org.apache.hudi.keygen.KeyGenUtils;
import java.io.Serializable;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
@@ -61,13 +60,11 @@ public class BucketIdentifier implements Serializable {
}
protected static List<String> getHashKeys(String recordKey, String
indexKeyFields) {
- return !recordKey.contains(":") ? Collections.singletonList(recordKey) :
- getHashKeysUsingIndexFields(recordKey,
Arrays.asList(indexKeyFields.split(",")));
+ return getHashKeysUsingIndexFields(recordKey,
Arrays.asList(indexKeyFields.split(",")));
}
protected static List<String> getHashKeys(String recordKey, List<String>
indexKeyFields) {
- return !recordKey.contains(":") ? Collections.singletonList(recordKey) :
- getHashKeysUsingIndexFields(recordKey, indexKeyFields);
+ return getHashKeysUsingIndexFields(recordKey, indexKeyFields);
}
private static List<String> getHashKeysUsingIndexFields(String recordKey,
List<String> indexKeyFields) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 85d8835875e..83cd1d4139c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -35,6 +35,7 @@ import org.apache.hudi.keygen.parser.BaseHoodieDateTimeParser;
import org.apache.avro.generic.GenericRecord;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -48,7 +49,7 @@ public class KeyGenUtils {
protected static final String HUDI_DEFAULT_PARTITION_PATH =
PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
public static final String DEFAULT_RECORD_KEY_PARTS_SEPARATOR = ",";
- public static final String DEFAULT_COMPOSITE_KEY_FILED_VALUE = ":";
+ public static final String DEFAULT_COLUMN_VALUE_SEPARATOR = ":";
public static final String RECORD_KEY_GEN_PARTITION_ID_CONFIG =
"_hoodie.record.key.gen.partition.id";
public static final String RECORD_KEY_GEN_INSTANT_TIME_CONFIG =
"_hoodie.record.key.gen.instant.time";
@@ -133,20 +134,87 @@ public class KeyGenUtils {
}
public static String[] extractRecordKeysByFields(String recordKey,
List<String> fields) {
- String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
- return Arrays.stream(fieldKV).map(kv ->
kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2))
- .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() ||
(fields.contains(kvArray[0])))
- .map(kvArray -> {
- if (kvArray.length == 1) {
- return kvArray[0];
- } else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) {
- return null;
- } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
- return "";
+ // if there is no ',' and ':', then it's a key value
+ if (!recordKey.contains(DEFAULT_RECORD_KEY_PARTS_SEPARATOR) ||
!recordKey.contains(DEFAULT_COLUMN_VALUE_SEPARATOR)) {
+ return new String[] {recordKey};
+ }
+ // complex key case
+ // Here we're reducing memory allocation for substrings and use index
positions,
+ // because for bucket index this will be called for each record, which
leads to GC overhead
+ int keyValueSep1;
+ int keyValueSep2;
+ int commaPosition;
+ String currentField;
+ String currentValue;
+ List<String> values = new ArrayList<>();
+ int processed = 0;
+ while (processed < recordKey.length()) {
+ // note that keyValueSeps and commaPosition are absolute
+ keyValueSep1 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR,
processed);
+ currentField = recordKey.substring(processed, keyValueSep1);
+ keyValueSep2 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR,
keyValueSep1 + 1);
+ if (fields.isEmpty() || (fields.size() == 1 && fields.get(0).isEmpty())
|| fields.contains(currentField)) {
+ if (keyValueSep2 < 0) {
+ // there is no next key value pair
+ currentValue = recordKey.substring(keyValueSep1 + 1);
+ processed = recordKey.length();
+ } else {
+ // looking for ',' in reverse order to support multiple ',' in key
values by looking for the latest ','
+ commaPosition =
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
+ // commaPosition could be -1 if didn't find ',', or we could find
',' from previous key-value pair ('col1:val1,...')
+ // also we could have the last value with ':', so need to check if
keyValueSep2 > 0
+ while (commaPosition < keyValueSep1 && keyValueSep2 > 0) {
+ // If we have key value as a timestamp with ':',
+ // then we continue to skip ':' until before the next ':' there is
a ',' character.
+ // For instance, 'col1:val1,col2:2014-10-22 13:50:42,col3:val3'
+ // ^ ^ ^ ^
+ // 1) keyValueSep1 skip skip
keyValueSep2
+ // ^
+ // commaPosition
+ // 2) | currentValue |
+ // ^
+ // 3) processed
+ keyValueSep2 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR,
keyValueSep2 + 1);
+ commaPosition =
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
+ }
+ if (commaPosition > 0) {
+ currentValue = recordKey.substring(keyValueSep1 + 1,
commaPosition);
+ processed = commaPosition + 1;
} else {
- return kvArray[1];
+ // it could be the last value with many ':', in this case we
wouldn't find any ',' before
+ currentValue = recordKey.substring(keyValueSep1 + 1);
+ processed = recordKey.length();
+ }
+ }
+ // here could be any logic of conditional replacing of currentValue
+ if (currentValue.equals(NULL_RECORDKEY_PLACEHOLDER)) {
+ values.add(null);
+ } else if (currentValue.equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
+ values.add("");
+ } else {
+ values.add(currentValue);
+ }
+ } else {
+ if (keyValueSep2 < 0) {
+ processed = recordKey.length();
+ } else {
+ commaPosition =
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
+ while (commaPosition < keyValueSep1) {
+ // described above
+ keyValueSep2 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR,
keyValueSep2 + 1);
+ commaPosition =
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
}
- }).toArray(String[]::new);
+ if (commaPosition < 0) {
+ // if something went wrong, and there is no ',', we should stop
here, and pass the whole recordKey,
+ // otherwise processed = commaPosition + 1 would lead to infinite
loop
+ processed = recordKey.length();
+ } else {
+ processed = commaPosition + 1;
+ }
+ }
+ }
+ }
+ return values.isEmpty() ? new String[] {recordKey} : values.toArray(new
String[0]);
}
public static String getRecordKey(GenericRecord record, List<String>
recordKeyFields, boolean consistentLogicalTimestampEnabled) {
@@ -161,11 +229,11 @@ public class KeyGenUtils {
throw new HoodieKeyException("Record key field '" + recordKeyField +
"' does not exist in the input record");
}
if (recordKeyValue == null) {
-
recordKey.append(recordKeyField).append(DEFAULT_COMPOSITE_KEY_FILED_VALUE).append(NULL_RECORDKEY_PLACEHOLDER);
+
recordKey.append(recordKeyField).append(DEFAULT_COLUMN_VALUE_SEPARATOR).append(NULL_RECORDKEY_PLACEHOLDER);
} else if (recordKeyValue.isEmpty()) {
-
recordKey.append(recordKeyField).append(DEFAULT_COMPOSITE_KEY_FILED_VALUE).append(EMPTY_RECORDKEY_PLACEHOLDER);
+
recordKey.append(recordKeyField).append(DEFAULT_COLUMN_VALUE_SEPARATOR).append(EMPTY_RECORDKEY_PLACEHOLDER);
} else {
-
recordKey.append(recordKeyField).append(DEFAULT_COMPOSITE_KEY_FILED_VALUE).append(recordKeyValue);
+
recordKey.append(recordKeyField).append(DEFAULT_COLUMN_VALUE_SEPARATOR).append(recordKeyValue);
keyIsNullEmpty = false;
}
if (i != recordKeyFields.size() - 1) {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
index e6e6e4aabe7..8f4228cde52 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
@@ -103,14 +103,12 @@ public class TestBucketIdentifier {
@Test
public void testGetHashKeys() {
BucketIdentifier identifier = new BucketIdentifier();
+ // if for recordKey one column only is used, then there is no added column
name before value
List<String> keys = identifier.getHashKeys(new HoodieKey("abc",
"partition"), "");
assertEquals(1, keys.size());
assertEquals("abc", keys.get(0));
- keys = identifier.getHashKeys(new HoodieKey("f1:abc", "partition"), "f1");
- assertEquals(1, keys.size());
- assertEquals("abc", keys.get(0));
-
+ // complex keys, composite from key-value pairs
keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd", "partition"),
"f2");
assertEquals(1, keys.size());
assertEquals("bcd", keys.get(0));
@@ -121,9 +119,8 @@ public class TestBucketIdentifier {
assertEquals("bcd", keys.get(1));
keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd,efg",
"partition"), "f1,f2");
- assertEquals(3, keys.size());
+ assertEquals(2, keys.size());
assertEquals("abc", keys.get(0));
- assertEquals("bcd", keys.get(1));
- assertEquals("efg", keys.get(2));
+ assertEquals("bcd,efg", keys.get(1));
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
index 61391232869..5cffef26e6e 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -112,16 +113,18 @@ public class TestKeyGenUtils {
@Test
public void testExtractRecordKeys() {
- // test complex key form: field1:val1,field2:val2,...
- String[] s1 = KeyGenUtils.extractRecordKeys("id:1");
- Assertions.assertArrayEquals(new String[] {"1"}, s1);
+ // if for recordKey one column only is used, then there is no added column
name before value
+ String[] s1 = KeyGenUtils.extractRecordKeys("2024-10-22 14:11:53.023");
+ Assertions.assertArrayEquals(new String[] {"2024-10-22 14:11:53.023"}, s1);
+ // test complex key form: field1:val1,field2:val2,...
String[] s2 = KeyGenUtils.extractRecordKeys("id:1,id:2");
Assertions.assertArrayEquals(new String[] {"1", "2"}, s2);
String[] s3 =
KeyGenUtils.extractRecordKeys("id:1,id2:__null__,id3:__empty__");
Assertions.assertArrayEquals(new String[] {"1", null, ""}, s3);
+ // keys with ':' are not supported
String[] s4 = KeyGenUtils.extractRecordKeys("id:ab:cd,id2:ef");
Assertions.assertArrayEquals(new String[] {"ab:cd", "ef"}, s4);
@@ -130,7 +133,7 @@ public class TestKeyGenUtils {
Assertions.assertArrayEquals(new String[] {"1"}, s5);
String[] s6 = KeyGenUtils.extractRecordKeys("id:1,id2:2,2");
- Assertions.assertArrayEquals(new String[]{"1", "2", "2"}, s6);
+ Assertions.assertArrayEquals(new String[]{"1", "2,2"}, s6);
}
@Test
@@ -142,6 +145,14 @@ public class TestKeyGenUtils {
Assertions.assertArrayEquals(new String[] {"2"}, s1);
String[] s2 = KeyGenUtils.extractRecordKeysByFields("id1:1,id2:2,2,id3:3",
fields);
- Assertions.assertArrayEquals(new String[] {"2", "2"}, s2);
+ Assertions.assertArrayEquals(new String[] {"2,2"}, s2);
+
+ String[] s3 =
KeyGenUtils.extractRecordKeysByFields("id1:1,1,1,id2:,2,2,,id3:3", fields);
+ Assertions.assertArrayEquals(new String[] {",2,2,"}, s3);
+
+ fields.addAll(Arrays.asList("id1", "id3", "id4"));
+ // tough case with a lot of ',' and ':'
+ String[] s4 =
KeyGenUtils.extractRecordKeysByFields("id1:1,,,id2:2024-10-22
14:11:53.023,id3:,,3,id4:::1:2::4::", fields);
+ Assertions.assertArrayEquals(new String[] {"1,,", "2024-10-22
14:11:53.023", ",,3", "::1:2::4::"}, s4);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
index 58350b0d494..0f7c6144d41 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
@@ -51,7 +51,7 @@ import java.util.function.Supplier;
import scala.Function1;
import static org.apache.hudi.common.util.CollectionUtils.tail;
-import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_COMPOSITE_KEY_FILED_VALUE;
+import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_COLUMN_VALUE_SEPARATOR;
import static
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
@@ -71,8 +71,6 @@ public abstract class BuiltinKeyGenerator extends
BaseKeyGenerator implements Sp
private static final Logger LOG =
LoggerFactory.getLogger(BuiltinKeyGenerator.class);
- private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
-
protected static final String FIELDS_SEP = ",";
protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 =
UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
@@ -220,7 +218,7 @@ public abstract class BuiltinKeyGenerator extends
BaseKeyGenerator implements Sp
PartitionPathFormatterBase.StringBuilder<S> sb = builderFactory.get();
for (int i = 0; i < recordKeyParts.size(); ++i) {
-
sb.appendJava(fieldNames.get(i)).appendJava(DEFAULT_COMPOSITE_KEY_FILED_VALUE);
+
sb.appendJava(fieldNames.get(i)).appendJava(DEFAULT_COLUMN_VALUE_SEPARATOR);
// NOTE: If record-key part has already been a string [[toString]] will
be a no-op
sb.append(emptyKeyPartHandler.apply(converter.apply(recordKeyParts.get(i))));
@@ -248,7 +246,7 @@ public abstract class BuiltinKeyGenerator extends
BaseKeyGenerator implements Sp
if (recordKeyParts.length > 1) {
sb.appendJava(recordKeyFields.get(i));
- sb.appendJava(COMPOSITE_KEY_FIELD_VALUE_INFIX);
+ sb.appendJava(DEFAULT_COLUMN_VALUE_SEPARATOR);
}
sb.append(convertedKeyPart);
// This check is to validate that overall composite-key has at least one
non-null, non-empty
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index f000a7e7831..d92b10b31e7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -1705,7 +1705,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| (6, 'a1,1', 10, 1000, "2021-01-05"),
| (7, 'a1,1', 10, 1000, "2021-01-05"),
| (8, 'a1,1', 10, 1000, "2021-01-05"),
- | (9, 'a3,3', 30, 3000, "2021-01-05")
+ | (10, 'a3,3', 30, 3000, "2021-01-05")
""".stripMargin)
checkAnswer(s"select count(distinct _hoodie_file_name) from
$tableName where dt = '2021-01-05'")(