This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch release-0.12.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 09aa8bc7dbdd5390b3d1095d480ae822e658f6d5 Author: shaoxiong.zhan <[email protected]> AuthorDate: Sat Dec 3 10:47:22 2022 +0800 [HUDI-5302] Fix: compute hash key from recordKey failed when recordKeyValue contains ',' (#7342) --- .../apache/hudi/index/bucket/BucketIdentifier.java | 9 +--- .../java/org/apache/hudi/keygen/KeyGenUtils.java | 39 +++++++++------- .../hudi/index/bucket/TestBucketIdentifier.java | 27 ++++++----- .../org/apache/hudi/keygen/TestKeyGenUtils.java | 18 ++++++++ .../apache/spark/sql/hudi/TestInsertTable.scala | 52 +++++++++++++++++++++- 5 files changed, 111 insertions(+), 34 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java index 48ccce1d174..35f9205a8e5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java @@ -21,14 +21,13 @@ package org.apache.hudi.index.bucket; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.keygen.KeyGenUtils; import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.regex.Pattern; -import java.util.stream.Collectors; public class BucketIdentifier implements Serializable { // Compatible with the spark bucket name @@ -69,11 +68,7 @@ public class BucketIdentifier implements Serializable { } private static List<String> getHashKeysUsingIndexFields(String recordKey, List<String> indexKeyFields) { - Map<String, String> recordKeyPairs = Arrays.stream(recordKey.split(",")) - .map(p -> p.split(":")) - .collect(Collectors.toMap(p -> p[0], p -> p[1])); - return indexKeyFields.stream() - .map(recordKeyPairs::get).collect(Collectors.toList()); + return Arrays.asList(KeyGenUtils.extractRecordKeysByFields(recordKey, indexKeyFields)); } public static String partitionBucketIdStr(String partition, int bucketId) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java index d28263574b7..5a8b2b01c5f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java @@ -33,6 +33,7 @@ import org.apache.hudi.keygen.parser.BaseHoodieDateTimeParser; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; public class KeyGenUtils { @@ -43,6 +44,7 @@ public class KeyGenUtils { protected static final String HUDI_DEFAULT_PARTITION_PATH = PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH; public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; public static final String DEFAULT_RECORD_KEY_PARTS_SEPARATOR = ","; + public static final String DEFAULT_COMPOSITE_KEY_FILED_VALUE = ":"; /** * Fetches record key from the GenericRecord. @@ -72,19 +74,24 @@ public class KeyGenUtils { * @see org.apache.hudi.keygen.ComplexAvroKeyGenerator */ public static String[] extractRecordKeys(String recordKey) { - String[] fieldKV = recordKey.split(","); - return Arrays.stream(fieldKV).map(kv -> { - final String[] kvArray = kv.split(":", 2); - if (kvArray.length == 1) { - return kvArray[0]; - } else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) { - return null; - } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) { - return ""; - } else { - return kvArray[1]; - } - }).toArray(String[]::new); + return extractRecordKeysByFields(recordKey, Collections.emptyList()); + } + + public static String[] extractRecordKeysByFields(String recordKey, List<String> fields) { + String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR); + return Arrays.stream(fieldKV).map(kv -> kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2)) + .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() || (fields.contains(kvArray[0]))) + .map(kvArray -> { + if (kvArray.length == 1) { + return kvArray[0]; + } else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) { + return null; + } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) { + return ""; + } else { + return kvArray[1]; + } + }).toArray(String[]::new); } public static String getRecordKey(GenericRecord record, List<String> recordKeyFields, boolean consistentLogicalTimestampEnabled) { @@ -93,11 +100,11 @@ public class KeyGenUtils { for (String recordKeyField : recordKeyFields) { String recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true, consistentLogicalTimestampEnabled); if (recordKeyValue == null) { - recordKey.append(recordKeyField + ":" + NULL_RECORDKEY_PLACEHOLDER + ","); + recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + NULL_RECORDKEY_PLACEHOLDER + DEFAULT_RECORD_KEY_PARTS_SEPARATOR); } else if (recordKeyValue.isEmpty()) { - recordKey.append(recordKeyField + ":" + EMPTY_RECORDKEY_PLACEHOLDER + ","); + recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + EMPTY_RECORDKEY_PLACEHOLDER + DEFAULT_RECORD_KEY_PARTS_SEPARATOR); } else { - recordKey.append(recordKeyField + ":" + recordKeyValue + ","); + recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + recordKeyValue + DEFAULT_RECORD_KEY_PARTS_SEPARATOR); keyIsNullEmpty = false; } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java index 31f33890ad3..e6e6e4aabe7 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java @@ -26,12 +26,13 @@ import org.apache.hudi.keygen.KeyGenUtils; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.List; +import static org.junit.jupiter.api.Assertions.assertEquals; + public class TestBucketIdentifier { public static final String NESTED_COL_SCHEMA = "{\"type\":\"record\", \"name\":\"nested_col\",\"fields\": [" @@ -103,20 +104,26 @@ public class TestBucketIdentifier { public void testGetHashKeys() { BucketIdentifier identifier = new BucketIdentifier(); List<String> keys = identifier.getHashKeys(new HoodieKey("abc", "partition"), ""); - Assertions.assertEquals(1, keys.size()); - Assertions.assertEquals("abc", keys.get(0)); + assertEquals(1, keys.size()); + assertEquals("abc", keys.get(0)); keys = identifier.getHashKeys(new HoodieKey("f1:abc", "partition"), "f1"); - Assertions.assertEquals(1, keys.size()); - Assertions.assertEquals("abc", keys.get(0)); + assertEquals(1, keys.size()); + assertEquals("abc", keys.get(0)); keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd", "partition"), "f2"); - Assertions.assertEquals(1, keys.size()); - Assertions.assertEquals("bcd", keys.get(0)); + assertEquals(1, keys.size()); + assertEquals("bcd", keys.get(0)); keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd", "partition"), "f1,f2"); - Assertions.assertEquals(2, keys.size()); - Assertions.assertEquals("abc", keys.get(0)); - Assertions.assertEquals("bcd", keys.get(1)); + assertEquals(2, keys.size()); + assertEquals("abc", keys.get(0)); + assertEquals("bcd", keys.get(1)); + + keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd,efg", "partition"), "f1,f2"); + assertEquals(3, keys.size()); + assertEquals("abc", keys.get(0)); + assertEquals("bcd", keys.get(1)); + assertEquals("efg", keys.get(2)); } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java index 43f5952e492..26e232ccc53 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java @@ -21,6 +21,9 @@ package org.apache.hudi.keygen; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.List; + public class TestKeyGenUtils { @Test @@ -41,5 +44,20 @@ public class TestKeyGenUtils { // test simple key form: val1 String[] s5 = KeyGenUtils.extractRecordKeys("1"); Assertions.assertArrayEquals(new String[] {"1"}, s5); + + String[] s6 = KeyGenUtils.extractRecordKeys("id:1,id2:2,2"); + Assertions.assertArrayEquals(new String[]{"1", "2", "2"}, s6); + } + + @Test + public void testExtractRecordKeysWithFields() { + List<String> fields = new ArrayList<>(1); + fields.add("id2"); + + String[] s1 = KeyGenUtils.extractRecordKeysByFields("id1:1,id2:2,id3:3", fields); + Assertions.assertArrayEquals(new String[] {"2"}, s1); + + String[] s2 = KeyGenUtils.extractRecordKeysByFields("id1:1,id2:2,2,id3:3", fields); + Assertions.assertArrayEquals(new String[] {"2", "2"}, s2); } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index c5e1abbf09a..2fa6b939acb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -24,7 +24,6 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieDuplicateKeyException import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.spark.sql.SaveMode -import org.apache.spark.sql.internal.SQLConf import java.io.File @@ -991,4 +990,55 @@ class TestInsertTable extends HoodieSparkSqlTestBase { spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false") spark.sql("set hoodie.datasource.write.operation = upsert") } + + test("Test Insert Into Bucket Index Table") { + withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | dt string, + | name string, + | price double, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id,name', + | preCombineField = 'ts', + | hoodie.index.type = 'BUCKET', + | hoodie.bucket.index.hash.field = 'id,name') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}' + """.stripMargin) + + // Note: Do not write the field alias, the partition field must be placed last. + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1,1', 10, 1000, "2021-01-05"), + | (2, 'a2', 20, 2000, "2021-01-06"), + | (3, 'a3', 30, 3000, "2021-01-07") + """.stripMargin) + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1,1', 10, 1000, "2021-01-05") + """.stripMargin) + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + } + } }
