danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569411072
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -228,29 +169,36 @@ protected boolean isWorkloadProfileNeeded() {
}
@SuppressWarnings("unchecked")
- protected Iterator<List<WriteStatus>> handleUpsertPartition(String
instantTime, Integer partition, Iterator recordItr,
- Partitioner
partitioner) {
- UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
- BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
- BucketType btype = binfo.bucketType;
+ protected Iterator<List<WriteStatus>> handleUpsertPartition(
+ String instantTime,
+ String partitionPath,
+ String fileIdHint,
+ BucketType bucketType,
+ Iterator recordItr) {
try {
- if (btype.equals(BucketType.INSERT)) {
- return handleInsert(binfo.fileIdPrefix, recordItr);
- } else if (btype.equals(BucketType.UPDATE)) {
- return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix,
recordItr);
- } else {
- throw new HoodieUpsertException("Unknown bucketType " + btype + " for
partition :" + partition);
+ switch (bucketType) {
+ case INSERT:
+ return handleInsert(fileIdHint, recordItr);
+ case UPDATE:
+ return handleUpdate(partitionPath, fileIdHint, recordItr);
+ default:
+ throw new HoodieUpsertException("Unknown bucketType " + bucketType +
" for partition :" + partitionPath);
}
} catch (Throwable t) {
- String msg = "Error upserting bucketType " + btype + " for partition :"
+ partition;
+ String msg = "Error upserting bucketType " + bucketType + " for
partition :" + partitionPath;
LOG.error(msg, t);
throw new HoodieUpsertException(msg, t);
}
}
- protected Iterator<List<WriteStatus>> handleInsertPartition(String
instantTime, Integer partition, Iterator recordItr,
- Partitioner
partitioner) {
- return handleUpsertPartition(instantTime, partition, recordItr,
partitioner);
+ protected Iterator<List<WriteStatus>> handleInsertPartition(
Review comment:
Yes.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]