garyli1019 commented on a change in pull request #5093:
URL: https://github.com/apache/hudi/pull/5093#discussion_r832204078



##########
File path: 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java
##########
@@ -108,25 +116,30 @@ public void initializeState(FunctionInitializationContext 
context) throws Except
   @Override
   public void snapshotState() {
     super.snapshotState();
-    this.bucketIndex.putAll(this.incBucketIndex);
     this.incBucketIndex.clear();
   }
 
   @Override
   public void processElement(I i, ProcessFunction<I, Object>.Context context, 
Collector<Object> collector) throws Exception {
     HoodieRecord<?> record = (HoodieRecord<?>) i;
     final HoodieKey hoodieKey = record.getKey();
+    final String partition = hoodieKey.getPartitionPath();
     final HoodieRecordLocation location;
 
+    bootstrapIndexIfNeed(partition);
+    Map<Integer, String> bucketToFileIdMap = bucketIndex.get(partition);
     final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, 
indexKeyFields, this.bucketNum);
     final String partitionBucketId = 
BucketIdentifier.partitionBucketIdStr(hoodieKey.getPartitionPath(), bucketNum);
 
-    if (bucketIndex.containsKey(partitionBucketId)) {
-      location = new HoodieRecordLocation("U", 
bucketIndex.get(partitionBucketId));
+    if (incBucketIndex.contains(partitionBucketId)) {

Review comment:
       nice catch, a bug fixed here

##########
File path: 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java
##########
@@ -66,16 +68,21 @@
   private String indexKeyFields;
 
   /**
-   * BucketID to file group mapping.
+   * BucketID should be load in this task.
    */
-  private HashMap<String, String> bucketIndex;
+  private Set<Integer> bucketToLoad;
+
+  /**
+   * BucketID to file group mapping in each partition.
+   */
+  private Map<String, Map<Integer, String>> bucketIndex;

Review comment:
       please add comments for the meaning of key and value of this map.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to