This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new c955aba01 [AMORO-3671][Improvement]: Optimizing Allocation and 
Schedule Priority of Optimizer resource for Tables (#3669)
c955aba01 is described below

commit c955aba01ce28304a8a7d5e4d2bb2d432b30df37
Author: cxxiii <[email protected]>
AuthorDate: Wed Aug 6 19:13:10 2025 +0800

    [AMORO-3671][Improvement]: Optimizing Allocation and Schedule Priority of 
Optimizer resource for Tables (#3669)
    
    * optimize the definition and function of quota
    
    * revise the calculation of occupation in scheduling policy
    
    * revise the code format
    
    * modify the default quota from a fixed value to a percentage
    
    * set the quota count to at least 1
    
    * add an option of over quota enabled
    
    * add an option of over quota enabled
    
    correct the format
    
    * allow quota to be a decimal or a value larger than 1
    
    * add some unit testing
    
    * update related doc
    
    * correct format
    
    * [AMORO-3638] Add support for task retries when task execution times out
    
    This commit adds a timeout for the task state `ACK`, which is helpful for 
that
    AMS can't receive the complete notification in any case.
    
    * [Improvement] Add JDK8+ support for `optimizer.sh` (#3702)
    
    [Improvement] Add jvm parameter to optimizer.sh
    
    * [Improvement] Use fileIndex="nomax" to avoid log loss. (#3701)
    
    * [AMORO-3692] Remove deprecated metrics and update related logic in 
MetricsSummary (#3693)
    
    [Hotfix] Remove deprecated metrics and update related logic in 
MetricsSummary
    
    * [AMORO-3686][Improvement]: Use ZK's multi operation to ensure write 
consistency. (#3687)
    
    [Improvement]: Use ZK's multi operation in HighAvailabilityContainer to 
ensure write consistency and fix the followerLath word spelling error #3686
    
    Co-authored-by: wardli <[email protected]>
    
    * [AMORO-3607]Fix syntax error in init sql #3607 (#3681)
    
    * [AMORO-3245] Display comment on Table page. (#3672)
    
    * Display comment in Tables page.
    
    * fixup comment display
    
    ---------
    
    Co-authored-by: 张文领 <[email protected]>
    
    * fix the bug
    
    * remove lib
    
    * polish the codes
    
    * fix error in tests
    
    * rename
    
    * rename
    
    * remove unnecessary interface method
    
    ---------
    
    Co-authored-by: Jzjsnow <[email protected]>
    Co-authored-by: CatchYouIfICan 
<[email protected]>
    Co-authored-by: Qishang Zhong <[email protected]>
    Co-authored-by: can <[email protected]>
    Co-authored-by: wardli <[email protected]>
    Co-authored-by: Nico CHen <[email protected]>
    Co-authored-by: zhangwl9 <[email protected]>
    Co-authored-by: 张文领 <[email protected]>
    Co-authored-by: ZhouJinsong <[email protected]>
---
 .../apache/amoro/server/AmoroManagementConf.java   |   7 +
 .../amoro/server/DefaultOptimizingService.java     |   5 +-
 .../dashboard/model/TableOptimizingInfo.java       |   6 +-
 .../server/dashboard/utils/OptimizingUtil.java     |  49 ++++--
 .../amoro/server/optimizing/OptimizingQueue.java   |  58 ++++++-
 .../amoro/server/table/DefaultOptimizingState.java |  29 +++-
 .../amoro/server/table/DefaultTableManager.java    |  17 +-
 .../org/apache/amoro/server/AmsEnvironment.java    |   1 +
 .../server/dashboard/utils/TestOptimizingUtil.java | 191 +++++++++++++++++++++
 .../server/optimizing/TestOptimizingQueue.java     | 181 ++++++++++++++++++-
 .../src/test/resources/config-with-units.yaml      |   1 +
 .../org/apache/amoro/table/TableProperties.java    |   2 +-
 charts/amoro/templates/amoro-configmap.yaml        |   1 +
 dist/src/main/amoro-bin/conf/config.yaml           |   1 +
 docs/concepts/self-optimizing.md                   |  26 +--
 docs/user-guides/configurations.md                 |   2 +-
 docs/user-guides/using-tables.md                   |   4 +-
 17 files changed, 528 insertions(+), 53 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index a92a5e4d3..ff95d8d0d 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -302,6 +302,13 @@ public class AmoroManagementConf {
           .withDescription(
               "The number of hours that self-optimizing runtime data expire 
interval.");
 
+  public static final ConfigOption<Boolean> 
OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED =
+      ConfigOptions.key("self-optimizing.break-quota-limit-enabled")
+          .booleanType()
+          .defaultValue(true)
+          .withDescription(
+              "Allow the table to break the quota limit when the resource is 
sufficient.");
+
   public static final ConfigOption<Duration> OVERVIEW_CACHE_REFRESH_INTERVAL =
       ConfigOptions.key("overview-cache.refresh-interval")
           .durationType()
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index ffdfd72b8..397c473eb 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -96,6 +96,7 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
   private final long taskExecuteTimeout;
   private final int maxPlanningParallelism;
   private final long pollingTimeout;
+  private final boolean breakQuotaLimit;
   private final long refreshGroupInterval;
   private final Map<String, OptimizingQueue> optimizingQueueByGroup = new 
ConcurrentHashMap<>();
   private final Map<String, OptimizingQueue> optimizingQueueByToken = new 
ConcurrentHashMap<>();
@@ -125,6 +126,8 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
         
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM);
     this.pollingTimeout =
         
serviceConfig.get(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT).toMillis();
+    this.breakQuotaLimit =
+        
serviceConfig.getBoolean(AmoroManagementConf.OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED);
     this.tableService = tableService;
     this.catalogManager = catalogManager;
     this.optimizerManager = optimizerManager;
@@ -215,7 +218,7 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
   public OptimizingTask pollTask(String authToken, int threadId) {
     LOG.debug("Optimizer {} (threadId {}) try polling task", authToken, 
threadId);
     OptimizingQueue queue = getQueueByToken(authToken);
-    return Optional.ofNullable(queue.pollTask(pollingTimeout))
+    return Optional.ofNullable(queue.pollTask(pollingTimeout, breakQuotaLimit))
         .map(task -> extractOptimizingTask(task, authToken, threadId, queue))
         .orElse(null);
   }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableOptimizingInfo.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableOptimizingInfo.java
index 92797900d..0a2a922b9 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableOptimizingInfo.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableOptimizingInfo.java
@@ -43,7 +43,7 @@ public class TableOptimizingInfo {
   private long duration = 0;
   private long fileCount = 0;
   private long fileSize = 0;
-  private double quota = 0.0;
+  private int quota = 0;
   private double quotaOccupation = 0.0;
 
   private String groupName = TableProperties.SELF_OPTIMIZING_GROUP_DEFAULT;
@@ -88,11 +88,11 @@ public class TableOptimizingInfo {
     this.fileSize = fileSize;
   }
 
-  public double getQuota() {
+  public int getQuota() {
     return quota;
   }
 
-  public void setQuota(double quota) {
+  public void setQuota(int quota) {
     this.quota = quota;
   }
 
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/OptimizingUtil.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/OptimizingUtil.java
index 148ac2e76..ddb293570 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/OptimizingUtil.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/OptimizingUtil.java
@@ -22,13 +22,18 @@ import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.config.OptimizingConfig;
 import org.apache.amoro.optimizing.MetricsSummary;
 import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
+import org.apache.amoro.server.AmoroServiceConstants;
 import org.apache.amoro.server.dashboard.model.TableOptimizingInfo;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
 import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
 import org.apache.amoro.server.optimizing.TaskRuntime;
+import org.apache.amoro.server.optimizing.TaskRuntime.Status;
 import org.apache.amoro.server.persistence.TableRuntimeMeta;
+import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
 import org.apache.amoro.table.descriptor.FilesStatistics;
 
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -42,7 +47,8 @@ public class OptimizingUtil {
   public static TableOptimizingInfo buildTableOptimizeInfo(
       TableRuntimeMeta optimizingTableRuntime,
       List<OptimizingTaskMeta> processTasks,
-      List<TaskRuntime.TaskQuota> quotas) {
+      List<TaskRuntime.TaskQuota> quotas,
+      int threadCount) {
     ServerTableIdentifier identifier =
         ServerTableIdentifier.of(
             optimizingTableRuntime.getTableId(),
@@ -57,14 +63,19 @@ public class OptimizingUtil {
         System.currentTimeMillis() - 
optimizingTableRuntime.getCurrentStatusStartTime());
     OptimizingConfig optimizingConfig =
         optimizingTableRuntime.getTableConfig().getOptimizingConfig();
-    tableOptimizeInfo.setQuota(optimizingConfig.getTargetQuota());
-    double quotaOccupy =
-        calculateQuotaOccupy(
-            processTasks,
-            quotas,
-            optimizingTableRuntime.getCurrentStatusStartTime(),
-            System.currentTimeMillis());
-    tableOptimizeInfo.setQuotaOccupation(quotaOccupy);
+    double targetQuota = optimizingConfig.getTargetQuota();
+    tableOptimizeInfo.setQuota(
+        targetQuota > 1 ? (int) targetQuota : (int) Math.ceil(targetQuota * 
threadCount));
+
+    long endTime = System.currentTimeMillis();
+    long startTime = System.currentTimeMillis() - 
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME;
+    long quotaOccupy = calculateQuotaOccupy(processTasks, quotas, startTime, 
endTime);
+    double quotaOccupation =
+        (double) quotaOccupy
+            / (AmoroServiceConstants.QUOTA_LOOK_BACK_TIME * 
tableOptimizeInfo.getQuota());
+    tableOptimizeInfo.setQuotaOccupation(
+        BigDecimal.valueOf(quotaOccupation).setScale(4, 
RoundingMode.HALF_UP).doubleValue());
+
     FilesStatistics optimizeFileInfo;
     if (optimizingStatus.isProcessing()) {
       MetricsSummary summary = null;
@@ -89,23 +100,31 @@ public class OptimizingUtil {
     return tableOptimizeInfo;
   }
 
-  private static double calculateQuotaOccupy(
+  @VisibleForTesting
+  static long calculateQuotaOccupy(
       List<OptimizingTaskMeta> processTasks,
       List<TaskRuntime.TaskQuota> quotas,
       long startTime,
       long endTime) {
-    double finishedOccupy = 0;
+    long finishedOccupy = 0;
     if (quotas != null) {
-      finishedOccupy = quotas.stream().mapToDouble(q -> 
q.getQuotaTime(startTime)).sum();
+      quotas.removeIf(task -> task.checkExpired(startTime));
+      finishedOccupy =
+          quotas.stream().mapToLong(taskQuota -> 
taskQuota.getQuotaTime(startTime)).sum();
     }
-    double runningOccupy = 0;
+    long runningOccupy = 0;
     if (processTasks != null) {
       runningOccupy =
           processTasks.stream()
-              .mapToDouble(
+              .filter(
                   t ->
+                      t.getStatus() != TaskRuntime.Status.CANCELED
+                          && t.getStatus() != Status.SUCCESS
+                          && t.getStatus() != TaskRuntime.Status.FAILED)
+              .mapToLong(
+                  task ->
                       TaskRuntime.taskRunningQuotaTime(
-                          startTime, endTime, t.getStartTime(), 
t.getCostTime()))
+                          startTime, endTime, task.getStartTime(), 
task.getCostTime()))
               .sum();
     }
     return finishedOccupy + runningOccupy;
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index 65b2daf03..ed8464fe3 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -70,9 +70,11 @@ import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedTransferQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -96,6 +98,8 @@ public class OptimizingQueue extends PersistentBase {
   private final int maxPlanningParallelism;
   private final OptimizerGroupMetrics metrics;
   private ResourceGroup optimizerGroup;
+  private final Map<ServerTableIdentifier, AtomicInteger> optimizingTasksMap =
+      new ConcurrentHashMap<>();
 
   public OptimizingQueue(
       CatalogManager catalogManager,
@@ -196,15 +200,22 @@ public class OptimizingQueue extends PersistentBase {
         taskRuntime -> taskRuntime.getTaskId().getProcessId() == 
optimizingProcess.getProcessId());
   }
 
-  public TaskRuntime<?> pollTask(long maxWaitTime) {
+  public TaskRuntime<?> pollTask(long maxWaitTime, boolean breakQuotaLimit) {
     long deadline = calculateDeadline(maxWaitTime);
     TaskRuntime<?> task = fetchTask();
     while (task == null && waitTask(deadline)) {
       task = fetchTask();
     }
+    if (task == null && breakQuotaLimit && planningTables.isEmpty()) {
+      task = fetchScheduledTask(false);
+    }
     return task;
   }
 
+  public TaskRuntime<?> pollTask(long maxWaitTime) {
+    return pollTask(maxWaitTime, false);
+  }
+
   private long calculateDeadline(long maxWaitTime) {
     long deadline = System.currentTimeMillis() + maxWaitTime;
     return deadline <= 0 ? Long.MAX_VALUE : deadline;
@@ -227,12 +238,12 @@ public class OptimizingQueue extends PersistentBase {
 
   private TaskRuntime<?> fetchTask() {
     TaskRuntime<?> task = retryTaskQueue.poll();
-    return task != null ? task : fetchScheduledTask();
+    return task != null ? task : fetchScheduledTask(true);
   }
 
-  private TaskRuntime<?> fetchScheduledTask() {
+  private TaskRuntime<?> fetchScheduledTask(boolean needQuotaChecking) {
     return tableQueue.stream()
-        .map(TableOptimizingProcess::poll)
+        .map(process -> process.poll(needQuotaChecking))
         .filter(Objects::nonNull)
         .findFirst()
         .orElse(null);
@@ -422,12 +433,21 @@ public class OptimizingQueue extends PersistentBase {
     private Map<String, Long> toSequence = Maps.newHashMap();
     private boolean hasCommitted = false;
 
-    public TaskRuntime<?> poll() {
+    public TaskRuntime<?> poll(boolean needQuotaChecking) {
       if (lock.tryLock()) {
         try {
-          return status != ProcessStatus.KILLED && status != 
ProcessStatus.FAILED
-              ? taskQueue.poll()
-              : null;
+          TaskRuntime<?> task = null;
+          if (status != ProcessStatus.KILLED && status != 
ProcessStatus.FAILED) {
+            if (!needQuotaChecking || getActualQuota() < getQuotaLimit()) {
+              task = taskQueue.poll();
+            }
+          }
+          if (task != null) {
+            optimizingTasksMap
+                .computeIfAbsent(optimizingState.getTableIdentifier(), k -> 
new AtomicInteger(0))
+                .incrementAndGet();
+          }
+          return task;
         } finally {
           lock.unlock();
         }
@@ -468,6 +488,14 @@ public class OptimizingQueue extends PersistentBase {
       loadTaskRuntimes(this);
     }
 
+    private int getQuotaLimit() {
+      double targetQuota =
+          
optimizingState.getTableConfiguration().getOptimizingConfig().getTargetQuota();
+      return targetQuota > 1
+          ? (int) targetQuota
+          : (int) Math.ceil(targetQuota * getAvailableCore());
+    }
+
     @Override
     public long getProcessId() {
       return processId;
@@ -501,6 +529,14 @@ public class OptimizingQueue extends PersistentBase {
     private void acceptResult(TaskRuntime<?> taskRuntime) {
       lock.lock();
       try {
+        optimizingTasksMap.computeIfPresent(
+            optimizingState.getTableIdentifier(),
+            (k, v) -> {
+              if (v.get() > 0) {
+                v.decrementAndGet();
+              }
+              return v;
+            });
         try {
           optimizingState.addTaskQuota(taskRuntime.getCurrentQuota());
         } catch (Throwable throwable) {
@@ -609,6 +645,12 @@ public class OptimizingQueue extends PersistentBase {
           .sum();
     }
 
+    public int getActualQuota() {
+      return optimizingTasksMap
+          .getOrDefault(optimizingState.getTableIdentifier(), new 
AtomicInteger(0))
+          .get();
+    }
+
     @Override
     public void commit() {
       LOG.debug(
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
index c4b2993fd..68204e5eb 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
@@ -38,9 +38,11 @@ import org.apache.amoro.server.optimizing.OptimizingStatus;
 import org.apache.amoro.server.optimizing.TaskRuntime;
 import org.apache.amoro.server.persistence.StatedPersistentBase;
 import org.apache.amoro.server.persistence.TableRuntimeMeta;
+import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
 import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
 import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
 import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.resource.OptimizerInstance;
 import org.apache.amoro.server.table.blocker.TableBlocker;
 import org.apache.amoro.server.utils.IcebergTableUtil;
 import org.apache.amoro.server.utils.SnowflakeIdGenerator;
@@ -54,8 +56,6 @@ import org.apache.iceberg.Snapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.math.BigDecimal;
-import java.math.RoundingMode;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -520,7 +520,7 @@ public class DefaultOptimizingState extends 
StatedPersistentBase implements Proc
     return tableConfiguration.getOptimizingConfig().isEnabled();
   }
 
-  public Double getTargetQuota() {
+  public double getTargetQuota() {
     return tableConfiguration.getOptimizingConfig().getTargetQuota();
   }
 
@@ -627,12 +627,23 @@ public class DefaultOptimizingState extends 
StatedPersistentBase implements Proc
   }
 
   public double calculateQuotaOccupy() {
-    return new BigDecimal(
-            (double) getQuotaTime()
-                / AmoroServiceConstants.QUOTA_LOOK_BACK_TIME
-                / tableConfiguration.getOptimizingConfig().getTargetQuota())
-        .setScale(4, RoundingMode.HALF_UP)
-        .doubleValue();
+    double targetQuota = 
tableConfiguration.getOptimizingConfig().getTargetQuota();
+    int targetQuotaLimit =
+        targetQuota > 1 ? (int) targetQuota : (int) Math.ceil(targetQuota * 
getThreadCount());
+    return (double) getQuotaTime() / 
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME / targetQuotaLimit;
+  }
+
+  public int getThreadCount() {
+    List<OptimizerInstance> instances = getAs(OptimizerMapper.class, 
OptimizerMapper::selectAll);
+    if (instances == null || instances.isEmpty()) {
+      return 1;
+    }
+    return Math.max(
+        instances.stream()
+            .filter(instance -> optimizerGroup.equals(instance.getGroupName()))
+            .mapToInt(OptimizerInstance::getThreadCount)
+            .sum(),
+        1);
   }
 
   /**
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
index 558443963..826d59b7b 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
@@ -41,9 +41,11 @@ import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
 import org.apache.amoro.server.optimizing.TaskRuntime;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.server.persistence.TableRuntimeMeta;
+import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
 import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
 import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
 import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.resource.OptimizerInstance;
 import org.apache.amoro.server.table.blocker.TableBlocker;
 import org.apache.amoro.server.utils.SnowflakeIdGenerator;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
@@ -56,6 +58,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -282,6 +285,15 @@ public class DefaultTableManager extends PersistentBase 
implements TableManager
 
     // load quota info
     Map<Long, List<TaskRuntime.TaskQuota>> tableQuotaMap = 
getQuotaTime(tableIds);
+    List<OptimizerInstance> instances = getAs(OptimizerMapper.class, 
OptimizerMapper::selectAll);
+    Map<String, Integer> optimizerThreadCountMap =
+        instances == null
+            ? Collections.emptyMap()
+            : instances.stream()
+                .collect(
+                    Collectors.groupingBy(
+                        OptimizerInstance::getGroupName,
+                        
Collectors.summingInt(OptimizerInstance::getThreadCount)));
 
     List<TableOptimizingInfo> infos =
         ret.stream()
@@ -289,7 +301,10 @@ public class DefaultTableManager extends PersistentBase 
implements TableManager
                 meta -> {
                   List<OptimizingTaskMeta> tasks = 
tableTaskMetaMap.get(meta.getTableId());
                   List<TaskRuntime.TaskQuota> quotas = 
tableQuotaMap.get(meta.getTableId());
-                  return OptimizingUtil.buildTableOptimizeInfo(meta, tasks, 
quotas);
+                  int threadCount =
+                      
optimizerThreadCountMap.getOrDefault(meta.getOptimizerGroup(), 0);
+                  return OptimizingUtil.buildTableOptimizeInfo(
+                      meta, tasks, quotas, Math.max(threadCount, 1));
                 })
             .collect(Collectors.toList());
     return Pair.of(infos, total);
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
index ff09d575e..39384c68c 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
@@ -370,6 +370,7 @@ public class AmsEnvironment {
         + "    commit-thread-count: 10\n"
         + "    runtime-data-keep-days: 30\n"
         + "    runtime-data-expire-interval-hours: 1\n"
+        + "    break-quota-limit-enabled: true\n"
         + "\n"
         + "  database:\n"
         + "    type: \"derby\"\n"
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/utils/TestOptimizingUtil.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/utils/TestOptimizingUtil.java
new file mode 100644
index 000000000..a22ee6bdc
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/utils/TestOptimizingUtil.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.dashboard.utils;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.api.OptimizingTaskId;
+import org.apache.amoro.api.OptimizingTaskResult;
+import org.apache.amoro.catalog.BasicCatalogTestHelper;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.io.MixedDataTestHelpers;
+import org.apache.amoro.optimizing.RewriteFilesOutput;
+import org.apache.amoro.optimizing.TableOptimizing;
+import org.apache.amoro.resource.ResourceGroup;
+import org.apache.amoro.server.AmoroServiceConstants;
+import org.apache.amoro.server.optimizing.OptimizingQueue;
+import org.apache.amoro.server.optimizing.OptimizingStatus;
+import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
+import org.apache.amoro.server.optimizing.TaskRuntime;
+import org.apache.amoro.server.persistence.TableRuntimeMeta;
+import org.apache.amoro.server.resource.OptimizerThread;
+import org.apache.amoro.server.resource.QuotaProvider;
+import org.apache.amoro.server.table.AMSTableTestBase;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.server.table.TableConfigurations;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.UnkeyedTable;
+import org.apache.amoro.utils.SerializationUtil;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.data.Record;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+@RunWith(Parameterized.class)
+public class TestOptimizingUtil extends AMSTableTestBase {
+  private final long MAX_POLLING_TIME = 5000;
+  private final Executor planExecutor = Executors.newSingleThreadExecutor();
+  private final QuotaProvider quotaProvider = resourceGroup -> 1;
+
+  private final OptimizerThread optimizerThread =
+      new OptimizerThread(1, null) {
+
+        @Override
+        public String getToken() {
+          return "aah";
+        }
+      };
+
+  public TestOptimizingUtil(CatalogTestHelper catalogTestHelper, 
TableTestHelper tableTestHelper) {
+    super(catalogTestHelper, tableTestHelper, true);
+  }
+
+  @Parameterized.Parameters(name = "{0}, {1}")
+  public static Object[] parameters() {
+    return new Object[][] {
+      {new BasicCatalogTestHelper(TableFormat.ICEBERG), new 
BasicTableTestHelper(false, true)}
+    };
+  }
+
+  @Test
+  public void testCalculateQuotaOccupy() {
+    long endTime = System.currentTimeMillis();
+    long startTime = endTime - AmoroServiceConstants.QUOTA_LOOK_BACK_TIME;
+
+    Assertions.assertEquals(0, OptimizingUtil.calculateQuotaOccupy(null, null, 
startTime, endTime));
+
+    DefaultTableRuntime tableRuntime = initTableWithFiles();
+    OptimizingQueue queue = buildOptimizingGroupService(tableRuntime);
+    Assert.assertEquals(0, queue.collectTasks().size());
+    TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+    task.schedule(optimizerThread);
+    task.ack(optimizerThread);
+    Assert.assertEquals(
+        1, queue.collectTasks(t -> t.getStatus() == 
TaskRuntime.Status.ACKED).size());
+    Assert.assertNotNull(task);
+    task.complete(
+        optimizerThread,
+        buildOptimizingTaskResult(task.getTaskId(), 
optimizerThread.getThreadId()));
+    Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
+
+    List<TaskRuntime.TaskQuota> quotas = new ArrayList<>();
+    TaskRuntime.TaskQuota quota = task.getCurrentQuota();
+    quotas.add(quota);
+    Assertions.assertTrue(
+        OptimizingUtil.calculateQuotaOccupy(null, quotas, startTime, endTime) 
> 0);
+
+    List<OptimizingTaskMeta> tasks = new ArrayList<>();
+    OptimizingTaskMeta taskMeta = new OptimizingTaskMeta();
+    taskMeta.setStatus(TaskRuntime.Status.ACKED);
+    taskMeta.setStartTime(endTime - 2000);
+    taskMeta.setCostTime(1000);
+    tasks.add(taskMeta);
+    Assertions.assertEquals(
+        1000, OptimizingUtil.calculateQuotaOccupy(tasks, null, startTime, 
endTime));
+
+    Assertions.assertTrue(
+        OptimizingUtil.calculateQuotaOccupy(tasks, quotas, startTime, endTime)
+            > OptimizingUtil.calculateQuotaOccupy(null, quotas, startTime, 
endTime));
+    Assertions.assertTrue(
+        OptimizingUtil.calculateQuotaOccupy(tasks, quotas, startTime, endTime)
+            > OptimizingUtil.calculateQuotaOccupy(tasks, null, startTime, 
endTime));
+  }
+
+  protected OptimizingQueue buildOptimizingGroupService(DefaultTableRuntime 
tableRuntime) {
+    return new OptimizingQueue(
+        CATALOG_MANAGER,
+        testResourceGroup(),
+        quotaProvider,
+        planExecutor,
+        Collections.singletonList(tableRuntime),
+        1);
+  }
+
+  protected static ResourceGroup testResourceGroup() {
+    return new ResourceGroup.Builder("test", "local").build();
+  }
+
+  protected DefaultTableRuntime initTableWithFiles() {
+    MixedTable mixedTable =
+        (MixedTable) 
tableService().loadTable(serverTableIdentifier()).originalTable();
+    appendData(mixedTable.asUnkeyedTable(), 1);
+    appendData(mixedTable.asUnkeyedTable(), 2);
+    DefaultTableRuntime tableRuntime =
+        buildTableRuntimeMeta(OptimizingStatus.PENDING, 
defaultResourceGroup());
+
+    
tableRuntime.getOptimizingState().refresh(tableService().loadTable(serverTableIdentifier()));
+    return tableRuntime;
+  }
+
+  private void appendData(UnkeyedTable table, int id) {
+    ArrayList<Record> newRecords =
+        Lists.newArrayList(
+            MixedDataTestHelpers.createRecord(
+                table.schema(), id, "111", 0L, "2022-01-01T12:00:00"));
+    List<DataFile> dataFiles = MixedDataTestHelpers.writeBaseStore(table, 0L, 
newRecords, false);
+    AppendFiles appendFiles = table.newAppend();
+    dataFiles.forEach(appendFiles::appendFile);
+    appendFiles.commit();
+  }
+
+  private DefaultTableRuntime buildTableRuntimeMeta(
+      OptimizingStatus status, ResourceGroup resourceGroup) {
+    MixedTable mixedTable =
+        (MixedTable) 
tableService().loadTable(serverTableIdentifier()).originalTable();
+    TableRuntimeMeta tableRuntimeMeta = new TableRuntimeMeta();
+    tableRuntimeMeta.setCatalogName(serverTableIdentifier().getCatalog());
+    tableRuntimeMeta.setDbName(serverTableIdentifier().getDatabase());
+    tableRuntimeMeta.setTableName(serverTableIdentifier().getTableName());
+    tableRuntimeMeta.setTableId(serverTableIdentifier().getId());
+    tableRuntimeMeta.setFormat(TableFormat.ICEBERG);
+    tableRuntimeMeta.setTableStatus(status);
+    
tableRuntimeMeta.setTableConfig(TableConfigurations.parseTableConfig(mixedTable.properties()));
+    tableRuntimeMeta.setOptimizerGroup(resourceGroup.getName());
+    return new DefaultTableRuntime(tableRuntimeMeta, tableService());
+  }
+
+  private OptimizingTaskResult buildOptimizingTaskResult(OptimizingTaskId 
taskId, int threadId) {
+    TableOptimizing.OptimizingOutput output = new RewriteFilesOutput(null, 
null, null);
+    OptimizingTaskResult optimizingTaskResult = new 
OptimizingTaskResult(taskId, threadId);
+    
optimizingTaskResult.setTaskOutput(SerializationUtil.simpleSerialize(output));
+    return optimizingTaskResult;
+  }
+}
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
index 506624e19..4cec592f7 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
@@ -31,6 +31,7 @@ import static 
org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER
 import static 
org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_THREADS;
 
 import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.api.OptimizerRegisterInfo;
@@ -62,6 +63,7 @@ import org.apache.amoro.table.UnkeyedTable;
 import org.apache.amoro.utils.SerializationUtil;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.Record;
 import org.junit.Assert;
 import org.junit.Test;
@@ -168,6 +170,125 @@ public class TestOptimizingQueue extends AMSTableTestBase 
{
     queue.dispose();
   }
 
+  @Test
+  public void testPollTaskWithOverQuotaDisabled() {
+    DefaultTableRuntime tableRuntime = initTableWithPartitionedFiles();
+    OptimizingQueue queue =
+        new OptimizingQueue(
+            CATALOG_MANAGER,
+            testResourceGroup(),
+            resourceGroup -> 2,
+            planExecutor,
+            Collections.singletonList(tableRuntime),
+            1);
+
+    TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+    Assert.assertNotNull(task);
+    Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
+    task.schedule(optimizerThread);
+    task.ack(optimizerThread);
+    Assert.assertEquals(
+        1, queue.collectTasks(t -> t.getStatus() == 
TaskRuntime.Status.ACKED).size());
+    Assert.assertNotNull(task);
+
+    TaskRuntime task2 = queue.pollTask(MAX_POLLING_TIME);
+    Assert.assertNull(task2);
+
+    task.complete(
+        optimizerThread,
+        buildOptimizingTaskResult(task.getTaskId(), 
optimizerThread.getThreadId()));
+    Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
+
+    TaskRuntime retryTask = queue.pollTask(MAX_POLLING_TIME);
+    Assert.assertNotNull(retryTask);
+
+    queue.dispose();
+  }
+
+  @Test
+  public void testPollTaskWithOverQuotaEnabled() {
+    DefaultTableRuntime tableRuntime = initTableWithPartitionedFiles();
+    OptimizingQueue queue =
+        new OptimizingQueue(
+            CATALOG_MANAGER,
+            testResourceGroup(),
+            resourceGroup -> 2,
+            planExecutor,
+            Collections.singletonList(tableRuntime),
+            1);
+
+    TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+    Assert.assertNotNull(task);
+    Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
+    task.schedule(optimizerThread);
+    task.ack(optimizerThread);
+    Assert.assertEquals(
+        1, queue.collectTasks(t -> t.getStatus() == 
TaskRuntime.Status.ACKED).size());
+    Assert.assertNotNull(task);
+
+    TaskRuntime task2 = queue.pollTask(MAX_POLLING_TIME, true);
+    Assert.assertNotNull(task2);
+
+    task.complete(
+        optimizerThread,
+        buildOptimizingTaskResult(task.getTaskId(), 
optimizerThread.getThreadId()));
+    Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
+    TaskRuntime task4 = queue.pollTask(MAX_POLLING_TIME);
+    Assert.assertNull(task4);
+    TaskRuntime retryTask = queue.pollTask(MAX_POLLING_TIME, true);
+    Assert.assertNotNull(retryTask);
+    queue.dispose();
+  }
+
+  @Test
+  public void testQuotaSchedulePolicy() {
+    DefaultTableRuntime tableRuntime = initTableWithFiles();
+
+    OptimizingQueue queue =
+        new OptimizingQueue(
+            CATALOG_MANAGER,
+            testResourceGroup(),
+            resourceGroup -> 2,
+            planExecutor,
+            Collections.singletonList(tableRuntime),
+            1);
+    TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+    task.schedule(optimizerThread);
+    task.ack(optimizerThread);
+    Assert.assertEquals(
+        1, queue.collectTasks(t -> t.getStatus() == 
TaskRuntime.Status.ACKED).size());
+    Assert.assertNotNull(task);
+    Assert.assertTrue(tableRuntime.getTableIdentifier().getId() == 
task.getTableId());
+    task.complete(
+        optimizerThread,
+        buildOptimizingTaskResult(task.getTaskId(), 
optimizerThread.getThreadId()));
+    Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
+    OptimizingProcess optimizingProcess = 
tableRuntime.getOptimizingState().getOptimizingProcess();
+    Assert.assertEquals(ProcessStatus.RUNNING, optimizingProcess.getStatus());
+    optimizingProcess.commit();
+    Assert.assertEquals(ProcessStatus.SUCCESS, optimizingProcess.getStatus());
+    
Assert.assertNull(tableRuntime.getOptimizingState().getOptimizingProcess());
+
+    tableRuntime = initTableWithPartitionedFiles();
+    ServerTableIdentifier serverTableIdentifier =
+        ServerTableIdentifier.of(
+            org.apache.amoro.table.TableIdentifier.of(
+                serverTableIdentifier().getCatalog(), "db", "test_table2"),
+            TableFormat.ICEBERG);
+    serverTableIdentifier.setId(2L);
+    DefaultTableRuntime tableRuntime2 = createTable(serverTableIdentifier);
+    queue.refreshTable(tableRuntime2);
+    queue.refreshTable(tableRuntime);
+
+    TaskRuntime task2 = queue.pollTask(MAX_POLLING_TIME);
+    Assert.assertNotNull(task2);
+    Assert.assertTrue(tableRuntime2.getTableIdentifier().getId() == 
task2.getTableId());
+    TaskRuntime task3 = queue.pollTask(MAX_POLLING_TIME);
+    Assert.assertNotNull(task3);
+    Assert.assertTrue(tableRuntime.getTableIdentifier().getId() == 
task3.getTableId());
+    queue.dispose();
+  }
+
   @Test
   public void testRetryTask() {
     DefaultTableRuntime tableRuntimeMeta = initTableWithFiles();
@@ -185,7 +306,7 @@ public class TestOptimizingQueue extends AMSTableTestBase {
       retryTask.ack(optimizerThread);
       retryTask.complete(
           optimizerThread,
-          buildOptimizingTaskFailed(task.getTaskId(), 
optimizerThread.getThreadId()));
+          buildOptimizingTaskFailed(retryTask.getTaskId(), 
optimizerThread.getThreadId()));
       Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
     }
 
@@ -376,6 +497,18 @@ public class TestOptimizingQueue extends AMSTableTestBase {
     return tableRuntime;
   }
 
+  protected DefaultTableRuntime initTableWithPartitionedFiles() {
+    MixedTable mixedTable =
+        (MixedTable) 
tableService().loadTable(serverTableIdentifier()).originalTable();
+    appendPartitionedData(mixedTable.asUnkeyedTable(), 1);
+    appendPartitionedData(mixedTable.asUnkeyedTable(), 2);
+    DefaultTableRuntime tableRuntime =
+        buildTableRuntimeMeta(OptimizingStatus.PENDING, 
defaultResourceGroup());
+
+    
tableRuntime.getOptimizingState().refresh(tableService().loadTable(serverTableIdentifier()));
+    return tableRuntime;
+  }
+
   private DefaultTableRuntime buildTableRuntimeMeta(
       OptimizingStatus status, ResourceGroup resourceGroup) {
     MixedTable mixedTable =
@@ -392,6 +525,21 @@ public class TestOptimizingQueue extends AMSTableTestBase {
     return new DefaultTableRuntime(tableRuntimeMeta, tableService());
   }
 
+  private void appendPartitionedData(UnkeyedTable table, int id) {
+    ArrayList<Record> newRecords =
+        Lists.newArrayList(
+            MixedDataTestHelpers.createRecord(
+                table.schema(), id, "111", 0L, "2022-01-01T12:00:00"));
+    newRecords.add(
+        MixedDataTestHelpers.createRecord(table.schema(), id, "222", 0L, 
"2022-01-02T12:00:00"));
+    newRecords.add(
+        MixedDataTestHelpers.createRecord(table.schema(), id, "333", 0L, 
"2022-01-03T12:00:00"));
+    List<DataFile> dataFiles = MixedDataTestHelpers.writeBaseStore(table, 0L, 
newRecords, false);
+    AppendFiles appendFiles = table.newAppend();
+    dataFiles.forEach(appendFiles::appendFile);
+    appendFiles.commit();
+  }
+
   private void appendData(UnkeyedTable table, int id) {
     ArrayList<Record> newRecords =
         Lists.newArrayList(
@@ -403,6 +551,37 @@ public class TestOptimizingQueue extends AMSTableTestBase {
     appendFiles.commit();
   }
 
+  private DefaultTableRuntime createTable(ServerTableIdentifier 
serverTableIdentifier) {
+    org.apache.iceberg.catalog.Catalog catalog =
+        catalogTestHelper().buildIcebergCatalog(catalogMeta());
+    catalog.createTable(
+        TableIdentifier.of(
+            serverTableIdentifier.getDatabase(), 
serverTableIdentifier.getTableName()),
+        tableTestHelper().tableSchema(),
+        tableTestHelper().partitionSpec(),
+        tableTestHelper().tableProperties());
+
+    MixedTable mixedTable =
+        (MixedTable) 
tableService().loadTable(serverTableIdentifier).originalTable();
+    appendPartitionedData(mixedTable.asUnkeyedTable(), 1);
+    appendPartitionedData(mixedTable.asUnkeyedTable(), 2);
+
+    TableRuntimeMeta tableRuntimeMeta = new TableRuntimeMeta();
+    tableRuntimeMeta.setCatalogName(serverTableIdentifier.getCatalog());
+    tableRuntimeMeta.setDbName(serverTableIdentifier.getDatabase());
+    tableRuntimeMeta.setTableName(serverTableIdentifier.getTableName());
+    tableRuntimeMeta.setTableId(serverTableIdentifier.getId());
+    tableRuntimeMeta.setFormat(TableFormat.ICEBERG);
+    tableRuntimeMeta.setTableStatus(OptimizingStatus.PENDING);
+    
tableRuntimeMeta.setTableConfig(TableConfigurations.parseTableConfig(mixedTable.properties()));
+    tableRuntimeMeta.setOptimizerGroup(defaultResourceGroup().getName());
+    DefaultTableRuntime tableRuntime = new 
DefaultTableRuntime(tableRuntimeMeta, tableService());
+
+    
tableRuntime.getOptimizingState().refresh(tableService().loadTable(serverTableIdentifier));
+
+    return tableRuntime;
+  }
+
   private OptimizingTaskResult buildOptimizingTaskResult(OptimizingTaskId 
taskId, int threadId) {
     TableOptimizing.OptimizingOutput output = new RewriteFilesOutput(null, 
null, null);
     OptimizingTaskResult optimizingTaskResult = new 
OptimizingTaskResult(taskId, threadId);
diff --git a/amoro-ams/src/test/resources/config-with-units.yaml 
b/amoro-ams/src/test/resources/config-with-units.yaml
index 40f24cf4c..9f72540a9 100644
--- a/amoro-ams/src/test/resources/config-with-units.yaml
+++ b/amoro-ams/src/test/resources/config-with-units.yaml
@@ -51,6 +51,7 @@ ams:
     runtime-data-keep-days: 30
     runtime-data-expire-interval-hours: 1
     refresh-group-interval: 30s
+    break-quota-limit-enabled: true
 
   optimizer:
     heart-beat-timeout: 1min
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
index 3da2922ea..1ff9d541f 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
@@ -76,7 +76,7 @@ public class TableProperties {
   public static final String SELF_OPTIMIZING_GROUP_DEFAULT = "default";
 
   public static final String SELF_OPTIMIZING_QUOTA = "self-optimizing.quota";
-  public static final double SELF_OPTIMIZING_QUOTA_DEFAULT = 0.1;
+  public static final double SELF_OPTIMIZING_QUOTA_DEFAULT = 0.5;
 
   public static final String SELF_OPTIMIZING_EXECUTE_RETRY_NUMBER =
       "self-optimizing.execute.num-retries";
diff --git a/charts/amoro/templates/amoro-configmap.yaml 
b/charts/amoro/templates/amoro-configmap.yaml
index a57b79c2f..6dbb7ec29 100644
--- a/charts/amoro/templates/amoro-configmap.yaml
+++ b/charts/amoro/templates/amoro-configmap.yaml
@@ -77,6 +77,7 @@ data:
         commit-thread-count: 10
         runtime-data-keep-days: 30
         runtime-data-expire-interval-hours: 1
+        break-quota-limit-enabled: true
 
       optimizer:
         heart-beat-timeout: 1min # 60000
diff --git a/dist/src/main/amoro-bin/conf/config.yaml 
b/dist/src/main/amoro-bin/conf/config.yaml
index 7503317d0..f628433b7 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -51,6 +51,7 @@ ams:
     runtime-data-keep-days: 30
     runtime-data-expire-interval-hours: 1
     refresh-group-interval: 30s
+    break-quota-limit-enabled: true
 
   optimizer:
     heart-beat-timeout: 1min # 60000
diff --git a/docs/concepts/self-optimizing.md b/docs/concepts/self-optimizing.md
index 285fade2c..bac381592 100644
--- a/docs/concepts/self-optimizing.md
+++ b/docs/concepts/self-optimizing.md
@@ -147,22 +147,26 @@ Currently, there are two main scheduling policies 
available: `Quota` and `Balanc
 ### Quota
 
 The `Quota` strategy is a scheduling policy that schedules based on resource 
usage. The Self-optimizing resource usage of a single table is managed
-by configuring the quota configuration on the table:
+by configuring the quota configuration on the table. Quota specifies the 
maximum number of optimizer resources that can be allocated to each table.
+Quotas can be specified as either a decimal (representing a percentage) or an 
integer (representing a fixed number of resources):
 
 ```SQL
--- Quota for Self-optimizing, indicating the CPU resource the table can take up
-self-optimizing.quota = 0.1;
+-- Set quota as a percentage of total optimizer resources
+self-optimizing.quota = 0.5;
 ```
-Quota defines the maximum CPU usage that a single table can use, but 
Self-optimizing is actually executed in a distributed manner, and actual 
resource
-usage is dynamically managed based on actual execution time.In the optimizing 
management Web UI, the dynamic quota usage of a single table can be
-viewed through the "Quota Occupy" metric. From a design perspective, the quota 
occupy metric should dynamically approach 100%.
+A decimal quota (e.g., 0.5) limits the table to a percentage of the total 
available optimizer resources.
 
-In a platform, two situations may occur: overselling and overbuying.
+```SQL
+-- Set quota as a fixed number of optimizer resources
+self-optimizing.quota = 10;
+```
+An integer quota (e.g., 10) restricts the table to a specific number of 
optimizer resources.
+
+This flexible configuration prevents resource underutilization and allows 
users to tailor resource allocation to their needs.
+
+The `Quota` strategy schedules tables based on their Occupation metric, which 
is calculated as the ratio of the actual optimizer thread execution time used
+by a table to its quota execution time within the QUOTA_LOOK_BACK_TIME window. 
Tables with lower Occupation are given higher scheduling priority.
 
-- Overselling — If all optimizer configurations exceed the total quota of all 
table configurations, the quota occupy metric may dynamically approach
-above 100%
-- Overbuying — If all optimizer configurations are lower than the total quota 
of all table configurations, the quota occupy metric should dynamically
-approach below 100%
 
 ### Balanced
 
diff --git a/docs/user-guides/configurations.md 
b/docs/user-guides/configurations.md
index 5fcde8842..0d5c24531 100644
--- a/docs/user-guides/configurations.md
+++ b/docs/user-guides/configurations.md
@@ -47,7 +47,7 @@ Self-optimizing configurations are applicable to both Iceberg 
Format and Mixed s
 
|-----------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | self-optimizing.enabled                       | true             | Enables 
Self-optimizing                                                                 
                                                                                
                                                                |
 | self-optimizing.group                         | default          | Optimizer 
group for Self-optimizing                                                       
                                                                                
                                                              |
-| self-optimizing.quota                         | 0.1              | Quota for 
Self-optimizing, indicating the CPU resource the table can take up              
                                                                                
                                                              |
+| self-optimizing.quota                         | 0.5              | Quota for 
Self-optimizing, indicating the optimizer resources the table can take up       
                                                                                
                                                                     |
 | self-optimizing.execute.num-retries           | 5                | Number of 
retries after failure of Self-optimizing                                        
                                                                                
                                                              |
 | self-optimizing.target-size                   | 134217728(128MB) | Target 
size for Self-optimizing                                                        
                                                                                
                                                                 |
 | self-optimizing.max-file-count                | 10000            | Maximum 
number of files processed by a Self-optimizing process                          
                                                                                
                                                                |
diff --git a/docs/user-guides/using-tables.md b/docs/user-guides/using-tables.md
index fd6366761..6410c2247 100644
--- a/docs/user-guides/using-tables.md
+++ b/docs/user-guides/using-tables.md
@@ -264,5 +264,5 @@ The Optimizing page displays self-optimizing status of all 
tables.
 - **Duration**: The duration of the current status.
 - **File Count**: The total number of files involved in the current 
Self-optimizing, including base, insert, eq-delete, and pos-delete file types.
 - **File Size**: The total size of files involved in the current 
self-optimizing.
-- **Quota**: The proportion of self-optimizing execution time executed per 
unit time.
-- **Quota Occupation**: The actual Quota used for self-optimizing during 
execution of the table in the last hour. When optimizer resources are 
sufficient and the table requires more resources for self-optimizing, this 
value will be greater than 100%. When resources are scarce or the table 
requires fewer resources for self-optimizing, this value will be less than 100%.
\ No newline at end of file
+- **Quota**: The maximum number of optimizer resources that can be allocated 
to each table.
+- **Quota Occupation**: The ratio of the actual optimizer thread execution 
time used by a table to its quota execution time within the 
QUOTA_LOOK_BACK_TIME window (one hour). 

Reply via email to