xiarixiaoyao commented on a change in pull request #4178:
URL: https://github.com/apache/hudi/pull/4178#discussion_r765412301
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
##########
@@ -95,8 +95,7 @@ public MultipleSparkJobExecutionStrategy(HoodieTable table,
HoodieEngineContext
.map(inputGroup -> runClusteringForGroupAsync(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
- instantTime))
- .map(CompletableFuture::join);
+
instantTime)).collect(Collectors.toList()).stream().map(CompletableFuture::join);
Review comment:
@alexeykudinkin, here is my explain:
1) in original code, one streaming pipleline is used, using join in the same
pipeline will lead threads to execute one by one
2) Here are two stream operations. Join operation is in the second stream
pipleline, this operation will not lead threads to execute one by one。
we can verify by follow code:
public static void main(String[] args) {
Integer[] test = new Integer[] {0, 2, 3};
long time = System.currentTimeMillis();
List<CompletableFuture<Integer>> ls =
Arrays.stream(test).map(f -> CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
return f;
})).collect(Collectors.toList());
ls.stream().map(f -> f.join()).collect(Collectors.toList());
System.out.println(String.format("cost time: %s",
(System.currentTimeMillis() - time) /1000));
}
You are an expert in this field, thank you for your patient guidance. If you
still think there is a problem, I will modify the code. Thanks again
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]