This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a1593f90c9b1 feat: double buffer based async write in
AppendWriteFunctionWithBufferSort (#13892)
a1593f90c9b1 is described below
commit a1593f90c9b11f910bb21184909ed73b55178b9f
Author: Peter Huang <[email protected]>
AuthorDate: Mon Jan 12 12:07:49 2026 -0800
feat: double buffer based async write in AppendWriteFunctionWithBufferSort
(#13892)
---
.../append/AppendWriteFunctionWithBufferSort.java | 113 +++++++++++++++++---
.../ITTestAppendWriteFunctionWithBufferSort.java | 115 ++++++++++++++++++++-
2 files changed, 213 insertions(+), 15 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
index 6980a9d5b4f7..ed5cd197bb07 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
@@ -39,11 +39,18 @@ import
org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
@@ -58,8 +65,14 @@ import java.util.stream.Collectors;
*/
@Slf4j
public class AppendWriteFunctionWithBufferSort<T> extends
AppendWriteFunction<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(AppendWriteFunctionWithBufferSort.class);
+
private final long writeBufferSize;
- private transient BinaryInMemorySortBuffer buffer;
+ private transient BinaryInMemorySortBuffer activeBuffer;
+ private transient BinaryInMemorySortBuffer backgroundBuffer;
+ private transient ExecutorService asyncWriteExecutor;
+ private transient AtomicReference<CompletableFuture<Void>> asyncWriteTask;
+ private transient AtomicBoolean isBackgroundBufferBeingProcessed;
public AppendWriteFunctionWithBufferSort(Configuration config, RowType
rowType) {
super(config, rowType);
@@ -79,38 +92,52 @@ public class AppendWriteFunctionWithBufferSort<T> extends
AppendWriteFunction<T>
GeneratedNormalizedKeyComputer keyComputer =
codeGenerator.generateNormalizedKeyComputer("SortComputer");
GeneratedRecordComparator recordComparator =
codeGenerator.generateRecordComparator("SortComparator");
MemorySegmentPool memorySegmentPool =
MemorySegmentPoolFactory.createMemorySegmentPool(config);
- this.buffer = BufferUtils.createBuffer(rowType,
+
+ this.activeBuffer = BufferUtils.createBuffer(rowType,
memorySegmentPool,
keyComputer.newInstance(Thread.currentThread().getContextClassLoader()),
recordComparator.newInstance(Thread.currentThread().getContextClassLoader()));
- log.info("{} is initialized successfully.", getClass().getSimpleName());
+ this.backgroundBuffer = BufferUtils.createBuffer(rowType,
+ memorySegmentPool,
+
keyComputer.newInstance(Thread.currentThread().getContextClassLoader()),
+
recordComparator.newInstance(Thread.currentThread().getContextClassLoader()));
+
+ this.asyncWriteExecutor = Executors.newSingleThreadExecutor(r -> {
+ Thread t = new Thread(r, "async-write-thread");
+ t.setDaemon(true);
+ return t;
+ });
+ this.asyncWriteTask = new AtomicReference<>(null);
+ this.isBackgroundBufferBeingProcessed = new AtomicBoolean(false);
+
+ LOG.info("{} is initialized successfully with double buffer.",
getClass().getSimpleName());
}
@Override
public void processElement(T value, Context ctx, Collector<RowData> out)
throws Exception {
RowData data = (RowData) value;
- // 1.try to write data into memory pool
- boolean success = buffer.write(data);
+ // try to write data into active buffer
+ boolean success = activeBuffer.write(data);
if (!success) {
- // 2. flushes the bucket if the memory pool is full
- sortAndSend();
- // 3. write the row again
- success = buffer.write(data);
+ swapAndFlushAsync();
+ // write the row again to the new active buffer
+ success = activeBuffer.write(data);
if (!success) {
throw new HoodieException("Buffer is too small to hold a single
record.");
}
}
- if (buffer.size() >= writeBufferSize) {
- sortAndSend();
+ if (activeBuffer.size() >= writeBufferSize) {
+ swapAndFlushAsync();
}
}
@Override
public void snapshotState() {
try {
- sortAndSend();
+ waitForAsyncWriteCompletion();
+ sortAndSend(activeBuffer);
} catch (IOException e) {
throw new HoodieIOException("Fail to sort and flush data in buffer
during snapshot state.", e);
}
@@ -120,20 +147,67 @@ public class AppendWriteFunctionWithBufferSort<T> extends
AppendWriteFunction<T>
@Override
public void endInput() {
try {
- sortAndSend();
+ waitForAsyncWriteCompletion();
+ sortAndSend(activeBuffer);
} catch (IOException e) {
throw new HoodieIOException("Fail to sort and flush data in buffer
during endInput.", e);
}
super.endInput();
}
+ /**
+ * Swaps the active and background buffers and triggers async flush of the
background buffer.
+ */
+ private void swapAndFlushAsync() throws IOException {
+ waitForAsyncWriteCompletion();
+
+ // Swap buffers
+ BinaryInMemorySortBuffer temp = activeBuffer;
+ activeBuffer = backgroundBuffer;
+ backgroundBuffer = temp;
+
+ // Start async processing of the background buffer
+ if (!backgroundBuffer.isEmpty()) {
+ isBackgroundBufferBeingProcessed.set(true);
+ CompletableFuture<Void> newTask = CompletableFuture.runAsync(() -> {
+ try {
+ sortAndSend(backgroundBuffer);
+ } catch (IOException e) {
+ LOG.error("Error during async write", e);
+ throw new RuntimeException(e);
+ } finally {
+ isBackgroundBufferBeingProcessed.set(false);
+ }
+ }, asyncWriteExecutor);
+ asyncWriteTask.set(newTask);
+ }
+ }
+
+ /**
+ * Waits for any ongoing async write operation to complete.
+ */
+ private void waitForAsyncWriteCompletion() {
+ try {
+ if (isBackgroundBufferBeingProcessed.get()) {
+ CompletableFuture<Void> currentTask = asyncWriteTask.get();
+ if (currentTask != null) {
+ currentTask.join();
+ asyncWriteTask.set(null);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error waiting for async write completion", e);
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* For append writing, the flushing can be triggered with the following
conditions:
* 1. Checkpoint trigger, in which the current remaining data in buffer are
flushed and committed.
* 2. Binary buffer is full.
* 3. `endInput` is called for pipelines with a bounded source.
*/
- private void sortAndSend() throws IOException {
+ private void sortAndSend(BinaryInMemorySortBuffer buffer) throws IOException
{
if (buffer.isEmpty()) {
return;
}
@@ -153,4 +227,15 @@ public class AppendWriteFunctionWithBufferSort<T> extends
AppendWriteFunction<T>
private static void sort(BinaryInMemorySortBuffer dataBuffer) {
new QuickSort().sort(dataBuffer);
}
+
+ @Override
+ public void close() throws Exception {
+ try {
+ if (asyncWriteExecutor != null && !asyncWriteExecutor.isShutdown()) {
+ asyncWriteExecutor.shutdown();
+ }
+ } finally {
+ super.close();
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
index 38ad5487762a..24ba04f21849 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -48,6 +49,7 @@ import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for {@link AppendWriteFunctionWithBufferSort}.
@@ -125,7 +127,7 @@ public class ITTestAppendWriteFunctionWithBufferSort
extends TestWriteBase {
// enlarge the wirte buffer record size
this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 10000L);
// use a very small buffer memory size here
- this.conf.set(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.1D);
+ this.conf.set(FlinkOptions.WRITE_TASK_MAX_SIZE, 400.1D);
// Create test data that exceeds buffer size
List<RowData> inputData = new ArrayList<>();
@@ -175,6 +177,117 @@ public class ITTestAppendWriteFunctionWithBufferSort
extends TestWriteBase {
assertArrayEquals(expected.toArray(), filteredResult.toArray());
}
+ @Test
+ public void testMultipleCheckpoints() throws Exception {
+ List<RowData> batch1 = Arrays.asList(
+ createRowData("uuid1", "Charlie", 35, "1970-01-01 00:00:01.123", "p1"),
+ createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1")
+ );
+
+ List<RowData> batch2 = Arrays.asList(
+ createRowData("uuid3", "Bob", 30, "1970-01-01 00:00:01.125", "p1"),
+ createRowData("uuid4", "Diana", 28, "1970-01-01 00:00:01.126", "p1")
+ );
+
+ TestHarness testHarness = TestWriteBase.TestHarness.instance()
+ .preparePipeline(tempFile, conf);
+
+ testHarness.consume(batch1).checkpoint(1);
+ testHarness.consume(batch2).checkpoint(2);
+ testHarness.endInput();
+
+ List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 1);
+ assertEquals(4, actualData.size());
+ }
+
+ @Test
+ public void testLargeDatasetWithMultipleFlushes() throws Exception {
+ this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 50L);
+
+ List<RowData> inputData = new ArrayList<>();
+ for (int i = 0; i < 500; i++) {
+ inputData.add(createRowData("uuid" + i, "Name" + (i % 10), i % 100,
"1970-01-01 00:00:01.123", "p" + (i % 3)));
+ }
+
+ TestWriteBase.TestHarness.instance()
+ .preparePipeline(tempFile, conf)
+ .consume(inputData)
+ .endInput();
+
+ List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 3);
+ assertEquals(500, actualData.size());
+ }
+
+ @Test
+ public void testSortStabilityWithDuplicateKeys() throws Exception {
+ List<RowData> inputData = Arrays.asList(
+ createRowData("uuid1", "Alice", 25, "1970-01-01 00:00:01.123", "p1"),
+ createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1"),
+ createRowData("uuid3", "Alice", 25, "1970-01-01 00:00:01.125", "p1"),
+ createRowData("uuid4", "Bob", 30, "1970-01-01 00:00:01.126", "p1")
+ );
+
+ TestWriteBase.TestHarness.instance()
+ .preparePipeline(tempFile, conf)
+ .consume(inputData)
+ .endInput();
+
+ List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 1);
+ assertEquals(4, actualData.size());
+
+ List<String> filteredResult = actualData.stream()
+ .map(TestData::filterOutVariablesWithoutHudiMetadata)
+ .collect(Collectors.toList());
+
+ assertTrue(filteredResult.get(0).contains("Alice"));
+ assertTrue(filteredResult.get(1).contains("Alice"));
+ assertTrue(filteredResult.get(2).contains("Alice"));
+ assertTrue(filteredResult.get(3).contains("Bob"));
+ }
+
+ @Test
+ public void testDifferentPartitions() throws Exception {
+ List<RowData> inputData = Arrays.asList(
+ createRowData("uuid1", "Alice", 25, "1970-01-01 00:00:01.123", "p1"),
+ createRowData("uuid2", "Bob", 30, "1970-01-01 00:00:01.124", "p2"),
+ createRowData("uuid3", "Charlie", 35, "1970-01-01 00:00:01.125", "p3"),
+ createRowData("uuid4", "Diana", 28, "1970-01-01 00:00:01.126", "p1")
+ );
+
+ TestWriteBase.TestHarness.instance()
+ .preparePipeline(tempFile, conf)
+ .consume(inputData)
+ .endInput();
+
+ List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 3);
+ assertEquals(4, actualData.size());
+ }
+
+ @Test
+ public void testConcurrentWriteScenario() throws Exception {
+ this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 20L);
+
+ List<RowData> inputData = new ArrayList<>();
+ for (int i = 0; i < 200; i++) {
+ inputData.add(createRowData("uuid" + i, "Name" + (i % 5), i % 50,
"1970-01-01 00:00:01.123", "p1"));
+ }
+
+ TestHarness testHarness = TestWriteBase.TestHarness.instance()
+ .preparePipeline(tempFile, conf);
+
+ for (int i = 0; i < inputData.size(); i += 10) {
+ List<RowData> batch = inputData.subList(i, Math.min(i + 10,
inputData.size()));
+ testHarness.consume(batch);
+ if (i % 50 == 0) {
+ testHarness.checkpoint(i / 50 + 1);
+ }
+ }
+ testHarness.endInput();
+
+ List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 1);
+ assertEquals(200, actualData.size());
+ }
+
private GenericRowData createRowData(String uuid, String name, int age,
String timestamp, String partition) {
return GenericRowData.of(StringData.fromString(uuid),
StringData.fromString(name),
age, TimestampData.fromTimestamp(Timestamp.valueOf(timestamp)),
StringData.fromString(partition));