This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 33a02987b7f [HUDI-8301] Removed `DataItem` for Flink stream writing 
(#12054)
33a02987b7f is described below

commit 33a02987b7fc385253dc0e0efbf112066b8cf190
Author: Geser Dugarov <[email protected]>
AuthorDate: Tue Oct 8 08:37:54 2024 +0700

    [HUDI-8301] Removed `DataItem` for Flink stream writing (#12054)
---
 .../org/apache/hudi/sink/StreamWriteFunction.java  | 74 ++++------------------
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 31 +++++----
 2 files changed, 30 insertions(+), 75 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index be6e901f171..57fb24a413c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -20,8 +20,6 @@ package org.apache.hudi.sink;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordMerger;
@@ -57,7 +55,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.function.BiFunction;
-import java.util.stream.Collectors;
 
 /**
  * Sink function to write the data to the underneath filesystem.
@@ -175,7 +172,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
   public Map<String, List<HoodieRecord>> getDataBuffer() {
     Map<String, List<HoodieRecord>> ret = new HashMap<>();
     for (Map.Entry<String, DataBucket> entry : buckets.entrySet()) {
-      ret.put(entry.getKey(), entry.getValue().writeBuffer());
+      ret.put(entry.getKey(), entry.getValue().getRecords());
     }
     return ret;
   }
@@ -215,48 +212,11 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     LOG.info("init hoodie merge with class [{}]", 
recordMerger.getClass().getName());
   }
 
-  /**
-   * Represents a data item in the buffer, this is needed to reduce the
-   * memory footprint.
-   *
-   * <p>A {@link HoodieRecord} was firstly transformed into a {@link DataItem}
-   * for buffering, it then transforms back to the {@link HoodieRecord} before 
flushing.
-   */
-  private static class DataItem {
-    private final String key; // record key
-    private final String instant; // 'U' or 'I'
-    private final HoodieRecordPayload<?> data; // record payload
-    private final HoodieOperation operation; // operation
-
-    private DataItem(String key, String instant, HoodieRecordPayload<?> data, 
HoodieOperation operation) {
-      this.key = key;
-      this.instant = instant;
-      this.data = data;
-      this.operation = operation;
-    }
-
-    public static DataItem fromHoodieRecord(HoodieRecord<?> record) {
-      return new DataItem(
-          record.getRecordKey(),
-          record.getCurrentLocation().getInstantTime(),
-          ((HoodieAvroRecord) record).getData(),
-          record.getOperation());
-    }
-
-    public HoodieRecord<?> toHoodieRecord(String partitionPath) {
-      HoodieKey hoodieKey = new HoodieKey(this.key, partitionPath);
-      HoodieRecord<?> record = new HoodieAvroRecord<>(hoodieKey, data, 
operation);
-      HoodieRecordLocation loc = new HoodieRecordLocation(instant, null);
-      record.setCurrentLocation(loc);
-      return record;
-    }
-  }
-
   /**
    * Data bucket.
    */
   protected static class DataBucket {
-    private final List<DataItem> records;
+    private final List<HoodieRecord> records;
     private final BufferSizeDetector detector;
     private final String partitionPath;
     private final String fileID;
@@ -268,14 +228,8 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
       this.fileID = hoodieRecord.getCurrentLocation().getFileId();
     }
 
-    /**
-     * Prepare the write data buffer: patch up all the records with correct 
partition path.
-     */
-    public List<HoodieRecord> writeBuffer() {
-      // rewrite all the records with new record key
-      return records.stream()
-          .map(record -> record.toHoodieRecord(partitionPath))
-          .collect(Collectors.toList());
+    public List<HoodieRecord> getRecords() {
+      return records;
     }
 
     /**
@@ -398,11 +352,9 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
 
     DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
         k -> new 
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
-    final DataItem item = DataItem.fromHoodieRecord(value);
-
-    bucket.records.add(item);
+    bucket.records.add(value);
 
-    boolean flushBucket = bucket.detector.detect(item);
+    boolean flushBucket = bucket.detector.detect(value);
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     // update buffer metrics after tracing buffer size
     writeMetrics.setWriteBufferedSize(this.tracer.bufferSize);
@@ -426,8 +378,8 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
   }
 
   private boolean hasData() {
-    return this.buckets.size() > 0
-        && this.buckets.values().stream().anyMatch(bucket -> 
bucket.records.size() > 0);
+    return !this.buckets.isEmpty()
+        && this.buckets.values().stream().anyMatch(bucket -> 
!bucket.records.isEmpty());
   }
 
   @SuppressWarnings("unchecked, rawtypes")
@@ -440,8 +392,8 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
       return false;
     }
 
-    List<HoodieRecord> records = bucket.writeBuffer();
-    ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has 
no buffering records");
+    List<HoodieRecord> records = bucket.getRecords();
+    ValidationUtils.checkState(!records.isEmpty(), "Data bucket to flush has 
no buffering records");
     records = deduplicateRecordsIfNeeded(records);
     final List<WriteStatus> writeStatus = writeBucket(instant, bucket, 
records);
     records.clear();
@@ -467,14 +419,14 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
       throw new HoodieException("No inflight instant when flushing data!");
     }
     final List<WriteStatus> writeStatus;
-    if (buckets.size() > 0) {
+    if (!buckets.isEmpty()) {
       writeStatus = new ArrayList<>();
       this.buckets.values()
           // The records are partitioned by the bucket ID and each batch sent 
to
           // the writer belongs to one bucket.
           .forEach(bucket -> {
-            List<HoodieRecord> records = bucket.writeBuffer();
-            if (records.size() > 0) {
+            List<HoodieRecord> records = bucket.getRecords();
+            if (!records.isEmpty()) {
               records = deduplicateRecordsIfNeeded(records);
               writeStatus.addAll(writeBucket(currentInstant, bucket, records));
               records.clear();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index d80b0746965..8a1278db9ab 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -53,6 +53,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
  */
 public class TestWriteCopyOnWrite extends TestWriteBase {
 
+  // to trigger buffer flush of 3 rows, each is 576 bytes for INSERT and 624 
bytes for UPSERT
+  private static final double BATCH_SIZE_MB = 0.0016;
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testCheckpoint(boolean allowEmptyCommit) throws Exception {
@@ -232,13 +235,12 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
   @Test
   public void testInsertWithMiniBatches() throws Exception {
     // reset the config option
-    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch 
size
+    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, getBatchSize());
 
     Map<String, String> expected = getMiniBatchExpected();
 
     preparePipeline(conf)
-        // record (operation: 'I') is 304 bytes and record (operation: 'U') is 
352 bytes.
-        // so 3 records expect to trigger a mini-batch write
+        // 3 records from 5 should trigger a mini-batch write
         .consume(TestData.DATA_SET_INSERT_DUPLICATES)
         .assertDataBuffer(1, 2)
         .checkpoint(1)
@@ -257,15 +259,14 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
   @Test
   public void testInsertWithDeduplication() throws Exception {
     // reset the config option
-    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch 
size
+    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, getBatchSize());
     conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
 
     Map<String, String> expected = new HashMap<>();
     expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]");
 
     preparePipeline(conf)
-        // record (operation: 'I') is 304 bytes and record (operation: 'U') is 
352 bytes.
-        // so 3 records expect to trigger a mini-batch write
+        // 3 records from 5 should trigger a mini-batch write
         .consume(TestData.DATA_SET_INSERT_SAME_KEY)
         .assertDataBuffer(1, 2)
         .checkpoint(1)
@@ -308,13 +309,12 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
     // reset the config option
     conf.setString(FlinkOptions.OPERATION, "insert");
     conf.setBoolean(FlinkOptions.INSERT_CLUSTER, true);
-    conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes 
buffer size
+    conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0 + getBatchSize());
 
     TestWriteMergeOnRead.TestHarness.instance()
-        // record (operation: 'I') is 304 bytes and record (operation: 'U') is 
352 bytes.
-        // so 3 records expect to trigger a mini-batch write
-        // flush the max size bucket once at a time.
         .preparePipeline(tempFile, conf)
+        // 3 records from 5 should trigger a mini-batch write
+        // flush the max size bucket once at a time
         .consume(TestData.DATA_SET_INSERT_SAME_KEY)
         .assertDataBuffer(1, 2)
         .checkpoint(1)
@@ -357,14 +357,13 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
   @Test
   public void testInsertWithSmallBufferSize() throws Exception {
     // reset the config option
-    conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0008); // 839 bytes 
buffer size
+    conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200 + getBatchSize());
 
     Map<String, String> expected = getMiniBatchExpected();
 
     preparePipeline(conf)
-        // record (operation: 'I') is 304 bytes and record (operation: 'U') is 
352 bytes.
-        // so 3 records expect to trigger a mini-batch write
-        // flush the max size bucket once at a time.
+        // 3 records from 5 should trigger a mini-batch write
+        // flush the max size bucket once at a time
         .consume(TestData.DATA_SET_INSERT_DUPLICATES)
         .assertDataBuffer(1, 2)
         .checkpoint(1)
@@ -418,6 +417,10 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .end();
   }
 
+  protected double getBatchSize() {
+    return BATCH_SIZE_MB;
+  }
+
   protected Map<String, String> getMiniBatchExpected() {
     Map<String, String> expected = new HashMap<>();
     // the last 2 lines are merged

Reply via email to