danny0405 commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r569411072



##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -228,29 +169,36 @@ protected boolean isWorkloadProfileNeeded() {
   }
 
   @SuppressWarnings("unchecked")
-  protected Iterator<List<WriteStatus>> handleUpsertPartition(String 
instantTime, Integer partition, Iterator recordItr,
-                                                              Partitioner 
partitioner) {
-    UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
-    BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
-    BucketType btype = binfo.bucketType;
+  protected Iterator<List<WriteStatus>> handleUpsertPartition(
+      String instantTime,
+      String partitionPath,
+      String fileIdHint,
+      BucketType bucketType,
+      Iterator recordItr) {
     try {
-      if (btype.equals(BucketType.INSERT)) {
-        return handleInsert(binfo.fileIdPrefix, recordItr);
-      } else if (btype.equals(BucketType.UPDATE)) {
-        return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, 
recordItr);
-      } else {
-        throw new HoodieUpsertException("Unknown bucketType " + btype + " for 
partition :" + partition);
+      switch (bucketType) {
+        case INSERT:
+          return handleInsert(fileIdHint, recordItr);
+        case UPDATE:
+          return handleUpdate(partitionPath, fileIdHint, recordItr);
+        default:
+          throw new HoodieUpsertException("Unknown bucketType " + bucketType + 
" for partition :" + partitionPath);
       }
     } catch (Throwable t) {
-      String msg = "Error upserting bucketType " + btype + " for partition :" 
+ partition;
+      String msg = "Error upserting bucketType " + bucketType + " for 
partition :" + partitionPath;
       LOG.error(msg, t);
       throw new HoodieUpsertException(msg, t);
     }
   }
 
-  protected Iterator<List<WriteStatus>> handleInsertPartition(String 
instantTime, Integer partition, Iterator recordItr,
-                                                              Partitioner 
partitioner) {
-    return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
+  protected Iterator<List<WriteStatus>> handleInsertPartition(

Review comment:
       Yes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to