geserdugarov commented on code in PR #12104:
URL: https://github.com/apache/hudi/pull/12104#discussion_r1802380438


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java:
##########
@@ -463,16 +439,15 @@ private void registerMetrics() {
     writeMetrics.registerMetrics();
   }
 
-  protected List<WriteStatus> writeBucket(String instant, DataBucket bucket, 
List<HoodieRecord> records) {
-    bucket.preWrite(records);
+  protected List<WriteStatus> writeRecords(String instant, List<HoodieRecord> 
records) {
     writeMetrics.startFileFlush();
-    List<WriteStatus> statuses = writeFunction.apply(records, instant);
+    List<WriteStatus> statuses = 
writeFunction.apply(deduplicateRecordsIfNeeded(records), instant);

Review Comment:
   It's because all records in the bucket will have identical location. The 
reason is how we decide which `bucketID` to use for each record to buffer.
   
   For each incoming record in `StreamWriteFunction` we call 
`bufferRecord(value)`. And first step in `bufferRecord(value)` is to call 
`getBucketID(value)`:
   
https://github.com/apache/hudi/blob/47e5dddf7e64f79728398233314faf72c02e5283/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java#L333-L336
   which means that each record in the same bucket (with the same `bucketID`) 
has current location, and all records in the the same bucket will have the same 
`fileId` and `partitionPath`.
   
   As a result, all records in the bucket will have the same current location, 
and we don't need to rewrite it for the first record in the bucket before flush 
even after deduplication.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to