This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2288c079833 [HUDI-8782] BulkInsertWriterHelper parallel close (#12518)
2288c079833 is described below

commit 2288c07983322017bfb63fa22c4e998a25dca2c5
Author: fhan <[email protected]>
AuthorDate: Tue Jan 7 15:42:16 2025 +0800

    [HUDI-8782] BulkInsertWriterHelper parallel close (#12518)
    
    * parralel close draft
    * update awaitTermination to 10 minutes in close
    * deal with empty handles in close
    * hard code close to 10 max parallelism
    
    ---------
    
    Co-authored-by: fhan <[email protected]>
---
 .../hudi/sink/bulk/BulkInsertWriterHelper.java     | 34 ++++++++++++++++++++--
 1 file changed, 31 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 9b9b85ba6e3..dd2368b1ff5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -25,6 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
 import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
 import org.apache.hudi.table.HoodieTable;
@@ -45,6 +46,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.FutureUtils.allOf;
 
 /**
  * Helper class for bulk insert used by Flink.
@@ -164,9 +172,29 @@ public class BulkInsertWriterHelper {
   }
 
   public void close() throws IOException {
-    for (HoodieRowDataCreateHandle rowCreateHandle : handles.values()) {
-      LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName());
-      writeStatusList.add(closeWriteHandle(rowCreateHandle));
+    if (handles.isEmpty()) {
+      return;
+    }
+    int handsSize = Math.min(handles.size(), 10);
+    ExecutorService executorService = Executors.newFixedThreadPool(handsSize);
+    allOf(handles.values().stream()
+        .map(rowCreateHandle -> CompletableFuture.supplyAsync(() -> {
+          try {
+            LOG.info("Closing bulk insert file " + 
rowCreateHandle.getFileName());
+            return rowCreateHandle.close();
+          } catch (IOException e) {
+            throw new HoodieIOException("IOE during rowCreateHandle.close()", 
e);
+          }
+        }, executorService))
+        .collect(Collectors.toList())
+    ).whenComplete((result, throwable) -> {
+      writeStatusList.addAll(result);
+    }).join();
+    try {
+      executorService.shutdown();
+      executorService.awaitTermination(10, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
     }
     handles.clear();
     handle = null;

Reply via email to