This is an automated email from the ASF dual-hosted git repository.
klion26 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 1af9f36e2 [AMORO-4037] Support dynamic refresh interval for table
metadata refreshing in TableRuntimeRefreshExecutor (#4038)
1af9f36e2 is described below
commit 1af9f36e24d5f0df115eae6edb1ddfeb46ac159e
Author: zhangwl9 <[email protected]>
AuthorDate: Tue Mar 3 10:29:47 2026 +0800
[AMORO-4037] Support dynamic refresh interval for table metadata refreshing
in TableRuntimeRefreshExecutor (#4038)
This commit adds a dynamic refresh interval for table metadata refreshing
support.
This is useful for a catalog that contains real-time tables and some static
tables.
---------
Co-authored-by: 张文领 <[email protected]>
---
.../inline/TableRuntimeRefreshExecutor.java | 152 ++++++++++++++-
.../amoro/server/table/DefaultTableRuntime.java | 18 ++
.../amoro/server/table/TableConfigurations.java | 12 +-
.../inline/TestTableRuntimeRefreshExecutor.java | 215 +++++++++++++++++++++
.../org/apache/amoro/config/OptimizingConfig.java | 47 ++++-
.../org/apache/amoro/table/TableProperties.java | 9 +
docs/user-guides/configurations.md | 40 ++--
7 files changed, 465 insertions(+), 28 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
index 18071a2cd..b6b7e4ba6 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
@@ -31,6 +31,7 @@ import
org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.utils.IcebergTableUtil;
+import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.table.MixedTable;
@@ -56,15 +57,23 @@ public class TableRuntimeRefreshExecutor extends
PeriodicTableScheduler {
@Override
protected long getNextExecutingTime(TableRuntime tableRuntime) {
DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime)
tableRuntime;
+
+ if
(defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled(interval))
{
+ long newInterval = defaultTableRuntime.getLatestRefreshInterval();
+ if (newInterval > 0) {
+ return newInterval;
+ }
+ }
+
return Math.min(
defaultTableRuntime.getOptimizingConfig().getMinorLeastInterval() * 4L
/ 5, interval);
}
- private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime,
MixedTable table) {
+ private boolean tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime,
MixedTable table) {
// only evaluate pending input when optimizing is enabled and in idle state
OptimizingConfig optimizingConfig = tableRuntime.getOptimizingConfig();
- if (optimizingConfig.isEnabled()
- && tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {
+ boolean optimizingEnabled = optimizingConfig.isEnabled();
+ if (optimizingEnabled &&
tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {
if (optimizingConfig.isMetadataBasedTriggerEnabled()
&& !MetadataBasedEvaluationEvent.isEvaluatingNecessary(
@@ -72,12 +81,14 @@ public class TableRuntimeRefreshExecutor extends
PeriodicTableScheduler {
logger.debug(
"{} optimizing is not necessary due to metadata based trigger",
tableRuntime.getTableIdentifier());
- return;
+ // indicates no optimization demand now
+ return false;
}
AbstractOptimizingEvaluator evaluator =
IcebergTableUtil.createOptimizingEvaluator(tableRuntime, table,
maxPendingPartitions);
- if (evaluator.isNecessary()) {
+ boolean evaluatorIsNecessary = evaluator.isNecessary();
+ if (evaluatorIsNecessary) {
AbstractOptimizingEvaluator.PendingInput pendingInput =
evaluator.getOptimizingPendingInput();
logger.debug(
@@ -88,7 +99,21 @@ public class TableRuntimeRefreshExecutor extends
PeriodicTableScheduler {
} else {
tableRuntime.optimizingNotNecessary();
}
+
tableRuntime.setTableSummary(evaluator.getPendingInput());
+ return evaluatorIsNecessary;
+ } else if (!optimizingEnabled) {
+ logger.debug(
+ "{} optimizing is not enabled, skip evaluating pending input",
+ tableRuntime.getTableIdentifier());
+ // indicates no optimization demand now
+ return false;
+ } else {
+ logger.debug(
+ "{} optimizing is processing or is in preparation",
tableRuntime.getTableIdentifier());
+ // indicates optimization demand exists (preparation or processing),
+ // even though we don't trigger a new evaluation in this loop.
+ return true;
}
}
@@ -122,16 +147,131 @@ public class TableRuntimeRefreshExecutor extends
PeriodicTableScheduler {
AmoroTable<?> table = loadTable(tableRuntime);
defaultTableRuntime.refresh(table);
MixedTable mixedTable = (MixedTable) table.originalTable();
+ // Check if there is any optimizing demand now.
+ boolean hasOptimizingDemand = false;
if ((mixedTable.isKeyedTable()
&& (lastOptimizedSnapshotId !=
defaultTableRuntime.getCurrentSnapshotId()
|| lastOptimizedChangeSnapshotId
!= defaultTableRuntime.getCurrentChangeSnapshotId()))
|| (mixedTable.isUnkeyedTable()
&& lastOptimizedSnapshotId !=
defaultTableRuntime.getCurrentSnapshotId())) {
- tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
+ hasOptimizingDemand = tryEvaluatingPendingInput(defaultTableRuntime,
mixedTable);
+ } else {
+ logger.debug("{} optimizing is not necessary",
defaultTableRuntime.getTableIdentifier());
+ }
+
+ // Update adaptive interval according to evaluated result.
+ if
(defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled(interval))
{
+
defaultTableRuntime.setLatestEvaluatedNeedOptimizing(hasOptimizingDemand);
+ long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime);
+ defaultTableRuntime.setLatestRefreshInterval(newInterval);
}
} catch (Throwable throwable) {
logger.error("Refreshing table {} failed.",
tableRuntime.getTableIdentifier(), throwable);
}
}
+
+ /**
+ * Calculate adaptive execution interval based on table optimization status.
+ *
+ * <p>Uses AIMD (Additive Increase Multiplicative Decrease) algorithm
inspired by TCP congestion
+ * control:
+ *
+ * <ul>
+ * <li>If table does not need to be optimized: additive increase -
gradually extend interval to
+ * reduce resource consumption
+ * <li>If table needs optimization: multiplicative decrease - rapidly
reduce interval for quick
+ * response
+ * </ul>
+ *
+ * <p>Interval is bounded by [interval_min, interval_max] and kept in memory
only (resets to
+ * interval_min on restart).
+ *
+ * @param tableRuntime The table runtime information containing current
status and configuration
+ * @return The next execution interval in milliseconds
+ */
+ @VisibleForTesting
+ public long getAdaptiveExecutingInterval(DefaultTableRuntime tableRuntime) {
+ final long minInterval = interval;
+ final long maxInterval =
+
tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxIntervalMs();
+ long currentInterval = tableRuntime.getLatestRefreshInterval();
+
+ // Initialize interval on first run or after restart
+ if (currentInterval == 0) {
+ currentInterval = minInterval;
+ }
+
+ // Determine whether table needs optimization
+ boolean needOptimizing = tableRuntime.getLatestEvaluatedNeedOptimizing();
+
+ long nextInterval;
+ if (needOptimizing) {
+ nextInterval = decreaseInterval(currentInterval, minInterval);
+ logger.debug(
+ "Table {} needs optimization, decreasing interval from {}ms to {}ms",
+ tableRuntime.getTableIdentifier(),
+ currentInterval,
+ nextInterval);
+ } else {
+ nextInterval = increaseInterval(tableRuntime, currentInterval,
maxInterval);
+ logger.debug(
+ "Table {} does not need optimization, increasing interval from {}ms
to {}ms",
+ tableRuntime.getTableIdentifier(),
+ currentInterval,
+ nextInterval);
+ }
+
+ return nextInterval;
+ }
+
+ /**
+ * Decrease interval when table needs optimization.
+ *
+ * <p>Uses multiplicative decrease (halving) inspired by TCP Fast Recovery
algorithm for rapid
+ * response to table health issues.
+ *
+ * @param currentInterval Current refresh interval in milliseconds
+ * @param minInterval Minimum allowed interval in milliseconds
+ * @return New interval after decrease.
+ */
+ private long decreaseInterval(long currentInterval, long minInterval) {
+ long newInterval = currentInterval / 2;
+ long boundedInterval = Math.max(newInterval, minInterval);
+ if (newInterval < minInterval) {
+ logger.debug(
+ "Interval reached minimum boundary: attempted {}ms, capped at {}ms",
+ newInterval,
+ minInterval);
+ }
+
+ return boundedInterval;
+ }
+
+ /**
+ * Increase interval when table does not need optimization.
+ *
+ * <p>Uses additive increase inspired by TCP Congestion Avoidance algorithm
for gradual and stable
+ * growth.
+ *
+ * @param tableRuntime The table runtime information containing configuration
+ * @param currentInterval Current refresh interval in milliseconds
+ * @param maxInterval Maximum allowed interval in milliseconds
+ * @return New interval after increase.
+ */
+ private long increaseInterval(
+ DefaultTableRuntime tableRuntime, long currentInterval, long
maxInterval) {
+ long step =
tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveIncreaseStepMs();
+ long newInterval = currentInterval + step;
+ long boundedInterval = Math.min(newInterval, maxInterval);
+ if (newInterval > maxInterval) {
+ logger.debug(
+ "Interval reached maximum boundary: currentInterval is {}ms,
attempted {}ms, capped at {}ms",
+ currentInterval,
+ newInterval,
+ maxInterval);
+ }
+
+ return boundedInterval;
+ }
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index 8a337451c..648532c23 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -88,6 +88,8 @@ public class DefaultTableRuntime extends AbstractTableRuntime
{
private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics;
private final TableSummaryMetrics tableSummaryMetrics;
private volatile long lastPlanTime;
+ private volatile long latestRefreshInterval =
AmoroServiceConstants.INVALID_TIME;
+ private volatile boolean latestEvaluatedNeedOptimizing = true;
private volatile OptimizingProcess optimizingProcess;
private final List<TaskRuntime.TaskQuota> taskQuotas = new
CopyOnWriteArrayList<>();
@@ -139,6 +141,22 @@ public class DefaultTableRuntime extends
AbstractTableRuntime {
this.lastPlanTime = lastPlanTime;
}
+ public long getLatestRefreshInterval() {
+ return latestRefreshInterval;
+ }
+
+ public void setLatestRefreshInterval(long latestRefreshInterval) {
+ this.latestRefreshInterval = latestRefreshInterval;
+ }
+
+ public boolean getLatestEvaluatedNeedOptimizing() {
+ return this.latestEvaluatedNeedOptimizing;
+ }
+
+ public void setLatestEvaluatedNeedOptimizing(boolean
latestEvaluatedNeedOptimizing) {
+ this.latestEvaluatedNeedOptimizing = latestEvaluatedNeedOptimizing;
+ }
+
public OptimizingStatus getOptimizingStatus() {
return OptimizingStatus.ofCode(getStatusCode());
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
index c891a24f4..1601166db 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
@@ -341,7 +341,17 @@ public class TableConfigurations {
PropertyUtil.propertyAsLong(
properties,
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE,
-
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT));
+
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT))
+ .setRefreshTableAdaptiveMaxIntervalMs(
+ PropertyUtil.propertyAsLong(
+ properties,
+
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS,
+
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS_DEFAULT))
+ .setRefreshTableAdaptiveIncreaseStepMs(
+ PropertyUtil.propertyAsLong(
+ properties,
+
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS,
+
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS_DEFAULT));
}
/**
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
new file mode 100644
index 000000000..e75d9e16d
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.scheduler.inline;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.TestedCatalogs;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
+import org.apache.amoro.hive.catalog.HiveTableTestHelper;
+import org.apache.amoro.server.table.AMSTableTestBase;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.table.TableRuntimeStore;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestTableRuntimeRefreshExecutor extends AMSTableTestBase {
+ @Parameterized.Parameters(name = "{0}, {1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(true, true)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(false, true)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(false, false)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(true, false)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(true, true)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(false, true)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(false, false)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(true, false)
+ },
+ };
+ }
+
+ public TestTableRuntimeRefreshExecutor(
+ CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
+ super(catalogTestHelper, tableTestHelper);
+ }
+
+ private static final long INTERVAL = 60000L; // 1 minute
+ private static final long MAX_INTERVAL = 300000L; // 5 minutes
+ private static final long STEP = 30000L; // 30s
+ private static final int MAX_PENDING_PARTITIONS = 1;
+ private static final int MINOR_LEAST_INTERVAL = 3600000; // 1h
+
+ /**
+ * A test helper class that allows configuration updates. Reuses the same
TableRuntime instance
+ * across different test scenarios.
+ */
+ private static class TestTableRuntime extends DefaultTableRuntime {
+ private OptimizingConfig testOptimizingConfig;
+
+ TestTableRuntime(TableRuntimeStore store, OptimizingConfig
optimizingConfig) {
+ super(store);
+ this.testOptimizingConfig = optimizingConfig;
+ }
+
+ /** Update the optimizing config without creating a new instance */
+ void updateOptimizingConfig(OptimizingConfig newConfig) {
+ this.testOptimizingConfig = newConfig;
+ }
+
+ @Override
+ public OptimizingConfig getOptimizingConfig() {
+ return this.testOptimizingConfig;
+ }
+ }
+
+ /** Create OptimizingConfig with specified parameters */
+ private OptimizingConfig createOptimizingConfig(long maxInterval, long step)
{
+ OptimizingConfig config = new OptimizingConfig();
+ config.setRefreshTableAdaptiveMaxIntervalMs(maxInterval);
+ config.setMinorLeastInterval(MINOR_LEAST_INTERVAL);
+ config.setRefreshTableAdaptiveIncreaseStepMs(step);
+ return config;
+ }
+
+ @Test
+ public void testAdaptiveRefreshIntervalScenarios() {
+ createDatabase();
+ createTable();
+
+ // Get the original table runtime
+ DefaultTableRuntime baseRuntime =
+ (DefaultTableRuntime)
tableService().getRuntime(serverTableIdentifier().getId());
+ TableRuntimeRefreshExecutor executor =
+ new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL,
MAX_PENDING_PARTITIONS);
+
+ // Create a tableRuntime instance with initial config
+ OptimizingConfig initialConfig = createOptimizingConfig(MAX_INTERVAL,
STEP);
+ TestTableRuntime tableRuntime = new TestTableRuntime(baseRuntime.store(),
initialConfig);
+
+ // Test 1: Healthy table (not need optimizing) - interval should increase
by STEP
+ // Initial state: needOptimizing=false, latestRefreshInterval=0 (will use
default INTERVAL)
+ tableRuntime.setLatestEvaluatedNeedOptimizing(false);
+ tableRuntime.setLatestRefreshInterval(0);
+ long adaptiveExecutingInterval =
executor.getAdaptiveExecutingInterval(tableRuntime);
+ long expectedInterval = INTERVAL + STEP;
+ Assert.assertEquals(expectedInterval, adaptiveExecutingInterval);
+
+ // Test 2: Test minimum boundary - interval should not below INTERVAL
+ // Unhealthy table (need optimizing) - interval should decrease to half
+ // current interval is INTERVAL + STEP, the latest interval is INTERVAL
+ // not (INTERVAL + STEP) / 2
+ tableRuntime.setLatestEvaluatedNeedOptimizing(true);
+ tableRuntime.setLatestRefreshInterval(adaptiveExecutingInterval);
+ adaptiveExecutingInterval =
executor.getAdaptiveExecutingInterval(tableRuntime);
+ expectedInterval = INTERVAL;
+ Assert.assertEquals(expectedInterval, adaptiveExecutingInterval);
+
+ // Test 3: Healthy table with larger step value - interval should increase
by 8 * STEP
+ tableRuntime.setLatestEvaluatedNeedOptimizing(false);
+ tableRuntime.setLatestRefreshInterval(adaptiveExecutingInterval);
+ tableRuntime.updateOptimizingConfig(createOptimizingConfig(MAX_INTERVAL, 8
* STEP));
+ adaptiveExecutingInterval =
executor.getAdaptiveExecutingInterval(tableRuntime);
+ expectedInterval = expectedInterval + 8 * STEP;
+ Assert.assertEquals(expectedInterval, adaptiveExecutingInterval);
+
+ // Test 4: Maximum boundary - interval should not exceed MAX_INTERVAL
+ tableRuntime.setLatestRefreshInterval(adaptiveExecutingInterval);
+ tableRuntime.updateOptimizingConfig(createOptimizingConfig(MAX_INTERVAL,
STEP));
+ // current interval is INTERVAL + 8 * STEP, the latest interval is
MAX_INTERVAL
+ // rather than INTERVAL + 9 * STEP
+ adaptiveExecutingInterval =
executor.getAdaptiveExecutingInterval(tableRuntime);
+ Assert.assertEquals(MAX_INTERVAL, adaptiveExecutingInterval);
+
+ dropTable();
+ dropDatabase();
+ }
+
+ @Test
+ public void testGetNextExecutingTime() {
+ createDatabase();
+ createTable();
+
+ // Get the original table runtime
+ DefaultTableRuntime baseRuntime =
+ (DefaultTableRuntime)
tableService().getRuntime(serverTableIdentifier().getId());
+ TableRuntimeRefreshExecutor executor =
+ new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL,
MAX_PENDING_PARTITIONS);
+
+ // Create a tableRuntime instance with adaptive interval enabled
+ OptimizingConfig configWithAdaptiveEnabled =
createOptimizingConfig(MAX_INTERVAL, STEP);
+ TestTableRuntime tableRuntime =
+ new TestTableRuntime(baseRuntime.store(), configWithAdaptiveEnabled);
+
+ // Test 1: getNextExecutingTime with adaptive interval enabled and positive
+ // latestRefreshInterval
+ // Set a positive latestRefreshInterval to enable adaptive behavior
+ tableRuntime.setLatestRefreshInterval(MAX_INTERVAL);
+ long nextExecutingTime = executor.getNextExecutingTime(tableRuntime);
+ Assert.assertEquals(MAX_INTERVAL, nextExecutingTime);
+
+ // Test 2: getNextExecutingTime with adaptive interval enabled but
latestRefreshInterval is 0
+ // Should fall back to min(minorLeastInterval * 4/5, INTERVAL)
+ tableRuntime.setLatestRefreshInterval(0);
+ nextExecutingTime = executor.getNextExecutingTime(tableRuntime);
+ long expectedFallbackInterval = Math.min(MINOR_LEAST_INTERVAL * 4L / 5,
INTERVAL);
+ Assert.assertEquals(expectedFallbackInterval, nextExecutingTime);
+
+ // Test3: MaxInterval should be greater than minInterval
+ // If maxInterval <= minInterval, getNextExecutingTime with adaptive
interval disabled and
+ // fallback to min(minorLeastInterval * 4/5, INTERVAL)
+ long maxInterval = INTERVAL - 1000;
+ tableRuntime.updateOptimizingConfig(createOptimizingConfig(maxInterval,
STEP));
+ tableRuntime.setLatestRefreshInterval(INTERVAL + 1000);
+ nextExecutingTime = executor.getNextExecutingTime(tableRuntime);
+ Assert.assertEquals(expectedFallbackInterval, nextExecutingTime);
+
+ dropTable();
+ dropDatabase();
+ }
+}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
index 0c743ac6b..39afbdbb5 100644
--- a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
+++ b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
@@ -97,6 +97,12 @@ public class OptimizingConfig {
// self-optimizing.evaluation.fallback-interval
private long evaluationFallbackInterval;
+ // self-optimizing.refresh-table.adaptive.max-interval-ms
+ private long refreshTableAdaptiveMaxIntervalMs;
+
+ // self-optimizing.refresh-table.adaptive.increase-step-ms
+ private long refreshTableAdaptiveIncreaseStepMs;
+
public OptimizingConfig() {}
public boolean isEnabled() {
@@ -318,6 +324,36 @@ public class OptimizingConfig {
return this;
}
+ /**
+ * Adaptive refresh is enabled only when max interval is valid and greater
than min interval.
+ *
+ * @param minInterval the minimum interval
+ * @return true if the adaptive refresh is enabled
+ */
+ public boolean isRefreshTableAdaptiveEnabled(long minInterval) {
+ return refreshTableAdaptiveMaxIntervalMs > 0 &&
refreshTableAdaptiveMaxIntervalMs > minInterval;
+ }
+
+ public long getRefreshTableAdaptiveMaxIntervalMs() {
+ return refreshTableAdaptiveMaxIntervalMs;
+ }
+
+ public OptimizingConfig setRefreshTableAdaptiveMaxIntervalMs(
+ long refreshTableAdaptiveMaxIntervalMs) {
+ this.refreshTableAdaptiveMaxIntervalMs = refreshTableAdaptiveMaxIntervalMs;
+ return this;
+ }
+
+ public long getRefreshTableAdaptiveIncreaseStepMs() {
+ return refreshTableAdaptiveIncreaseStepMs;
+ }
+
+ public OptimizingConfig setRefreshTableAdaptiveIncreaseStepMs(
+ long refreshTableAdaptiveIncreaseStepMs) {
+ this.refreshTableAdaptiveIncreaseStepMs =
refreshTableAdaptiveIncreaseStepMs;
+ return this;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -350,7 +386,10 @@ public class OptimizingConfig {
&& Objects.equal(optimizerGroup, that.optimizerGroup)
&& Objects.equal(minPlanInterval, that.minPlanInterval)
&& Objects.equal(evaluationMseTolerance, that.evaluationMseTolerance)
- && Objects.equal(evaluationFallbackInterval,
that.evaluationFallbackInterval);
+ && Objects.equal(evaluationFallbackInterval,
that.evaluationFallbackInterval)
+ && Objects.equal(refreshTableAdaptiveMaxIntervalMs,
that.refreshTableAdaptiveMaxIntervalMs)
+ && Objects.equal(
+ refreshTableAdaptiveIncreaseStepMs,
that.refreshTableAdaptiveIncreaseStepMs);
}
@Override
@@ -379,7 +418,9 @@ public class OptimizingConfig {
hiveRefreshInterval,
minPlanInterval,
evaluationMseTolerance,
- evaluationFallbackInterval);
+ evaluationFallbackInterval,
+ refreshTableAdaptiveMaxIntervalMs,
+ refreshTableAdaptiveIncreaseStepMs);
}
@Override
@@ -407,6 +448,8 @@ public class OptimizingConfig {
.add("hiveRefreshInterval", hiveRefreshInterval)
.add("evaluationMseTolerance", evaluationMseTolerance)
.add("evaluationFallbackInterval", evaluationFallbackInterval)
+ .add("refreshTableAdaptiveMaxIntervalMs",
refreshTableAdaptiveMaxIntervalMs)
+ .add("refreshTableAdaptiveIncreaseStepMs",
refreshTableAdaptiveIncreaseStepMs)
.toString();
}
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
index 0d804cdec..015c68d48 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
@@ -160,6 +160,15 @@ public class TableProperties {
public static final String SNAPSHOT_MIN_COUNT = "snapshot.keep.min-count";
public static final int SNAPSHOT_MIN_COUNT_DEFAULT = 1;
+ public static final String
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS =
+ "self-optimizing.refresh-table.adaptive.max-interval-ms";
+ public static final long
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS_DEFAULT =
+ 0; // disabled
+ public static final String
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS =
+ "self-optimizing.refresh-table.adaptive.increase-step-ms";
+ public static final long
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS_DEFAULT =
+ 30000; // 30s
+
/**
* The retention period for snapshots created by Flink checkpoints.
Snapshots older than this
* duration may be cleaned up. Avoid keeping the last flink checkpoint
snapshot for too long, as
diff --git a/docs/user-guides/configurations.md
b/docs/user-guides/configurations.md
index 1895812ce..e25c52811 100644
--- a/docs/user-guides/configurations.md
+++ b/docs/user-guides/configurations.md
@@ -43,25 +43,27 @@ modified through [Alter
Table](../using-tables/#modify-table) operations.
Self-optimizing configurations are applicable to both Iceberg Format and Mixed
streaming Format.
-| Key | Default |
Description
|
-|-----------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| self-optimizing.enabled | true | Enables
Self-optimizing
|
-| self-optimizing.allow-partial-commit | false | Whether
to allow partial commit when self-optimizing fails or process is cancelled
|
-| self-optimizing.group | default | Optimizer
group for Self-optimizing
|
-| self-optimizing.quota | 0.5 | Quota for
Self-optimizing, indicating the optimizer resources the table can take up
|
-| self-optimizing.execute.num-retries | 5 | Number of
retries after failure of Self-optimizing
|
-| self-optimizing.target-size | 134217728(128MB) | Target
size for Self-optimizing
|
-| self-optimizing.max-file-count | 10000 | Maximum
number of files processed by a Self-optimizing process
|
-| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum
file size bytes in a single task for splitting tasks
|
-| self-optimizing.fragment-ratio | 8 | The
fragment file size threshold. We could divide self-optimizing.target-size by
this ratio to get the actual fragment file size
|
-| self-optimizing.min-target-size-ratio | 0.75 | The
undersized segment file size threshold. Segment files under this threshold will
be considered for rewriting
|
-| self-optimizing.minor.trigger.file-count | 12 | The
minimum number of files to trigger minor optimizing is determined by the sum of
fragment file count and equality delete file count
|
-| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time
interval in milliseconds to trigger minor optimizing
|
-| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio
of duplicate data of segment files to trigger major optimizing
|
-| self-optimizing.full.trigger.interval | -1(closed) | The time
interval in milliseconds to trigger full optimizing
|
-| self-optimizing.full.rewrite-all-files | true | Whether
full optimizing rewrites all files or skips files that do not need to be
optimized
|
-| self-optimizing.min-plan-interval | 60000 | The
minimum time interval between two self-optimizing planning action
|
-| self-optimizing.filter | NULL | Filter
conditions for self-optimizing, using SQL conditional expressions, without
supporting any functions. For the timestamp column condition, the ISO date-time
formatter must be used. For example: op_time > '2007-12-03T10:15:30'. |
+| Key | Default |
Description
|
+|---------------------------------------------------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| self-optimizing.enabled | true |
Enables Self-optimizing
|
+| self-optimizing.allow-partial-commit | false |
Whether to allow partial commit when self-optimizing fails or process is
cancelled
|
+| self-optimizing.group | default |
Optimizer group for Self-optimizing
|
+| self-optimizing.quota | 0.5 |
Quota for Self-optimizing, indicating the optimizer resources the table can
take up
|
+| self-optimizing.execute.num-retries | 5 |
Number of retries after failure of Self-optimizing
|
+| self-optimizing.target-size | 134217728(128MB) |
Target size for Self-optimizing
|
+| self-optimizing.max-file-count | 10000 |
Maximum number of files processed by a Self-optimizing process
|
+| self-optimizing.max-task-size-bytes | 134217728(128MB) |
Maximum file size bytes in a single task for splitting tasks
|
+| self-optimizing.fragment-ratio | 8 |
The fragment file size threshold. We could divide self-optimizing.target-size
by this ratio to get the actual fragment file size
|
+| self-optimizing.min-target-size-ratio | 0.75 |
The undersized segment file size threshold. Segment files under this threshold
will be considered for rewriting
|
+| self-optimizing.minor.trigger.file-count | 12 |
The minimum number of files to trigger minor optimizing is determined by the
sum of fragment file count and equality delete file count
|
+| self-optimizing.minor.trigger.interval | 3600000(1 hour) |
The time interval in milliseconds to trigger minor optimizing
|
+| self-optimizing.major.trigger.duplicate-ratio | 0.1 |
The ratio of duplicate data of segment files to trigger major optimizing
|
+| self-optimizing.full.trigger.interval | -1(closed) |
The time interval in milliseconds to trigger full optimizing
|
+| self-optimizing.full.rewrite-all-files | true |
Whether full optimizing rewrites all files or skips files that do not need to
be optimized
|
+| self-optimizing.min-plan-interval | 60000 |
The minimum time interval between two self-optimizing planning action
|
+| self-optimizing.filter | NULL |
Filter conditions for self-optimizing, using SQL conditional expressions,
without supporting any functions. For the timestamp column condition, the ISO
date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'.
|
+| self-optimizing.refresh-table.adaptive.max-interval-ms | 0 |
The maximum time interval in milliseconds to refresh table metadata. 0 means
disable adaptive refresh. When enabled, the value must be greater than
'refresh-tables.interval' and may exceed
'self-optimizing.minor.trigger.interval' * 4/5; if not, adaptive refresh will
be automatically disabled. |
+| self-optimizing.refresh-table.adaptive.increase-step-ms | 30000(30s) |
The time interval increase step in milliseconds to refresh table metadata
|
## Data-cleaning configurations