This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit dd032bb1fe282be967572bdeae5e56ae4cb0e689 Author: shaoxiong.zhan <[email protected]> AuthorDate: Sat Dec 3 10:47:22 2022 +0800 [HUDI-5302] Fix: compute hash key from recordKey failed when recordKeyValue contains ',' (#7342) (cherry picked from commit 3dc76af95d8e0c64c3dedfdd28c081046b905640) --- .../apache/hudi/index/bucket/BucketIdentifier.java | 9 +- .../java/org/apache/hudi/keygen/KeyGenUtils.java | 39 +++--- .../hudi/index/bucket/TestBucketIdentifier.java | 14 +- .../org/apache/hudi/keygen/TestKeyGenUtils.java | 18 +++ .../apache/spark/sql/hudi/TestInsertTable.scala | 142 ++++++++++++++++++++- 5 files changed, 195 insertions(+), 27 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java index 48ccce1d174..35f9205a8e5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java @@ -21,14 +21,13 @@ package org.apache.hudi.index.bucket; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.keygen.KeyGenUtils; import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.regex.Pattern; -import java.util.stream.Collectors; public class BucketIdentifier implements Serializable { // Compatible with the spark bucket name @@ -69,11 +68,7 @@ public class BucketIdentifier implements Serializable { } private static List<String> getHashKeysUsingIndexFields(String recordKey, List<String> indexKeyFields) { - Map<String, String> recordKeyPairs = Arrays.stream(recordKey.split(",")) - .map(p -> p.split(":")) - .collect(Collectors.toMap(p -> p[0], p -> p[1])); - return indexKeyFields.stream() - .map(recordKeyPairs::get).collect(Collectors.toList()); + return Arrays.asList(KeyGenUtils.extractRecordKeysByFields(recordKey, indexKeyFields)); } public static String partitionBucketIdStr(String partition, int bucketId) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java index d28263574b7..5a8b2b01c5f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java @@ -33,6 +33,7 @@ import org.apache.hudi.keygen.parser.BaseHoodieDateTimeParser; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; public class KeyGenUtils { @@ -43,6 +44,7 @@ public class KeyGenUtils { protected static final String HUDI_DEFAULT_PARTITION_PATH = PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH; public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; public static final String DEFAULT_RECORD_KEY_PARTS_SEPARATOR = ","; + public static final String DEFAULT_COMPOSITE_KEY_FILED_VALUE = ":"; /** * Fetches record key from the GenericRecord. @@ -72,19 +74,24 @@ public class KeyGenUtils { * @see org.apache.hudi.keygen.ComplexAvroKeyGenerator */ public static String[] extractRecordKeys(String recordKey) { - String[] fieldKV = recordKey.split(","); - return Arrays.stream(fieldKV).map(kv -> { - final String[] kvArray = kv.split(":", 2); - if (kvArray.length == 1) { - return kvArray[0]; - } else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) { - return null; - } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) { - return ""; - } else { - return kvArray[1]; - } - }).toArray(String[]::new); + return extractRecordKeysByFields(recordKey, Collections.emptyList()); + } + + public static String[] extractRecordKeysByFields(String recordKey, List<String> fields) { + String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR); + return Arrays.stream(fieldKV).map(kv -> kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2)) + .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() || (fields.contains(kvArray[0]))) + .map(kvArray -> { + if (kvArray.length == 1) { + return kvArray[0]; + } else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) { + return null; + } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) { + return ""; + } else { + return kvArray[1]; + } + }).toArray(String[]::new); } public static String getRecordKey(GenericRecord record, List<String> recordKeyFields, boolean consistentLogicalTimestampEnabled) { @@ -93,11 +100,11 @@ public class KeyGenUtils { for (String recordKeyField : recordKeyFields) { String recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true, consistentLogicalTimestampEnabled); if (recordKeyValue == null) { - recordKey.append(recordKeyField + ":" + NULL_RECORDKEY_PLACEHOLDER + ","); + recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + NULL_RECORDKEY_PLACEHOLDER + DEFAULT_RECORD_KEY_PARTS_SEPARATOR); } else if (recordKeyValue.isEmpty()) { - recordKey.append(recordKeyField + ":" + EMPTY_RECORDKEY_PLACEHOLDER + ","); + recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + EMPTY_RECORDKEY_PLACEHOLDER + DEFAULT_RECORD_KEY_PARTS_SEPARATOR); } else { - recordKey.append(recordKeyField + ":" + recordKeyValue + ","); + recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + recordKeyValue + DEFAULT_RECORD_KEY_PARTS_SEPARATOR); keyIsNullEmpty = false; } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java index 31f33890ad3..a6595713378 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java @@ -32,6 +32,8 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.List; +import static org.junit.jupiter.api.Assertions.assertEquals; + public class TestBucketIdentifier { public static final String NESTED_COL_SCHEMA = "{\"type\":\"record\", \"name\":\"nested_col\",\"fields\": [" @@ -115,8 +117,14 @@ public class TestBucketIdentifier { Assertions.assertEquals("bcd", keys.get(0)); keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd", "partition"), "f1,f2"); - Assertions.assertEquals(2, keys.size()); - Assertions.assertEquals("abc", keys.get(0)); - Assertions.assertEquals("bcd", keys.get(1)); + assertEquals(2, keys.size()); + assertEquals("abc", keys.get(0)); + assertEquals("bcd", keys.get(1)); + + keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd,efg", "partition"), "f1,f2"); + assertEquals(3, keys.size()); + assertEquals("abc", keys.get(0)); + assertEquals("bcd", keys.get(1)); + assertEquals("efg", keys.get(2)); } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java index 43f5952e492..26e232ccc53 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java @@ -21,6 +21,9 @@ package org.apache.hudi.keygen; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.List; + public class TestKeyGenUtils { @Test @@ -41,5 +44,20 @@ public class TestKeyGenUtils { // test simple key form: val1 String[] s5 = KeyGenUtils.extractRecordKeys("1"); Assertions.assertArrayEquals(new String[] {"1"}, s5); + + String[] s6 = KeyGenUtils.extractRecordKeys("id:1,id2:2,2"); + Assertions.assertArrayEquals(new String[]{"1", "2", "2"}, s6); + } + + @Test + public void testExtractRecordKeysWithFields() { + List<String> fields = new ArrayList<>(1); + fields.add("id2"); + + String[] s1 = KeyGenUtils.extractRecordKeysByFields("id1:1,id2:2,id3:3", fields); + Assertions.assertArrayEquals(new String[] {"2"}, s1); + + String[] s2 = KeyGenUtils.extractRecordKeysByFields("id1:1,id2:2,2,id3:3", fields); + Assertions.assertArrayEquals(new String[] {"2", "2"}, s2); } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index ced6fef72d4..2fa6b939acb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -24,7 +24,6 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieDuplicateKeyException import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.spark.sql.SaveMode -import org.apache.spark.sql.internal.SQLConf import java.io.File @@ -267,6 +266,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { throw root } } + // Create table with dropDup is true val tableName2 = generateTableName spark.sql("set hoodie.datasource.write.insert.drop.duplicates = true") @@ -297,6 +297,52 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } + test("Test Insert Into None Partitioned Table strict mode with no preCombineField") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql(s"set hoodie.sql.insert.mode=strict") + // Create none partitioned cow table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'cow', + | primaryKey = 'id' + | ) + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10)") + checkAnswer(s"select id, name, price from $tableName")( + Seq(1, "a1", 10.0) + ) + spark.sql(s"insert into $tableName select 2, 'a2', 12") + checkAnswer(s"select id, name, price from $tableName")( + Seq(1, "a1", 10.0), + Seq(2, "a2", 12.0) + ) + + assertThrows[HoodieDuplicateKeyException] { + try { + spark.sql(s"insert into $tableName select 1, 'a1', 10") + } catch { + case e: Exception => + var root: Throwable = e + while (root.getCause != null) { + root = root.getCause + } + throw root + } + } + + // disable this config to avoid affect other test in this class. + spark.sql(s"set hoodie.sql.insert.mode=upsert") + } + } + test("Test Insert Overwrite") { withTempDir { tmp => val tableName = generateTableName @@ -901,4 +947,98 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } } + + test("Test enable hoodie.merge.allow.duplicate.on.inserts when write") { + spark.sql("set hoodie.datasource.write.operation = insert") + Seq("mor", "cow").foreach { tableType => + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + | ) using hudi + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | type = '$tableType' + | ) + """.stripMargin) + spark.sql(s"insert into $tableName partition(dt='2021-12-25') values (1, 'a1', 10, 1000)") + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10, 1000, "2021-12-25") + ) + spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false") + spark.sql(s"insert into $tableName partition(dt='2021-12-25') values (1, 'a2', 20, 1001)") + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a2", 20, 1001, "2021-12-25") + ) + spark.sql("set hoodie.merge.allow.duplicate.on.inserts = true") + spark.sql(s"insert into $tableName partition(dt='2021-12-25') values (1, 'a3', 30, 1002)") + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a2", 20, 1001, "2021-12-25"), + Seq(1, "a3", 30, 1002, "2021-12-25") + ) + } + } + spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false") + spark.sql("set hoodie.datasource.write.operation = upsert") + } + + test("Test Insert Into Bucket Index Table") { + withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | dt string, + | name string, + | price double, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id,name', + | preCombineField = 'ts', + | hoodie.index.type = 'BUCKET', + | hoodie.bucket.index.hash.field = 'id,name') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}' + """.stripMargin) + + // Note: Do not write the field alias, the partition field must be placed last. + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1,1', 10, 1000, "2021-01-05"), + | (2, 'a2', 20, 2000, "2021-01-06"), + | (3, 'a3', 30, 3000, "2021-01-07") + """.stripMargin) + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1,1', 10, 1000, "2021-01-05") + """.stripMargin) + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + } + } }
