This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a1593f90c9b1 feat: double buffer based async write in 
AppendWriteFunctionWithBufferSort (#13892)
a1593f90c9b1 is described below

commit a1593f90c9b11f910bb21184909ed73b55178b9f
Author: Peter Huang <[email protected]>
AuthorDate: Mon Jan 12 12:07:49 2026 -0800

    feat: double buffer based async write in AppendWriteFunctionWithBufferSort 
(#13892)
---
 .../append/AppendWriteFunctionWithBufferSort.java  | 113 +++++++++++++++++---
 .../ITTestAppendWriteFunctionWithBufferSort.java   | 115 ++++++++++++++++++++-
 2 files changed, 213 insertions(+), 15 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
index 6980a9d5b4f7..ed5cd197bb07 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
@@ -39,11 +39,18 @@ import 
org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer;
 import org.apache.flink.table.runtime.util.MemorySegmentPool;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
@@ -58,8 +65,14 @@ import java.util.stream.Collectors;
  */
 @Slf4j
 public class AppendWriteFunctionWithBufferSort<T> extends 
AppendWriteFunction<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AppendWriteFunctionWithBufferSort.class);
+
   private final long writeBufferSize;
-  private transient BinaryInMemorySortBuffer buffer;
+  private transient BinaryInMemorySortBuffer activeBuffer;
+  private transient BinaryInMemorySortBuffer backgroundBuffer;
+  private transient ExecutorService asyncWriteExecutor;
+  private transient AtomicReference<CompletableFuture<Void>> asyncWriteTask;
+  private transient AtomicBoolean isBackgroundBufferBeingProcessed;
 
   public AppendWriteFunctionWithBufferSort(Configuration config, RowType 
rowType) {
     super(config, rowType);
@@ -79,38 +92,52 @@ public class AppendWriteFunctionWithBufferSort<T> extends 
AppendWriteFunction<T>
     GeneratedNormalizedKeyComputer keyComputer = 
codeGenerator.generateNormalizedKeyComputer("SortComputer");
     GeneratedRecordComparator recordComparator = 
codeGenerator.generateRecordComparator("SortComparator");
     MemorySegmentPool memorySegmentPool = 
MemorySegmentPoolFactory.createMemorySegmentPool(config);
-    this.buffer = BufferUtils.createBuffer(rowType,
+
+    this.activeBuffer = BufferUtils.createBuffer(rowType,
             memorySegmentPool,
             
keyComputer.newInstance(Thread.currentThread().getContextClassLoader()),
             
recordComparator.newInstance(Thread.currentThread().getContextClassLoader()));
-    log.info("{} is initialized successfully.", getClass().getSimpleName());
+    this.backgroundBuffer = BufferUtils.createBuffer(rowType,
+            memorySegmentPool,
+            
keyComputer.newInstance(Thread.currentThread().getContextClassLoader()),
+            
recordComparator.newInstance(Thread.currentThread().getContextClassLoader()));
+
+    this.asyncWriteExecutor = Executors.newSingleThreadExecutor(r -> {
+      Thread t = new Thread(r, "async-write-thread");
+      t.setDaemon(true);
+      return t;
+    });
+    this.asyncWriteTask = new AtomicReference<>(null);
+    this.isBackgroundBufferBeingProcessed = new AtomicBoolean(false);
+
+    LOG.info("{} is initialized successfully with double buffer.", 
getClass().getSimpleName());
   }
 
   @Override
   public void processElement(T value, Context ctx, Collector<RowData> out) 
throws Exception {
     RowData data = (RowData) value;
 
-    // 1.try to write data into memory pool
-    boolean success = buffer.write(data);
+    // try to write data into active buffer
+    boolean success = activeBuffer.write(data);
     if (!success) {
-      // 2. flushes the bucket if the memory pool is full
-      sortAndSend();
-      // 3. write the row again
-      success = buffer.write(data);
+      swapAndFlushAsync();
+      // write the row again to the new active buffer
+      success = activeBuffer.write(data);
       if (!success) {
         throw new HoodieException("Buffer is too small to hold a single 
record.");
       }
     }
 
-    if (buffer.size() >= writeBufferSize) {
-      sortAndSend();
+    if (activeBuffer.size() >= writeBufferSize) {
+      swapAndFlushAsync();
     }
   }
 
   @Override
   public void snapshotState() {
     try {
-      sortAndSend();
+      waitForAsyncWriteCompletion();
+      sortAndSend(activeBuffer);
     } catch (IOException e) {
       throw new HoodieIOException("Fail to sort and flush data in buffer 
during snapshot state.", e);
     }
@@ -120,20 +147,67 @@ public class AppendWriteFunctionWithBufferSort<T> extends 
AppendWriteFunction<T>
   @Override
   public void endInput() {
     try {
-      sortAndSend();
+      waitForAsyncWriteCompletion();
+      sortAndSend(activeBuffer);
     } catch (IOException e) {
       throw new HoodieIOException("Fail to sort and flush data in buffer 
during endInput.", e);
     }
     super.endInput();
   }
 
+  /**
+   * Swaps the active and background buffers and triggers async flush of the 
background buffer.
+   */
+  private void swapAndFlushAsync() throws IOException {
+    waitForAsyncWriteCompletion();
+
+    // Swap buffers
+    BinaryInMemorySortBuffer temp = activeBuffer;
+    activeBuffer = backgroundBuffer;
+    backgroundBuffer = temp;
+
+    // Start async processing of the background buffer
+    if (!backgroundBuffer.isEmpty()) {
+      isBackgroundBufferBeingProcessed.set(true);
+      CompletableFuture<Void> newTask = CompletableFuture.runAsync(() -> {
+        try {
+          sortAndSend(backgroundBuffer);
+        } catch (IOException e) {
+          LOG.error("Error during async write", e);
+          throw new RuntimeException(e);
+        } finally {
+          isBackgroundBufferBeingProcessed.set(false);
+        }
+      }, asyncWriteExecutor);
+      asyncWriteTask.set(newTask);
+    }
+  }
+
+  /**
+   * Waits for any ongoing async write operation to complete.
+   */
+  private void waitForAsyncWriteCompletion() {
+    try {
+      if (isBackgroundBufferBeingProcessed.get()) {
+        CompletableFuture<Void> currentTask = asyncWriteTask.get();
+        if (currentTask != null) {
+          currentTask.join();
+          asyncWriteTask.set(null);
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Error waiting for async write completion", e);
+      throw new RuntimeException(e);
+    }
+  }
+
   /**
    * For append writing, the flushing can be triggered with the following 
conditions:
    * 1. Checkpoint trigger, in which the current remaining data in buffer are 
flushed and committed.
    * 2. Binary buffer is full.
    * 3. `endInput` is called for pipelines with a bounded source.
    */
-  private void sortAndSend() throws IOException {
+  private void sortAndSend(BinaryInMemorySortBuffer buffer) throws IOException 
{
     if (buffer.isEmpty()) {
       return;
     }
@@ -153,4 +227,15 @@ public class AppendWriteFunctionWithBufferSort<T> extends 
AppendWriteFunction<T>
   private static void sort(BinaryInMemorySortBuffer dataBuffer) {
     new QuickSort().sort(dataBuffer);
   }
+
+  @Override
+  public void close() throws Exception {
+    try {
+      if (asyncWriteExecutor != null && !asyncWriteExecutor.isShutdown()) {
+        asyncWriteExecutor.shutdown();
+      }
+    } finally {
+      super.close();
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
index 38ad5487762a..24ba04f21849 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
+
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -48,6 +49,7 @@ import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test cases for {@link AppendWriteFunctionWithBufferSort}.
@@ -125,7 +127,7 @@ public class ITTestAppendWriteFunctionWithBufferSort 
extends TestWriteBase {
     // enlarge the wirte buffer record size
     this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 10000L);
     // use a very small buffer memory size here
-    this.conf.set(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.1D);
+    this.conf.set(FlinkOptions.WRITE_TASK_MAX_SIZE, 400.1D);
 
     // Create test data that exceeds buffer size
     List<RowData> inputData = new ArrayList<>();
@@ -175,6 +177,117 @@ public class ITTestAppendWriteFunctionWithBufferSort 
extends TestWriteBase {
     assertArrayEquals(expected.toArray(), filteredResult.toArray());
   }
 
+  @Test
+  public void testMultipleCheckpoints() throws Exception {
+    List<RowData> batch1 = Arrays.asList(
+        createRowData("uuid1", "Charlie", 35, "1970-01-01 00:00:01.123", "p1"),
+        createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1")
+    );
+
+    List<RowData> batch2 = Arrays.asList(
+        createRowData("uuid3", "Bob", 30, "1970-01-01 00:00:01.125", "p1"),
+        createRowData("uuid4", "Diana", 28, "1970-01-01 00:00:01.126", "p1")
+    );
+
+    TestHarness testHarness = TestWriteBase.TestHarness.instance()
+        .preparePipeline(tempFile, conf);
+
+    testHarness.consume(batch1).checkpoint(1);
+    testHarness.consume(batch2).checkpoint(2);
+    testHarness.endInput();
+
+    List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 1);
+    assertEquals(4, actualData.size());
+  }
+
+  @Test
+  public void testLargeDatasetWithMultipleFlushes() throws Exception {
+    this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 50L);
+
+    List<RowData> inputData = new ArrayList<>();
+    for (int i = 0; i < 500; i++) {
+      inputData.add(createRowData("uuid" + i, "Name" + (i % 10), i % 100, 
"1970-01-01 00:00:01.123", "p" + (i % 3)));
+    }
+
+    TestWriteBase.TestHarness.instance()
+        .preparePipeline(tempFile, conf)
+        .consume(inputData)
+        .endInput();
+
+    List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 3);
+    assertEquals(500, actualData.size());
+  }
+
+  @Test
+  public void testSortStabilityWithDuplicateKeys() throws Exception {
+    List<RowData> inputData = Arrays.asList(
+        createRowData("uuid1", "Alice", 25, "1970-01-01 00:00:01.123", "p1"),
+        createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1"),
+        createRowData("uuid3", "Alice", 25, "1970-01-01 00:00:01.125", "p1"),
+        createRowData("uuid4", "Bob", 30, "1970-01-01 00:00:01.126", "p1")
+    );
+
+    TestWriteBase.TestHarness.instance()
+        .preparePipeline(tempFile, conf)
+        .consume(inputData)
+        .endInput();
+
+    List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 1);
+    assertEquals(4, actualData.size());
+
+    List<String> filteredResult = actualData.stream()
+        .map(TestData::filterOutVariablesWithoutHudiMetadata)
+        .collect(Collectors.toList());
+
+    assertTrue(filteredResult.get(0).contains("Alice"));
+    assertTrue(filteredResult.get(1).contains("Alice"));
+    assertTrue(filteredResult.get(2).contains("Alice"));
+    assertTrue(filteredResult.get(3).contains("Bob"));
+  }
+
+  @Test
+  public void testDifferentPartitions() throws Exception {
+    List<RowData> inputData = Arrays.asList(
+        createRowData("uuid1", "Alice", 25, "1970-01-01 00:00:01.123", "p1"),
+        createRowData("uuid2", "Bob", 30, "1970-01-01 00:00:01.124", "p2"),
+        createRowData("uuid3", "Charlie", 35, "1970-01-01 00:00:01.125", "p3"),
+        createRowData("uuid4", "Diana", 28, "1970-01-01 00:00:01.126", "p1")
+    );
+
+    TestWriteBase.TestHarness.instance()
+        .preparePipeline(tempFile, conf)
+        .consume(inputData)
+        .endInput();
+
+    List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 3);
+    assertEquals(4, actualData.size());
+  }
+
+  @Test
+  public void testConcurrentWriteScenario() throws Exception {
+    this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 20L);
+
+    List<RowData> inputData = new ArrayList<>();
+    for (int i = 0; i < 200; i++) {
+      inputData.add(createRowData("uuid" + i, "Name" + (i % 5), i % 50, 
"1970-01-01 00:00:01.123", "p1"));
+    }
+
+    TestHarness testHarness = TestWriteBase.TestHarness.instance()
+        .preparePipeline(tempFile, conf);
+
+    for (int i = 0; i < inputData.size(); i += 10) {
+      List<RowData> batch = inputData.subList(i, Math.min(i + 10, 
inputData.size()));
+      testHarness.consume(batch);
+      if (i % 50 == 0) {
+        testHarness.checkpoint(i / 50 + 1);
+      }
+    }
+    testHarness.endInput();
+
+    List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 1);
+    assertEquals(200, actualData.size());
+  }
+
   private GenericRowData createRowData(String uuid, String name, int age, 
String timestamp, String partition) {
     return GenericRowData.of(StringData.fromString(uuid), 
StringData.fromString(name),
         age, TimestampData.fromTimestamp(Timestamp.valueOf(timestamp)), 
StringData.fromString(partition));

Reply via email to