This is an automated email from the ASF dual-hosted git repository.

klion26 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 1af9f36e2 [AMORO-4037] Support dynamic refresh interval for table 
metadata refreshing in TableRuntimeRefreshExecutor (#4038)
1af9f36e2 is described below

commit 1af9f36e24d5f0df115eae6edb1ddfeb46ac159e
Author: zhangwl9 <[email protected]>
AuthorDate: Tue Mar 3 10:29:47 2026 +0800

    [AMORO-4037] Support dynamic refresh interval for table metadata refreshing 
in TableRuntimeRefreshExecutor (#4038)
    
    This commit adds a dynamic refresh interval for table metadata refreshing 
support.
    
    This is useful for a catalog that contains real-time tables and some static 
tables.
    ---------
    
    Co-authored-by: 张文领 <[email protected]>
---
 .../inline/TableRuntimeRefreshExecutor.java        | 152 ++++++++++++++-
 .../amoro/server/table/DefaultTableRuntime.java    |  18 ++
 .../amoro/server/table/TableConfigurations.java    |  12 +-
 .../inline/TestTableRuntimeRefreshExecutor.java    | 215 +++++++++++++++++++++
 .../org/apache/amoro/config/OptimizingConfig.java  |  47 ++++-
 .../org/apache/amoro/table/TableProperties.java    |   9 +
 docs/user-guides/configurations.md                 |  40 ++--
 7 files changed, 465 insertions(+), 28 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
index 18071a2cd..b6b7e4ba6 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java
@@ -31,6 +31,7 @@ import 
org.apache.amoro.server.scheduler.PeriodicTableScheduler;
 import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.TableService;
 import org.apache.amoro.server.utils.IcebergTableUtil;
+import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.table.MixedTable;
 
@@ -56,15 +57,23 @@ public class TableRuntimeRefreshExecutor extends 
PeriodicTableScheduler {
   @Override
   protected long getNextExecutingTime(TableRuntime tableRuntime) {
     DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) 
tableRuntime;
+
+    if 
(defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled(interval))
 {
+      long newInterval = defaultTableRuntime.getLatestRefreshInterval();
+      if (newInterval > 0) {
+        return newInterval;
+      }
+    }
+
     return Math.min(
         defaultTableRuntime.getOptimizingConfig().getMinorLeastInterval() * 4L 
/ 5, interval);
   }
 
-  private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, 
MixedTable table) {
+  private boolean tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, 
MixedTable table) {
     // only evaluate pending input when optimizing is enabled and in idle state
     OptimizingConfig optimizingConfig = tableRuntime.getOptimizingConfig();
-    if (optimizingConfig.isEnabled()
-        && tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {
+    boolean optimizingEnabled = optimizingConfig.isEnabled();
+    if (optimizingEnabled && 
tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {
 
       if (optimizingConfig.isMetadataBasedTriggerEnabled()
           && !MetadataBasedEvaluationEvent.isEvaluatingNecessary(
@@ -72,12 +81,14 @@ public class TableRuntimeRefreshExecutor extends 
PeriodicTableScheduler {
         logger.debug(
             "{} optimizing is not necessary due to metadata based trigger",
             tableRuntime.getTableIdentifier());
-        return;
+        // indicates no optimization demand now
+        return false;
       }
 
       AbstractOptimizingEvaluator evaluator =
           IcebergTableUtil.createOptimizingEvaluator(tableRuntime, table, 
maxPendingPartitions);
-      if (evaluator.isNecessary()) {
+      boolean evaluatorIsNecessary = evaluator.isNecessary();
+      if (evaluatorIsNecessary) {
         AbstractOptimizingEvaluator.PendingInput pendingInput =
             evaluator.getOptimizingPendingInput();
         logger.debug(
@@ -88,7 +99,21 @@ public class TableRuntimeRefreshExecutor extends 
PeriodicTableScheduler {
       } else {
         tableRuntime.optimizingNotNecessary();
       }
+
       tableRuntime.setTableSummary(evaluator.getPendingInput());
+      return evaluatorIsNecessary;
+    } else if (!optimizingEnabled) {
+      logger.debug(
+          "{} optimizing is not enabled, skip evaluating pending input",
+          tableRuntime.getTableIdentifier());
+      // indicates no optimization demand now
+      return false;
+    } else {
+      logger.debug(
+          "{} optimizing is processing or is in preparation", 
tableRuntime.getTableIdentifier());
+      // indicates optimization demand exists (preparation or processing),
+      // even though we don't trigger a new evaluation in this loop.
+      return true;
     }
   }
 
@@ -122,16 +147,131 @@ public class TableRuntimeRefreshExecutor extends 
PeriodicTableScheduler {
       AmoroTable<?> table = loadTable(tableRuntime);
       defaultTableRuntime.refresh(table);
       MixedTable mixedTable = (MixedTable) table.originalTable();
+      // Check if there is any optimizing demand now.
+      boolean hasOptimizingDemand = false;
       if ((mixedTable.isKeyedTable()
               && (lastOptimizedSnapshotId != 
defaultTableRuntime.getCurrentSnapshotId()
                   || lastOptimizedChangeSnapshotId
                       != defaultTableRuntime.getCurrentChangeSnapshotId()))
           || (mixedTable.isUnkeyedTable()
               && lastOptimizedSnapshotId != 
defaultTableRuntime.getCurrentSnapshotId())) {
-        tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
+        hasOptimizingDemand = tryEvaluatingPendingInput(defaultTableRuntime, 
mixedTable);
+      } else {
+        logger.debug("{} optimizing is not necessary", 
defaultTableRuntime.getTableIdentifier());
+      }
+
+      // Update adaptive interval according to evaluated result.
+      if 
(defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled(interval))
 {
+        
defaultTableRuntime.setLatestEvaluatedNeedOptimizing(hasOptimizingDemand);
+        long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime);
+        defaultTableRuntime.setLatestRefreshInterval(newInterval);
       }
     } catch (Throwable throwable) {
       logger.error("Refreshing table {} failed.", 
tableRuntime.getTableIdentifier(), throwable);
     }
   }
+
+  /**
+   * Calculate adaptive execution interval based on table optimization status.
+   *
+   * <p>Uses AIMD (Additive Increase Multiplicative Decrease) algorithm 
inspired by TCP congestion
+   * control:
+   *
+   * <ul>
+   *   <li>If table does not need to be optimized: additive increase - 
gradually extend interval to
+   *       reduce resource consumption
+   *   <li>If table needs optimization: multiplicative decrease - rapidly 
reduce interval for quick
+   *       response
+   * </ul>
+   *
+   * <p>Interval is bounded by [interval_min, interval_max] and kept in memory 
only (resets to
+   * interval_min on restart).
+   *
+   * @param tableRuntime The table runtime information containing current 
status and configuration
+   * @return The next execution interval in milliseconds
+   */
+  @VisibleForTesting
+  public long getAdaptiveExecutingInterval(DefaultTableRuntime tableRuntime) {
+    final long minInterval = interval;
+    final long maxInterval =
+        
tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxIntervalMs();
+    long currentInterval = tableRuntime.getLatestRefreshInterval();
+
+    // Initialize interval on first run or after restart
+    if (currentInterval == 0) {
+      currentInterval = minInterval;
+    }
+
+    // Determine whether table needs optimization
+    boolean needOptimizing = tableRuntime.getLatestEvaluatedNeedOptimizing();
+
+    long nextInterval;
+    if (needOptimizing) {
+      nextInterval = decreaseInterval(currentInterval, minInterval);
+      logger.debug(
+          "Table {} needs optimization, decreasing interval from {}ms to {}ms",
+          tableRuntime.getTableIdentifier(),
+          currentInterval,
+          nextInterval);
+    } else {
+      nextInterval = increaseInterval(tableRuntime, currentInterval, 
maxInterval);
+      logger.debug(
+          "Table {} does not need optimization, increasing interval from {}ms 
to {}ms",
+          tableRuntime.getTableIdentifier(),
+          currentInterval,
+          nextInterval);
+    }
+
+    return nextInterval;
+  }
+
+  /**
+   * Decrease interval when table needs optimization.
+   *
+   * <p>Uses multiplicative decrease (halving) inspired by TCP Fast Recovery 
algorithm for rapid
+   * response to table health issues.
+   *
+   * @param currentInterval Current refresh interval in milliseconds
+   * @param minInterval Minimum allowed interval in milliseconds
+   * @return New interval after decrease.
+   */
+  private long decreaseInterval(long currentInterval, long minInterval) {
+    long newInterval = currentInterval / 2;
+    long boundedInterval = Math.max(newInterval, minInterval);
+    if (newInterval < minInterval) {
+      logger.debug(
+          "Interval reached minimum boundary: attempted {}ms, capped at {}ms",
+          newInterval,
+          minInterval);
+    }
+
+    return boundedInterval;
+  }
+
+  /**
+   * Increase interval when table does not need optimization.
+   *
+   * <p>Uses additive increase inspired by TCP Congestion Avoidance algorithm 
for gradual and stable
+   * growth.
+   *
+   * @param tableRuntime The table runtime information containing configuration
+   * @param currentInterval Current refresh interval in milliseconds
+   * @param maxInterval Maximum allowed interval in milliseconds
+   * @return New interval after increase.
+   */
+  private long increaseInterval(
+      DefaultTableRuntime tableRuntime, long currentInterval, long 
maxInterval) {
+    long step = 
tableRuntime.getOptimizingConfig().getRefreshTableAdaptiveIncreaseStepMs();
+    long newInterval = currentInterval + step;
+    long boundedInterval = Math.min(newInterval, maxInterval);
+    if (newInterval > maxInterval) {
+      logger.debug(
+          "Interval reached maximum boundary: currentInterval is {}ms, 
attempted {}ms, capped at {}ms",
+          currentInterval,
+          newInterval,
+          maxInterval);
+    }
+
+    return boundedInterval;
+  }
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index 8a337451c..648532c23 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -88,6 +88,8 @@ public class DefaultTableRuntime extends AbstractTableRuntime 
{
   private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics;
   private final TableSummaryMetrics tableSummaryMetrics;
   private volatile long lastPlanTime;
+  private volatile long latestRefreshInterval = 
AmoroServiceConstants.INVALID_TIME;
+  private volatile boolean latestEvaluatedNeedOptimizing = true;
   private volatile OptimizingProcess optimizingProcess;
   private final List<TaskRuntime.TaskQuota> taskQuotas = new 
CopyOnWriteArrayList<>();
 
@@ -139,6 +141,22 @@ public class DefaultTableRuntime extends 
AbstractTableRuntime {
     this.lastPlanTime = lastPlanTime;
   }
 
+  public long getLatestRefreshInterval() {
+    return latestRefreshInterval;
+  }
+
+  public void setLatestRefreshInterval(long latestRefreshInterval) {
+    this.latestRefreshInterval = latestRefreshInterval;
+  }
+
+  public boolean getLatestEvaluatedNeedOptimizing() {
+    return this.latestEvaluatedNeedOptimizing;
+  }
+
+  public void setLatestEvaluatedNeedOptimizing(boolean 
latestEvaluatedNeedOptimizing) {
+    this.latestEvaluatedNeedOptimizing = latestEvaluatedNeedOptimizing;
+  }
+
   public OptimizingStatus getOptimizingStatus() {
     return OptimizingStatus.ofCode(getStatusCode());
   }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
index c891a24f4..1601166db 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
@@ -341,7 +341,17 @@ public class TableConfigurations {
             PropertyUtil.propertyAsLong(
                 properties,
                 
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE,
-                
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT));
+                
TableProperties.SELF_OPTIMIZING_EVALUATION_FILE_SIZE_MSE_TOLERANCE_DEFAULT))
+        .setRefreshTableAdaptiveMaxIntervalMs(
+            PropertyUtil.propertyAsLong(
+                properties,
+                
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS,
+                
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS_DEFAULT))
+        .setRefreshTableAdaptiveIncreaseStepMs(
+            PropertyUtil.propertyAsLong(
+                properties,
+                
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS,
+                
TableProperties.SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS_DEFAULT));
   }
 
   /**
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
new file mode 100644
index 000000000..e75d9e16d
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.scheduler.inline;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.TestedCatalogs;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
+import org.apache.amoro.hive.catalog.HiveTableTestHelper;
+import org.apache.amoro.server.table.AMSTableTestBase;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.table.TableRuntimeStore;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestTableRuntimeRefreshExecutor extends AMSTableTestBase {
+  @Parameterized.Parameters(name = "{0}, {1}")
+  public static Object[] parameters() {
+    return new Object[][] {
+      {
+        TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+        new BasicTableTestHelper(true, true)
+      },
+      {
+        TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+        new BasicTableTestHelper(false, true)
+      },
+      {
+        TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+        new BasicTableTestHelper(false, false)
+      },
+      {
+        TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+        new BasicTableTestHelper(true, false)
+      },
+      {
+        new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+        new HiveTableTestHelper(true, true)
+      },
+      {
+        new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+        new HiveTableTestHelper(false, true)
+      },
+      {
+        new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+        new HiveTableTestHelper(false, false)
+      },
+      {
+        new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+        new HiveTableTestHelper(true, false)
+      },
+    };
+  }
+
+  public TestTableRuntimeRefreshExecutor(
+      CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
+    super(catalogTestHelper, tableTestHelper);
+  }
+
+  private static final long INTERVAL = 60000L; // 1 minute
+  private static final long MAX_INTERVAL = 300000L; // 5 minutes
+  private static final long STEP = 30000L; // 30s
+  private static final int MAX_PENDING_PARTITIONS = 1;
+  private static final int MINOR_LEAST_INTERVAL = 3600000; // 1h
+
+  /**
+   * A test helper class that allows configuration updates. Reuses the same 
TableRuntime instance
+   * across different test scenarios.
+   */
+  private static class TestTableRuntime extends DefaultTableRuntime {
+    private OptimizingConfig testOptimizingConfig;
+
+    TestTableRuntime(TableRuntimeStore store, OptimizingConfig 
optimizingConfig) {
+      super(store);
+      this.testOptimizingConfig = optimizingConfig;
+    }
+
+    /** Update the optimizing config without creating a new instance */
+    void updateOptimizingConfig(OptimizingConfig newConfig) {
+      this.testOptimizingConfig = newConfig;
+    }
+
+    @Override
+    public OptimizingConfig getOptimizingConfig() {
+      return this.testOptimizingConfig;
+    }
+  }
+
+  /** Create OptimizingConfig with specified parameters */
+  private OptimizingConfig createOptimizingConfig(long maxInterval, long step) 
{
+    OptimizingConfig config = new OptimizingConfig();
+    config.setRefreshTableAdaptiveMaxIntervalMs(maxInterval);
+    config.setMinorLeastInterval(MINOR_LEAST_INTERVAL);
+    config.setRefreshTableAdaptiveIncreaseStepMs(step);
+    return config;
+  }
+
+  @Test
+  public void testAdaptiveRefreshIntervalScenarios() {
+    createDatabase();
+    createTable();
+
+    // Get the original table runtime
+    DefaultTableRuntime baseRuntime =
+        (DefaultTableRuntime) 
tableService().getRuntime(serverTableIdentifier().getId());
+    TableRuntimeRefreshExecutor executor =
+        new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL, 
MAX_PENDING_PARTITIONS);
+
+    // Create a tableRuntime instance with initial config
+    OptimizingConfig initialConfig = createOptimizingConfig(MAX_INTERVAL, 
STEP);
+    TestTableRuntime tableRuntime = new TestTableRuntime(baseRuntime.store(), 
initialConfig);
+
+    // Test 1: Healthy table (not need optimizing) - interval should increase 
by STEP
+    // Initial state: needOptimizing=false, latestRefreshInterval=0 (will use 
default INTERVAL)
+    tableRuntime.setLatestEvaluatedNeedOptimizing(false);
+    tableRuntime.setLatestRefreshInterval(0);
+    long adaptiveExecutingInterval = 
executor.getAdaptiveExecutingInterval(tableRuntime);
+    long expectedInterval = INTERVAL + STEP;
+    Assert.assertEquals(expectedInterval, adaptiveExecutingInterval);
+
+    // Test 2: Test minimum boundary - interval should not below INTERVAL
+    // Unhealthy table (need optimizing) - interval should decrease to half
+    // current interval is INTERVAL + STEP, the latest interval is INTERVAL
+    // not (INTERVAL + STEP) / 2
+    tableRuntime.setLatestEvaluatedNeedOptimizing(true);
+    tableRuntime.setLatestRefreshInterval(adaptiveExecutingInterval);
+    adaptiveExecutingInterval = 
executor.getAdaptiveExecutingInterval(tableRuntime);
+    expectedInterval = INTERVAL;
+    Assert.assertEquals(expectedInterval, adaptiveExecutingInterval);
+
+    // Test 3: Healthy table with larger step value - interval should increase 
by 8 * STEP
+    tableRuntime.setLatestEvaluatedNeedOptimizing(false);
+    tableRuntime.setLatestRefreshInterval(adaptiveExecutingInterval);
+    tableRuntime.updateOptimizingConfig(createOptimizingConfig(MAX_INTERVAL, 8 
* STEP));
+    adaptiveExecutingInterval = 
executor.getAdaptiveExecutingInterval(tableRuntime);
+    expectedInterval = expectedInterval + 8 * STEP;
+    Assert.assertEquals(expectedInterval, adaptiveExecutingInterval);
+
+    // Test 4: Maximum boundary - interval should not exceed MAX_INTERVAL
+    tableRuntime.setLatestRefreshInterval(adaptiveExecutingInterval);
+    tableRuntime.updateOptimizingConfig(createOptimizingConfig(MAX_INTERVAL, 
STEP));
+    // current interval is INTERVAL + 8 * STEP, the latest interval is 
MAX_INTERVAL
+    // rather than INTERVAL + 9 * STEP
+    adaptiveExecutingInterval = 
executor.getAdaptiveExecutingInterval(tableRuntime);
+    Assert.assertEquals(MAX_INTERVAL, adaptiveExecutingInterval);
+
+    dropTable();
+    dropDatabase();
+  }
+
+  @Test
+  public void testGetNextExecutingTime() {
+    createDatabase();
+    createTable();
+
+    // Get the original table runtime
+    DefaultTableRuntime baseRuntime =
+        (DefaultTableRuntime) 
tableService().getRuntime(serverTableIdentifier().getId());
+    TableRuntimeRefreshExecutor executor =
+        new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL, 
MAX_PENDING_PARTITIONS);
+
+    // Create a tableRuntime instance with adaptive interval enabled
+    OptimizingConfig configWithAdaptiveEnabled = 
createOptimizingConfig(MAX_INTERVAL, STEP);
+    TestTableRuntime tableRuntime =
+        new TestTableRuntime(baseRuntime.store(), configWithAdaptiveEnabled);
+
+    // Test 1: getNextExecutingTime with adaptive interval enabled and positive
+    // latestRefreshInterval
+    // Set a positive latestRefreshInterval to enable adaptive behavior
+    tableRuntime.setLatestRefreshInterval(MAX_INTERVAL);
+    long nextExecutingTime = executor.getNextExecutingTime(tableRuntime);
+    Assert.assertEquals(MAX_INTERVAL, nextExecutingTime);
+
+    // Test 2: getNextExecutingTime with adaptive interval enabled but 
latestRefreshInterval is 0
+    // Should fall back to min(minorLeastInterval * 4/5, INTERVAL)
+    tableRuntime.setLatestRefreshInterval(0);
+    nextExecutingTime = executor.getNextExecutingTime(tableRuntime);
+    long expectedFallbackInterval = Math.min(MINOR_LEAST_INTERVAL * 4L / 5, 
INTERVAL);
+    Assert.assertEquals(expectedFallbackInterval, nextExecutingTime);
+
+    // Test3: MaxInterval should be greater than minInterval
+    // If maxInterval <= minInterval, getNextExecutingTime with adaptive 
interval disabled and
+    // fallback to min(minorLeastInterval * 4/5, INTERVAL)
+    long maxInterval = INTERVAL - 1000;
+    tableRuntime.updateOptimizingConfig(createOptimizingConfig(maxInterval, 
STEP));
+    tableRuntime.setLatestRefreshInterval(INTERVAL + 1000);
+    nextExecutingTime = executor.getNextExecutingTime(tableRuntime);
+    Assert.assertEquals(expectedFallbackInterval, nextExecutingTime);
+
+    dropTable();
+    dropDatabase();
+  }
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java 
b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
index 0c743ac6b..39afbdbb5 100644
--- a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
+++ b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java
@@ -97,6 +97,12 @@ public class OptimizingConfig {
   // self-optimizing.evaluation.fallback-interval
   private long evaluationFallbackInterval;
 
+  // self-optimizing.refresh-table.adaptive.max-interval-ms
+  private long refreshTableAdaptiveMaxIntervalMs;
+
+  // self-optimizing.refresh-table.adaptive.increase-step-ms
+  private long refreshTableAdaptiveIncreaseStepMs;
+
   public OptimizingConfig() {}
 
   public boolean isEnabled() {
@@ -318,6 +324,36 @@ public class OptimizingConfig {
     return this;
   }
 
+  /**
+   * Adaptive refresh is enabled only when max interval is valid and greater 
than min interval.
+   *
+   * @param minInterval the minimum interval
+   * @return true if the adaptive refresh is enabled
+   */
+  public boolean isRefreshTableAdaptiveEnabled(long minInterval) {
+    return refreshTableAdaptiveMaxIntervalMs > 0 && 
refreshTableAdaptiveMaxIntervalMs > minInterval;
+  }
+
+  public long getRefreshTableAdaptiveMaxIntervalMs() {
+    return refreshTableAdaptiveMaxIntervalMs;
+  }
+
+  public OptimizingConfig setRefreshTableAdaptiveMaxIntervalMs(
+      long refreshTableAdaptiveMaxIntervalMs) {
+    this.refreshTableAdaptiveMaxIntervalMs = refreshTableAdaptiveMaxIntervalMs;
+    return this;
+  }
+
+  public long getRefreshTableAdaptiveIncreaseStepMs() {
+    return refreshTableAdaptiveIncreaseStepMs;
+  }
+
+  public OptimizingConfig setRefreshTableAdaptiveIncreaseStepMs(
+      long refreshTableAdaptiveIncreaseStepMs) {
+    this.refreshTableAdaptiveIncreaseStepMs = 
refreshTableAdaptiveIncreaseStepMs;
+    return this;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -350,7 +386,10 @@ public class OptimizingConfig {
         && Objects.equal(optimizerGroup, that.optimizerGroup)
         && Objects.equal(minPlanInterval, that.minPlanInterval)
         && Objects.equal(evaluationMseTolerance, that.evaluationMseTolerance)
-        && Objects.equal(evaluationFallbackInterval, 
that.evaluationFallbackInterval);
+        && Objects.equal(evaluationFallbackInterval, 
that.evaluationFallbackInterval)
+        && Objects.equal(refreshTableAdaptiveMaxIntervalMs, 
that.refreshTableAdaptiveMaxIntervalMs)
+        && Objects.equal(
+            refreshTableAdaptiveIncreaseStepMs, 
that.refreshTableAdaptiveIncreaseStepMs);
   }
 
   @Override
@@ -379,7 +418,9 @@ public class OptimizingConfig {
         hiveRefreshInterval,
         minPlanInterval,
         evaluationMseTolerance,
-        evaluationFallbackInterval);
+        evaluationFallbackInterval,
+        refreshTableAdaptiveMaxIntervalMs,
+        refreshTableAdaptiveIncreaseStepMs);
   }
 
   @Override
@@ -407,6 +448,8 @@ public class OptimizingConfig {
         .add("hiveRefreshInterval", hiveRefreshInterval)
         .add("evaluationMseTolerance", evaluationMseTolerance)
         .add("evaluationFallbackInterval", evaluationFallbackInterval)
+        .add("refreshTableAdaptiveMaxIntervalMs", 
refreshTableAdaptiveMaxIntervalMs)
+        .add("refreshTableAdaptiveIncreaseStepMs", 
refreshTableAdaptiveIncreaseStepMs)
         .toString();
   }
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
index 0d804cdec..015c68d48 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
@@ -160,6 +160,15 @@ public class TableProperties {
   public static final String SNAPSHOT_MIN_COUNT = "snapshot.keep.min-count";
   public static final int SNAPSHOT_MIN_COUNT_DEFAULT = 1;
 
+  public static final String 
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS =
+      "self-optimizing.refresh-table.adaptive.max-interval-ms";
+  public static final long 
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS_DEFAULT =
+      0; // disabled
+  public static final String 
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS =
+      "self-optimizing.refresh-table.adaptive.increase-step-ms";
+  public static final long 
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS_DEFAULT =
+      30000; // 30s
+
   /**
    * The retention period for snapshots created by Flink checkpoints. 
Snapshots older than this
    * duration may be cleaned up. Avoid keeping the last flink checkpoint 
snapshot for too long, as
diff --git a/docs/user-guides/configurations.md 
b/docs/user-guides/configurations.md
index 1895812ce..e25c52811 100644
--- a/docs/user-guides/configurations.md
+++ b/docs/user-guides/configurations.md
@@ -43,25 +43,27 @@ modified through [Alter 
Table](../using-tables/#modify-table) operations.
 
 Self-optimizing configurations are applicable to both Iceberg Format and Mixed 
streaming Format.
 
-| Key                                           | Default          | 
Description                                                                     
                                                                                
                                                                        |
-|-----------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| self-optimizing.enabled                       | true             | Enables 
Self-optimizing                                                                 
                                                                                
                                                                |
-| self-optimizing.allow-partial-commit          | false            | Whether 
to allow partial commit when self-optimizing fails or process is cancelled      
                                                                                
                                                                                
    |
-| self-optimizing.group                         | default          | Optimizer 
group for Self-optimizing                                                       
                                                                                
                                                              |
-| self-optimizing.quota                         | 0.5              | Quota for 
Self-optimizing, indicating the optimizer resources the table can take up       
                                                                                
                                                                     |
-| self-optimizing.execute.num-retries           | 5                | Number of 
retries after failure of Self-optimizing                                        
                                                                                
                                                              |
-| self-optimizing.target-size                   | 134217728(128MB) | Target 
size for Self-optimizing                                                        
                                                                                
                                                                 |
-| self-optimizing.max-file-count                | 10000            | Maximum 
number of files processed by a Self-optimizing process                          
                                                                                
                                                                |
-| self-optimizing.max-task-size-bytes           | 134217728(128MB) | Maximum 
file size bytes in a single task for splitting tasks                            
                                                                                
                                                                |
-| self-optimizing.fragment-ratio                | 8                | The 
fragment file size threshold. We could divide self-optimizing.target-size by 
this ratio to get the actual fragment file size                                 
                                                                       |
-| self-optimizing.min-target-size-ratio         | 0.75             | The 
undersized segment file size threshold. Segment files under this threshold will 
be considered for rewriting                                                     
                                                                    |
-| self-optimizing.minor.trigger.file-count      | 12               | The 
minimum number of files to trigger minor optimizing is determined by the sum of 
fragment file count and equality delete file count                              
                                                                    |
-| self-optimizing.minor.trigger.interval        | 3600000(1 hour)  | The time 
interval in milliseconds to trigger minor optimizing                            
                                                                                
                                                               |
-| self-optimizing.major.trigger.duplicate-ratio | 0.1              | The ratio 
of duplicate data of segment files to trigger major optimizing                  
                                                                                
                                                              |
-| self-optimizing.full.trigger.interval         | -1(closed)       | The time 
interval in milliseconds to trigger full optimizing                             
                                                                                
                                                               |
-| self-optimizing.full.rewrite-all-files        | true             | Whether 
full optimizing rewrites all files or skips files that do not need to be 
optimized                                                                       
                                                                       |
-| self-optimizing.min-plan-interval             | 60000            | The 
minimum time interval between two self-optimizing planning action               
                                                                                
                                                                    |
-| self-optimizing.filter                        | NULL             | Filter 
conditions for self-optimizing, using SQL conditional expressions, without 
supporting any functions. For the timestamp column condition, the ISO date-time 
formatter must be used. For example: op_time > '2007-12-03T10:15:30'. |
+| Key                                                     | Default          | 
Description                                                                     
                                                                                
                                                                                
                                                      |
+|---------------------------------------------------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| self-optimizing.enabled                                 | true             | 
Enables Self-optimizing                                                         
                                                                                
                                                                                
                                                      |
+| self-optimizing.allow-partial-commit                    | false            | 
Whether to allow partial commit when self-optimizing fails or process is 
cancelled                                                                       
                                                                                
                                                             |
+| self-optimizing.group                                   | default          | 
Optimizer group for Self-optimizing                                             
                                                                                
                                                                                
                                                      |
+| self-optimizing.quota                                   | 0.5              | 
Quota for Self-optimizing, indicating the optimizer resources the table can 
take up                                                                         
                                                                                
                                                          |
+| self-optimizing.execute.num-retries                     | 5                | 
Number of retries after failure of Self-optimizing                              
                                                                                
                                                                                
                                                      |
+| self-optimizing.target-size                             | 134217728(128MB) | 
Target size for Self-optimizing                                                 
                                                                                
                                                                                
                                                      |
+| self-optimizing.max-file-count                          | 10000            | 
Maximum number of files processed by a Self-optimizing process                  
                                                                                
                                                                                
                                                      |
+| self-optimizing.max-task-size-bytes                     | 134217728(128MB) | 
Maximum file size bytes in a single task for splitting tasks                    
                                                                                
                                                                                
                                                      |
+| self-optimizing.fragment-ratio                          | 8                | 
The fragment file size threshold. We could divide self-optimizing.target-size 
by this ratio to get the actual fragment file size                              
                                                                                
                                                        |
+| self-optimizing.min-target-size-ratio                   | 0.75             | 
The undersized segment file size threshold. Segment files under this threshold 
will be considered for rewriting                                                
                                                                                
                                                       |
+| self-optimizing.minor.trigger.file-count                | 12               | 
The minimum number of files to trigger minor optimizing is determined by the 
sum of fragment file count and equality delete file count                       
                                                                                
                                                         |
+| self-optimizing.minor.trigger.interval                  | 3600000(1 hour)  | 
The time interval in milliseconds to trigger minor optimizing                   
                                                                                
                                                                                
                                                      |
+| self-optimizing.major.trigger.duplicate-ratio           | 0.1              | 
The ratio of duplicate data of segment files to trigger major optimizing        
                                                                                
                                                                                
                                                      |
+| self-optimizing.full.trigger.interval                   | -1(closed)       | 
The time interval in milliseconds to trigger full optimizing                    
                                                                                
                                                                                
                                                      |
+| self-optimizing.full.rewrite-all-files                  | true             | 
Whether full optimizing rewrites all files or skips files that do not need to 
be optimized                                                                    
                                                                                
                                                        |
+| self-optimizing.min-plan-interval                       | 60000            | 
The minimum time interval between two self-optimizing planning action           
                                                                                
                                                                                
                                                      |
+| self-optimizing.filter                                  | NULL             | 
Filter conditions for self-optimizing, using SQL conditional expressions, 
without supporting any functions. For the timestamp column condition, the ISO 
date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'. 
                                                              |
+| self-optimizing.refresh-table.adaptive.max-interval-ms  | 0                | 
The maximum time interval in milliseconds to refresh table metadata. 0 means 
disable adaptive refresh. When enabled, the value must be greater than 
'refresh-tables.interval' and may exceed 
'self-optimizing.minor.trigger.interval' * 4/5; if not, adaptive refresh will 
be automatically disabled. |
+| self-optimizing.refresh-table.adaptive.increase-step-ms | 30000(30s)       | 
The time interval increase step in milliseconds to refresh table metadata       
                                                                                
                                                                                
                                                      |
 
 ## Data-cleaning configurations
 


Reply via email to