This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit dd032bb1fe282be967572bdeae5e56ae4cb0e689
Author: shaoxiong.zhan <[email protected]>
AuthorDate: Sat Dec 3 10:47:22 2022 +0800

    [HUDI-5302] Fix: compute hash key from recordKey failed when recordKeyValue 
contains ',' (#7342)
    
    (cherry picked from commit 3dc76af95d8e0c64c3dedfdd28c081046b905640)
---
 .../apache/hudi/index/bucket/BucketIdentifier.java |   9 +-
 .../java/org/apache/hudi/keygen/KeyGenUtils.java   |  39 +++---
 .../hudi/index/bucket/TestBucketIdentifier.java    |  14 +-
 .../org/apache/hudi/keygen/TestKeyGenUtils.java    |  18 +++
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 142 ++++++++++++++++++++-
 5 files changed, 195 insertions(+), 27 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index 48ccce1d174..35f9205a8e5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -21,14 +21,13 @@ package org.apache.hudi.index.bucket;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.keygen.KeyGenUtils;
 
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
 public class BucketIdentifier implements Serializable {
   // Compatible with the spark bucket name
@@ -69,11 +68,7 @@ public class BucketIdentifier implements Serializable {
   }
 
   private static List<String> getHashKeysUsingIndexFields(String recordKey, 
List<String> indexKeyFields) {
-    Map<String, String> recordKeyPairs = Arrays.stream(recordKey.split(","))
-        .map(p -> p.split(":"))
-        .collect(Collectors.toMap(p -> p[0], p -> p[1]));
-    return indexKeyFields.stream()
-        .map(recordKeyPairs::get).collect(Collectors.toList());
+    return Arrays.asList(KeyGenUtils.extractRecordKeysByFields(recordKey, 
indexKeyFields));
   }
 
   public static String partitionBucketIdStr(String partition, int bucketId) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index d28263574b7..5a8b2b01c5f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -33,6 +33,7 @@ import org.apache.hudi.keygen.parser.BaseHoodieDateTimeParser;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 public class KeyGenUtils {
@@ -43,6 +44,7 @@ public class KeyGenUtils {
   protected static final String HUDI_DEFAULT_PARTITION_PATH = 
PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
   public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
   public static final String DEFAULT_RECORD_KEY_PARTS_SEPARATOR = ",";
+  public static final String DEFAULT_COMPOSITE_KEY_FILED_VALUE = ":";
 
   /**
    * Fetches record key from the GenericRecord.
@@ -72,19 +74,24 @@ public class KeyGenUtils {
    * @see org.apache.hudi.keygen.ComplexAvroKeyGenerator
    */
   public static String[] extractRecordKeys(String recordKey) {
-    String[] fieldKV = recordKey.split(",");
-    return Arrays.stream(fieldKV).map(kv -> {
-      final String[] kvArray = kv.split(":", 2);
-      if (kvArray.length == 1) {
-        return kvArray[0];
-      } else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) {
-        return null;
-      } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
-        return "";
-      } else {
-        return kvArray[1];
-      }
-    }).toArray(String[]::new);
+    return extractRecordKeysByFields(recordKey, Collections.emptyList());
+  }
+
+  public static String[] extractRecordKeysByFields(String recordKey, 
List<String> fields) {
+    String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
+    return Arrays.stream(fieldKV).map(kv -> 
kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2))
+            .filter(kvArray -> kvArray.length == 1 || fields.isEmpty() || 
(fields.contains(kvArray[0])))
+            .map(kvArray -> {
+              if (kvArray.length == 1) {
+                return kvArray[0];
+              } else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) {
+                return null;
+              } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
+                return "";
+              } else {
+                return kvArray[1];
+              }
+            }).toArray(String[]::new);
   }
 
   public static String getRecordKey(GenericRecord record, List<String> 
recordKeyFields, boolean consistentLogicalTimestampEnabled) {
@@ -93,11 +100,11 @@ public class KeyGenUtils {
     for (String recordKeyField : recordKeyFields) {
       String recordKeyValue = 
HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true, 
consistentLogicalTimestampEnabled);
       if (recordKeyValue == null) {
-        recordKey.append(recordKeyField + ":" + NULL_RECORDKEY_PLACEHOLDER + 
",");
+        recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + 
NULL_RECORDKEY_PLACEHOLDER + DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
       } else if (recordKeyValue.isEmpty()) {
-        recordKey.append(recordKeyField + ":" + EMPTY_RECORDKEY_PLACEHOLDER + 
",");
+        recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + 
EMPTY_RECORDKEY_PLACEHOLDER + DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
       } else {
-        recordKey.append(recordKeyField + ":" + recordKeyValue + ",");
+        recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + 
recordKeyValue + DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
         keyIsNullEmpty = false;
       }
     }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
index 31f33890ad3..a6595713378 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
@@ -32,6 +32,8 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 public class TestBucketIdentifier {
 
   public static final String NESTED_COL_SCHEMA = "{\"type\":\"record\", 
\"name\":\"nested_col\",\"fields\": ["
@@ -115,8 +117,14 @@ public class TestBucketIdentifier {
     Assertions.assertEquals("bcd", keys.get(0));
 
     keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd", "partition"), 
"f1,f2");
-    Assertions.assertEquals(2, keys.size());
-    Assertions.assertEquals("abc", keys.get(0));
-    Assertions.assertEquals("bcd", keys.get(1));
+    assertEquals(2, keys.size());
+    assertEquals("abc", keys.get(0));
+    assertEquals("bcd", keys.get(1));
+
+    keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd,efg", 
"partition"), "f1,f2");
+    assertEquals(3, keys.size());
+    assertEquals("abc", keys.get(0));
+    assertEquals("bcd", keys.get(1));
+    assertEquals("efg", keys.get(2));
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
index 43f5952e492..26e232ccc53 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
@@ -21,6 +21,9 @@ package org.apache.hudi.keygen;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class TestKeyGenUtils {
 
   @Test
@@ -41,5 +44,20 @@ public class TestKeyGenUtils {
     // test simple key form: val1
     String[] s5 = KeyGenUtils.extractRecordKeys("1");
     Assertions.assertArrayEquals(new String[] {"1"}, s5);
+
+    String[] s6 = KeyGenUtils.extractRecordKeys("id:1,id2:2,2");
+    Assertions.assertArrayEquals(new String[]{"1", "2", "2"}, s6);
+  }
+
+  @Test
+  public void testExtractRecordKeysWithFields() {
+    List<String> fields = new ArrayList<>(1);
+    fields.add("id2");
+
+    String[] s1 = KeyGenUtils.extractRecordKeysByFields("id1:1,id2:2,id3:3", 
fields);
+    Assertions.assertArrayEquals(new String[] {"2"}, s1);
+
+    String[] s2 = KeyGenUtils.extractRecordKeysByFields("id1:1,id2:2,2,id3:3", 
fields);
+    Assertions.assertArrayEquals(new String[] {"2", "2"}, s2);
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index ced6fef72d4..2fa6b939acb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -24,7 +24,6 @@ import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieDuplicateKeyException
 import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.internal.SQLConf
 
 import java.io.File
 
@@ -267,6 +266,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
             throw root
         }
       }
+
       // Create table with dropDup is true
       val tableName2 = generateTableName
       spark.sql("set hoodie.datasource.write.insert.drop.duplicates = true")
@@ -297,6 +297,52 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Insert Into None Partitioned Table strict mode with no 
preCombineField") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(s"set hoodie.sql.insert.mode=strict")
+      // Create none partitioned cow table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  type = 'cow',
+           |  primaryKey = 'id'
+           | )
+       """.stripMargin)
+      spark.sql(s"insert into $tableName values(1, 'a1', 10)")
+      checkAnswer(s"select id, name, price from $tableName")(
+        Seq(1, "a1", 10.0)
+      )
+      spark.sql(s"insert into $tableName select 2, 'a2', 12")
+      checkAnswer(s"select id, name, price from $tableName")(
+        Seq(1, "a1", 10.0),
+        Seq(2, "a2", 12.0)
+      )
+
+      assertThrows[HoodieDuplicateKeyException] {
+        try {
+          spark.sql(s"insert into $tableName select 1, 'a1', 10")
+        } catch {
+          case e: Exception =>
+            var root: Throwable = e
+            while (root.getCause != null) {
+              root = root.getCause
+            }
+            throw root
+        }
+      }
+
+      // disable this config to avoid affect other test in this class.
+      spark.sql(s"set hoodie.sql.insert.mode=upsert")
+    }
+  }
+
   test("Test Insert Overwrite") {
     withTempDir { tmp =>
       val tableName = generateTableName
@@ -901,4 +947,98 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Test enable hoodie.merge.allow.duplicate.on.inserts when write") {
+    spark.sql("set hoodie.datasource.write.operation = insert")
+    Seq("mor", "cow").foreach { tableType =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             | create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long,
+             |  dt string
+             | ) using hudi
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}/$tableName'
+             | tblproperties (
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts',
+             |  type = '$tableType'
+             | )
+        """.stripMargin)
+        spark.sql(s"insert into $tableName partition(dt='2021-12-25') values 
(1, 'a1', 10, 1000)")
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a1", 10, 1000, "2021-12-25")
+        )
+        spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false")
+        spark.sql(s"insert into $tableName partition(dt='2021-12-25') values 
(1, 'a2', 20, 1001)")
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a2", 20, 1001, "2021-12-25")
+        )
+        spark.sql("set hoodie.merge.allow.duplicate.on.inserts = true")
+        spark.sql(s"insert into $tableName partition(dt='2021-12-25') values 
(1, 'a3', 30, 1002)")
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a2", 20, 1001, "2021-12-25"),
+          Seq(1, "a3", 30, 1002, "2021-12-25")
+        )
+      }
+    }
+    spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false")
+    spark.sql("set hoodie.datasource.write.operation = upsert")
+  }
+
+  test("Test Insert Into Bucket Index Table") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create a partitioned table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  dt string,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | tblproperties (
+           | primaryKey = 'id,name',
+           | preCombineField = 'ts',
+           | hoodie.index.type = 'BUCKET',
+           | hoodie.bucket.index.hash.field = 'id,name')
+           | partitioned by (dt)
+           | location '${tmp.getCanonicalPath}'
+       """.stripMargin)
+
+      // Note: Do not write the field alias, the partition field must be 
placed last.
+      spark.sql(
+        s"""
+           | insert into $tableName values
+           | (1, 'a1,1', 10, 1000, "2021-01-05"),
+           | (2, 'a2', 20, 2000, "2021-01-06"),
+           | (3, 'a3', 30, 3000, "2021-01-07")
+              """.stripMargin)
+
+      checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+        Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+        Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+        Seq(3, "a3", 30.0, 3000, "2021-01-07")
+      )
+
+      spark.sql(
+        s"""
+           | insert into $tableName values
+           | (1, 'a1,1', 10, 1000, "2021-01-05")
+              """.stripMargin)
+
+      checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+        Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+        Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+        Seq(3, "a3", 30.0, 3000, "2021-01-07")
+      )
+    }
+  }
 }

Reply via email to