This is an automated email from the ASF dual-hosted git repository.

xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new e69710ae5 [AMORO-3545] Prevent empty tasks list during planning (#3546)
e69710ae5 is described below

commit e69710ae595652302ad4f49f3e1c38f989ed3280
Author: lrsb <[email protected]>
AuthorDate: Fri May 30 17:32:54 2025 +0200

    [AMORO-3545] Prevent empty tasks list during planning (#3546)
---
 .../optimizing/plan/AbstractOptimizingPlanner.java | 36 +++++++++++++---------
 1 file changed, 21 insertions(+), 15 deletions(-)

diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
index 0e4b1b0db..69380c2c9 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java
@@ -36,9 +36,9 @@ import org.apache.iceberg.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -151,27 +151,33 @@ public abstract class AbstractOptimizingPlanner extends 
AbstractOptimizingEvalua
       return cacheAndReturnTasks(Collections.emptyList());
     }
 
-    List<PartitionEvaluator> evaluators = new 
ArrayList<>(needOptimizingPlanMap.values());
+    LinkedList<PartitionEvaluator> evaluators = new 
LinkedList<>(needOptimizingPlanMap.values());
     // prioritize partitions with high cost to avoid starvation
     evaluators.sort(Comparator.comparing(PartitionEvaluator::getWeight, 
Comparator.reverseOrder()));
 
     double maxInputSize = maxInputSizePerThread * availableCore;
     actualPartitionPlans = Lists.newArrayList();
     long actualInputSize = 0;
-    for (PartitionEvaluator evaluator : evaluators) {
-      actualPartitionPlans.add((AbstractPartitionPlan) evaluator);
-      actualInputSize += evaluator.getCost();
-      if (actualInputSize > maxInputSize) {
-        break;
+    List<RewriteStageTask> plannedTasks = Lists.newArrayList();
+
+    while (plannedTasks.isEmpty() && !evaluators.isEmpty()) {
+      for (PartitionEvaluator evaluator = evaluators.poll();
+          evaluator != null;
+          evaluator = evaluators.poll()) {
+        actualPartitionPlans.add((AbstractPartitionPlan) evaluator);
+        actualInputSize += evaluator.getCost();
+        if (actualInputSize > maxInputSize) {
+          break;
+        }
       }
-    }
 
-    double avgThreadCost = actualInputSize / availableCore;
-    List<RewriteStageTask> tasks = Lists.newArrayList();
-    for (AbstractPartitionPlan partitionPlan : actualPartitionPlans) {
-      tasks.addAll(partitionPlan.splitTasks((int) (actualInputSize / 
avgThreadCost)));
+      double avgThreadCost = actualInputSize / availableCore;
+      for (AbstractPartitionPlan partitionPlan : actualPartitionPlans) {
+        plannedTasks.addAll(partitionPlan.splitTasks((int) (actualInputSize / 
avgThreadCost)));
+      }
     }
-    if (!tasks.isEmpty()) {
+
+    if (!plannedTasks.isEmpty()) {
       if (actualPartitionPlans.stream()
           .anyMatch(plan -> plan.getOptimizingType() == OptimizingType.FULL)) {
         optimizingType = OptimizingType.FULL;
@@ -187,12 +193,12 @@ public abstract class AbstractOptimizingPlanner extends 
AbstractOptimizingEvalua
         "{} finish plan, type = {}, get {} tasks, cost {} ns, {} ms 
maxInputSize {} actualInputSize {}",
         identifier,
         getOptimizingType(),
-        tasks.size(),
+        plannedTasks.size(),
         endTime - startTime,
         (endTime - startTime) / 1_000_000,
         maxInputSize,
         actualInputSize);
-    return cacheAndReturnTasks(tasks);
+    return cacheAndReturnTasks(plannedTasks);
   }
 
   private List<RewriteStageTask> cacheAndReturnTasks(List<RewriteStageTask> 
tasks) {

Reply via email to