klion26 commented on code in PR #4038:
URL: https://github.com/apache/amoro/pull/4038#discussion_r2745355589
##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java:
##########
@@ -122,16 +143,133 @@ public void execute(TableRuntime tableRuntime) {
AmoroTable<?> table = loadTable(tableRuntime);
defaultTableRuntime.refresh(table);
MixedTable mixedTable = (MixedTable) table.originalTable();
+ boolean needOptimizing = false;
if ((mixedTable.isKeyedTable()
&& (lastOptimizedSnapshotId !=
defaultTableRuntime.getCurrentSnapshotId()
|| lastOptimizedChangeSnapshotId
!= defaultTableRuntime.getCurrentChangeSnapshotId()))
|| (mixedTable.isUnkeyedTable()
&& lastOptimizedSnapshotId !=
defaultTableRuntime.getCurrentSnapshotId())) {
- tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
+ needOptimizing = tryEvaluatingPendingInput(defaultTableRuntime,
mixedTable);
+ } else {
+ logger.debug("{} optimizing is not necessary",
defaultTableRuntime.getTableIdentifier());
+ }
+
+ // Update adaptive interval according to evaluated result.
+ if
(defaultTableRuntime.getOptimizingConfig().getRefreshTableAdaptiveMaxIntervalMs()
> 0) {
Review Comment:
Maybe we can add a function in `OptimizingConfig` which return `true/false`
show whethere we enabled this feature
##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java:
##########
@@ -88,7 +98,18 @@ private void tryEvaluatingPendingInput(DefaultTableRuntime
tableRuntime, MixedTa
} else {
tableRuntime.optimizingNotNecessary();
}
+
tableRuntime.setTableSummary(evaluator.getPendingInput());
+ return evaluatorIsNecessary;
+ } else if (!optimizingEnabled) {
+ logger.debug(
+ "{} optimizing is not enabled, skip evaluating pending input",
+ tableRuntime.getTableIdentifier());
+ return false;
+ } else {
+ logger.debug(
Review Comment:
From below, the name is `needOptimizing`, looks like whethere we need to do
an opitmizing, but in this branch, we don't need to optimizing in current loop?
as there is an ongoing optimizing process
##########
amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.scheduler.inline;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.TestedCatalogs;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
+import org.apache.amoro.hive.catalog.HiveTableTestHelper;
+import org.apache.amoro.server.table.AMSTableTestBase;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestTableRuntimeRefreshExecutor extends AMSTableTestBase {
+ @Parameterized.Parameters(name = "{0}, {1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(true, true)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(false, true)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(false, false)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(true, false)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(true, true)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(false, true)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(false, false)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(true, false)
+ },
+ };
+ }
+
+ public TestTableRuntimeRefreshExecutor(
+ CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
+ super(catalogTestHelper, tableTestHelper);
+ }
+
+ private static final long INTERVAL = 60000L; // 1 minute
+ private static final long MAX_INTERVAL = 300000L; // 5 minutes
+ private static final long DEFAULT_MAX_INTERVAL = -1L;
+ private static final long STEP = 30000L; // 30s
+ private static final int MAX_PENDING_PARTITIONS = 1;
+ private static final int MINOR_LEAST_INTERVAL = 3600000; // 1h
+
+ // Create DefaultTableRuntime for adaptive interval tests
+ private DefaultTableRuntime buildTableRuntimeWithAdaptiveRefresh(
+ DefaultTableRuntime tableRuntime, boolean needOptimizing, long interval,
long step) {
+ return buildTableRuntimeWithConfig(tableRuntime, needOptimizing, interval,
MAX_INTERVAL, step);
+ }
+
+ private DefaultTableRuntime buildTableRuntimeWithAdaptiveMaxInterval(
+ DefaultTableRuntime tableRuntime, long interval, long maxInterval) {
+ return buildTableRuntimeWithConfig(tableRuntime, true, interval,
maxInterval, STEP);
+ }
+
+ private DefaultTableRuntime buildTableRuntimeWithConfig(
+ DefaultTableRuntime tableRuntime,
+ boolean needOptimizing,
+ long interval,
+ long maxInterval,
+ long step) {
+ OptimizingConfig optimizingConfig = new OptimizingConfig();
+ optimizingConfig.setRefreshTableAdaptiveMaxIntervalMs(maxInterval);
+ optimizingConfig.setMinorLeastInterval(MINOR_LEAST_INTERVAL);
+ optimizingConfig.setRefreshTableAdaptiveIncreaseStepMs(step);
+
+ DefaultTableRuntime newRuntime =
+ new DefaultTableRuntime(tableRuntime.store()) {
+ @Override
+ public OptimizingConfig getOptimizingConfig() {
+ return optimizingConfig;
+ }
+ };
+ newRuntime.setLatestEvaluatedNeedOptimizing(needOptimizing);
+ newRuntime.setLatestRefreshInterval(interval);
+
+ return newRuntime;
+ }
+
+ @Test
+ public void testAdaptiveRefreshIntervalScenarios() {
+ createDatabase();
+ createTable();
+
+ DefaultTableRuntime tableRuntime =
+ (DefaultTableRuntime)
tableService().getRuntime(serverTableIdentifier().getId());
+ TableRuntimeRefreshExecutor executor =
+ new TableRuntimeRefreshExecutor(tableService(), 1, INTERVAL,
MAX_PENDING_PARTITIONS);
+
+ // Test healthy table (not need optimizing) - interval should increase
+ DefaultTableRuntime newTableRuntime =
+ buildTableRuntimeWithAdaptiveRefresh(tableRuntime, false, 0, STEP);
+ long adaptiveExecutingInterval =
executor.getAdaptiveExecutingInterval(newTableRuntime);
+ long expectedInterval = INTERVAL + STEP;
+ Assert.assertEquals(expectedInterval, adaptiveExecutingInterval);
+
+ // Test minimum boundary - interval should not below INTERVAL
+ // The unhealthy table (need optimizing) - interval should decrease half
+ // The currentInterval is INTERVAL + STEP, the latest interval is INTERVAL
not (INTERVAL + STEP)
+ // / 2
Review Comment:
Place `(INTERVAL + STEP) / 2` in the same line is better than 2 lines
##########
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java:
##########
@@ -88,7 +98,18 @@ private void tryEvaluatingPendingInput(DefaultTableRuntime
tableRuntime, MixedTa
} else {
tableRuntime.optimizingNotNecessary();
}
+
tableRuntime.setTableSummary(evaluator.getPendingInput());
+ return evaluatorIsNecessary;
+ } else if (!optimizingEnabled) {
+ logger.debug(
+ "{} optimizing is not enabled, skip evaluating pending input",
+ tableRuntime.getTableIdentifier());
+ return false;
+ } else {
+ logger.debug(
Review Comment:
Could we add some comments to describle the "semantics" of the return value,
and why would this case would be `true`?
##########
docs/user-guides/configurations.md:
##########
@@ -43,25 +43,27 @@ modified through [Alter
Table](../using-tables/#modify-table) operations.
Self-optimizing configurations are applicable to both Iceberg Format and Mixed
streaming Format.
-| Key | Default |
Description
|
-|-----------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| self-optimizing.enabled | true | Enables
Self-optimizing
|
-| self-optimizing.allow-partial-commit | false | Whether
to allow partial commit when self-optimizing fails or process is cancelled
|
-| self-optimizing.group | default | Optimizer
group for Self-optimizing
|
-| self-optimizing.quota | 0.5 | Quota for
Self-optimizing, indicating the optimizer resources the table can take up
|
-| self-optimizing.execute.num-retries | 5 | Number of
retries after failure of Self-optimizing
|
-| self-optimizing.target-size | 134217728(128MB) | Target
size for Self-optimizing
|
-| self-optimizing.max-file-count | 10000 | Maximum
number of files processed by a Self-optimizing process
|
-| self-optimizing.max-task-size-bytes | 134217728(128MB) | Maximum
file size bytes in a single task for splitting tasks
|
-| self-optimizing.fragment-ratio | 8 | The
fragment file size threshold. We could divide self-optimizing.target-size by
this ratio to get the actual fragment file size
|
-| self-optimizing.min-target-size-ratio | 0.75 | The
undersized segment file size threshold. Segment files under this threshold will
be considered for rewriting
|
-| self-optimizing.minor.trigger.file-count | 12 | The
minimum number of files to trigger minor optimizing is determined by the sum of
fragment file count and equality delete file count
|
-| self-optimizing.minor.trigger.interval | 3600000(1 hour) | The time
interval in milliseconds to trigger minor optimizing
|
-| self-optimizing.major.trigger.duplicate-ratio | 0.1 | The ratio
of duplicate data of segment files to trigger major optimizing
|
-| self-optimizing.full.trigger.interval | -1(closed) | The time
interval in milliseconds to trigger full optimizing
|
-| self-optimizing.full.rewrite-all-files | true | Whether
full optimizing rewrites all files or skips files that do not need to be
optimized
|
-| self-optimizing.min-plan-interval | 60000 | The
minimum time interval between two self-optimizing planning action
|
-| self-optimizing.filter | NULL | Filter
conditions for self-optimizing, using SQL conditional expressions, without
supporting any functions. For the timestamp column condition, the ISO date-time
formatter must be used. For example: op_time > '2007-12-03T10:15:30'. |
+| Key | Default |
Description
|
+|---------------------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| self-optimizing.enabled | true |
Enables Self-optimizing
|
+| self-optimizing.allow-partial-commit | false |
Whether to allow partial commit when self-optimizing fails or process is
cancelled
|
+| self-optimizing.group | default |
Optimizer group for Self-optimizing
|
+| self-optimizing.quota | 0.5 |
Quota for Self-optimizing, indicating the optimizer resources the table can
take up
|
+| self-optimizing.execute.num-retries | 5 |
Number of retries after failure of Self-optimizing
|
+| self-optimizing.target-size | 134217728(128MB) |
Target size for Self-optimizing
|
+| self-optimizing.max-file-count | 10000 |
Maximum number of files processed by a Self-optimizing process
|
+| self-optimizing.max-task-size-bytes | 134217728(128MB) |
Maximum file size bytes in a single task for splitting tasks
|
+| self-optimizing.fragment-ratio | 8 |
The fragment file size threshold. We could divide self-optimizing.target-size
by this ratio to get the actual fragment file size
|
+| self-optimizing.min-target-size-ratio | 0.75 |
The undersized segment file size threshold. Segment files under this threshold
will be considered for rewriting
|
+| self-optimizing.minor.trigger.file-count | 12 |
The minimum number of files to trigger minor optimizing is determined by the
sum of fragment file count and equality delete file count
|
+| self-optimizing.minor.trigger.interval | 3600000(1 hour) |
The time interval in milliseconds to trigger minor optimizing
|
+| self-optimizing.major.trigger.duplicate-ratio | 0.1 |
The ratio of duplicate data of segment files to trigger major optimizing
|
+| self-optimizing.full.trigger.interval | -1(closed) |
The time interval in milliseconds to trigger full optimizing
|
+| self-optimizing.full.rewrite-all-files | true |
Whether full optimizing rewrites all files or skips files that do not need to
be optimized
|
+| self-optimizing.min-plan-interval | 60000 |
The minimum time interval between two self-optimizing planning action
|
+| self-optimizing.filter | NULL |
Filter conditions for self-optimizing, using SQL conditional expressions,
without supporting any functions. For the timestamp column condition, the ISO
date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'.
|
+| self-optimizing.refresh-table.adaptive.max-interval-ms | -1 |
The maximum time interval in milliseconds to refresh table metadata. The
default value is -1, which disables the adaptive refresh.
|
Review Comment:
Maybe we need to add some description here: after this has been enabled, the
interval may be larger than
`defaultTableRuntime.getOptimizingConfig().getMinorLeastInterval() * 4L / 5`
##########
amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.scheduler.inline;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.TestedCatalogs;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
+import org.apache.amoro.hive.catalog.HiveTableTestHelper;
+import org.apache.amoro.server.table.AMSTableTestBase;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestTableRuntimeRefreshExecutor extends AMSTableTestBase {
+ @Parameterized.Parameters(name = "{0}, {1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(true, true)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(false, true)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(false, false)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(true, false)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(true, true)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(false, true)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(false, false)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(true, false)
+ },
+ };
+ }
+
+ public TestTableRuntimeRefreshExecutor(
+ CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
+ super(catalogTestHelper, tableTestHelper);
+ }
+
+ private static final long INTERVAL = 60000L; // 1 minute
+ private static final long MAX_INTERVAL = 300000L; // 5 minutes
+ private static final long DEFAULT_MAX_INTERVAL = -1L;
+ private static final long STEP = 30000L; // 30s
+ private static final int MAX_PENDING_PARTITIONS = 1;
+ private static final int MINOR_LEAST_INTERVAL = 3600000; // 1h
+
+ // Create DefaultTableRuntime for adaptive interval tests
+ private DefaultTableRuntime buildTableRuntimeWithAdaptiveRefresh(
+ DefaultTableRuntime tableRuntime, boolean needOptimizing, long interval,
long step) {
+ return buildTableRuntimeWithConfig(tableRuntime, needOptimizing, interval,
MAX_INTERVAL, step);
+ }
+
+ private DefaultTableRuntime buildTableRuntimeWithAdaptiveMaxInterval(
+ DefaultTableRuntime tableRuntime, long interval, long maxInterval) {
+ return buildTableRuntimeWithConfig(tableRuntime, true, interval,
maxInterval, STEP);
+ }
+
+ private DefaultTableRuntime buildTableRuntimeWithConfig(
+ DefaultTableRuntime tableRuntime,
+ boolean needOptimizing,
+ long interval,
+ long maxInterval,
+ long step) {
+ OptimizingConfig optimizingConfig = new OptimizingConfig();
+ optimizingConfig.setRefreshTableAdaptiveMaxIntervalMs(maxInterval);
+ optimizingConfig.setMinorLeastInterval(MINOR_LEAST_INTERVAL);
+ optimizingConfig.setRefreshTableAdaptiveIncreaseStepMs(step);
+
+ DefaultTableRuntime newRuntime =
+ new DefaultTableRuntime(tableRuntime.store()) {
+ @Override
+ public OptimizingConfig getOptimizingConfig() {
+ return optimizingConfig;
+ }
+ };
+ newRuntime.setLatestEvaluatedNeedOptimizing(needOptimizing);
+ newRuntime.setLatestRefreshInterval(interval);
+
+ return newRuntime;
+ }
+
+ @Test
+ public void testAdaptiveRefreshIntervalScenarios() {
+ createDatabase();
+ createTable();
+
+ DefaultTableRuntime tableRuntime =
Review Comment:
Could you please reuse the _same_ table runtime, in the current test, we'll
create every _new_ table runtime in `buildTableRuntimeWithAdaptiveRefresh`
##########
amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.scheduler.inline;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.TestedCatalogs;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
+import org.apache.amoro.hive.catalog.HiveTableTestHelper;
+import org.apache.amoro.server.table.AMSTableTestBase;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestTableRuntimeRefreshExecutor extends AMSTableTestBase {
+ @Parameterized.Parameters(name = "{0}, {1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(true, true)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(false, true)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(false, false)
+ },
+ {
+ TestedCatalogs.hadoopCatalog(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(true, false)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(true, true)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(false, true)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(false, false)
+ },
+ {
+ new HiveCatalogTestHelper(TableFormat.ICEBERG, TEST_HMS.getHiveConf()),
+ new HiveTableTestHelper(true, false)
+ },
+ };
+ }
+
+ public TestTableRuntimeRefreshExecutor(
+ CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
+ super(catalogTestHelper, tableTestHelper);
+ }
+
+ private static final long INTERVAL = 60000L; // 1 minute
+ private static final long MAX_INTERVAL = 300000L; // 5 minutes
+ private static final long DEFAULT_MAX_INTERVAL = -1L;
+ private static final long STEP = 30000L; // 30s
+ private static final int MAX_PENDING_PARTITIONS = 1;
+ private static final int MINOR_LEAST_INTERVAL = 3600000; // 1h
+
+ // Create DefaultTableRuntime for adaptive interval tests
+ private DefaultTableRuntime buildTableRuntimeWithAdaptiveRefresh(
+ DefaultTableRuntime tableRuntime, boolean needOptimizing, long interval,
long step) {
+ return buildTableRuntimeWithConfig(tableRuntime, needOptimizing, interval,
MAX_INTERVAL, step);
+ }
+
+ private DefaultTableRuntime buildTableRuntimeWithAdaptiveMaxInterval(
+ DefaultTableRuntime tableRuntime, long interval, long maxInterval) {
+ return buildTableRuntimeWithConfig(tableRuntime, true, interval,
maxInterval, STEP);
+ }
+
+ private DefaultTableRuntime buildTableRuntimeWithConfig(
+ DefaultTableRuntime tableRuntime,
+ boolean needOptimizing,
+ long interval,
+ long maxInterval,
+ long step) {
+ OptimizingConfig optimizingConfig = new OptimizingConfig();
+ optimizingConfig.setRefreshTableAdaptiveMaxIntervalMs(maxInterval);
+ optimizingConfig.setMinorLeastInterval(MINOR_LEAST_INTERVAL);
+ optimizingConfig.setRefreshTableAdaptiveIncreaseStepMs(step);
+
+ DefaultTableRuntime newRuntime =
+ new DefaultTableRuntime(tableRuntime.store()) {
+ @Override
+ public OptimizingConfig getOptimizingConfig() {
+ return optimizingConfig;
+ }
+ };
+ newRuntime.setLatestEvaluatedNeedOptimizing(needOptimizing);
+ newRuntime.setLatestRefreshInterval(interval);
+
+ return newRuntime;
+ }
+
+ @Test
+ public void testAdaptiveRefreshIntervalScenarios() {
+ createDatabase();
+ createTable();
+
+ DefaultTableRuntime tableRuntime =
Review Comment:
If we need to update the `config` for an existing table runtime, maybe we
can add a new class who extend `DefaultTableRuntime` and overrides the
`getStore` function.
```
class TestTableRuntime extends DefaultTableRuntime {
TableRuntimeStore testStore;
TestTableRuntime(TableRuntimeStore store) {
...
this.testStore = store
}
void updateRuntimeStore(TableRuntimeStore newStore) {
...
this.testStore = newStore;
}
@Override
TableConfiguration getStore() {
this.testStore;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]