zhoujinsong commented on code in PR #3669:
URL: https://github.com/apache/amoro/pull/3669#discussion_r2250613965


##########
amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java:
##########
@@ -124,6 +125,8 @@ public DefaultOptimizingService(
         
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM);
     this.pollingTimeout =
         
serviceConfig.get(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT).toMillis();
+    this.enableOverQuota =

Review Comment:
   Maybe change the variable name to `breakQuotaLimit`



##########
amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/OptimizingUtil.java:
##########
@@ -89,26 +100,33 @@ public static TableOptimizingInfo buildTableOptimizeInfo(
     return tableOptimizeInfo;
   }
 
-  private static double calculateQuotaOccupy(
+  public static long calculateQuotaOccupy(
       List<OptimizingTaskMeta> processTasks,
       List<TaskRuntime.TaskQuota> quotas,
       long startTime,
       long endTime) {
-    double finishedOccupy = 0;
-    if (quotas != null) {
-      finishedOccupy = quotas.stream().mapToDouble(q -> 
q.getQuotaTime(startTime)).sum();
-    }
-    double runningOccupy = 0;
-    if (processTasks != null) {
-      runningOccupy =
-          processTasks.stream()
-              .mapToDouble(
-                  t ->
-                      TaskRuntime.taskRunningQuotaTime(
-                          startTime, endTime, t.getStartTime(), 
t.getCostTime()))
-              .sum();
+    if (quotas == null) {
+      quotas = Collections.emptyList();
     }
-    return finishedOccupy + runningOccupy;
+    quotas.removeIf(task -> task.checkExpired(startTime));
+    long finishedTaskQuotaTime =
+        quotas.stream().mapToLong(taskQuota -> 
taskQuota.getQuotaTime(startTime)).sum();
+    long quotaOccupy =
+        processTasks == null
+            ? finishedTaskQuotaTime
+            : finishedTaskQuotaTime
+                + processTasks.stream()
+                    .filter(
+                        t ->
+                            t.getStatus() != TaskRuntime.Status.CANCELED
+                                && t.getStatus() != Status.SUCCESS
+                                && t.getStatus() != TaskRuntime.Status.FAILED)
+                    .mapToLong(
+                        task ->
+                            TaskRuntime.taskRunningQuotaTime(
+                                startTime, endTime, task.getStartTime(), 
task.getCostTime()))
+                    .sum();
+    return quotaOccupy;

Review Comment:
   I tried to polish the codes here to make it more readable:
   ```
     @VisibleForTesting
     static long calculateQuotaOccupy(
         List<OptimizingTaskMeta> processTasks,
         List<TaskRuntime.TaskQuota> quotas,
         long startTime,
         long endTime) {
       long finishedOccupy = 0;
       if (quotas != null) {
         quotas.removeIf(task -> task.checkExpired(startTime));
         finishedOccupy =
             quotas.stream().mapToLong(taskQuota -> 
taskQuota.getQuotaTime(startTime)).sum();
       }
       long runningOccupy = 0;
       if (processTasks != null) {
         runningOccupy = processTasks.stream()
             .filter(
                 t ->
                     t.getStatus() != TaskRuntime.Status.CANCELED
                         && t.getStatus() != Status.SUCCESS
                         && t.getStatus() != TaskRuntime.Status.FAILED)
             .mapToLong(
                 task ->
                     TaskRuntime.taskRunningQuotaTime(
                         startTime, endTime, task.getStartTime(), 
task.getCostTime()))
             .sum();
       }
       return finishedOccupy + runningOccupy;
     }
   ```



##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java:
##########
@@ -192,15 +193,40 @@ private void clearProcess(OptimizingProcess 
optimizingProcess) {
         taskRuntime -> taskRuntime.getTaskId().getProcessId() == 
optimizingProcess.getProcessId());
   }
 
-  public TaskRuntime<?> pollTask(long maxWaitTime) {
+  public TaskRuntime<?> pollTask(long maxWaitTime, boolean enableOverQuota) {

Review Comment:
   Please change the variable name to `breakQuotaLimit`.



##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java:
##########
@@ -24,6 +24,8 @@
 
 public interface OptimizingProcess {
 
+  int getQuotaCount();

Review Comment:
   Is `getQuota` or `getQuotaLimit` a proper name?



##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java:
##########
@@ -192,15 +193,40 @@ private void clearProcess(OptimizingProcess 
optimizingProcess) {
         taskRuntime -> taskRuntime.getTaskId().getProcessId() == 
optimizingProcess.getProcessId());
   }
 
-  public TaskRuntime<?> pollTask(long maxWaitTime) {
+  public TaskRuntime<?> pollTask(long maxWaitTime, boolean enableOverQuota) {
     long deadline = calculateDeadline(maxWaitTime);
     TaskRuntime<?> task = fetchTask();
     while (task == null && waitTask(deadline)) {
       task = fetchTask();
     }
+    if (task == null && enableOverQuota && shouldPollOverQuota()) {
+      task =
+          tableQueue.stream()
+              .map(process -> process.poll(false))
+              .filter(Objects::nonNull)
+              .findFirst()
+              .orElse(null);
+    }
     return task;
   }
 
+  public TaskRuntime<?> pollTask(long maxWaitTime) {
+    return pollTask(maxWaitTime, false);
+  }
+
+  private boolean shouldPollOverQuota() {
+    boolean noPendingTables =

Review Comment:
   Maybe you can check if the `planningTables` is empty to find whether we 
should break the quota limit quickly.



##########
amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java:
##########
@@ -192,15 +193,40 @@ private void clearProcess(OptimizingProcess 
optimizingProcess) {
         taskRuntime -> taskRuntime.getTaskId().getProcessId() == 
optimizingProcess.getProcessId());
   }
 
-  public TaskRuntime<?> pollTask(long maxWaitTime) {
+  public TaskRuntime<?> pollTask(long maxWaitTime, boolean enableOverQuota) {
     long deadline = calculateDeadline(maxWaitTime);
     TaskRuntime<?> task = fetchTask();
     while (task == null && waitTask(deadline)) {
       task = fetchTask();
     }
+    if (task == null && enableOverQuota && shouldPollOverQuota()) {
+      task =
+          tableQueue.stream()

Review Comment:
   Maybe reuse the `fetchScheduledTask` method here, you can add a parameter 
for it.



##########
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java:
##########
@@ -282,14 +285,30 @@ public Pair<List<TableOptimizingInfo>, Integer> 
queryTableOptimizingInfo(
 
     // load quota info
     Map<Long, List<TaskRuntime.TaskQuota>> tableQuotaMap = 
getQuotaTime(tableIds);
+    List<OptimizerInstance> instances = getAs(OptimizerMapper.class, 
OptimizerMapper::selectAll);

Review Comment:
   I tried to polish the codes here:
   ```
   // load quota info
       Map<Long, List<TaskRuntime.TaskQuota>> tableQuotaMap = 
getQuotaTime(tableIds);
       List<OptimizerInstance> instances = getAs(OptimizerMapper.class, 
OptimizerMapper::selectAll);
       Map<String, Integer> optimizerThreadCountMap =
           instances == null
               ? Collections.emptyMap()
               : instances.stream()
                   .collect(
                       Collectors.groupingBy(
                           OptimizerInstance::getGroupName,
                           
Collectors.summingInt(OptimizerInstance::getThreadCount)));
   
       List<TableOptimizingInfo> infos =
           ret.stream()
               .map(
                   meta -> {
                     List<OptimizingTaskMeta> tasks = 
tableTaskMetaMap.get(meta.getTableId());
                     List<TaskRuntime.TaskQuota> quotas = 
tableQuotaMap.get(meta.getTableId());
                     int threadCount =
                         optimizerThreadCountMap
                             .getOrDefault(meta.getOptimizerGroup(), 0);
                     return OptimizingUtil.buildTableOptimizeInfo(
                         meta, tasks, quotas, Math.max(threadCount, 1));
                   })
               .collect(Collectors.toList());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to