This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 74cafba6e6c6 [HUDI-9795] Use dedicated ForkJoinPool for parallel
execution in HoodieFlinkEngineContext (#13853)
74cafba6e6c6 is described below
commit 74cafba6e6c65d0ea25f84e84165f671d040df9e
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Sep 12 15:10:43 2025 +0800
[HUDI-9795] Use dedicated ForkJoinPool for parallel execution in
HoodieFlinkEngineContext (#13853)
---
.../client/common/HoodieFlinkEngineContext.java | 54 ++++++++++++++++------
1 file changed, 41 insertions(+), 13 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index aee7a2d904ca..8c45c9c35f9d 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
@@ -54,6 +55,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -109,16 +112,19 @@ public class HoodieFlinkEngineContext extends
HoodieEngineContext {
@Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int
parallelism) {
- return
data.stream().parallel().map(throwingMapWrapper(func)).collect(Collectors.toList());
+ return executeParallelStream(data.parallelStream(), stream ->
stream.map(throwingMapWrapper(func)).collect(Collectors.toList()), parallelism);
}
@Override
public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data,
SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V,
V> reduceFunc, int parallelism) {
- return
data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
- .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
- .map(list -> list.stream().map(e ->
e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
+ return executeParallelStream(
+ data.parallelStream(),
+ stream -> stream.map(throwingMapToPairWrapper(mapToPairFunc))
+ .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+ .map(list -> list.stream().map(e ->
e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList()),
+ parallelism);
}
@Override
@@ -135,16 +141,21 @@ public class HoodieFlinkEngineContext extends
HoodieEngineContext {
@Override
public <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int
parallelism) {
- return data.stream().parallel()
- .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
- .map(list -> list.stream().map(e ->
e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
+ return executeParallelStream(
+ data.parallelStream(),
+ stream ->
stream.collect(Collectors.groupingBy(Pair::getKey)).values().stream()
+ .map(list ->
list.stream().map(Pair::getValue).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList()),
+ parallelism);
}
@Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I,
Stream<O>> func, int parallelism) {
- return
data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList());
+ return executeParallelStream(
+ data.parallelStream(),
+ stream ->
stream.flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList()),
+ parallelism);
}
@Override
@@ -154,7 +165,24 @@ public class HoodieFlinkEngineContext extends
HoodieEngineContext {
@Override
public <I, K, V> Map<K, V> mapToPair(List<I> data,
SerializablePairFunction<I, K, V> func, Integer parallelism) {
- return
data.stream().parallel().map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Pair::getLeft,
Pair::getRight));
+ return executeParallelStream(
+ data.parallelStream(),
+ stream ->
stream.map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Pair::getLeft,
Pair::getRight)),
+ parallelism);
+ }
+
+ /**
+ * Execute a parallel stream with a dedicated ForkJoinPool.
+ */
+ private static <E, O> O executeParallelStream(Stream<E> paralelStream,
Function<Stream<E>, O> transform, int parallelism) {
+ ForkJoinPool pool = new ForkJoinPool(parallelism);
+ try {
+ return pool.submit(() -> transform.apply(paralelStream)).get();
+ } catch (Exception e) {
+ throw new HoodieException("Failed to execute parallel stream with
dedicated ForkJoinPool.", e);
+ } finally {
+ pool.shutdown();
+ }
}
@Override