KnightChess commented on code in PR #12245:
URL: https://github.com/apache/hudi/pull/12245#discussion_r1840225139
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java:
##########
@@ -59,12 +62,19 @@ public BucketIndexBulkInsertPartitioner(HoodieTable table,
String sortString, bo
this.sortColumnNames = null;
}
this.preserveHoodieMetadata = preserveHoodieMetadata;
+ // Bulk insert into COW table with bucket index is allowed only once,
otherwise AppendHadleFactory will produce MOR log files
+ this.isAppendAllowed =
!table.getConfig().getTableType().equals(HoodieTableType.COPY_ON_WRITE);
}
@Override
public Option<WriteHandleFactory> getWriteHandleFactory(int idx) {
- return doAppend.get(idx) ? Option.of(new AppendHandleFactory()) :
- Option.of(new
SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0),
this.preserveHoodieMetadata));
+ if (!doAppend.get(idx)) {
+ return Option.of(new
SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0),
this.preserveHoodieMetadata));
+ } else if (isAppendAllowed) {
+ return Option.of(new AppendHandleFactory());
+ } else {
+ throw new HoodieNotSupportedException("Bulk insert into COW table with
bucket index is allowed only once, please, use upsert operation instead");
Review Comment:
may overwrite a table use bulk_insert more time? and I think if user need,
they can insert into table use bulk_insert more time, because bulk_insert is
more effective than other op.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala:
##########
@@ -1709,6 +1709,45 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
+ test("Test not supported double Bulk Insert Into Bucket Index COW Table") {
+ withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert",
"hoodie.bulkinsert.shuffle.parallelism" -> "1") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | primaryKey = 'id,name',
+ | type = 'cow',
+ | preCombineField = 'ts',
+ | hoodie.index.type = 'BUCKET',
+ | hoodie.index.bucket.engine = 'SIMPLE',
+ | hoodie.bucket.index.num.buckets = '2',
+ | hoodie.bucket.index.hash.field = 'id,name',
+ | hoodie.datasource.write.row.writer.enable = 'false')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (5, 'a1,1', 10, 1000, "2021-01-05")
+ """.stripMargin)
+ checkExceptionContain(
+ s"""
+ | insert into $tableName values
Review Comment:
need add other operator like overwrite.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]