This is an automated email from the ASF dual-hosted git repository.

wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e19c815bb12 [HUDI-8403] Fixed values extraction for bucketing and 
optimized `KeyGenUtils::extractRecordKeysByFields` (#12120)
e19c815bb12 is described below

commit e19c815bb125e9a758bf50c07370b91498ae35cf
Author: Geser Dugarov <[email protected]>
AuthorDate: Mon Oct 28 09:43:43 2024 +0700

    [HUDI-8403] Fixed values extraction for bucketing and optimized 
`KeyGenUtils::extractRecordKeysByFields` (#12120)
    
    * [HUDI-8387] [HUDI-8403] Optimized 
`KeyGenUtils::extractRecordKeysByFields`, fixed hash calculation for buckets
---
 .../apache/hudi/index/bucket/BucketIdentifier.java |   7 +-
 .../java/org/apache/hudi/keygen/KeyGenUtils.java   | 100 +++++++++++++++++----
 .../hudi/index/bucket/TestBucketIdentifier.java    |  11 +--
 .../org/apache/hudi/keygen/TestKeyGenUtils.java    |  21 +++--
 .../apache/hudi/keygen/BuiltinKeyGenerator.java    |   8 +-
 .../spark/sql/hudi/dml/TestInsertTable.scala       |   2 +-
 6 files changed, 110 insertions(+), 39 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index 475ee36c55e..ea98ecc009d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -25,7 +25,6 @@ import org.apache.hudi.keygen.KeyGenUtils;
 
 import java.io.Serializable;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.regex.Pattern;
 
@@ -61,13 +60,11 @@ public class BucketIdentifier implements Serializable {
   }
 
   protected static List<String> getHashKeys(String recordKey, String 
indexKeyFields) {
-    return !recordKey.contains(":") ? Collections.singletonList(recordKey) :
-        getHashKeysUsingIndexFields(recordKey, 
Arrays.asList(indexKeyFields.split(",")));
+    return getHashKeysUsingIndexFields(recordKey, 
Arrays.asList(indexKeyFields.split(",")));
   }
 
   protected static List<String> getHashKeys(String recordKey, List<String> 
indexKeyFields) {
-    return !recordKey.contains(":") ? Collections.singletonList(recordKey) :
-        getHashKeysUsingIndexFields(recordKey, indexKeyFields);
+    return getHashKeysUsingIndexFields(recordKey, indexKeyFields);
   }
 
   private static List<String> getHashKeysUsingIndexFields(String recordKey, 
List<String> indexKeyFields) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 85d8835875e..83cd1d4139c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -35,6 +35,7 @@ import org.apache.hudi.keygen.parser.BaseHoodieDateTimeParser;
 import org.apache.avro.generic.GenericRecord;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -48,7 +49,7 @@ public class KeyGenUtils {
   protected static final String HUDI_DEFAULT_PARTITION_PATH = 
PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
   public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
   public static final String DEFAULT_RECORD_KEY_PARTS_SEPARATOR = ",";
-  public static final String DEFAULT_COMPOSITE_KEY_FILED_VALUE = ":";
+  public static final String DEFAULT_COLUMN_VALUE_SEPARATOR = ":";
 
   public static final String RECORD_KEY_GEN_PARTITION_ID_CONFIG = 
"_hoodie.record.key.gen.partition.id";
   public static final String RECORD_KEY_GEN_INSTANT_TIME_CONFIG = 
"_hoodie.record.key.gen.instant.time";
@@ -133,20 +134,87 @@ public class KeyGenUtils {
   }
 
   public static String[] extractRecordKeysByFields(String recordKey, 
List<String> fields) {
-    String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
-    return Arrays.stream(fieldKV).map(kv -> 
kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2))
-        .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() || 
(fields.contains(kvArray[0])))
-        .map(kvArray -> {
-          if (kvArray.length == 1) {
-            return kvArray[0];
-          } else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) {
-            return null;
-          } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
-            return "";
+    // if there is no ',' and ':', then it's a key value
+    if (!recordKey.contains(DEFAULT_RECORD_KEY_PARTS_SEPARATOR) || 
!recordKey.contains(DEFAULT_COLUMN_VALUE_SEPARATOR)) {
+      return new String[] {recordKey};
+    }
+    // complex key case
+    // Here we're reducing memory allocation for substrings and use index 
positions,
+    // because for bucket index this will be called for each record, which 
leads to GC overhead
+    int keyValueSep1;
+    int keyValueSep2;
+    int commaPosition;
+    String currentField;
+    String currentValue;
+    List<String> values = new ArrayList<>();
+    int processed = 0;
+    while (processed < recordKey.length()) {
+      // note that keyValueSeps and commaPosition are absolute
+      keyValueSep1 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR, 
processed);
+      currentField = recordKey.substring(processed, keyValueSep1);
+      keyValueSep2 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR, 
keyValueSep1 + 1);
+      if (fields.isEmpty() || (fields.size() == 1 && fields.get(0).isEmpty()) 
|| fields.contains(currentField)) {
+        if (keyValueSep2 < 0) {
+          // there is no next key value pair
+          currentValue = recordKey.substring(keyValueSep1 + 1);
+          processed = recordKey.length();
+        } else {
+          // looking for ',' in reverse order to support multiple ',' in key 
values by looking for the latest ','
+          commaPosition = 
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
+          // commaPosition could be -1 if didn't find ',', or we could find 
',' from previous key-value pair ('col1:val1,...')
+          // also we could have the last value with ':', so need to check if 
keyValueSep2 > 0
+          while (commaPosition < keyValueSep1 && keyValueSep2 > 0) {
+            // If we have key value as a timestamp with ':',
+            // then we continue to skip ':' until before the next ':' there is 
a ',' character.
+            // For instance, 'col1:val1,col2:2014-10-22 13:50:42,col3:val3'
+            //                              ^             ^  ^       ^
+            //   1)              keyValueSep1          skip  skip    
keyValueSep2
+            //                                                  ^
+            //                                                  commaPosition
+            //   2)                         |   currentValue    |
+            //                                                  ^
+            //   3)                                             processed
+            keyValueSep2 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR, 
keyValueSep2 + 1);
+            commaPosition = 
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
+          }
+          if (commaPosition > 0) {
+            currentValue = recordKey.substring(keyValueSep1 + 1, 
commaPosition);
+            processed = commaPosition + 1;
           } else {
-            return kvArray[1];
+            // it could be the last value with many ':', in this case we 
wouldn't find any ',' before
+            currentValue = recordKey.substring(keyValueSep1 + 1);
+            processed = recordKey.length();
+          }
+        }
+        // here could be any logic of conditional replacing of currentValue
+        if (currentValue.equals(NULL_RECORDKEY_PLACEHOLDER)) {
+          values.add(null);
+        } else if (currentValue.equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          values.add("");
+        } else {
+          values.add(currentValue);
+        }
+      } else {
+        if (keyValueSep2 < 0) {
+          processed = recordKey.length();
+        } else {
+          commaPosition = 
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
+          while (commaPosition < keyValueSep1) {
+            // described above
+            keyValueSep2 = recordKey.indexOf(DEFAULT_COLUMN_VALUE_SEPARATOR, 
keyValueSep2 + 1);
+            commaPosition = 
recordKey.lastIndexOf(DEFAULT_RECORD_KEY_PARTS_SEPARATOR, keyValueSep2);
           }
-        }).toArray(String[]::new);
+          if (commaPosition < 0) {
+            // if something went wrong, and there is no ',', we should stop 
here, and pass the whole recordKey,
+            // otherwise processed = commaPosition + 1 would lead to infinite 
loop
+            processed = recordKey.length();
+          } else {
+            processed = commaPosition + 1;
+          }
+        }
+      }
+    }
+    return values.isEmpty() ? new String[] {recordKey} : values.toArray(new 
String[0]);
   }
 
   public static String getRecordKey(GenericRecord record, List<String> 
recordKeyFields, boolean consistentLogicalTimestampEnabled) {
@@ -161,11 +229,11 @@ public class KeyGenUtils {
         throw new HoodieKeyException("Record key field '" + recordKeyField + 
"' does not exist in the input record");
       }
       if (recordKeyValue == null) {
-        
recordKey.append(recordKeyField).append(DEFAULT_COMPOSITE_KEY_FILED_VALUE).append(NULL_RECORDKEY_PLACEHOLDER);
+        
recordKey.append(recordKeyField).append(DEFAULT_COLUMN_VALUE_SEPARATOR).append(NULL_RECORDKEY_PLACEHOLDER);
       } else if (recordKeyValue.isEmpty()) {
-        
recordKey.append(recordKeyField).append(DEFAULT_COMPOSITE_KEY_FILED_VALUE).append(EMPTY_RECORDKEY_PLACEHOLDER);
+        
recordKey.append(recordKeyField).append(DEFAULT_COLUMN_VALUE_SEPARATOR).append(EMPTY_RECORDKEY_PLACEHOLDER);
       } else {
-        
recordKey.append(recordKeyField).append(DEFAULT_COMPOSITE_KEY_FILED_VALUE).append(recordKeyValue);
+        
recordKey.append(recordKeyField).append(DEFAULT_COLUMN_VALUE_SEPARATOR).append(recordKeyValue);
         keyIsNullEmpty = false;
       }
       if (i != recordKeyFields.size() - 1) {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
index e6e6e4aabe7..8f4228cde52 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
@@ -103,14 +103,12 @@ public class TestBucketIdentifier {
   @Test
   public void testGetHashKeys() {
     BucketIdentifier identifier = new BucketIdentifier();
+    // if for recordKey one column only is used, then there is no added column 
name before value
     List<String> keys = identifier.getHashKeys(new HoodieKey("abc", 
"partition"), "");
     assertEquals(1, keys.size());
     assertEquals("abc", keys.get(0));
 
-    keys = identifier.getHashKeys(new HoodieKey("f1:abc", "partition"), "f1");
-    assertEquals(1, keys.size());
-    assertEquals("abc", keys.get(0));
-
+    // complex keys, composite from key-value pairs
     keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd", "partition"), 
"f2");
     assertEquals(1, keys.size());
     assertEquals("bcd", keys.get(0));
@@ -121,9 +119,8 @@ public class TestBucketIdentifier {
     assertEquals("bcd", keys.get(1));
 
     keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd,efg", 
"partition"), "f1,f2");
-    assertEquals(3, keys.size());
+    assertEquals(2, keys.size());
     assertEquals("abc", keys.get(0));
-    assertEquals("bcd", keys.get(1));
-    assertEquals("efg", keys.get(2));
+    assertEquals("bcd,efg", keys.get(1));
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
index 61391232869..5cffef26e6e 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -112,16 +113,18 @@ public class TestKeyGenUtils {
 
   @Test
   public void testExtractRecordKeys() {
-    // test complex key form: field1:val1,field2:val2,...
-    String[] s1 = KeyGenUtils.extractRecordKeys("id:1");
-    Assertions.assertArrayEquals(new String[] {"1"}, s1);
+    // if for recordKey one column only is used, then there is no added column 
name before value
+    String[] s1 = KeyGenUtils.extractRecordKeys("2024-10-22 14:11:53.023");
+    Assertions.assertArrayEquals(new String[] {"2024-10-22 14:11:53.023"}, s1);
 
+    // test complex key form: field1:val1,field2:val2,...
     String[] s2 = KeyGenUtils.extractRecordKeys("id:1,id:2");
     Assertions.assertArrayEquals(new String[] {"1", "2"}, s2);
 
     String[] s3 = 
KeyGenUtils.extractRecordKeys("id:1,id2:__null__,id3:__empty__");
     Assertions.assertArrayEquals(new String[] {"1", null, ""}, s3);
 
+    // keys with ':' are not supported
     String[] s4 = KeyGenUtils.extractRecordKeys("id:ab:cd,id2:ef");
     Assertions.assertArrayEquals(new String[] {"ab:cd", "ef"}, s4);
 
@@ -130,7 +133,7 @@ public class TestKeyGenUtils {
     Assertions.assertArrayEquals(new String[] {"1"}, s5);
 
     String[] s6 = KeyGenUtils.extractRecordKeys("id:1,id2:2,2");
-    Assertions.assertArrayEquals(new String[]{"1", "2", "2"}, s6);
+    Assertions.assertArrayEquals(new String[]{"1", "2,2"}, s6);
   }
 
   @Test
@@ -142,6 +145,14 @@ public class TestKeyGenUtils {
     Assertions.assertArrayEquals(new String[] {"2"}, s1);
 
     String[] s2 = KeyGenUtils.extractRecordKeysByFields("id1:1,id2:2,2,id3:3", 
fields);
-    Assertions.assertArrayEquals(new String[] {"2", "2"}, s2);
+    Assertions.assertArrayEquals(new String[] {"2,2"}, s2);
+
+    String[] s3 = 
KeyGenUtils.extractRecordKeysByFields("id1:1,1,1,id2:,2,2,,id3:3", fields);
+    Assertions.assertArrayEquals(new String[] {",2,2,"}, s3);
+
+    fields.addAll(Arrays.asList("id1", "id3", "id4"));
+    // tough case with a lot of ',' and ':'
+    String[] s4 = 
KeyGenUtils.extractRecordKeysByFields("id1:1,,,id2:2024-10-22 
14:11:53.023,id3:,,3,id4:::1:2::4::", fields);
+    Assertions.assertArrayEquals(new String[] {"1,,", "2024-10-22 
14:11:53.023", ",,3", "::1:2::4::"}, s4);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
index 58350b0d494..0f7c6144d41 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
@@ -51,7 +51,7 @@ import java.util.function.Supplier;
 import scala.Function1;
 
 import static org.apache.hudi.common.util.CollectionUtils.tail;
-import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_COMPOSITE_KEY_FILED_VALUE;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_COLUMN_VALUE_SEPARATOR;
 import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
 import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
 import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
@@ -71,8 +71,6 @@ public abstract class BuiltinKeyGenerator extends 
BaseKeyGenerator implements Sp
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BuiltinKeyGenerator.class);
 
-  private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
-
   protected static final String FIELDS_SEP = ",";
 
   protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = 
UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
@@ -220,7 +218,7 @@ public abstract class BuiltinKeyGenerator extends 
BaseKeyGenerator implements Sp
 
     PartitionPathFormatterBase.StringBuilder<S> sb = builderFactory.get();
     for (int i = 0; i < recordKeyParts.size(); ++i) {
-      
sb.appendJava(fieldNames.get(i)).appendJava(DEFAULT_COMPOSITE_KEY_FILED_VALUE);
+      
sb.appendJava(fieldNames.get(i)).appendJava(DEFAULT_COLUMN_VALUE_SEPARATOR);
       // NOTE: If record-key part has already been a string [[toString]] will 
be a no-op
       
sb.append(emptyKeyPartHandler.apply(converter.apply(recordKeyParts.get(i))));
 
@@ -248,7 +246,7 @@ public abstract class BuiltinKeyGenerator extends 
BaseKeyGenerator implements Sp
 
       if (recordKeyParts.length > 1) {
         sb.appendJava(recordKeyFields.get(i));
-        sb.appendJava(COMPOSITE_KEY_FIELD_VALUE_INFIX);
+        sb.appendJava(DEFAULT_COLUMN_VALUE_SEPARATOR);
       }
       sb.append(convertedKeyPart);
       // This check is to validate that overall composite-key has at least one 
non-null, non-empty
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index f000a7e7831..d92b10b31e7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -1705,7 +1705,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
                  | (6, 'a1,1', 10, 1000, "2021-01-05"),
                  | (7, 'a1,1', 10, 1000, "2021-01-05"),
                  | (8, 'a1,1', 10, 1000, "2021-01-05"),
-                 | (9, 'a3,3', 30, 3000, "2021-01-05")
+                 | (10, 'a3,3', 30, 3000, "2021-01-05")
                """.stripMargin)
 
             checkAnswer(s"select count(distinct _hoodie_file_name) from 
$tableName where dt = '2021-01-05'")(

Reply via email to