klion26 commented on code in PR #4038:
URL: https://github.com/apache/amoro/pull/4038#discussion_r2731561968
##########
amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java:
##########
@@ -160,6 +160,18 @@ private TableProperties() {}
public static final String SNAPSHOT_MIN_COUNT = "snapshot.keep.min-count";
public static final int SNAPSHOT_MIN_COUNT_DEFAULT = 1;
+ public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED =
+ "self-optimizing.refresh-table.adaptive.enabled";
Review Comment:
If we need to set `self-optimizing.refresh-table.adaptive.enabled` and
`self-optimizing.refresh-table.adaptive.max-interval`, how about using only
`self-optimizing.refresh-table.adaptive.max-interval`? Some value (like -1 or
zero) means that disable it, and some other value means that to enable it.
##########
amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java:
##########
@@ -160,6 +160,18 @@ private TableProperties() {}
public static final String SNAPSHOT_MIN_COUNT = "snapshot.keep.min-count";
public static final int SNAPSHOT_MIN_COUNT_DEFAULT = 1;
+ public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED =
+ "self-optimizing.refresh-table.adaptive.enabled";
+ public static final boolean
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED_DEFAULT = false;
+ public static final String
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL =
+ "self-optimizing.refresh-table.adaptive.max-interval";
+ public static final long
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_DEFAULT =
Review Comment:
How about add the time unit in the name,
`SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_DEFAULT` ->
`SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_MS_DEFAULT`
##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java:
##########
@@ -129,9 +148,124 @@ public void execute(TableRuntime tableRuntime) {
|| (mixedTable.isUnkeyedTable()
&& lastOptimizedSnapshotId !=
defaultTableRuntime.getCurrentSnapshotId())) {
tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
+ } else {
+ logger.debug("{} optimizing is not necessary",
defaultTableRuntime.getTableIdentifier());
+ defaultTableRuntime.setLatestEvaluatedNeedOptimizing(false);
+ }
+
+ // Update adaptive interval according to evaluated result.
+ if
(defaultTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveEnabled()) {
+ long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime);
+ defaultTableRuntime.setLatestRefreshInterval(newInterval);
Review Comment:
Maybe we can call `tableRuntime.setLatestEvaluatedNeedOptimizing(true);` and
`defaultTableRuntime.setLatestRefreshInterval(newInterval);` both in this if
block, so that we wouldn't miss either
##########
amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.scheduler.inline;
+
+import static org.mockito.Mockito.verify;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.TestedCatalogs;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
+import org.apache.amoro.hive.catalog.HiveTableTestHelper;
+import org.apache.amoro.server.table.AMSTableTestBase;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+@RunWith(Parameterized.class)
+public class TestTableRuntimeRefreshExecutor extends AMSTableTestBase {
+ @Parameterized.Parameters(name = "{0}, {1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(true, true)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(false, true)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(false, false)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(true, false)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(true, true)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(false, true)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(false, false)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(true, false)
+ },
+ };
+ }
+
+ public TestTableRuntimeRefreshExecutor(
+ CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
+ super(catalogTestHelper, tableTestHelper);
+ }
+
+ private static final long INTERVAL = 60000L; // 1 minute
+ private static final long MAX_INTERVAL = 300000L; // 5 minutes
+ private static final long STEP = 30000L; // 30s
+ private static final int MAX_PENDING_PARTITIONS = 1;
+ private static final int MINOR_LEAST_INTERVAL = 3600000; // 1h
+ private static final long INITIAL_SNAPSHOTID = 0L;
+
+ // Create mock DefaultTableRuntime for adaptive interval tests
+ private DefaultTableRuntime getMockTableRuntimeWithAdaptiveInterval(
+ DefaultTableRuntime tableRuntime,
+ boolean needOptimizing,
+ boolean adaptiveEnabled,
+ long currentInterval,
+ long step) {
+ DefaultTableRuntime mockTableRuntime =
Mockito.mock(DefaultTableRuntime.class);
+ Mockito.when(mockTableRuntime.getTableIdentifier())
+ .thenReturn(tableRuntime.getTableIdentifier());
+ Mockito.when(mockTableRuntime.getLastOptimizedSnapshotId())
+ .thenReturn(tableRuntime.getLastOptimizedSnapshotId());
+ Mockito.when(mockTableRuntime.getLastOptimizedChangeSnapshotId())
+ .thenReturn(tableRuntime.getLastOptimizedChangeSnapshotId());
+
Mockito.when(mockTableRuntime.getCurrentSnapshotId()).thenReturn(INITIAL_SNAPSHOTID);
+
Mockito.when(mockTableRuntime.getCurrentChangeSnapshotId()).thenReturn(INITIAL_SNAPSHOTID);
+
Mockito.when(mockTableRuntime.getLatestRefreshInterval()).thenReturn(currentInterval);
+
Mockito.when(mockTableRuntime.getLatestEvaluatedNeedOptimizing()).thenReturn(needOptimizing);
+
+ OptimizingConfig mockOptimizingConfig =
Mockito.mock(OptimizingConfig.class);
+
Mockito.when(mockOptimizingConfig.getRefreshTableAdaptiveEnabled()).thenReturn(adaptiveEnabled);
+ Mockito.when(mockOptimizingConfig.getRefreshTableAdaptiveMaxInterval())
+ .thenReturn(MAX_INTERVAL);
+
Mockito.when(mockOptimizingConfig.getMinorLeastInterval()).thenReturn(MINOR_LEAST_INTERVAL);
+
Mockito.when(mockOptimizingConfig.getRefreshTableAdaptiveIncreaseStep()).thenReturn(step);
+
Mockito.when(mockTableRuntime.getOptimizingConfig()).thenReturn(mockOptimizingConfig);
+
+ return mockTableRuntime;
+ }
+
+ // Overloaded method with default step factor
+ private DefaultTableRuntime getMockTableRuntimeWithAdaptiveInterval(
+ DefaultTableRuntime tableRuntime,
+ boolean needOptimizing,
+ boolean adaptiveEnabled,
+ long currentInterval) {
+ return getMockTableRuntimeWithAdaptiveInterval(
+ tableRuntime, needOptimizing, adaptiveEnabled, currentInterval, STEP);
+ }
+
+ @Test
+ public void testAdaptiveIntervalScenarios() {
+ createDatabase();
+ createTable();
+
+ DefaultTableRuntime tableRuntime =
+ (DefaultTableRuntime)
tableService().getRuntime(serverTableIdentifier().getId());
+ TableRuntimeRefreshExecutor executor =
+ new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL,
MAX_PENDING_PARTITIONS);
+
+ // Test healthy table (not need optimizing) - interval should increase
+ DefaultTableRuntime mockTableRuntime =
+ getMockTableRuntimeWithAdaptiveInterval(tableRuntime, false, true,
INTERVAL);
+
+ // Initial interval is INTERVAL, should increase by 30000
+ long expectedInterval = INTERVAL + STEP;
+ executor.execute(mockTableRuntime);
+
+ // Verify that setLatestRefreshInterval was called with the expected value
+ verify(mockTableRuntime,
Mockito.times(1)).setLatestRefreshInterval(expectedInterval);
Review Comment:
Instead using `verify` here to validate, is there any way we can validate
the value of the `true interval` after `executor.execute` in L147.
The `true interval` is the value keeped in the table runtime.
##########
docs/user-guides/configurations.md:
##########
@@ -43,25 +43,28 @@ modified through [Alter
Table](../using-tables/#modify-table) operations.
Self-optimizing configurations are applicable to both Iceberg Format and Mixed
streaming Format.
-| Key | Default |
Description
|
-|-----------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| self-optimizing.enabled | true | Enables
Self-optimizing
|
-| self-optimizing.allow-partial-commit | false | Whether
to allow partial commit when self-optimizing fails or process is cancelled
|
-| self-optimizing.group | default | Optimizer
group for Self-optimizing
|
-| self-optimizing.quota | 0.5 | Quota for
Self-optimizing, indicating the optimizer resources the table can take up
|
-| self-optimizing.execute.num-retries | 5 | Number of
retries after failure of Self-optimizing
|
-| self-optimizing.target-size | 134217728(128MB) | Target
size for Self-optimizing
|
-| self-optimizing.max-file-count | 10000 | Maximum
number of files processed by a Self-optimizing process
|
-| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum
file size bytes in a single task for splitting tasks
|
-| self-optimizing.fragment-ratio | 8 | The
fragment file size threshold. We could divide self-optimizing.target-size by
this ratio to get the actual fragment file size
|
-| self-optimizing.min-target-size-ratio | 0.75 | The
undersized segment file size threshold. Segment files under this threshold will
be considered for rewriting
|
-| self-optimizing.minor.trigger.file-count | 12 | The
minimum number of files to trigger minor optimizing is determined by the sum of
fragment file count and equality delete file count
|
-| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time
interval in milliseconds to trigger minor optimizing
|
-| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio
of duplicate data of segment files to trigger major optimizing
|
-| self-optimizing.full.trigger.interval | -1(closed) | The time
interval in milliseconds to trigger full optimizing
|
-| self-optimizing.full.rewrite-all-files | true | Whether
full optimizing rewrites all files or skips files that do not need to be
optimized
|
-| self-optimizing.min-plan-interval | 60000 | The
minimum time interval between two self-optimizing planning action
|
-| self-optimizing.filter | NULL | Filter
conditions for self-optimizing, using SQL conditional expressions, without
supporting any functions. For the timestamp column condition, the ISO date-time
formatter must be used. For example: op_time > '2007-12-03T10:15:30'. |
+| Key | Default |
Description
|
+|------------------------------------------------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| self-optimizing.enabled | true |
Enables Self-optimizing
|
+| self-optimizing.allow-partial-commit | false |
Whether to allow partial commit when self-optimizing fails or process is
cancelled
|
+| self-optimizing.group | default |
Optimizer group for Self-optimizing
|
+| self-optimizing.quota | 0.5 |
Quota for Self-optimizing, indicating the optimizer resources the table can
take up
|
+| self-optimizing.execute.num-retries | 5 |
Number of retries after failure of Self-optimizing
|
+| self-optimizing.target-size | 134217728(128MB) |
Target size for Self-optimizing
|
+| self-optimizing.max-file-count | 10000 |
Maximum number of files processed by a Self-optimizing process
|
+| self-optimizing.max-task-size-bytes | 134217728(128MB) |
Maximum file size bytes in a single task for splitting tasks
|
+| self-optimizing.fragment-ratio | 8 |
The fragment file size threshold. We could divide self-optimizing.target-size
by this ratio to get the actual fragment file size
|
+| self-optimizing.min-target-size-ratio | 0.75 |
The undersized segment file size threshold. Segment files under this threshold
will be considered for rewriting
|
+| self-optimizing.minor.trigger.file-count | 12 |
The minimum number of files to trigger minor optimizing is determined by the
sum of fragment file count and equality delete file count
|
+| self-optimizing.minor.trigger.interval | 3600000(1 hour) |
The time interval in milliseconds to trigger minor optimizing
|
+| self-optimizing.major.trigger.duplicate-ratio | 0.1 |
The ratio of duplicate data of segment files to trigger major optimizing
|
+| self-optimizing.full.trigger.interval | -1(closed) |
The time interval in milliseconds to trigger full optimizing
|
+| self-optimizing.full.rewrite-all-files | true |
Whether full optimizing rewrites all files or skips files that do not need to
be optimized
|
+| self-optimizing.min-plan-interval | 60000 |
The minimum time interval between two self-optimizing planning action
|
+| self-optimizing.filter | NULL |
Filter conditions for self-optimizing, using SQL conditional expressions,
without supporting any functions. For the timestamp column condition, the ISO
date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'.
|
+| self-optimizing.refresh-table.adaptive.enabled | false |
Whether to enable adaptive refresh interval for refreshing table metadata
|
+| self-optimizing.refresh-table.adaptive.max-interval | 3600000(1 hour) |
The maximum time interval in milliseconds to refresh table metadata
|
Review Comment:
Maybe we can set the max-interval to an unset value, so that we can keep the
behavior same as before the current pr.
##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java:
##########
@@ -56,19 +56,29 @@ protected boolean enabled(TableRuntime tableRuntime) {
@Override
protected long getNextExecutingTime(TableRuntime tableRuntime) {
DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime)
tableRuntime;
+
+ if
(defaultTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveEnabled()) {
+ long newInterval = defaultTableRuntime.getLatestRefreshInterval();
+ if (newInterval > 0) {
+ return newInterval;
+ }
+ }
+
return Math.min(
defaultTableRuntime.getOptimizingConfig().getMinorLeastInterval() * 4L
/ 5, interval);
}
private void tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime,
MixedTable table) {
// only evaluate pending input when optimizing is enabled and in idle state
OptimizingConfig optimizingConfig = tableRuntime.getOptimizingConfig();
- if (optimizingConfig.isEnabled()
- && tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {
+ boolean optimizingEnabled = optimizingConfig.isEnabled();
+ if (optimizingEnabled &&
tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {
if (optimizingConfig.isMetadataBasedTriggerEnabled()
&& !MetadataBasedEvaluationEvent.isEvaluatingNecessary(
optimizingConfig, table, tableRuntime.getLastPlanTime())) {
+ tableRuntime.setLatestEvaluatedNeedOptimizing(false);
Review Comment:
Is it better that we change the return value of `tryEvaluatingPendingInput`
to `boolean` -- indicate whether we need to optimize after evaluating?
So that we can call `tableRuntime.setLatestEvaluteNeedOptimizing` in one
place in `execute` below,
Please let me what do you think about it.
##########
amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.scheduler.inline;
+
+import static org.mockito.Mockito.verify;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.TestedCatalogs;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
+import org.apache.amoro.hive.catalog.HiveTableTestHelper;
+import org.apache.amoro.server.table.AMSTableTestBase;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+@RunWith(Parameterized.class)
+public class TestTableRuntimeRefreshExecutor extends AMSTableTestBase {
+ @Parameterized.Parameters(name = "{0}, {1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(true, true)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(false, true)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(false, false)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(true, false)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(true, true)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(false, true)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(false, false)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(true, false)
+ },
+ };
+ }
+
+ public TestTableRuntimeRefreshExecutor(
+ CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
+ super(catalogTestHelper, tableTestHelper);
+ }
+
+ private static final long INTERVAL = 60000L; // 1 minute
+ private static final long MAX_INTERVAL = 300000L; // 5 minutes
+ private static final long STEP = 30000L; // 30s
+ private static final int MAX_PENDING_PARTITIONS = 1;
+ private static final int MINOR_LEAST_INTERVAL = 3600000; // 1h
+ private static final long INITIAL_SNAPSHOTID = 0L;
+
+ // Create mock DefaultTableRuntime for adaptive interval tests
+ private DefaultTableRuntime getMockTableRuntimeWithAdaptiveInterval(
+ DefaultTableRuntime tableRuntime,
+ boolean needOptimizing,
+ boolean adaptiveEnabled,
+ long currentInterval,
+ long step) {
+ DefaultTableRuntime mockTableRuntime =
Mockito.mock(DefaultTableRuntime.class);
+ Mockito.when(mockTableRuntime.getTableIdentifier())
+ .thenReturn(tableRuntime.getTableIdentifier());
+ Mockito.when(mockTableRuntime.getLastOptimizedSnapshotId())
+ .thenReturn(tableRuntime.getLastOptimizedSnapshotId());
+ Mockito.when(mockTableRuntime.getLastOptimizedChangeSnapshotId())
+ .thenReturn(tableRuntime.getLastOptimizedChangeSnapshotId());
+
Mockito.when(mockTableRuntime.getCurrentSnapshotId()).thenReturn(INITIAL_SNAPSHOTID);
+
Mockito.when(mockTableRuntime.getCurrentChangeSnapshotId()).thenReturn(INITIAL_SNAPSHOTID);
+
Mockito.when(mockTableRuntime.getLatestRefreshInterval()).thenReturn(currentInterval);
+
Mockito.when(mockTableRuntime.getLatestEvaluatedNeedOptimizing()).thenReturn(needOptimizing);
+
+ OptimizingConfig mockOptimizingConfig =
Mockito.mock(OptimizingConfig.class);
+
Mockito.when(mockOptimizingConfig.getRefreshTableAdaptiveEnabled()).thenReturn(adaptiveEnabled);
+ Mockito.when(mockOptimizingConfig.getRefreshTableAdaptiveMaxInterval())
+ .thenReturn(MAX_INTERVAL);
+
Mockito.when(mockOptimizingConfig.getMinorLeastInterval()).thenReturn(MINOR_LEAST_INTERVAL);
+
Mockito.when(mockOptimizingConfig.getRefreshTableAdaptiveIncreaseStep()).thenReturn(step);
+
Mockito.when(mockTableRuntime.getOptimizingConfig()).thenReturn(mockOptimizingConfig);
+
+ return mockTableRuntime;
+ }
+
+ // Overloaded method with default step factor
+ private DefaultTableRuntime getMockTableRuntimeWithAdaptiveInterval(
+ DefaultTableRuntime tableRuntime,
+ boolean needOptimizing,
+ boolean adaptiveEnabled,
+ long currentInterval) {
+ return getMockTableRuntimeWithAdaptiveInterval(
+ tableRuntime, needOptimizing, adaptiveEnabled, currentInterval, STEP);
+ }
+
+ @Test
+ public void testAdaptiveIntervalScenarios() {
+ createDatabase();
+ createTable();
+
+ DefaultTableRuntime tableRuntime =
+ (DefaultTableRuntime)
tableService().getRuntime(serverTableIdentifier().getId());
+ TableRuntimeRefreshExecutor executor =
+ new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL,
MAX_PENDING_PARTITIONS);
+
+ // Test healthy table (not need optimizing) - interval should increase
+ DefaultTableRuntime mockTableRuntime =
+ getMockTableRuntimeWithAdaptiveInterval(tableRuntime, false, true,
INTERVAL);
+
+ // Initial interval is INTERVAL, should increase by 30000
+ long expectedInterval = INTERVAL + STEP;
+ executor.execute(mockTableRuntime);
+
+ // Verify that setLatestRefreshInterval was called with the expected value
+ verify(mockTableRuntime,
Mockito.times(1)).setLatestRefreshInterval(expectedInterval);
+
+ // Test unhealthy table (need optimizing) - interval should decrease
+ mockTableRuntime =
+ getMockTableRuntimeWithAdaptiveInterval(tableRuntime, true, true,
INTERVAL * 2);
Review Comment:
Is there a way to reuse this `tableRuntime` so we can observe the change in
`interval` before and after two consecutive `execute` calls? Reusing a single
`tableRuntime` function would also be more suitable for production scenarios.
##########
amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java:
##########
@@ -160,6 +160,18 @@ private TableProperties() {}
public static final String SNAPSHOT_MIN_COUNT = "snapshot.keep.min-count";
public static final int SNAPSHOT_MIN_COUNT_DEFAULT = 1;
+ public static final String SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED =
+ "self-optimizing.refresh-table.adaptive.enabled";
+ public static final boolean
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_ENABLED_DEFAULT = false;
+ public static final String
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL =
+ "self-optimizing.refresh-table.adaptive.max-interval";
+ public static final long
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_MAX_INTERVAL_DEFAULT =
Review Comment:
Same as the other places
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]