HuangZhenQiu commented on code in PR #13892:
URL: https://github.com/apache/hudi/pull/13892#discussion_r2679269316
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java:
##########
@@ -122,20 +148,64 @@ public void snapshotState() {
@Override
public void endInput() {
try {
- sortAndSend();
+ waitForAsyncWriteCompletion();
+ sortAndSend(activeBuffer);
} catch (IOException e) {
throw new HoodieIOException("Fail to sort and flush data in buffer
during endInput.", e);
}
super.endInput();
}
+ /**
+ * Swaps the active and background buffers and triggers async flush of the
background buffer.
+ */
+ private void swapAndFlushAsync() throws IOException {
+ waitForAsyncWriteCompletion();
+
+ // Swap buffers
+ BinaryInMemorySortBuffer temp = activeBuffer;
+ activeBuffer = backgroundBuffer;
+ backgroundBuffer = temp;
+
+ // Start async processing of the background buffer
+ if (!backgroundBuffer.isEmpty()) {
+ isBackgroundBufferBeingProcessed.set(true);
+ CompletableFuture<Void> newTask = CompletableFuture.runAsync(() -> {
+ try {
+ sortAndSend(backgroundBuffer);
+ } catch (IOException e) {
+ LOG.error("Error during async write", e);
+ throw new RuntimeException(e);
+ } finally {
+ isBackgroundBufferBeingProcessed.set(false);
+ }
+ }, asyncWriteExecutor);
+ asyncWriteTask.set(newTask);
+ }
+ }
+
+ /**
+ * Waits for any ongoing async write operation to complete.
+ */
+ private void waitForAsyncWriteCompletion() {
+ try {
+ CompletableFuture<Void> currentTask = asyncWriteTask.get();
+ if (currentTask != null) {
Review Comment:
I see. Change initial value to null
this.asyncWriteTask = new AtomicReference<>(null);
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]