cshuo commented on code in PR #13892:
URL: https://github.com/apache/hudi/pull/13892#discussion_r2469385472
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java:
##########
@@ -78,41 +89,56 @@ public void open(Configuration parameters) throws Exception
{
List<String> sortKeyList = Arrays.stream(sortKeys.split(",")).map(key ->
key.trim()).collect(Collectors.toList());
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType,
sortKeyList.toArray(new String[0]));
SortCodeGenerator codeGenerator =
sortOperatorGen.createSortCodeGenerator();
- GeneratedNormalizedKeyComputer keyComputer =
codeGenerator.generateNormalizedKeyComputer("SortComputer");
- GeneratedRecordComparator recordComparator =
codeGenerator.generateRecordComparator("SortComparator");
+ this.keyComputer =
codeGenerator.generateNormalizedKeyComputer("SortComputer");
+ this.recordComparator =
codeGenerator.generateRecordComparator("SortComparator");
MemorySegmentPool memorySegmentPool =
MemorySegmentPoolFactory.createMemorySegmentPool(config);
- this.buffer = BufferUtils.createBuffer(rowType,
+
+ this.activeBuffer = BufferUtils.createBuffer(rowType,
memorySegmentPool,
keyComputer.newInstance(Thread.currentThread().getContextClassLoader()),
recordComparator.newInstance(Thread.currentThread().getContextClassLoader()));
- LOG.info("{} is initialized successfully.", getClass().getSimpleName());
+ this.backgroundBuffer = BufferUtils.createBuffer(rowType,
+ MemorySegmentPoolFactory.createMemorySegmentPool(config),
+
keyComputer.newInstance(Thread.currentThread().getContextClassLoader()),
+
recordComparator.newInstance(Thread.currentThread().getContextClassLoader()));
+
+ this.asyncWriteExecutor = Executors.newSingleThreadExecutor(r -> {
+ Thread t = new Thread(r, "async-write-thread");
+ t.setDaemon(true);
+ return t;
+ });
+ this.asyncWriteTask = new
AtomicReference<>(CompletableFuture.completedFuture(null));
+ this.isBackgroundBufferBeingProcessed = new AtomicBoolean(false);
Review Comment:
guess `isBackgroundBufferBeingProcessed` can be used for early return in
`waitForAsyncWriteCompletion`?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java:
##########
@@ -61,7 +66,13 @@
public class AppendWriteFunctionWithBufferSort<T> extends
AppendWriteFunction<T> {
private static final Logger LOG =
LoggerFactory.getLogger(AppendWriteFunctionWithBufferSort.class);
private final long writeBufferSize;
- private transient BinaryInMemorySortBuffer buffer;
+ private transient BinaryInMemorySortBuffer activeBuffer;
+ private transient BinaryInMemorySortBuffer backgroundBuffer;
+ private transient ExecutorService asyncWriteExecutor;
+ private transient AtomicReference<CompletableFuture<Void>> asyncWriteTask;
+ private transient AtomicBoolean isBackgroundBufferBeingProcessed;
+ private transient GeneratedNormalizedKeyComputer keyComputer;
+ private transient GeneratedRecordComparator recordComparator;
Review Comment:
can be kept as local variable
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java:
##########
@@ -61,7 +66,13 @@
public class AppendWriteFunctionWithBufferSort<T> extends
AppendWriteFunction<T> {
private static final Logger LOG =
LoggerFactory.getLogger(AppendWriteFunctionWithBufferSort.class);
private final long writeBufferSize;
- private transient BinaryInMemorySortBuffer buffer;
+ private transient BinaryInMemorySortBuffer activeBuffer;
+ private transient BinaryInMemorySortBuffer backgroundBuffer;
+ private transient ExecutorService asyncWriteExecutor;
+ private transient AtomicReference<CompletableFuture<Void>> asyncWriteTask;
+ private transient AtomicBoolean isBackgroundBufferBeingProcessed;
+ private transient GeneratedNormalizedKeyComputer keyComputer;
Review Comment:
can be kept as local variable
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java:
##########
@@ -155,4 +225,16 @@ private void sortAndSend() throws IOException {
private static void sort(BinaryInMemorySortBuffer dataBuffer) {
new QuickSort().sort(dataBuffer);
}
+
+ @Override
+ public void close() throws Exception {
+ try {
+ waitForAsyncWriteCompletion();
Review Comment:
Seems it's unnecessary to wait for async task completion here? There are
basically 2 cases when `close()` is called:
1. Job finished normally, in which case `endInput()` should be called, and
`waitForAsyncWriteCompletion` is called there to ensure data is flushed.
2. Job finished with exception, in which case the buffered data is not
needed to flush, and blocking the close procedure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]