This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 63e014a6c0f [HUDI-8394] Restrict multiple bulk inserts into COW with
simple bucket and disabled Spark native row writer (#12245)
63e014a6c0f is described below
commit 63e014a6c0fd783818f380a6a84be58990abdecc
Author: Geser Dugarov <[email protected]>
AuthorDate: Wed Dec 18 09:43:19 2024 +0700
[HUDI-8394] Restrict multiple bulk inserts into COW with simple bucket and
disabled Spark native row writer (#12245)
---
.../table/BucketIndexBulkInsertPartitioner.java | 15 ++-
.../org/apache/hudi/keygen/TestKeyGenUtils.java | 1 -
.../TestRDDSimpleBucketBulkInsertPartitioner.java | 33 ++++--
.../sql/hudi/common/HoodieSparkSqlTestBase.scala | 15 +++
.../spark/sql/hudi/dml/TestInsertTable.scala | 122 +++++++++++++++------
5 files changed, 140 insertions(+), 46 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
index df50877a410..6be3dc83da0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java
@@ -19,7 +19,9 @@
package org.apache.hudi.table;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.io.AppendHandleFactory;
import org.apache.hudi.io.SingleFileHandleCreateFactory;
@@ -47,6 +49,7 @@ public abstract class BucketIndexBulkInsertPartitioner<T>
implements BulkInsertP
protected final List<String> indexKeyFields;
protected final List<Boolean> doAppend = new ArrayList<>();
protected final List<String> fileIdPfxList = new ArrayList<>();
+ protected boolean isAppendAllowed;
public BucketIndexBulkInsertPartitioner(HoodieTable table, String
sortString, boolean preserveHoodieMetadata) {
@@ -59,12 +62,20 @@ public abstract class BucketIndexBulkInsertPartitioner<T>
implements BulkInsertP
this.sortColumnNames = null;
}
this.preserveHoodieMetadata = preserveHoodieMetadata;
+ // Multiple bulk inserts into COW using `BucketIndexBulkInsertPartitioner`
is restricted, otherwise AppendHandleFactory will produce MOR log files
+ this.isAppendAllowed =
!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.COPY_ON_WRITE);
}
@Override
public Option<WriteHandleFactory> getWriteHandleFactory(int idx) {
- return doAppend.get(idx) ? Option.of(new AppendHandleFactory()) :
- Option.of(new
SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0),
this.preserveHoodieMetadata));
+ if (!doAppend.get(idx)) {
+ return Option.of(new
SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0),
this.preserveHoodieMetadata));
+ } else if (isAppendAllowed) {
+ return Option.of(new AppendHandleFactory());
+ } else {
+ throw new HoodieNotSupportedException("Multiple bulk inserts into COW
with simple bucket and disabled Spark native row writer is not supported, "
+ + "please, use upsert operation, overwrite mode (already written
data will be lost), or turn on Spark native row writer.");
+ }
}
@Override
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
index 5cffef26e6e..e506751a1a0 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java
@@ -124,7 +124,6 @@ public class TestKeyGenUtils {
String[] s3 =
KeyGenUtils.extractRecordKeys("id:1,id2:__null__,id3:__empty__");
Assertions.assertArrayEquals(new String[] {"1", null, ""}, s3);
- // keys with ':' are not supported
String[] s4 = KeyGenUtils.extractRecordKeys("id:ab:cd,id2:ef");
Assertions.assertArrayEquals(new String[] {"ab:cd", "ef"}, s4);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
index 0141d0d4cec..017847eff55 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkTable;
@@ -40,13 +41,17 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.apache.hudi.exception.ExceptionUtil.getRootCause;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertLinesMatch;
public class TestRDDSimpleBucketBulkInsertPartitioner extends
HoodieSparkClientTestHarness {
@@ -86,6 +91,7 @@ public class TestRDDSimpleBucketBulkInsertPartitioner extends
HoodieSparkClientT
HoodieJavaRDD<HoodieRecord> javaRDD = HoodieJavaRDD.of(records, context,
1);
final HoodieSparkTable table = HoodieSparkTable.create(config, context);
+ // we call BulkInsertInternalPartitionerFactory.get() directly, which
behaves like we disabled Spark native row writer
BulkInsertPartitioner partitioner =
BulkInsertInternalPartitionerFactory.get(table, config);
JavaRDD<HoodieRecord> repartitionRecords =
(JavaRDD<HoodieRecord>)
partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(javaRDD), 1);
@@ -104,16 +110,25 @@ public class TestRDDSimpleBucketBulkInsertPartitioner
extends HoodieSparkClientT
}, false).collect();
}
- // first writes, it will create new bucket files based on the records
+ // 1st write, will create new bucket files based on the records
getHoodieWriteClient(config).startCommitWithTime("0");
- List<WriteStatus> writeStatues =
getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD),
"0").collect();
- Map<String, WriteStatus> writeStatuesMap = new HashMap<>();
- writeStatues.forEach(ws -> writeStatuesMap.put(ws.getFileId(), ws));
+ List<WriteStatus> writeStatuses =
getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD),
"0").collect();
+ Map<String, WriteStatus> writeStatusesMap = new HashMap<>();
+ writeStatuses.forEach(ws -> writeStatusesMap.put(ws.getFileId(), ws));
- // second writes the same records, all records should be mapped to the
same bucket files
getHoodieWriteClient(config).startCommitWithTime("1");
- List<WriteStatus> writeStatues2 =
getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD),
"1").collect();
- writeStatues2.forEach(ws -> assertEquals(ws.getTotalRecords(),
writeStatuesMap.get(ws.getFileId()).getTotalRecords()));
+ // 2nd write of the same records, all records should be mapped to the same
bucket files for MOR,
+ // for COW with disabled Spark native row writer, 2nd bulk insert should
fail with exception
+ try {
+ List<WriteStatus> writeStatuses2 =
getHoodieWriteClient(config).bulkInsert(HoodieJavaRDD.getJavaRDD(javaRDD),
"1").collect();
+ writeStatuses2.forEach(ws -> assertEquals(ws.getTotalRecords(),
writeStatusesMap.get(ws.getFileId()).getTotalRecords()));
+ } catch (Exception ex) {
+ assertEquals("COPY_ON_WRITE", tableType);
+ Throwable rootExceptionCause = getRootCause(ex);
+ assertInstanceOf(HoodieNotSupportedException.class, rootExceptionCause);
+ assertLinesMatch(Collections.singletonList("Multiple bulk
insert.*COW.*Spark native row writer.*not supported.*"),
+ Collections.singletonList(rootExceptionCause.getMessage()));
+ }
}
private static final List<Object> TABLE_TYPES = Arrays.asList(
@@ -121,10 +136,6 @@ public class TestRDDSimpleBucketBulkInsertPartitioner
extends HoodieSparkClientT
"MERGE_ON_READ"
);
- private static Iterable<Object> tableTypes() {
- return TABLE_TYPES;
- }
-
// table type, partitionSort
private static Iterable<Object[]> configParams() {
List<Object[]> opts = new ArrayList<>();
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index b30a2adb353..77870670268 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -43,6 +43,7 @@ import java.io.File
import java.util.TimeZone
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
+import scala.util.matching.Regex
class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
org.apache.log4j.Logger.getRootLogger.setLevel(org.apache.log4j.Level.WARN)
@@ -230,6 +231,20 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
assertResult(true)(hasException)
}
+ protected def checkExceptionMatch(sql: String)(errorMsgRegex: String): Unit
= {
+ var hasException = false
+ try {
+ spark.sql(sql)
+ } catch {
+ case e: Throwable if getRootCause(e).getMessage.matches(errorMsgRegex) =>
+ hasException = true
+
+ case f: Throwable =>
+ fail("Exception should match pattern: " + errorMsgRegex + ", error
message: " + getRootCause(f).getMessage, f)
+ }
+ assertResult(true)(hasException)
+ }
+
def dropTypeLiteralPrefix(value: Any): Any = {
value match {
case s: String =>
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 693bdeb7360..7814d75a05e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -1665,47 +1665,105 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
)
- spark.sql(
- s"""
- | insert into $tableName values
- | (1, 'a1', 10, 1000, "2021-01-05"),
- | (3, "a3", 30, 3000, "2021-01-07")
- """.stripMargin)
+ // for COW with disabled Spark native row writer, multiple bulk
inserts are restricted
+ if (tableType != "cow" && bulkInsertAsRow != "false") {
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1', 10, 1000, "2021-01-05"),
+ | (3, "a3", 30, 3000, "2021-01-07")
+ """.stripMargin)
- checkAnswer(s"select id, name, price, ts, dt from $tableName")(
- Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
- Seq(1, "a1", 10.0, 1000, "2021-01-05"),
- Seq(2, "a2", 20.0, 2000, "2021-01-06"),
- Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
- Seq(3, "a3", 30.0, 3000, "2021-01-07")
- )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+ Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
- // there are two files in partition(dt = '2021-01-05')
- checkAnswer(s"select count(distinct _hoodie_file_name) from
$tableName where dt = '2021-01-05'")(
- Seq(2)
- )
+ // there are two files in partition(dt = '2021-01-05')
+ checkAnswer(s"select count(distinct _hoodie_file_name) from
$tableName where dt = '2021-01-05'")(
+ Seq(2)
+ )
- // would generate 6 other files in partition(dt = '2021-01-05')
- spark.sql(
- s"""
- | insert into $tableName values
- | (4, 'a1,1', 10, 1000, "2021-01-05"),
- | (5, 'a1,1', 10, 1000, "2021-01-05"),
- | (6, 'a1,1', 10, 1000, "2021-01-05"),
- | (7, 'a1,1', 10, 1000, "2021-01-05"),
- | (8, 'a1,1', 10, 1000, "2021-01-05"),
- | (10, 'a3,3', 30, 3000, "2021-01-05")
- """.stripMargin)
-
- checkAnswer(s"select count(distinct _hoodie_file_name) from
$tableName where dt = '2021-01-05'")(
- Seq(8)
- )
+ // would generate 6 other files in partition(dt = '2021-01-05')
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (4, 'a1,1', 10, 1000, "2021-01-05"),
+ | (5, 'a1,1', 10, 1000, "2021-01-05"),
+ | (6, 'a1,1', 10, 1000, "2021-01-05"),
+ | (7, 'a1,1', 10, 1000, "2021-01-05"),
+ | (8, 'a1,1', 10, 1000, "2021-01-05"),
+ | (10, 'a3,3', 30, 3000, "2021-01-05")
+ """.stripMargin)
+
+ checkAnswer(s"select count(distinct _hoodie_file_name) from
$tableName where dt = '2021-01-05'")(
+ Seq(8)
+ )
+ }
}
}
}
}
}
+ test("Test not supported multiple BULK INSERTs into COW with SIMPLE BUCKET
and disabled Spark native row writer") {
+ withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert",
+ "hoodie.bulkinsert.shuffle.parallelism" -> "1") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id long,
+ | name string,
+ | ts int,
+ | par string
+ |) using hudi
+ | tblproperties (
+ | primaryKey = 'id,name',
+ | type = 'cow',
+ | preCombineField = 'ts',
+ | hoodie.index.type = 'BUCKET',
+ | hoodie.index.bucket.engine = 'SIMPLE',
+ | hoodie.bucket.index.num.buckets = '4',
+ | hoodie.bucket.index.hash.field = 'id,name',
+ | hoodie.datasource.write.row.writer.enable = 'false')
+ | partitioned by (par)
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+
+ // Used rows with corresponding `bucketId`s if there are 4 buckets
+ // `id,name` `bucketId`
+ // 5,'a1,1' -> 1
+ // 6,'a6,6' -> 2
+ // 9,'a3,3' -> 1
+ // 13,'a13,13' -> 2
+ // 24,'cd' -> 0
+
+ // buckets 1 & 2 into partition 'main', bucket 1 into partition 'side'
+ spark.sql(s"insert into $tableName values (5, 'a1,1', 1, 'main'), (6,
'a6,6', 1, 'main'), (9, 'a3,3', 1, 'side')")
+ // bucket 1 into 'main', bucket 2 into 'side', the whole insert will
fail due to existed bucket 1 in 'main'
+ val causeRegex = "Multiple bulk insert.*COW.*Spark native row
writer.*not supported.*"
+ checkExceptionMatch(s"insert into $tableName values (9, 'a3,3', 2,
'main'), (13, 'a13,13', 1, 'side')")(causeRegex)
+ checkAnswer(spark.sql(s"select id from $tableName order by
id").collect())(Seq(5), Seq(6), Seq(9))
+
+ // bucket 0 into 'main', no bucket into 'side', will also fail,
+ // bulk insert into separate not presented bucket, if there is some
other buckets already written, also restricted
+ checkExceptionMatch(s"insert into $tableName values (24, 'cd', 1,
'main')")(causeRegex)
+ checkAnswer(spark.sql(s"select id from $tableName where par = 'main'
order by id").collect())(Seq(5), Seq(6))
+
+ // for overwrite mode it's allowed to do multiple bulk inserts
+ spark.sql(s"insert overwrite $tableName values (9, 'a3,3', 3, 'main'),
(13, 'a13,13', 2, 'side')")
+ // only data from the latest insert overwrite is available,
+ // because insert overwrite drops the whole table due to [HUDI-4704]
+ checkAnswer(spark.sql(s"select id from $tableName order by
id").collect())(Seq(9), Seq(13))
+ }
+ }
+ }
+
/**
* This test is to make sure that bulk insert doesn't create a bunch of tiny
files if
* hoodie.bulkinsert.user.defined.partitioner.sort.columns doesn't start
with the partition columns