This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new c955aba01 [AMORO-3671][Improvement]: Optimizing Allocation and
Schedule Priority of Optimizer resource for Tables (#3669)
c955aba01 is described below
commit c955aba01ce28304a8a7d5e4d2bb2d432b30df37
Author: cxxiii <[email protected]>
AuthorDate: Wed Aug 6 19:13:10 2025 +0800
[AMORO-3671][Improvement]: Optimizing Allocation and Schedule Priority of
Optimizer resource for Tables (#3669)
* optimize the definition and function of quota
* revise the calculation of occupation in scheduling policy
* revise the code format
* modify the default quota from a fixed value to a percentage
* set the quota count to at least 1
* add an option of over quota enabled
* add an option of over quota enabled
correct the format
* allow quota to be a decimal or a value larger than 1
* add some unit testing
* update related doc
* correct format
* [AMORO-3638] Add support for task retries when task execution times out
This commit adds a timeout for the task state `ACK`, which is helpful for
that
AMS can't receive the complete notification in any case.
* [Improvement] Add JDK8+ support for `optimizer.sh` (#3702)
[Improvement] Add jvm parameter to optimizer.sh
* [Improvement] Use fileIndex="nomax" to avoid log loss. (#3701)
* [AMORO-3692] Remove deprecated metrics and update related logic in
MetricsSummary (#3693)
[Hotfix] Remove deprecated metrics and update related logic in
MetricsSummary
* [AMORO-3686][Improvement]: Use ZK's multi operation to ensure write
consistency. (#3687)
[Improvement]: Use ZK's multi operation in HighAvailabilityContainer to
ensure write consistency and fix the followerLath word spelling error #3686
Co-authored-by: wardli <[email protected]>
* [AMORO-3607]Fix syntax error in init sql #3607 (#3681)
* [AMORO-3245] Display comment on Table page. (#3672)
* Display comment in Tables page.
* fixup comment display
---------
Co-authored-by: 张文领 <[email protected]>
* fix the bug
* remove lib
* polish the codes
* fix error in tests
* rename
* rename
* remove unnecessary interface method
---------
Co-authored-by: Jzjsnow <[email protected]>
Co-authored-by: CatchYouIfICan
<[email protected]>
Co-authored-by: Qishang Zhong <[email protected]>
Co-authored-by: can <[email protected]>
Co-authored-by: wardli <[email protected]>
Co-authored-by: Nico CHen <[email protected]>
Co-authored-by: zhangwl9 <[email protected]>
Co-authored-by: 张文领 <[email protected]>
Co-authored-by: ZhouJinsong <[email protected]>
---
.../apache/amoro/server/AmoroManagementConf.java | 7 +
.../amoro/server/DefaultOptimizingService.java | 5 +-
.../dashboard/model/TableOptimizingInfo.java | 6 +-
.../server/dashboard/utils/OptimizingUtil.java | 49 ++++--
.../amoro/server/optimizing/OptimizingQueue.java | 58 ++++++-
.../amoro/server/table/DefaultOptimizingState.java | 29 +++-
.../amoro/server/table/DefaultTableManager.java | 17 +-
.../org/apache/amoro/server/AmsEnvironment.java | 1 +
.../server/dashboard/utils/TestOptimizingUtil.java | 191 +++++++++++++++++++++
.../server/optimizing/TestOptimizingQueue.java | 181 ++++++++++++++++++-
.../src/test/resources/config-with-units.yaml | 1 +
.../org/apache/amoro/table/TableProperties.java | 2 +-
charts/amoro/templates/amoro-configmap.yaml | 1 +
dist/src/main/amoro-bin/conf/config.yaml | 1 +
docs/concepts/self-optimizing.md | 26 +--
docs/user-guides/configurations.md | 2 +-
docs/user-guides/using-tables.md | 4 +-
17 files changed, 528 insertions(+), 53 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index a92a5e4d3..ff95d8d0d 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -302,6 +302,13 @@ public class AmoroManagementConf {
.withDescription(
"The number of hours that self-optimizing runtime data expire
interval.");
+ public static final ConfigOption<Boolean>
OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED =
+ ConfigOptions.key("self-optimizing.break-quota-limit-enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Allow the table to break the quota limit when the resource is
sufficient.");
+
public static final ConfigOption<Duration> OVERVIEW_CACHE_REFRESH_INTERVAL =
ConfigOptions.key("overview-cache.refresh-interval")
.durationType()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index ffdfd72b8..397c473eb 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -96,6 +96,7 @@ public class DefaultOptimizingService extends
StatedPersistentBase
private final long taskExecuteTimeout;
private final int maxPlanningParallelism;
private final long pollingTimeout;
+ private final boolean breakQuotaLimit;
private final long refreshGroupInterval;
private final Map<String, OptimizingQueue> optimizingQueueByGroup = new
ConcurrentHashMap<>();
private final Map<String, OptimizingQueue> optimizingQueueByToken = new
ConcurrentHashMap<>();
@@ -125,6 +126,8 @@ public class DefaultOptimizingService extends
StatedPersistentBase
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM);
this.pollingTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT).toMillis();
+ this.breakQuotaLimit =
+
serviceConfig.getBoolean(AmoroManagementConf.OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED);
this.tableService = tableService;
this.catalogManager = catalogManager;
this.optimizerManager = optimizerManager;
@@ -215,7 +218,7 @@ public class DefaultOptimizingService extends
StatedPersistentBase
public OptimizingTask pollTask(String authToken, int threadId) {
LOG.debug("Optimizer {} (threadId {}) try polling task", authToken,
threadId);
OptimizingQueue queue = getQueueByToken(authToken);
- return Optional.ofNullable(queue.pollTask(pollingTimeout))
+ return Optional.ofNullable(queue.pollTask(pollingTimeout, breakQuotaLimit))
.map(task -> extractOptimizingTask(task, authToken, threadId, queue))
.orElse(null);
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableOptimizingInfo.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableOptimizingInfo.java
index 92797900d..0a2a922b9 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableOptimizingInfo.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableOptimizingInfo.java
@@ -43,7 +43,7 @@ public class TableOptimizingInfo {
private long duration = 0;
private long fileCount = 0;
private long fileSize = 0;
- private double quota = 0.0;
+ private int quota = 0;
private double quotaOccupation = 0.0;
private String groupName = TableProperties.SELF_OPTIMIZING_GROUP_DEFAULT;
@@ -88,11 +88,11 @@ public class TableOptimizingInfo {
this.fileSize = fileSize;
}
- public double getQuota() {
+ public int getQuota() {
return quota;
}
- public void setQuota(double quota) {
+ public void setQuota(int quota) {
this.quota = quota;
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/OptimizingUtil.java
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/OptimizingUtil.java
index 148ac2e76..ddb293570 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/OptimizingUtil.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/OptimizingUtil.java
@@ -22,13 +22,18 @@ import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.config.OptimizingConfig;
import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
+import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.dashboard.model.TableOptimizingInfo;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.TaskRuntime;
+import org.apache.amoro.server.optimizing.TaskRuntime.Status;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
+import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.table.descriptor.FilesStatistics;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.util.List;
import java.util.stream.Collectors;
@@ -42,7 +47,8 @@ public class OptimizingUtil {
public static TableOptimizingInfo buildTableOptimizeInfo(
TableRuntimeMeta optimizingTableRuntime,
List<OptimizingTaskMeta> processTasks,
- List<TaskRuntime.TaskQuota> quotas) {
+ List<TaskRuntime.TaskQuota> quotas,
+ int threadCount) {
ServerTableIdentifier identifier =
ServerTableIdentifier.of(
optimizingTableRuntime.getTableId(),
@@ -57,14 +63,19 @@ public class OptimizingUtil {
System.currentTimeMillis() -
optimizingTableRuntime.getCurrentStatusStartTime());
OptimizingConfig optimizingConfig =
optimizingTableRuntime.getTableConfig().getOptimizingConfig();
- tableOptimizeInfo.setQuota(optimizingConfig.getTargetQuota());
- double quotaOccupy =
- calculateQuotaOccupy(
- processTasks,
- quotas,
- optimizingTableRuntime.getCurrentStatusStartTime(),
- System.currentTimeMillis());
- tableOptimizeInfo.setQuotaOccupation(quotaOccupy);
+ double targetQuota = optimizingConfig.getTargetQuota();
+ tableOptimizeInfo.setQuota(
+ targetQuota > 1 ? (int) targetQuota : (int) Math.ceil(targetQuota *
threadCount));
+
+ long endTime = System.currentTimeMillis();
+ long startTime = System.currentTimeMillis() -
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME;
+ long quotaOccupy = calculateQuotaOccupy(processTasks, quotas, startTime,
endTime);
+ double quotaOccupation =
+ (double) quotaOccupy
+ / (AmoroServiceConstants.QUOTA_LOOK_BACK_TIME *
tableOptimizeInfo.getQuota());
+ tableOptimizeInfo.setQuotaOccupation(
+ BigDecimal.valueOf(quotaOccupation).setScale(4,
RoundingMode.HALF_UP).doubleValue());
+
FilesStatistics optimizeFileInfo;
if (optimizingStatus.isProcessing()) {
MetricsSummary summary = null;
@@ -89,23 +100,31 @@ public class OptimizingUtil {
return tableOptimizeInfo;
}
- private static double calculateQuotaOccupy(
+ @VisibleForTesting
+ static long calculateQuotaOccupy(
List<OptimizingTaskMeta> processTasks,
List<TaskRuntime.TaskQuota> quotas,
long startTime,
long endTime) {
- double finishedOccupy = 0;
+ long finishedOccupy = 0;
if (quotas != null) {
- finishedOccupy = quotas.stream().mapToDouble(q ->
q.getQuotaTime(startTime)).sum();
+ quotas.removeIf(task -> task.checkExpired(startTime));
+ finishedOccupy =
+ quotas.stream().mapToLong(taskQuota ->
taskQuota.getQuotaTime(startTime)).sum();
}
- double runningOccupy = 0;
+ long runningOccupy = 0;
if (processTasks != null) {
runningOccupy =
processTasks.stream()
- .mapToDouble(
+ .filter(
t ->
+ t.getStatus() != TaskRuntime.Status.CANCELED
+ && t.getStatus() != Status.SUCCESS
+ && t.getStatus() != TaskRuntime.Status.FAILED)
+ .mapToLong(
+ task ->
TaskRuntime.taskRunningQuotaTime(
- startTime, endTime, t.getStartTime(),
t.getCostTime()))
+ startTime, endTime, task.getStartTime(),
task.getCostTime()))
.sum();
}
return finishedOccupy + runningOccupy;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index 65b2daf03..ed8464fe3 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -70,9 +70,11 @@ import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -96,6 +98,8 @@ public class OptimizingQueue extends PersistentBase {
private final int maxPlanningParallelism;
private final OptimizerGroupMetrics metrics;
private ResourceGroup optimizerGroup;
+ private final Map<ServerTableIdentifier, AtomicInteger> optimizingTasksMap =
+ new ConcurrentHashMap<>();
public OptimizingQueue(
CatalogManager catalogManager,
@@ -196,15 +200,22 @@ public class OptimizingQueue extends PersistentBase {
taskRuntime -> taskRuntime.getTaskId().getProcessId() ==
optimizingProcess.getProcessId());
}
- public TaskRuntime<?> pollTask(long maxWaitTime) {
+ public TaskRuntime<?> pollTask(long maxWaitTime, boolean breakQuotaLimit) {
long deadline = calculateDeadline(maxWaitTime);
TaskRuntime<?> task = fetchTask();
while (task == null && waitTask(deadline)) {
task = fetchTask();
}
+ if (task == null && breakQuotaLimit && planningTables.isEmpty()) {
+ task = fetchScheduledTask(false);
+ }
return task;
}
+ public TaskRuntime<?> pollTask(long maxWaitTime) {
+ return pollTask(maxWaitTime, false);
+ }
+
private long calculateDeadline(long maxWaitTime) {
long deadline = System.currentTimeMillis() + maxWaitTime;
return deadline <= 0 ? Long.MAX_VALUE : deadline;
@@ -227,12 +238,12 @@ public class OptimizingQueue extends PersistentBase {
private TaskRuntime<?> fetchTask() {
TaskRuntime<?> task = retryTaskQueue.poll();
- return task != null ? task : fetchScheduledTask();
+ return task != null ? task : fetchScheduledTask(true);
}
- private TaskRuntime<?> fetchScheduledTask() {
+ private TaskRuntime<?> fetchScheduledTask(boolean needQuotaChecking) {
return tableQueue.stream()
- .map(TableOptimizingProcess::poll)
+ .map(process -> process.poll(needQuotaChecking))
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
@@ -422,12 +433,21 @@ public class OptimizingQueue extends PersistentBase {
private Map<String, Long> toSequence = Maps.newHashMap();
private boolean hasCommitted = false;
- public TaskRuntime<?> poll() {
+ public TaskRuntime<?> poll(boolean needQuotaChecking) {
if (lock.tryLock()) {
try {
- return status != ProcessStatus.KILLED && status !=
ProcessStatus.FAILED
- ? taskQueue.poll()
- : null;
+ TaskRuntime<?> task = null;
+ if (status != ProcessStatus.KILLED && status !=
ProcessStatus.FAILED) {
+ if (!needQuotaChecking || getActualQuota() < getQuotaLimit()) {
+ task = taskQueue.poll();
+ }
+ }
+ if (task != null) {
+ optimizingTasksMap
+ .computeIfAbsent(optimizingState.getTableIdentifier(), k ->
new AtomicInteger(0))
+ .incrementAndGet();
+ }
+ return task;
} finally {
lock.unlock();
}
@@ -468,6 +488,14 @@ public class OptimizingQueue extends PersistentBase {
loadTaskRuntimes(this);
}
+ private int getQuotaLimit() {
+ double targetQuota =
+
optimizingState.getTableConfiguration().getOptimizingConfig().getTargetQuota();
+ return targetQuota > 1
+ ? (int) targetQuota
+ : (int) Math.ceil(targetQuota * getAvailableCore());
+ }
+
@Override
public long getProcessId() {
return processId;
@@ -501,6 +529,14 @@ public class OptimizingQueue extends PersistentBase {
private void acceptResult(TaskRuntime<?> taskRuntime) {
lock.lock();
try {
+ optimizingTasksMap.computeIfPresent(
+ optimizingState.getTableIdentifier(),
+ (k, v) -> {
+ if (v.get() > 0) {
+ v.decrementAndGet();
+ }
+ return v;
+ });
try {
optimizingState.addTaskQuota(taskRuntime.getCurrentQuota());
} catch (Throwable throwable) {
@@ -609,6 +645,12 @@ public class OptimizingQueue extends PersistentBase {
.sum();
}
+ public int getActualQuota() {
+ return optimizingTasksMap
+ .getOrDefault(optimizingState.getTableIdentifier(), new
AtomicInteger(0))
+ .get();
+ }
+
@Override
public void commit() {
LOG.debug(
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
index c4b2993fd..68204e5eb 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
@@ -38,9 +38,11 @@ import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.StatedPersistentBase;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
+import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.table.blocker.TableBlocker;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
@@ -54,8 +56,6 @@ import org.apache.iceberg.Snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.math.BigDecimal;
-import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -520,7 +520,7 @@ public class DefaultOptimizingState extends
StatedPersistentBase implements Proc
return tableConfiguration.getOptimizingConfig().isEnabled();
}
- public Double getTargetQuota() {
+ public double getTargetQuota() {
return tableConfiguration.getOptimizingConfig().getTargetQuota();
}
@@ -627,12 +627,23 @@ public class DefaultOptimizingState extends
StatedPersistentBase implements Proc
}
public double calculateQuotaOccupy() {
- return new BigDecimal(
- (double) getQuotaTime()
- / AmoroServiceConstants.QUOTA_LOOK_BACK_TIME
- / tableConfiguration.getOptimizingConfig().getTargetQuota())
- .setScale(4, RoundingMode.HALF_UP)
- .doubleValue();
+ double targetQuota =
tableConfiguration.getOptimizingConfig().getTargetQuota();
+ int targetQuotaLimit =
+ targetQuota > 1 ? (int) targetQuota : (int) Math.ceil(targetQuota *
getThreadCount());
+ return (double) getQuotaTime() /
AmoroServiceConstants.QUOTA_LOOK_BACK_TIME / targetQuotaLimit;
+ }
+
+ public int getThreadCount() {
+ List<OptimizerInstance> instances = getAs(OptimizerMapper.class,
OptimizerMapper::selectAll);
+ if (instances == null || instances.isEmpty()) {
+ return 1;
+ }
+ return Math.max(
+ instances.stream()
+ .filter(instance -> optimizerGroup.equals(instance.getGroupName()))
+ .mapToInt(OptimizerInstance::getThreadCount)
+ .sum(),
+ 1);
}
/**
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
index 558443963..826d59b7b 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
@@ -41,9 +41,11 @@ import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
+import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.table.blocker.TableBlocker;
import org.apache.amoro.server.utils.SnowflakeIdGenerator;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
@@ -56,6 +58,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -282,6 +285,15 @@ public class DefaultTableManager extends PersistentBase
implements TableManager
// load quota info
Map<Long, List<TaskRuntime.TaskQuota>> tableQuotaMap =
getQuotaTime(tableIds);
+ List<OptimizerInstance> instances = getAs(OptimizerMapper.class,
OptimizerMapper::selectAll);
+ Map<String, Integer> optimizerThreadCountMap =
+ instances == null
+ ? Collections.emptyMap()
+ : instances.stream()
+ .collect(
+ Collectors.groupingBy(
+ OptimizerInstance::getGroupName,
+
Collectors.summingInt(OptimizerInstance::getThreadCount)));
List<TableOptimizingInfo> infos =
ret.stream()
@@ -289,7 +301,10 @@ public class DefaultTableManager extends PersistentBase
implements TableManager
meta -> {
List<OptimizingTaskMeta> tasks =
tableTaskMetaMap.get(meta.getTableId());
List<TaskRuntime.TaskQuota> quotas =
tableQuotaMap.get(meta.getTableId());
- return OptimizingUtil.buildTableOptimizeInfo(meta, tasks,
quotas);
+ int threadCount =
+
optimizerThreadCountMap.getOrDefault(meta.getOptimizerGroup(), 0);
+ return OptimizingUtil.buildTableOptimizeInfo(
+ meta, tasks, quotas, Math.max(threadCount, 1));
})
.collect(Collectors.toList());
return Pair.of(infos, total);
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
index ff09d575e..39384c68c 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
@@ -370,6 +370,7 @@ public class AmsEnvironment {
+ " commit-thread-count: 10\n"
+ " runtime-data-keep-days: 30\n"
+ " runtime-data-expire-interval-hours: 1\n"
+ + " break-quota-limit-enabled: true\n"
+ "\n"
+ " database:\n"
+ " type: \"derby\"\n"
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/utils/TestOptimizingUtil.java
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/utils/TestOptimizingUtil.java
new file mode 100644
index 000000000..a22ee6bdc
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/utils/TestOptimizingUtil.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.dashboard.utils;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.api.OptimizingTaskId;
+import org.apache.amoro.api.OptimizingTaskResult;
+import org.apache.amoro.catalog.BasicCatalogTestHelper;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.io.MixedDataTestHelpers;
+import org.apache.amoro.optimizing.RewriteFilesOutput;
+import org.apache.amoro.optimizing.TableOptimizing;
+import org.apache.amoro.resource.ResourceGroup;
+import org.apache.amoro.server.AmoroServiceConstants;
+import org.apache.amoro.server.optimizing.OptimizingQueue;
+import org.apache.amoro.server.optimizing.OptimizingStatus;
+import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
+import org.apache.amoro.server.optimizing.TaskRuntime;
+import org.apache.amoro.server.persistence.TableRuntimeMeta;
+import org.apache.amoro.server.resource.OptimizerThread;
+import org.apache.amoro.server.resource.QuotaProvider;
+import org.apache.amoro.server.table.AMSTableTestBase;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.server.table.TableConfigurations;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.UnkeyedTable;
+import org.apache.amoro.utils.SerializationUtil;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.data.Record;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+@RunWith(Parameterized.class)
+public class TestOptimizingUtil extends AMSTableTestBase {
+ private final long MAX_POLLING_TIME = 5000;
+ private final Executor planExecutor = Executors.newSingleThreadExecutor();
+ private final QuotaProvider quotaProvider = resourceGroup -> 1;
+
+ private final OptimizerThread optimizerThread =
+ new OptimizerThread(1, null) {
+
+ @Override
+ public String getToken() {
+ return "aah";
+ }
+ };
+
+ public TestOptimizingUtil(CatalogTestHelper catalogTestHelper,
TableTestHelper tableTestHelper) {
+ super(catalogTestHelper, tableTestHelper, true);
+ }
+
+ @Parameterized.Parameters(name = "{0}, {1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ {new BasicCatalogTestHelper(TableFormat.ICEBERG), new
BasicTableTestHelper(false, true)}
+ };
+ }
+
+ @Test
+ public void testCalculateQuotaOccupy() {
+ long endTime = System.currentTimeMillis();
+ long startTime = endTime - AmoroServiceConstants.QUOTA_LOOK_BACK_TIME;
+
+ Assertions.assertEquals(0, OptimizingUtil.calculateQuotaOccupy(null, null,
startTime, endTime));
+
+ DefaultTableRuntime tableRuntime = initTableWithFiles();
+ OptimizingQueue queue = buildOptimizingGroupService(tableRuntime);
+ Assert.assertEquals(0, queue.collectTasks().size());
+ TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+ task.schedule(optimizerThread);
+ task.ack(optimizerThread);
+ Assert.assertEquals(
+ 1, queue.collectTasks(t -> t.getStatus() ==
TaskRuntime.Status.ACKED).size());
+ Assert.assertNotNull(task);
+ task.complete(
+ optimizerThread,
+ buildOptimizingTaskResult(task.getTaskId(),
optimizerThread.getThreadId()));
+ Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
+
+ List<TaskRuntime.TaskQuota> quotas = new ArrayList<>();
+ TaskRuntime.TaskQuota quota = task.getCurrentQuota();
+ quotas.add(quota);
+ Assertions.assertTrue(
+ OptimizingUtil.calculateQuotaOccupy(null, quotas, startTime, endTime)
> 0);
+
+ List<OptimizingTaskMeta> tasks = new ArrayList<>();
+ OptimizingTaskMeta taskMeta = new OptimizingTaskMeta();
+ taskMeta.setStatus(TaskRuntime.Status.ACKED);
+ taskMeta.setStartTime(endTime - 2000);
+ taskMeta.setCostTime(1000);
+ tasks.add(taskMeta);
+ Assertions.assertEquals(
+ 1000, OptimizingUtil.calculateQuotaOccupy(tasks, null, startTime,
endTime));
+
+ Assertions.assertTrue(
+ OptimizingUtil.calculateQuotaOccupy(tasks, quotas, startTime, endTime)
+ > OptimizingUtil.calculateQuotaOccupy(null, quotas, startTime,
endTime));
+ Assertions.assertTrue(
+ OptimizingUtil.calculateQuotaOccupy(tasks, quotas, startTime, endTime)
+ > OptimizingUtil.calculateQuotaOccupy(tasks, null, startTime,
endTime));
+ }
+
+ protected OptimizingQueue buildOptimizingGroupService(DefaultTableRuntime
tableRuntime) {
+ return new OptimizingQueue(
+ CATALOG_MANAGER,
+ testResourceGroup(),
+ quotaProvider,
+ planExecutor,
+ Collections.singletonList(tableRuntime),
+ 1);
+ }
+
+ protected static ResourceGroup testResourceGroup() {
+ return new ResourceGroup.Builder("test", "local").build();
+ }
+
+ protected DefaultTableRuntime initTableWithFiles() {
+ MixedTable mixedTable =
+ (MixedTable)
tableService().loadTable(serverTableIdentifier()).originalTable();
+ appendData(mixedTable.asUnkeyedTable(), 1);
+ appendData(mixedTable.asUnkeyedTable(), 2);
+ DefaultTableRuntime tableRuntime =
+ buildTableRuntimeMeta(OptimizingStatus.PENDING,
defaultResourceGroup());
+
+
tableRuntime.getOptimizingState().refresh(tableService().loadTable(serverTableIdentifier()));
+ return tableRuntime;
+ }
+
+ private void appendData(UnkeyedTable table, int id) {
+ ArrayList<Record> newRecords =
+ Lists.newArrayList(
+ MixedDataTestHelpers.createRecord(
+ table.schema(), id, "111", 0L, "2022-01-01T12:00:00"));
+ List<DataFile> dataFiles = MixedDataTestHelpers.writeBaseStore(table, 0L,
newRecords, false);
+ AppendFiles appendFiles = table.newAppend();
+ dataFiles.forEach(appendFiles::appendFile);
+ appendFiles.commit();
+ }
+
+ private DefaultTableRuntime buildTableRuntimeMeta(
+ OptimizingStatus status, ResourceGroup resourceGroup) {
+ MixedTable mixedTable =
+ (MixedTable)
tableService().loadTable(serverTableIdentifier()).originalTable();
+ TableRuntimeMeta tableRuntimeMeta = new TableRuntimeMeta();
+ tableRuntimeMeta.setCatalogName(serverTableIdentifier().getCatalog());
+ tableRuntimeMeta.setDbName(serverTableIdentifier().getDatabase());
+ tableRuntimeMeta.setTableName(serverTableIdentifier().getTableName());
+ tableRuntimeMeta.setTableId(serverTableIdentifier().getId());
+ tableRuntimeMeta.setFormat(TableFormat.ICEBERG);
+ tableRuntimeMeta.setTableStatus(status);
+
tableRuntimeMeta.setTableConfig(TableConfigurations.parseTableConfig(mixedTable.properties()));
+ tableRuntimeMeta.setOptimizerGroup(resourceGroup.getName());
+ return new DefaultTableRuntime(tableRuntimeMeta, tableService());
+ }
+
+ private OptimizingTaskResult buildOptimizingTaskResult(OptimizingTaskId
taskId, int threadId) {
+ TableOptimizing.OptimizingOutput output = new RewriteFilesOutput(null,
null, null);
+ OptimizingTaskResult optimizingTaskResult = new
OptimizingTaskResult(taskId, threadId);
+
optimizingTaskResult.setTaskOutput(SerializationUtil.simpleSerialize(output));
+ return optimizingTaskResult;
+ }
+}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
index 506624e19..4cec592f7 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
@@ -31,6 +31,7 @@ import static
org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER
import static
org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_THREADS;
import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.api.OptimizerRegisterInfo;
@@ -62,6 +63,7 @@ import org.apache.amoro.table.UnkeyedTable;
import org.apache.amoro.utils.SerializationUtil;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.junit.Assert;
import org.junit.Test;
@@ -168,6 +170,125 @@ public class TestOptimizingQueue extends AMSTableTestBase
{
queue.dispose();
}
+ @Test
+ public void testPollTaskWithOverQuotaDisabled() {
+ DefaultTableRuntime tableRuntime = initTableWithPartitionedFiles();
+ OptimizingQueue queue =
+ new OptimizingQueue(
+ CATALOG_MANAGER,
+ testResourceGroup(),
+ resourceGroup -> 2,
+ planExecutor,
+ Collections.singletonList(tableRuntime),
+ 1);
+
+ TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+ Assert.assertNotNull(task);
+ Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
+ task.schedule(optimizerThread);
+ task.ack(optimizerThread);
+ Assert.assertEquals(
+ 1, queue.collectTasks(t -> t.getStatus() ==
TaskRuntime.Status.ACKED).size());
+ Assert.assertNotNull(task);
+
+ TaskRuntime task2 = queue.pollTask(MAX_POLLING_TIME);
+ Assert.assertNull(task2);
+
+ task.complete(
+ optimizerThread,
+ buildOptimizingTaskResult(task.getTaskId(),
optimizerThread.getThreadId()));
+ Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
+
+ TaskRuntime retryTask = queue.pollTask(MAX_POLLING_TIME);
+ Assert.assertNotNull(retryTask);
+
+ queue.dispose();
+ }
+
+ @Test
+ public void testPollTaskWithOverQuotaEnabled() {
+ DefaultTableRuntime tableRuntime = initTableWithPartitionedFiles();
+ OptimizingQueue queue =
+ new OptimizingQueue(
+ CATALOG_MANAGER,
+ testResourceGroup(),
+ resourceGroup -> 2,
+ planExecutor,
+ Collections.singletonList(tableRuntime),
+ 1);
+
+ TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+ Assert.assertNotNull(task);
+ Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
+ task.schedule(optimizerThread);
+ task.ack(optimizerThread);
+ Assert.assertEquals(
+ 1, queue.collectTasks(t -> t.getStatus() ==
TaskRuntime.Status.ACKED).size());
+ Assert.assertNotNull(task);
+
+ TaskRuntime task2 = queue.pollTask(MAX_POLLING_TIME, true);
+ Assert.assertNotNull(task2);
+
+ task.complete(
+ optimizerThread,
+ buildOptimizingTaskResult(task.getTaskId(),
optimizerThread.getThreadId()));
+ Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
+ TaskRuntime task4 = queue.pollTask(MAX_POLLING_TIME);
+ Assert.assertNull(task4);
+ TaskRuntime retryTask = queue.pollTask(MAX_POLLING_TIME, true);
+ Assert.assertNotNull(retryTask);
+ queue.dispose();
+ }
+
+ @Test
+ public void testQuotaSchedulePolicy() {
+ DefaultTableRuntime tableRuntime = initTableWithFiles();
+
+ OptimizingQueue queue =
+ new OptimizingQueue(
+ CATALOG_MANAGER,
+ testResourceGroup(),
+ resourceGroup -> 2,
+ planExecutor,
+ Collections.singletonList(tableRuntime),
+ 1);
+ TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+ task.schedule(optimizerThread);
+ task.ack(optimizerThread);
+ Assert.assertEquals(
+ 1, queue.collectTasks(t -> t.getStatus() ==
TaskRuntime.Status.ACKED).size());
+ Assert.assertNotNull(task);
+ Assert.assertTrue(tableRuntime.getTableIdentifier().getId() ==
task.getTableId());
+ task.complete(
+ optimizerThread,
+ buildOptimizingTaskResult(task.getTaskId(),
optimizerThread.getThreadId()));
+ Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
+ OptimizingProcess optimizingProcess =
tableRuntime.getOptimizingState().getOptimizingProcess();
+ Assert.assertEquals(ProcessStatus.RUNNING, optimizingProcess.getStatus());
+ optimizingProcess.commit();
+ Assert.assertEquals(ProcessStatus.SUCCESS, optimizingProcess.getStatus());
+
Assert.assertNull(tableRuntime.getOptimizingState().getOptimizingProcess());
+
+ tableRuntime = initTableWithPartitionedFiles();
+ ServerTableIdentifier serverTableIdentifier =
+ ServerTableIdentifier.of(
+ org.apache.amoro.table.TableIdentifier.of(
+ serverTableIdentifier().getCatalog(), "db", "test_table2"),
+ TableFormat.ICEBERG);
+ serverTableIdentifier.setId(2L);
+ DefaultTableRuntime tableRuntime2 = createTable(serverTableIdentifier);
+ queue.refreshTable(tableRuntime2);
+ queue.refreshTable(tableRuntime);
+
+ TaskRuntime task2 = queue.pollTask(MAX_POLLING_TIME);
+ Assert.assertNotNull(task2);
+ Assert.assertTrue(tableRuntime2.getTableIdentifier().getId() ==
task2.getTableId());
+ TaskRuntime task3 = queue.pollTask(MAX_POLLING_TIME);
+ Assert.assertNotNull(task3);
+ Assert.assertTrue(tableRuntime.getTableIdentifier().getId() ==
task3.getTableId());
+ queue.dispose();
+ }
+
@Test
public void testRetryTask() {
DefaultTableRuntime tableRuntimeMeta = initTableWithFiles();
@@ -185,7 +306,7 @@ public class TestOptimizingQueue extends AMSTableTestBase {
retryTask.ack(optimizerThread);
retryTask.complete(
optimizerThread,
- buildOptimizingTaskFailed(task.getTaskId(),
optimizerThread.getThreadId()));
+ buildOptimizingTaskFailed(retryTask.getTaskId(),
optimizerThread.getThreadId()));
Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
}
@@ -376,6 +497,18 @@ public class TestOptimizingQueue extends AMSTableTestBase {
return tableRuntime;
}
+ protected DefaultTableRuntime initTableWithPartitionedFiles() {
+ MixedTable mixedTable =
+ (MixedTable)
tableService().loadTable(serverTableIdentifier()).originalTable();
+ appendPartitionedData(mixedTable.asUnkeyedTable(), 1);
+ appendPartitionedData(mixedTable.asUnkeyedTable(), 2);
+ DefaultTableRuntime tableRuntime =
+ buildTableRuntimeMeta(OptimizingStatus.PENDING,
defaultResourceGroup());
+
+
tableRuntime.getOptimizingState().refresh(tableService().loadTable(serverTableIdentifier()));
+ return tableRuntime;
+ }
+
private DefaultTableRuntime buildTableRuntimeMeta(
OptimizingStatus status, ResourceGroup resourceGroup) {
MixedTable mixedTable =
@@ -392,6 +525,21 @@ public class TestOptimizingQueue extends AMSTableTestBase {
return new DefaultTableRuntime(tableRuntimeMeta, tableService());
}
+ private void appendPartitionedData(UnkeyedTable table, int id) {
+ ArrayList<Record> newRecords =
+ Lists.newArrayList(
+ MixedDataTestHelpers.createRecord(
+ table.schema(), id, "111", 0L, "2022-01-01T12:00:00"));
+ newRecords.add(
+ MixedDataTestHelpers.createRecord(table.schema(), id, "222", 0L,
"2022-01-02T12:00:00"));
+ newRecords.add(
+ MixedDataTestHelpers.createRecord(table.schema(), id, "333", 0L,
"2022-01-03T12:00:00"));
+ List<DataFile> dataFiles = MixedDataTestHelpers.writeBaseStore(table, 0L,
newRecords, false);
+ AppendFiles appendFiles = table.newAppend();
+ dataFiles.forEach(appendFiles::appendFile);
+ appendFiles.commit();
+ }
+
private void appendData(UnkeyedTable table, int id) {
ArrayList<Record> newRecords =
Lists.newArrayList(
@@ -403,6 +551,37 @@ public class TestOptimizingQueue extends AMSTableTestBase {
appendFiles.commit();
}
+ private DefaultTableRuntime createTable(ServerTableIdentifier
serverTableIdentifier) {
+ org.apache.iceberg.catalog.Catalog catalog =
+ catalogTestHelper().buildIcebergCatalog(catalogMeta());
+ catalog.createTable(
+ TableIdentifier.of(
+ serverTableIdentifier.getDatabase(),
serverTableIdentifier.getTableName()),
+ tableTestHelper().tableSchema(),
+ tableTestHelper().partitionSpec(),
+ tableTestHelper().tableProperties());
+
+ MixedTable mixedTable =
+ (MixedTable)
tableService().loadTable(serverTableIdentifier).originalTable();
+ appendPartitionedData(mixedTable.asUnkeyedTable(), 1);
+ appendPartitionedData(mixedTable.asUnkeyedTable(), 2);
+
+ TableRuntimeMeta tableRuntimeMeta = new TableRuntimeMeta();
+ tableRuntimeMeta.setCatalogName(serverTableIdentifier.getCatalog());
+ tableRuntimeMeta.setDbName(serverTableIdentifier.getDatabase());
+ tableRuntimeMeta.setTableName(serverTableIdentifier.getTableName());
+ tableRuntimeMeta.setTableId(serverTableIdentifier.getId());
+ tableRuntimeMeta.setFormat(TableFormat.ICEBERG);
+ tableRuntimeMeta.setTableStatus(OptimizingStatus.PENDING);
+
tableRuntimeMeta.setTableConfig(TableConfigurations.parseTableConfig(mixedTable.properties()));
+ tableRuntimeMeta.setOptimizerGroup(defaultResourceGroup().getName());
+ DefaultTableRuntime tableRuntime = new
DefaultTableRuntime(tableRuntimeMeta, tableService());
+
+
tableRuntime.getOptimizingState().refresh(tableService().loadTable(serverTableIdentifier));
+
+ return tableRuntime;
+ }
+
private OptimizingTaskResult buildOptimizingTaskResult(OptimizingTaskId
taskId, int threadId) {
TableOptimizing.OptimizingOutput output = new RewriteFilesOutput(null,
null, null);
OptimizingTaskResult optimizingTaskResult = new
OptimizingTaskResult(taskId, threadId);
diff --git a/amoro-ams/src/test/resources/config-with-units.yaml
b/amoro-ams/src/test/resources/config-with-units.yaml
index 40f24cf4c..9f72540a9 100644
--- a/amoro-ams/src/test/resources/config-with-units.yaml
+++ b/amoro-ams/src/test/resources/config-with-units.yaml
@@ -51,6 +51,7 @@ ams:
runtime-data-keep-days: 30
runtime-data-expire-interval-hours: 1
refresh-group-interval: 30s
+ break-quota-limit-enabled: true
optimizer:
heart-beat-timeout: 1min
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
index 3da2922ea..1ff9d541f 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
@@ -76,7 +76,7 @@ public class TableProperties {
public static final String SELF_OPTIMIZING_GROUP_DEFAULT = "default";
public static final String SELF_OPTIMIZING_QUOTA = "self-optimizing.quota";
- public static final double SELF_OPTIMIZING_QUOTA_DEFAULT = 0.1;
+ public static final double SELF_OPTIMIZING_QUOTA_DEFAULT = 0.5;
public static final String SELF_OPTIMIZING_EXECUTE_RETRY_NUMBER =
"self-optimizing.execute.num-retries";
diff --git a/charts/amoro/templates/amoro-configmap.yaml
b/charts/amoro/templates/amoro-configmap.yaml
index a57b79c2f..6dbb7ec29 100644
--- a/charts/amoro/templates/amoro-configmap.yaml
+++ b/charts/amoro/templates/amoro-configmap.yaml
@@ -77,6 +77,7 @@ data:
commit-thread-count: 10
runtime-data-keep-days: 30
runtime-data-expire-interval-hours: 1
+ break-quota-limit-enabled: true
optimizer:
heart-beat-timeout: 1min # 60000
diff --git a/dist/src/main/amoro-bin/conf/config.yaml
b/dist/src/main/amoro-bin/conf/config.yaml
index 7503317d0..f628433b7 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -51,6 +51,7 @@ ams:
runtime-data-keep-days: 30
runtime-data-expire-interval-hours: 1
refresh-group-interval: 30s
+ break-quota-limit-enabled: true
optimizer:
heart-beat-timeout: 1min # 60000
diff --git a/docs/concepts/self-optimizing.md b/docs/concepts/self-optimizing.md
index 285fade2c..bac381592 100644
--- a/docs/concepts/self-optimizing.md
+++ b/docs/concepts/self-optimizing.md
@@ -147,22 +147,26 @@ Currently, there are two main scheduling policies
available: `Quota` and `Balanc
### Quota
The `Quota` strategy is a scheduling policy that schedules based on resource
usage. The Self-optimizing resource usage of a single table is managed
-by configuring the quota configuration on the table:
+by configuring the quota configuration on the table. Quota specifies the
maximum number of optimizer resources that can be allocated to each table.
+Quotas can be specified as either a decimal (representing a percentage) or an
integer (representing a fixed number of resources):
```SQL
--- Quota for Self-optimizing, indicating the CPU resource the table can take up
-self-optimizing.quota = 0.1;
+-- Set quota as a percentage of total optimizer resources
+self-optimizing.quota = 0.5;
```
-Quota defines the maximum CPU usage that a single table can use, but
Self-optimizing is actually executed in a distributed manner, and actual
resource
-usage is dynamically managed based on actual execution time.In the optimizing
management Web UI, the dynamic quota usage of a single table can be
-viewed through the "Quota Occupy" metric. From a design perspective, the quota
occupy metric should dynamically approach 100%.
+A decimal quota (e.g., 0.5) limits the table to a percentage of the total
available optimizer resources.
-In a platform, two situations may occur: overselling and overbuying.
+```SQL
+-- Set quota as a fixed number of optimizer resources
+self-optimizing.quota = 10;
+```
+An integer quota (e.g., 10) restricts the table to a specific number of
optimizer resources.
+
+This flexible configuration prevents resource underutilization and allows
users to tailor resource allocation to their needs.
+
+The `Quota` strategy schedules tables based on their Occupation metric, which
is calculated as the ratio of the actual optimizer thread execution time used
+by a table to its quota execution time within the QUOTA_LOOK_BACK_TIME window.
Tables with lower Occupation are given higher scheduling priority.
-- Overselling — If all optimizer configurations exceed the total quota of all
table configurations, the quota occupy metric may dynamically approach
-above 100%
-- Overbuying — If all optimizer configurations are lower than the total quota
of all table configurations, the quota occupy metric should dynamically
-approach below 100%
### Balanced
diff --git a/docs/user-guides/configurations.md
b/docs/user-guides/configurations.md
index 5fcde8842..0d5c24531 100644
--- a/docs/user-guides/configurations.md
+++ b/docs/user-guides/configurations.md
@@ -47,7 +47,7 @@ Self-optimizing configurations are applicable to both Iceberg
Format and Mixed s
|-----------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| self-optimizing.enabled | true | Enables
Self-optimizing
|
| self-optimizing.group | default | Optimizer
group for Self-optimizing
|
-| self-optimizing.quota | 0.1 | Quota for
Self-optimizing, indicating the CPU resource the table can take up
|
+| self-optimizing.quota | 0.5 | Quota for
Self-optimizing, indicating the optimizer resources the table can take up
|
| self-optimizing.execute.num-retries | 5 | Number of
retries after failure of Self-optimizing
|
| self-optimizing.target-size | 134217728(128MB) | Target
size for Self-optimizing
|
| self-optimizing.max-file-count | 10000 | Maximum
number of files processed by a Self-optimizing process
|
diff --git a/docs/user-guides/using-tables.md b/docs/user-guides/using-tables.md
index fd6366761..6410c2247 100644
--- a/docs/user-guides/using-tables.md
+++ b/docs/user-guides/using-tables.md
@@ -264,5 +264,5 @@ The Optimizing page displays self-optimizing status of all
tables.
- **Duration**: The duration of the current status.
- **File Count**: The total number of files involved in the current
Self-optimizing, including base, insert, eq-delete, and pos-delete file types.
- **File Size**: The total size of files involved in the current
self-optimizing.
-- **Quota**: The proportion of self-optimizing execution time executed per
unit time.
-- **Quota Occupation**: The actual Quota used for self-optimizing during
execution of the table in the last hour. When optimizer resources are
sufficient and the table requires more resources for self-optimizing, this
value will be greater than 100%. When resources are scarce or the table
requires fewer resources for self-optimizing, this value will be less than 100%.
\ No newline at end of file
+- **Quota**: The maximum number of optimizer resources that can be allocated
to each table.
+- **Quota Occupation**: The ratio of the actual optimizer thread execution
time used by a table to its quota execution time within the
QUOTA_LOOK_BACK_TIME window (one hour).