nsivabalan commented on code in PR #7318:
URL: https://github.com/apache/hudi/pull/7318#discussion_r1034151030


##########
hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java:
##########
@@ -269,4 +279,70 @@ private static Object checkElementNotNull(Object element, 
int index) {
     return Objects.requireNonNull(element, "Element is null at index " + 
index);
   }
 
+  public static <T> void forEachParallel(Collection<T> inputs, Consumer<T> 
function) {
+    forEachParallel(inputs, function, 4, 1000);
+  }
+
+  /**
+   * Executes forEach loops in parallel. The input function will run on all 
inputs even if exceptions are thrown during the execution of any individual 
input.
+   * Execution of inputs is not guaranteed to occur if the timeout is reached. 
Don't use this if your consumer function isn't thread safe
+   * */
+  public static <T> void forEachParallel(Collection<T> inputs, Consumer<T> 
function, int nThreads, long msTimeout) {
+    long timeoutTime = System.currentTimeMillis() + msTimeout;
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+
+    //Queue up all consumer functions to run, they may start at any time
+    List<Future<Boolean>> futures = inputs.stream().map(t ->
+        executorService.submit(() -> {
+          function.accept(t);
+          return true;
+        })).collect(Collectors.toList());
+
+    //List of exceptions that consumer functions throw
+    List<Throwable> exceptions = new ArrayList<>();

Review Comment:
   can we try to use completable future. might simplify lot of things



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to