This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 63e014a6c0f [HUDI-8394] Restrict multiple bulk inserts into COW with 
simple bucket and disabled Spark native row writer (#12245)
63e014a6c0f is described below

commit 63e014a6c0fd783818f380a6a84be58990abdecc
Author: Geser Dugarov <[email protected]>
AuthorDate: Wed Dec 18 09:43:19 2024 +0700

    [HUDI-8394] Restrict multiple bulk inserts into COW with simple bucket and 
disabled Spark native row writer (#12245)
---
 .../table/BucketIndexBulkInsertPartitioner.java    |  15 ++-
 .../org/apache/hudi/keygen/TestKeyGenUtils.java    |   1 -
 .../TestRDDSimpleBucketBulkInsertPartitioner.java  |  33 ++++--
 .../sql/hudi/common/HoodieSparkSqlTestBase.scala   |  15 +++
 .../spark/sql/hudi/dml/TestInsertTable.scala       | 122 +++++++++++++++------
 5 files changed, 140 insertions(+), 46 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
index df50877a410..6be3dc83da0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
@@ -19,7 +19,9 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
 import org.apache.hudi.io.AppendHandleFactory;
 import org.apache.hudi.io.SingleFileHandleCreateFactory;
@@ -47,6 +49,7 @@ public abstract class BucketIndexBulkInsertPartitioner<T> 
implements BulkInsertP
   protected final List<String> indexKeyFields;
   protected final List<Boolean> doAppend = new ArrayList<>();
   protected final List<String> fileIdPfxList = new ArrayList<>();
+  protected boolean isAppendAllowed;
 
   public BucketIndexBulkInsertPartitioner(HoodieTable table, String 
sortString, boolean preserveHoodieMetadata) {
 
@@ -59,12 +62,20 @@ public abstract class BucketIndexBulkInsertPartitioner<T> 
implements BulkInsertP
       this.sortColumnNames = null;
     }
     this.preserveHoodieMetadata = preserveHoodieMetadata;
+    // Multiple bulk inserts into COW using `BucketIndexBulkInsertPartitioner` 
is restricted, otherwise AppendHandleFactory will produce MOR log files
+    this.isAppendAllowed = 
!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.COPY_ON_WRITE);
   }
 
   @Override
   public Option<WriteHandleFactory> getWriteHandleFactory(int idx) {
-    return doAppend.get(idx) ? Option.of(new AppendHandleFactory()) :
-        Option.of(new 
SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0), 
this.preserveHoodieMetadata));
+    if (!doAppend.get(idx)) {
+      return Option.of(new 
SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0), 
this.preserveHoodieMetadata));
+    } else if (isAppendAllowed) {
+      return Option.of(new AppendHandleFactory());
+    } else {
+      throw new HoodieNotSupportedException("Multiple bulk inserts into COW 
with simple bucket and disabled Spark native row writer is not supported, "
+          + "please, use upsert operation, overwrite mode (already written 
data will be lost), or turn on Spark native row writer.");
+    }
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
index 5cffef26e6e..e506751a1a0 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
@@ -124,7 +124,6 @@ public class TestKeyGenUtils {
     String[] s3 = 
KeyGenUtils.extractRecordKeys("id:1,id2:__null__,id3:__empty__");
     Assertions.assertArrayEquals(new String[] {"1", null, ""}, s3);
 
-    // keys with ':' are not supported
     String[] s4 = KeyGenUtils.extractRecordKeys("id:ab:cd,id2:ef");
     Assertions.assertArrayEquals(new String[] {"ab:cd", "ef"}, s4);
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
index 0141d0d4cec..017847eff55 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieSparkTable;
@@ -40,13 +41,17 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.apache.hudi.exception.ExceptionUtil.getRootCause;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertLinesMatch;
 
 public class TestRDDSimpleBucketBulkInsertPartitioner extends 
HoodieSparkClientTestHarness {
 
@@ -86,6 +91,7 @@ public class TestRDDSimpleBucketBulkInsertPartitioner extends 
HoodieSparkClientT
     HoodieJavaRDD<HoodieRecord> javaRDD = HoodieJavaRDD.of(records, context, 
1);
 
     final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+    // we call BulkInsertInternalPartitionerFactory.get() directly, which 
behaves like we disabled Spark native row writer
     BulkInsertPartitioner partitioner = 
BulkInsertInternalPartitionerFactory.get(table, config);
     JavaRDD<HoodieRecord> repartitionRecords =
         (JavaRDD<HoodieRecord>) 
partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(javaRDD), 1);
@@ -104,16 +110,25 @@ public class TestRDDSimpleBucketBulkInsertPartitioner 
extends HoodieSparkClientT
       }, false).collect();
     }
 
-    // first writes, it will create new bucket files based on the records
+    // 1st write, will create new bucket files based on the records
     getHoodieWriteClient(config).startCommitWithTime("0");
-    List<WriteStatus> writeStatues = 
getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), 
"0").collect();
-    Map<String, WriteStatus> writeStatuesMap = new HashMap<>();
-    writeStatues.forEach(ws -> writeStatuesMap.put(ws.getFileId(), ws));
+    List<WriteStatus> writeStatuses = 
getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), 
"0").collect();
+    Map<String, WriteStatus> writeStatusesMap = new HashMap<>();
+    writeStatuses.forEach(ws -> writeStatusesMap.put(ws.getFileId(), ws));
 
-    // second writes the same records, all records should be mapped to the 
same bucket files
     getHoodieWriteClient(config).startCommitWithTime("1");
-    List<WriteStatus> writeStatues2 = 
getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), 
"1").collect();
-    writeStatues2.forEach(ws -> assertEquals(ws.getTotalRecords(), 
writeStatuesMap.get(ws.getFileId()).getTotalRecords()));
+    // 2nd write of the same records, all records should be mapped to the same 
bucket files for MOR,
+    // for COW with disabled Spark native row writer, 2nd bulk insert should 
fail with exception
+    try {
+      List<WriteStatus> writeStatuses2 = 
getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD), 
"1").collect();
+      writeStatuses2.forEach(ws -> assertEquals(ws.getTotalRecords(), 
writeStatusesMap.get(ws.getFileId()).getTotalRecords()));
+    } catch (Exception ex) {
+      assertEquals("COPY_ON_WRITE", tableType);
+      Throwable rootExceptionCause = getRootCause(ex);
+      assertInstanceOf(HoodieNotSupportedException.class, rootExceptionCause);
+      assertLinesMatch(Collections.singletonList("Multiple bulk 
insert.*COW.*Spark native row writer.*not supported.*"),
+          Collections.singletonList(rootExceptionCause.getMessage()));
+    }
   }
 
   private static final List<Object> TABLE_TYPES = Arrays.asList(
@@ -121,10 +136,6 @@ public class TestRDDSimpleBucketBulkInsertPartitioner 
extends HoodieSparkClientT
       "MERGE_ON_READ"
   );
 
-  private static Iterable<Object> tableTypes() {
-    return TABLE_TYPES;
-  }
-
   // table type, partitionSort
   private static Iterable<Object[]> configParams() {
     List<Object[]> opts = new ArrayList<>();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index b30a2adb353..77870670268 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -43,6 +43,7 @@ import java.io.File
 import java.util.TimeZone
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.regex.Pattern
+import scala.util.matching.Regex
 
 class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
   org.apache.log4j.Logger.getRootLogger.setLevel(org.apache.log4j.Level.WARN)
@@ -230,6 +231,20 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     assertResult(true)(hasException)
   }
 
+  protected def checkExceptionMatch(sql: String)(errorMsgRegex: String): Unit 
= {
+    var hasException = false
+    try {
+      spark.sql(sql)
+    } catch {
+      case e: Throwable if getRootCause(e).getMessage.matches(errorMsgRegex) =>
+        hasException = true
+
+      case f: Throwable =>
+        fail("Exception should match pattern: " + errorMsgRegex + ", error 
message: " + getRootCause(f).getMessage, f)
+    }
+    assertResult(true)(hasException)
+  }
+
   def dropTypeLiteralPrefix(value: Any): Any = {
     value match {
       case s: String =>
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 693bdeb7360..7814d75a05e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -1665,47 +1665,105 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
               Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
             )
 
-            spark.sql(
-              s"""
-                 | insert into $tableName values
-                 | (1, 'a1', 10, 1000, "2021-01-05"),
-                 | (3, "a3", 30, 3000, "2021-01-07")
-               """.stripMargin)
+            // for COW with disabled Spark native row writer, multiple bulk 
inserts are restricted
+            if (tableType != "cow" && bulkInsertAsRow != "false") {
+              spark.sql(
+                s"""
+                   | insert into $tableName values
+                   | (1, 'a1', 10, 1000, "2021-01-05"),
+                   | (3, "a3", 30, 3000, "2021-01-07")
+                 """.stripMargin)
 
-            checkAnswer(s"select id, name, price, ts, dt from $tableName")(
-              Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
-              Seq(1, "a1", 10.0, 1000, "2021-01-05"),
-              Seq(2, "a2", 20.0, 2000, "2021-01-06"),
-              Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
-              Seq(3, "a3", 30.0, 3000, "2021-01-07")
-            )
+              checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+                Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+                Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+                Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+                Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
+                Seq(3, "a3", 30.0, 3000, "2021-01-07")
+              )
 
-            // there are two files in partition(dt = '2021-01-05')
-            checkAnswer(s"select count(distinct _hoodie_file_name) from 
$tableName where dt = '2021-01-05'")(
-              Seq(2)
-            )
+              // there are two files in partition(dt = '2021-01-05')
+              checkAnswer(s"select count(distinct _hoodie_file_name) from 
$tableName where dt = '2021-01-05'")(
+                Seq(2)
+              )
 
-            // would generate 6 other files in partition(dt = '2021-01-05')
-            spark.sql(
-              s"""
-                 | insert into $tableName values
-                 | (4, 'a1,1', 10, 1000, "2021-01-05"),
-                 | (5, 'a1,1', 10, 1000, "2021-01-05"),
-                 | (6, 'a1,1', 10, 1000, "2021-01-05"),
-                 | (7, 'a1,1', 10, 1000, "2021-01-05"),
-                 | (8, 'a1,1', 10, 1000, "2021-01-05"),
-                 | (10, 'a3,3', 30, 3000, "2021-01-05")
-               """.stripMargin)
-
-            checkAnswer(s"select count(distinct _hoodie_file_name) from 
$tableName where dt = '2021-01-05'")(
-              Seq(8)
-            )
+              // would generate 6 other files in partition(dt = '2021-01-05')
+              spark.sql(
+                s"""
+                   | insert into $tableName values
+                   | (4, 'a1,1', 10, 1000, "2021-01-05"),
+                   | (5, 'a1,1', 10, 1000, "2021-01-05"),
+                   | (6, 'a1,1', 10, 1000, "2021-01-05"),
+                   | (7, 'a1,1', 10, 1000, "2021-01-05"),
+                   | (8, 'a1,1', 10, 1000, "2021-01-05"),
+                   | (10, 'a3,3', 30, 3000, "2021-01-05")
+                 """.stripMargin)
+
+              checkAnswer(s"select count(distinct _hoodie_file_name) from 
$tableName where dt = '2021-01-05'")(
+                Seq(8)
+              )
+            }
           }
         }
       }
     }
   }
 
+  test("Test not supported multiple BULK INSERTs into COW with SIMPLE BUCKET 
and disabled Spark native row writer") {
+    withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert",
+      "hoodie.bulkinsert.shuffle.parallelism" -> "1") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id long,
+             |  name string,
+             |  ts int,
+             |  par string
+             |) using hudi
+             | tblproperties (
+             | primaryKey = 'id,name',
+             | type = 'cow',
+             | preCombineField = 'ts',
+             | hoodie.index.type = 'BUCKET',
+             | hoodie.index.bucket.engine = 'SIMPLE',
+             | hoodie.bucket.index.num.buckets = '4',
+             | hoodie.bucket.index.hash.field = 'id,name',
+             | hoodie.datasource.write.row.writer.enable = 'false')
+             | partitioned by (par)
+             | location '${tmp.getCanonicalPath}'
+             """.stripMargin)
+
+        // Used rows with corresponding `bucketId`s if there are 4 buckets
+        //   `id,name`    `bucketId`
+        //    5,'a1,1' ->    1
+        //    6,'a6,6' ->    2
+        //    9,'a3,3' ->    1
+        // 13,'a13,13' ->    2
+        //     24,'cd' ->    0
+
+        // buckets 1 & 2 into partition 'main', bucket 1 into partition 'side'
+        spark.sql(s"insert into $tableName values (5, 'a1,1', 1, 'main'), (6, 
'a6,6', 1, 'main'), (9, 'a3,3', 1, 'side')")
+        // bucket 1 into 'main', bucket 2 into 'side', the whole insert will 
fail due to existed bucket 1 in 'main'
+        val causeRegex = "Multiple bulk insert.*COW.*Spark native row 
writer.*not supported.*"
+        checkExceptionMatch(s"insert into $tableName values (9, 'a3,3', 2, 
'main'), (13, 'a13,13', 1, 'side')")(causeRegex)
+        checkAnswer(spark.sql(s"select id from $tableName order by 
id").collect())(Seq(5), Seq(6), Seq(9))
+
+        // bucket 0 into 'main', no bucket into 'side', will also fail,
+        // bulk insert into separate not presented bucket, if there is some 
other buckets already written, also restricted
+        checkExceptionMatch(s"insert into $tableName values (24, 'cd', 1, 
'main')")(causeRegex)
+        checkAnswer(spark.sql(s"select id from $tableName where par = 'main' 
order by id").collect())(Seq(5), Seq(6))
+
+        // for overwrite mode it's allowed to do multiple bulk inserts
+        spark.sql(s"insert overwrite $tableName values (9, 'a3,3', 3, 'main'), 
(13, 'a13,13', 2, 'side')")
+        // only data from the latest insert overwrite is available,
+        // because insert overwrite drops the whole table due to [HUDI-4704]
+        checkAnswer(spark.sql(s"select id from $tableName order by 
id").collect())(Seq(9), Seq(13))
+      }
+    }
+  }
+
   /**
    * This test is to make sure that bulk insert doesn't create a bunch of tiny 
files if
    * hoodie.bulkinsert.user.defined.partitioner.sort.columns doesn't start 
with the partition columns

Reply via email to