This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b619f05b0a [HUDI-8370] Removed excessive `DataBucket::preWrite` 
(#12104)
4b619f05b0a is described below

commit 4b619f05b0aa4d95f3e70fffdbba0c2368cdf5f1
Author: Geser Dugarov <[email protected]>
AuthorDate: Mon Oct 21 19:07:26 2024 +0700

    [HUDI-8370] Removed excessive `DataBucket::preWrite` (#12104)
---
 .../org/apache/hudi/sink/StreamWriteFunction.java  | 47 +++++-----------------
 .../ConsistentBucketStreamWriteFunction.java       |  5 +--
 2 files changed, 13 insertions(+), 39 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 57fb24a413c..48dd225ecb9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -19,11 +19,8 @@
 package org.apache.hudi.sink;
 
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.ObjectSizeCalculator;
@@ -218,33 +215,18 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
   protected static class DataBucket {
     private final List<HoodieRecord> records;
     private final BufferSizeDetector detector;
-    private final String partitionPath;
-    private final String fileID;
 
-    private DataBucket(Double batchSize, HoodieRecord<?> hoodieRecord) {
+    private DataBucket(Double batchSize) {
       this.records = new ArrayList<>();
       this.detector = new BufferSizeDetector(batchSize);
-      this.partitionPath = hoodieRecord.getPartitionPath();
-      this.fileID = hoodieRecord.getCurrentLocation().getFileId();
     }
 
     public List<HoodieRecord> getRecords() {
       return records;
     }
 
-    /**
-     * Sets up before flush: patch up the first record with correct partition 
path and fileID.
-     *
-     * <p>Note: the method may modify the given records {@code records}.
-     */
-    public void preWrite(List<HoodieRecord> records) {
-      // rewrite the first record with expected fileID
-      HoodieRecord<?> first = records.get(0);
-      HoodieRecord<?> record = new HoodieAvroRecord<>(first.getKey(), 
(HoodieRecordPayload) first.getData(), first.getOperation());
-      HoodieRecordLocation newLoc = new 
HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID);
-      record.setCurrentLocation(newLoc);
-
-      records.set(0, record);
+    public boolean isEmpty() {
+      return records.isEmpty();
     }
 
     public void reset() {
@@ -351,7 +333,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     final String bucketID = getBucketID(value);
 
     DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
-        k -> new 
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
+        k -> new 
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)));
     bucket.records.add(value);
 
     boolean flushBucket = bucket.detector.detect(value);
@@ -392,11 +374,8 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
       return false;
     }
 
-    List<HoodieRecord> records = bucket.getRecords();
-    ValidationUtils.checkState(!records.isEmpty(), "Data bucket to flush has 
no buffering records");
-    records = deduplicateRecordsIfNeeded(records);
-    final List<WriteStatus> writeStatus = writeBucket(instant, bucket, 
records);
-    records.clear();
+    ValidationUtils.checkState(!bucket.isEmpty(), "Data bucket to flush has no 
buffering records");
+    final List<WriteStatus> writeStatus = writeRecords(instant, 
bucket.getRecords());
     final WriteMetadataEvent event = WriteMetadataEvent.builder()
         .taskID(taskID)
         .instantTime(instant) // the write instant may shift but the event 
still use the currentInstant.
@@ -425,11 +404,8 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
           // The records are partitioned by the bucket ID and each batch sent 
to
           // the writer belongs to one bucket.
           .forEach(bucket -> {
-            List<HoodieRecord> records = bucket.getRecords();
-            if (!records.isEmpty()) {
-              records = deduplicateRecordsIfNeeded(records);
-              writeStatus.addAll(writeBucket(currentInstant, bucket, records));
-              records.clear();
+            if (!bucket.isEmpty()) {
+              writeStatus.addAll(writeRecords(currentInstant, 
bucket.getRecords()));
               bucket.reset();
             }
           });
@@ -463,16 +439,15 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     writeMetrics.registerMetrics();
   }
 
-  protected List<WriteStatus> writeBucket(String instant, DataBucket bucket, 
List<HoodieRecord> records) {
-    bucket.preWrite(records);
+  protected List<WriteStatus> writeRecords(String instant, List<HoodieRecord> 
records) {
     writeMetrics.startFileFlush();
-    List<WriteStatus> statuses = writeFunction.apply(records, instant);
+    List<WriteStatus> statuses = 
writeFunction.apply(deduplicateRecordsIfNeeded(records), instant);
     writeMetrics.endFileFlush();
     writeMetrics.increaseNumOfFilesWritten();
     return statuses;
   }
 
-  private List<HoodieRecord> deduplicateRecordsIfNeeded(List<HoodieRecord> 
records) {
+  protected List<HoodieRecord> deduplicateRecordsIfNeeded(List<HoodieRecord> 
records) {
     if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
       return FlinkWriteHelper.newInstance()
           .deduplicateRecords(records, null, -1, 
this.writeClient.getConfig().getSchema(), 
this.writeClient.getConfig().getProps(), recordMerger);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
index 9372d70b68e..aa5405e2aea 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
@@ -71,11 +71,10 @@ public class ConsistentBucketStreamWriteFunction<I> extends 
StreamWriteFunction<
   }
 
   @Override
-  protected List<WriteStatus> writeBucket(String instant, DataBucket bucket, 
List<HoodieRecord> records) {
+  protected List<WriteStatus> writeRecords(String instant, List<HoodieRecord> 
records) {
     updateStrategy.initialize(this.writeClient);
-    bucket.preWrite(records);
     Pair<List<Pair<List<HoodieRecord>, String>>, Set<HoodieFileGroupId>> 
recordListFgPair =
-        updateStrategy.handleUpdate(Collections.singletonList(Pair.of(records, 
instant)));
+        
updateStrategy.handleUpdate(Collections.singletonList(Pair.of(deduplicateRecordsIfNeeded(records),
 instant)));
     return recordListFgPair.getKey().stream().flatMap(
         recordsInstantPair -> 
writeFunction.apply(recordsInstantPair.getLeft(), 
recordsInstantPair.getRight()).stream()
     ).collect(Collectors.toList());

Reply via email to