This is an automated email from the ASF dual-hosted git repository.
xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new e69710ae5 [AMORO-3545] Prevent empty tasks list during planning (#3546)
e69710ae5 is described below
commit e69710ae595652302ad4f49f3e1c38f989ed3280
Author: lrsb <[email protected]>
AuthorDate: Fri May 30 17:32:54 2025 +0200
[AMORO-3545] Prevent empty tasks list during planning (#3546)
---
.../optimizing/plan/AbstractOptimizingPlanner.java | 36 +++++++++++++---------
1 file changed, 21 insertions(+), 15 deletions(-)
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
index 0e4b1b0db..69380c2c9 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
@@ -36,9 +36,9 @@ import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -151,27 +151,33 @@ public abstract class AbstractOptimizingPlanner extends
AbstractOptimizingEvalua
return cacheAndReturnTasks(Collections.emptyList());
}
- List<PartitionEvaluator> evaluators = new
ArrayList<>(needOptimizingPlanMap.values());
+ LinkedList<PartitionEvaluator> evaluators = new
LinkedList<>(needOptimizingPlanMap.values());
// prioritize partitions with high cost to avoid starvation
evaluators.sort(Comparator.comparing(PartitionEvaluator::getWeight,
Comparator.reverseOrder()));
double maxInputSize = maxInputSizePerThread * availableCore;
actualPartitionPlans = Lists.newArrayList();
long actualInputSize = 0;
- for (PartitionEvaluator evaluator : evaluators) {
- actualPartitionPlans.add((AbstractPartitionPlan) evaluator);
- actualInputSize += evaluator.getCost();
- if (actualInputSize > maxInputSize) {
- break;
+ List<RewriteStageTask> plannedTasks = Lists.newArrayList();
+
+ while (plannedTasks.isEmpty() && !evaluators.isEmpty()) {
+ for (PartitionEvaluator evaluator = evaluators.poll();
+ evaluator != null;
+ evaluator = evaluators.poll()) {
+ actualPartitionPlans.add((AbstractPartitionPlan) evaluator);
+ actualInputSize += evaluator.getCost();
+ if (actualInputSize > maxInputSize) {
+ break;
+ }
}
- }
- double avgThreadCost = actualInputSize / availableCore;
- List<RewriteStageTask> tasks = Lists.newArrayList();
- for (AbstractPartitionPlan partitionPlan : actualPartitionPlans) {
- tasks.addAll(partitionPlan.splitTasks((int) (actualInputSize /
avgThreadCost)));
+ double avgThreadCost = actualInputSize / availableCore;
+ for (AbstractPartitionPlan partitionPlan : actualPartitionPlans) {
+ plannedTasks.addAll(partitionPlan.splitTasks((int) (actualInputSize /
avgThreadCost)));
+ }
}
- if (!tasks.isEmpty()) {
+
+ if (!plannedTasks.isEmpty()) {
if (actualPartitionPlans.stream()
.anyMatch(plan -> plan.getOptimizingType() == OptimizingType.FULL)) {
optimizingType = OptimizingType.FULL;
@@ -187,12 +193,12 @@ public abstract class AbstractOptimizingPlanner extends
AbstractOptimizingEvalua
"{} finish plan, type = {}, get {} tasks, cost {} ns, {} ms
maxInputSize {} actualInputSize {}",
identifier,
getOptimizingType(),
- tasks.size(),
+ plannedTasks.size(),
endTime - startTime,
(endTime - startTime) / 1_000_000,
maxInputSize,
actualInputSize);
- return cacheAndReturnTasks(tasks);
+ return cacheAndReturnTasks(plannedTasks);
}
private List<RewriteStageTask> cacheAndReturnTasks(List<RewriteStageTask>
tasks) {