geserdugarov commented on code in PR #12245:
URL: https://github.com/apache/hudi/pull/12245#discussion_r1846437570
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BucketIndexBulkInsertPartitioner.java:
##########
@@ -59,12 +62,19 @@ public BucketIndexBulkInsertPartitioner(HoodieTable table,
String sortString, bo
this.sortColumnNames = null;
}
this.preserveHoodieMetadata = preserveHoodieMetadata;
+ // Bulk insert into COW table with bucket index is allowed only once,
otherwise AppendHadleFactory will produce MOR log files
+ this.isAppendAllowed =
!table.getConfig().getTableType().equals(HoodieTableType.COPY_ON_WRITE);
}
@Override
public Option<WriteHandleFactory> getWriteHandleFactory(int idx) {
- return doAppend.get(idx) ? Option.of(new AppendHandleFactory()) :
- Option.of(new
SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0),
this.preserveHoodieMetadata));
+ if (!doAppend.get(idx)) {
+ return Option.of(new
SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0),
this.preserveHoodieMetadata));
+ } else if (isAppendAllowed) {
+ return Option.of(new AppendHandleFactory());
+ } else {
+ throw new HoodieNotSupportedException("Bulk insert into COW table with
bucket index is allowed only once, please, use upsert operation instead");
Review Comment:
Currently, we create some strange data structures, I don't understand the.
For instance, into `fileIdPfxList` we place file names, which exist:
https://github.com/apache/hudi/blob/b31c858556ce783694bfec75dc38d51e58ced993/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java#L88-L92
And add not existed in the end:
https://github.com/apache/hudi/blob/b31c858556ce783694bfec75dc38d51e58ced993/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSimpleBucketBulkInsertPartitioner.java#L97-L102
And later in `BucketIndexBulkInsertPartitioner`, I should decide what to do
with current data from this data:

Here I've inserted `record1` with `bucketID = 161`. And at the debug point,
I want to insert `record2` with `bucketID = 138`. But incoming `int partitionId
= 1`, which came from Spark.
I will try to figure out, how it should work properly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]