This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new a922fe8f6 [AMORO-3418] Optimize ams configuration to support parsing
of time and storage unit (#3423)
a922fe8f6 is described below
commit a922fe8f6707e36e546e0489fae22527b24078c5
Author: Jzjsnow <[email protected]>
AuthorDate: Mon Feb 10 15:47:13 2025 +0800
[AMORO-3418] Optimize ams configuration to support parsing of time and
storage unit (#3423)
* [AMORO-3418] Optimize ams configuration to support parsing of time
interval and storage related configuration items when both values and units are
specified
* fixup! [AMORO-3418] Optimize ams configuration to support parsing of time
interval and storage related configuration items when both values and units are
specified
* fixup! [AMORO-3418] Optimize ams configuration to support parsing of time
interval and storage related configuration items when both values and units are
specified
---------
Co-authored-by: jzjsnow <[email protected]>
---
.../apache/amoro/server/AmoroManagementConf.java | 65 +++---
.../apache/amoro/server/AmoroServiceContainer.java | 2 +-
.../amoro/server/DefaultOptimizingService.java | 9 +-
.../amoro/server/table/DefaultTableManager.java | 2 +-
.../amoro/server/table/DefaultTableService.java | 2 +-
.../server/table/executor/AsyncTableExecutors.java | 4 +-
.../amoro/server/terminal/TerminalManager.java | 2 +-
.../apache/amoro/server/AMSServiceTestBase.java | 4 +-
.../org/apache/amoro/server/AmsEnvironment.java | 4 +-
.../amoro/server/TestAmoroManagementConf.java | 241 +++++++++++++++++++++
.../amoro/server/table/TestTableManager.java | 4 +-
.../src/test/resources/config-with-units.yaml | 200 +++++++++++++++++
.../src/test/resources/config-without-units.yaml | 41 ++++
.../org/apache/amoro/config/ConfigHelpers.java | 13 ++
.../org/apache/amoro/config/ConfigOptions.java | 10 +
15 files changed, 559 insertions(+), 44 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 02c6d938f..b4742fb47 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -20,6 +20,7 @@ package org.apache.amoro.server;
import org.apache.amoro.config.ConfigOption;
import org.apache.amoro.config.ConfigOptions;
+import org.apache.amoro.utils.MemorySize;
import java.time.Duration;
import java.util.Arrays;
@@ -81,10 +82,10 @@ public class AmoroManagementConf {
"Sets the size of the worker pool. The worker pool limits the
number of tasks concurrently processing "
+ "manifests in the base table implementation across all
concurrent commit operations.");
- public static final ConfigOption<Long> REFRESH_EXTERNAL_CATALOGS_INTERVAL =
+ public static final ConfigOption<Duration>
REFRESH_EXTERNAL_CATALOGS_INTERVAL =
ConfigOptions.key("refresh-external-catalogs.interval")
- .longType()
- .defaultValue(3 * 60 * 1000L)
+ .durationType()
+ .defaultValue(Duration.ofMinutes(3))
.withDescription("Interval to refresh the external catalog.");
public static final ConfigOption<Integer>
REFRESH_EXTERNAL_CATALOGS_THREAD_COUNT =
@@ -172,29 +173,29 @@ public class AmoroManagementConf {
.defaultValue(3)
.withDescription("The number of threads used for creating tags.");
- public static final ConfigOption<Long> AUTO_CREATE_TAGS_INTERVAL =
+ public static final ConfigOption<Duration> AUTO_CREATE_TAGS_INTERVAL =
ConfigOptions.key("auto-create-tags.interval")
- .longType()
- .defaultValue(60000L)
+ .durationType()
+ .defaultValue(Duration.ofMinutes(1))
.withDescription("Interval for creating tags.");
- public static final ConfigOption<Long> REFRESH_TABLES_INTERVAL =
+ public static final ConfigOption<Duration> REFRESH_TABLES_INTERVAL =
ConfigOptions.key("refresh-tables.interval")
- .longType()
- .defaultValue(60000L)
+ .durationType()
+ .defaultValue(Duration.ofMinutes(1))
.withDescription("Interval for refreshing table metadata.");
public static final ConfigOption<Integer> REFRESH_MAX_PENDING_PARTITIONS =
ConfigOptions.key("refresh-tables.max-pending-partition-count")
.intType()
.defaultValue(100)
- .withDescription("Filters will not be used beyond that number of
partitions");
+ .withDescription("Filters will not be used beyond that number of
partitions.");
- public static final ConfigOption<Long> BLOCKER_TIMEOUT =
+ public static final ConfigOption<Duration> BLOCKER_TIMEOUT =
ConfigOptions.key("blocker.timeout")
- .longType()
- .defaultValue(60000L)
- .withDescription("session timeout in Milliseconds");
+ .durationType()
+ .defaultValue(Duration.ofMinutes(1))
+ .withDescription("Session timeout. Default unit is milliseconds if
not specified.");
public static final ConfigOption<Boolean> HA_ENABLE =
ConfigOptions.key("ha.enabled")
@@ -226,11 +227,12 @@ public class AmoroManagementConf {
.defaultValue(1261)
.withDescription("Port that the optimizing service thrift server is
bound to.");
- public static final ConfigOption<Long> THRIFT_MAX_MESSAGE_SIZE =
+ public static final ConfigOption<MemorySize> THRIFT_MAX_MESSAGE_SIZE =
ConfigOptions.key("thrift-server.max-message-size")
- .longType()
- .defaultValue(100 * 1024 * 1024L)
- .withDescription("Maximum message size that the Thrift server can
accept.");
+ .memorySizeType()
+ .defaultValue(MemorySize.ofMebiBytes(100))
+ .withDescription(
+ "Maximum message size that the Thrift server can accept. Default
unit is bytes if not specified.");
public static final ConfigOption<Integer> THRIFT_WORKER_THREADS =
ConfigOptions.key("thrift-server.table-service.worker-thread-count")
@@ -354,16 +356,16 @@ public class AmoroManagementConf {
.defaultValue(30000L)
.withDescription("Max wait time before getting a connection
timeout.");
- public static final ConfigOption<Long> OPTIMIZER_HB_TIMEOUT =
+ public static final ConfigOption<Duration> OPTIMIZER_HB_TIMEOUT =
ConfigOptions.key("optimizer.heart-beat-timeout")
- .longType()
- .defaultValue(60000L)
+ .durationType()
+ .defaultValue(Duration.ofMinutes(1))
.withDescription("Timeout duration for Optimizer heartbeat.");
- public static final ConfigOption<Long> OPTIMIZER_TASK_ACK_TIMEOUT =
+ public static final ConfigOption<Duration> OPTIMIZER_TASK_ACK_TIMEOUT =
ConfigOptions.key("optimizer.task-ack-timeout")
- .longType()
- .defaultValue(30000L)
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
.withDescription("Timeout duration for task acknowledgment.");
public static final ConfigOption<Integer> OPTIMIZER_MAX_PLANNING_PARALLELISM
=
@@ -372,10 +374,10 @@ public class AmoroManagementConf {
.defaultValue(1)
.withDescription("Max planning parallelism in one optimizer group.");
- public static final ConfigOption<Long> OPTIMIZER_POLLING_TIMEOUT =
+ public static final ConfigOption<Duration> OPTIMIZER_POLLING_TIMEOUT =
ConfigOptions.key("optimizer.polling-timeout")
- .longType()
- .defaultValue(3000L)
+ .durationType()
+ .defaultValue(Duration.ofSeconds(3))
.withDescription("Optimizer polling task timeout.");
/** config key prefix of terminal */
@@ -408,11 +410,12 @@ public class AmoroManagementConf {
.withDescription(
"When a statement fails to execute, stop execution or continue
executing the remaining statements.");
- public static final ConfigOption<Integer> TERMINAL_SESSION_TIMEOUT =
+ public static final ConfigOption<Duration> TERMINAL_SESSION_TIMEOUT =
ConfigOptions.key("terminal.session.timeout")
- .intType()
- .defaultValue(30)
- .withDescription("Session timeout in minutes.");
+ .durationType()
+ .defaultValue(Duration.ofMinutes(30))
+ .withDescription(
+ "Session timeout. Default unit is milliseconds if not specified
(** Note: default units are minutes when version < 0.8).");
public static final ConfigOption<String> TERMINAL_SENSITIVE_CONF_KEYS =
ConfigOptions.key("terminal.sensitive-conf-keys")
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 208f70d3e..15c532d5d 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -335,7 +335,7 @@ public class AmoroServiceContainer {
private void initThriftService() throws TTransportException {
LOG.info("Initializing thrift service...");
- long maxMessageSize =
serviceConfig.getLong(AmoroManagementConf.THRIFT_MAX_MESSAGE_SIZE);
+ long maxMessageSize =
serviceConfig.get(AmoroManagementConf.THRIFT_MAX_MESSAGE_SIZE).getBytes();
int selectorThreads =
serviceConfig.getInteger(AmoroManagementConf.THRIFT_SELECTOR_THREADS);
int workerThreads =
serviceConfig.getInteger(AmoroManagementConf.THRIFT_WORKER_THREADS);
int queueSizePerSelector =
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 49fad3491..a34e06421 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -113,11 +113,14 @@ public class DefaultOptimizingService extends
StatedPersistentBase
CatalogManager catalogManager,
MaintainedTableManager tableManager,
TableService tableService) {
- this.optimizerTouchTimeout =
serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
- this.taskAckTimeout =
serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT);
+ this.optimizerTouchTimeout =
+ serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis();
+ this.taskAckTimeout =
+
serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT).toMillis();
this.maxPlanningParallelism =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM);
- this.pollingTimeout =
serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT);
+ this.pollingTimeout =
+
serviceConfig.get(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT).toMillis();
this.tableService = tableService;
this.catalogManager = catalogManager;
this.tableManager = tableManager;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
index 3051e56d1..8eaeb9db7 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java
@@ -73,7 +73,7 @@ public class DefaultTableManager extends PersistentBase
implements TableManager
public DefaultTableManager(Configurations configuration, CatalogManager
catalogManager) {
this.catalogManager = catalogManager;
- this.blockerTimeout =
configuration.getLong(AmoroManagementConf.BLOCKER_TIMEOUT);
+ this.blockerTimeout =
configuration.get(AmoroManagementConf.BLOCKER_TIMEOUT).toMillis();
}
@Override
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index 448c91169..a7ae6d96d 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -88,7 +88,7 @@ public class DefaultTableService extends PersistentBase
implements TableService
public DefaultTableService(Configurations configuration, CatalogManager
catalogManager) {
this.catalogManager = catalogManager;
this.externalCatalogRefreshingInterval =
-
configuration.getLong(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL);
+
configuration.get(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL).toMillis();
this.serverConfiguration = configuration;
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
index 29144b1dd..560f2bb4b 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/AsyncTableExecutors.java
@@ -77,14 +77,14 @@ public class AsyncTableExecutors {
new TableRuntimeRefreshExecutor(
tableService,
conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT),
- conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL),
+ conf.get(AmoroManagementConf.REFRESH_TABLES_INTERVAL).toMillis(),
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS));
if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) {
this.tagsAutoCreatingExecutor =
new TagsAutoCreatingExecutor(
tableService,
conf.getInteger(AmoroManagementConf.AUTO_CREATE_TAGS_THREAD_COUNT),
- conf.getLong(AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL));
+
conf.get(AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL).toMillis());
}
if (conf.getBoolean(AmoroManagementConf.DATA_EXPIRATION_ENABLED)) {
this.dataExpiringExecutor =
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java
b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java
index f1feab8f5..1396bf5c4 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java
@@ -85,7 +85,7 @@ public class TerminalManager {
this.catalogManager = catalogManager;
this.resultLimits =
conf.getInteger(AmoroManagementConf.TERMINAL_RESULT_LIMIT);
this.stopOnError =
conf.getBoolean(AmoroManagementConf.TERMINAL_STOP_ON_ERROR);
- this.sessionTimeout =
conf.getInteger(AmoroManagementConf.TERMINAL_SESSION_TIMEOUT);
+ this.sessionTimeout = (int)
conf.get(AmoroManagementConf.TERMINAL_SESSION_TIMEOUT).toMinutes();
this.sessionFactory = loadTerminalSessionFactory(conf);
gcThread = new Thread(new SessionCleanTask());
gcThread.setName("terminal-session-gc");
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
index 7c9c574f1..b1f29861d 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
@@ -27,6 +27,8 @@ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
+import java.time.Duration;
+
public abstract class AMSServiceTestBase extends AMSManagerTestBase {
private static DefaultTableService TABLE_SERVICE = null;
private static DefaultOptimizingService OPTIMIZING_SERVICE = null;
@@ -35,7 +37,7 @@ public abstract class AMSServiceTestBase extends
AMSManagerTestBase {
public static void initTableService() {
try {
Configurations configurations = new Configurations();
- configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, 800L);
+ configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT,
Duration.ofMillis(800L));
TABLE_SERVICE = new DefaultTableService(new Configurations(),
CATALOG_MANAGER);
OPTIMIZING_SERVICE =
new DefaultOptimizingService(
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
index b64c9db86..38d95adb0 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AmsEnvironment.java
@@ -52,6 +52,7 @@ import java.io.IOException;
import java.net.BindException;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -346,7 +347,8 @@ public class AmsEnvironment {
AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT,
optimizingServiceBindPort);
serviceConfig.set(
-
AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL, 1000L);
+ AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL,
+ Duration.ofMillis(1000L));
serviceContainer.startService();
break;
} catch (TTransportException e) {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java
new file mode 100644
index 000000000..a8bf95a7f
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmoroManagementConf.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import static org.apache.amoro.server.AmoroServiceContainer.expandConfigMap;
+
+import org.apache.amoro.config.ConfigOption;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.amoro.shade.guava32.com.google.common.io.Resources;
+import
org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.amoro.utils.JacksonUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Map;
+
+public class TestAmoroManagementConf {
+ private static final ConfigOption<Duration>[] TIME_RELATED_CONFIG_OPTIONS =
+ new ConfigOption[] {
+ AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL,
+ AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL,
+ AmoroManagementConf.REFRESH_TABLES_INTERVAL,
+ AmoroManagementConf.BLOCKER_TIMEOUT,
+ AmoroManagementConf.OPTIMIZER_HB_TIMEOUT,
+ AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT,
+ AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT,
+ AmoroManagementConf.TERMINAL_SESSION_TIMEOUT
+ };
+
+ private static final Map<String, String> DEFAULT_TIME_UNIT_IN_OLD_VERSIONS =
+ ImmutableMap.<String, String>builder()
+ .put(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL.key(),
"ms")
+ .put(AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL.key(), "ms")
+ .put(AmoroManagementConf.REFRESH_TABLES_INTERVAL.key(), "ms")
+ .put(AmoroManagementConf.BLOCKER_TIMEOUT.key(), "ms")
+ .put(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT.key(), "ms")
+ .put(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT.key(), "ms")
+ .put(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT.key(), "ms")
+ .put(AmoroManagementConf.TERMINAL_SESSION_TIMEOUT.key(), "min")
+ .build();
+
+ private static final ConfigOption<String>[] STORAGE_RELATED_CONFIG_OPTIONS =
+ new ConfigOption[] {AmoroManagementConf.THRIFT_MAX_MESSAGE_SIZE};
+
+ @Test
+ void testParsingDefaultTimeRelatedConfigs() {
+ Configurations serviceConfig = new Configurations();
+ Configurations expectedConfig =
+
Configurations.fromObjectMap(timeRelatedConfigMapInMillisSecondsWithoutTimeUnits);
+ assertTimeRelatedConfigs(serviceConfig, expectedConfig);
+ }
+
+ @Test
+ void testParsingDefaultStorageRelatedConfigs() {
+ Configurations serviceConfig = new Configurations();
+ Configurations expectedConfig =
+ Configurations.fromObjectMap(storageRelatedConfigMapWithoutTimeUnits);
+ assertStorageRelatedConfigs(serviceConfig, expectedConfig);
+ }
+
+ @Test
+ void testParsingAmoroManagementConfWithTimeUnits() throws Exception {
+ Configurations serviceConfig = getConfigurationsWithUnits();
+ Configurations expectedConfig =
+
Configurations.fromObjectMap(timeRelatedConfigMapInMillisSecondsWithoutTimeUnits);
+ assertTimeRelatedConfigs(serviceConfig, expectedConfig);
+ }
+
+ @Test
+ void testParsingAmoroManagementConfWithStorageUnits() throws Exception {
+ Configurations serviceConfig = getConfigurationsWithUnits();
+ Configurations expectedConfig =
+ Configurations.fromObjectMap(storageRelatedConfigMapWithoutTimeUnits);
+ assertStorageRelatedConfigs(serviceConfig, expectedConfig);
+ }
+
+ @Test
+ void testParsingAmoroManagementConfWithoutTimeUnits() throws Exception {
+ Configurations serviceConfig = getConfigurationsWithoutUnits();
+ Configurations expectedConfig =
Configurations.fromObjectMap(timeRelatedConfigMapWithTimeUnits);
+ assertTimeRelatedConfigs(serviceConfig, expectedConfig);
+ }
+
+ @Test
+ void testParsingAmoroManagementConfWithoutStorageUnits() throws Exception {
+ Configurations serviceConfig = getConfigurationsWithoutUnits();
+ Configurations expectedConfig =
+ Configurations.fromObjectMap(storageRelatedConfigMapWithTimeUnits);
+ assertStorageRelatedConfigs(serviceConfig, expectedConfig);
+ }
+
+ /** Test for conflicts when parsing configuration files in older versions (<
0.8) */
+ @Test
+ void testConflictsForParsingAmoroManagementConfInOldVersions() throws
Exception {
+ Configurations serviceConfig = getConfigurationsWithoutUnits();
+ Configurations expectedConfig =
+
Configurations.fromObjectMap(timeRelatedConfigMapWithExpectedTimeUnitsInOldVersions);
+
+ // Checking the parsed time-related configuration items (should fail).
+ // As the default unit for `terminal.session.timeout` has changed from
minutes to milliseconds
+ // since version 0.8.
+ Assertions.assertThrows(
+ AssertionError.class, () -> assertTimeRelatedConfigs(serviceConfig,
expectedConfig));
+
+ // Updating the time-related items with the expected time units in older
versions
+ Configurations updatedServiceConfig =
updateConfigurationOfOldVersions(serviceConfig);
+
+ // Checking the parsed time-related configuration items after upgrading
(should pass).
+ assertTimeRelatedConfigs(updatedServiceConfig, expectedConfig);
+ }
+
+ private Configurations updateConfigurationOfOldVersions(Configurations
serviceConfig) {
+ Map<String, String> updatedServiceConfigMap = serviceConfig.toMap();
+ for (String key : DEFAULT_TIME_UNIT_IN_OLD_VERSIONS.keySet()) {
+ if (updatedServiceConfigMap.containsKey(key)) {
+ String value = updatedServiceConfigMap.get(key);
+ String newValue = value + DEFAULT_TIME_UNIT_IN_OLD_VERSIONS.get(key);
+ updatedServiceConfigMap.put(key, newValue);
+ }
+ }
+ return Configurations.fromMap(updatedServiceConfigMap);
+ }
+
+ private Configurations getConfigurationsWithUnits() throws
URISyntaxException, IOException {
+ URL resource = Resources.getResource("config-with-units.yaml");
+ JsonNode yamlConfig =
+ JacksonUtil.fromObjects(
+ new
Yaml().loadAs(Files.newInputStream(Paths.get(resource.toURI())), Map.class));
+ Map<String, Object> systemConfig =
+ JacksonUtil.getMap(
+ yamlConfig,
+ AmoroManagementConf.SYSTEM_CONFIG,
+ new TypeReference<Map<String, Object>>() {});
+ Map<String, Object> expandedConfigurationMap = Maps.newHashMap();
+ expandConfigMap(systemConfig, "", expandedConfigurationMap);
+ return Configurations.fromObjectMap(expandedConfigurationMap);
+ }
+
+ private Configurations getConfigurationsWithoutUnits() throws
URISyntaxException, IOException {
+ URL resource = Resources.getResource("config-without-units.yaml");
+ JsonNode yamlConfig =
+ JacksonUtil.fromObjects(
+ new
Yaml().loadAs(Files.newInputStream(Paths.get(resource.toURI())), Map.class));
+ Map<String, Object> systemConfig =
+ JacksonUtil.getMap(
+ yamlConfig,
+ AmoroManagementConf.SYSTEM_CONFIG,
+ new TypeReference<Map<String, Object>>() {});
+ Map<String, Object> expandedConfigurationMap = Maps.newHashMap();
+ expandConfigMap(systemConfig, "", expandedConfigurationMap);
+ return Configurations.fromObjectMap(expandedConfigurationMap);
+ }
+
+ private void assertTimeRelatedConfigs(
+ Configurations serviceConfig, Configurations expectedConfig) {
+ for (ConfigOption<Duration> configOption : TIME_RELATED_CONFIG_OPTIONS) {
+ Assertions.assertEquals(expectedConfig.get(configOption),
serviceConfig.get(configOption));
+ }
+ }
+
+ private void assertStorageRelatedConfigs(
+ Configurations serviceConfig, Configurations expectedConfig) {
+ for (ConfigOption<String> configOption : STORAGE_RELATED_CONFIG_OPTIONS) {
+ Assertions.assertEquals(
+ expectedConfig.get(configOption),
+ serviceConfig.get(AmoroManagementConf.THRIFT_MAX_MESSAGE_SIZE));
+ }
+ }
+
+ private final Map<String, Object>
timeRelatedConfigMapInMillisSecondsWithoutTimeUnits =
+ ImmutableMap.<String, Object>builder()
+ .put("refresh-external-catalogs.interval", "180000")
+ .put("refresh-tables.interval", "60000")
+ .put("optimizer.heart-beat-timeout", "60000")
+ .put("optimizer.task-ack-timeout", "30000")
+ .put("optimizer.polling-timeout", "3000")
+ .put("blocker.timeout", "60000")
+ .put("auto-create-tags.interval", "60000")
+ .put("terminal.session.timeout", "1800000")
+ .build();
+
+ private final Map<String, Object> timeRelatedConfigMapWithTimeUnits =
+ ImmutableMap.<String, Object>builder()
+ .put("refresh-external-catalogs.interval", "5 min")
+ .put("refresh-tables.interval", "2 min")
+ .put("optimizer.heart-beat-timeout", "2 min")
+ .put("optimizer.task-ack-timeout", "60 s")
+ .put("optimizer.polling-timeout", "6 s")
+ .put("blocker.timeout", "2 min")
+ .put("auto-create-tags.interval", "2 min")
+ .put("terminal.session.timeout", "30 ms")
+ .build();
+
+ private final Map<String, Object>
timeRelatedConfigMapWithExpectedTimeUnitsInOldVersions =
+ ImmutableMap.<String, Object>builder()
+ .put("refresh-external-catalogs.interval", "5 min")
+ .put("refresh-tables.interval", "2 min")
+ .put("optimizer.heart-beat-timeout", "2 min")
+ .put("optimizer.task-ack-timeout", "60 s")
+ .put("optimizer.polling-timeout", "6 s")
+ .put("blocker.timeout", "2 min")
+ .put("auto-create-tags.interval", "2 min")
+ .put("terminal.session.timeout", "30 min")
+ .build();
+
+ private final Map<String, Object> storageRelatedConfigMapWithoutTimeUnits =
+ ImmutableMap.<String, Object>builder()
+ .put("thrift-server.max-message-size", "104857600")
+ .build();
+
+ private final Map<String, Object> storageRelatedConfigMapWithTimeUnits =
+ ImmutableMap.<String, Object>builder()
+ .put("thrift-server.max-message-size", "200 mb")
+ .build();
+}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java
index 2396d0048..17c5b20fb 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManager.java
@@ -312,7 +312,7 @@ public class TestTableManager extends AMSTableTestBase {
Assert.assertEquals(getProperties().size() + 3,
block.getProperties().size());
getProperties()
.forEach((key, value) ->
Assert.assertEquals(block.getProperties().get(key), value));
- long timeout = AmoroManagementConf.BLOCKER_TIMEOUT.defaultValue();
+ long timeout =
AmoroManagementConf.BLOCKER_TIMEOUT.defaultValue().toMillis();
Assert.assertEquals(timeout + "",
block.getProperties().get(RenewableBlocker.BLOCKER_TIMEOUT));
Assert.assertEquals(
@@ -322,7 +322,7 @@ public class TestTableManager extends AMSTableTestBase {
}
private void assertBlockerRenewed(Blocker block) {
- long timeout = AmoroManagementConf.BLOCKER_TIMEOUT.defaultValue();
+ long timeout =
AmoroManagementConf.BLOCKER_TIMEOUT.defaultValue().toMillis();
long actualTimeout =
Long.parseLong(block.getProperties().get(RenewableBlocker.EXPIRATION_TIME_PROPERTY))
-
Long.parseLong(block.getProperties().get(RenewableBlocker.CREATE_TIME_PROPERTY));
diff --git a/amoro-ams/src/test/resources/config-with-units.yaml
b/amoro-ams/src/test/resources/config-with-units.yaml
new file mode 100644
index 000000000..c01751c78
--- /dev/null
+++ b/amoro-ams/src/test/resources/config-with-units.yaml
@@ -0,0 +1,200 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ams:
+ admin-username: admin
+ admin-password: admin
+ server-bind-host: "0.0.0.0"
+ server-expose-host: "127.0.0.1"
+
+ thrift-server:
+ max-message-size: 100MB
+ selector-thread-count: 2
+ selector-queue-size: 4
+ table-service:
+ bind-port: 1260
+ worker-thread-count: 20
+ optimizing-service:
+ bind-port: 1261
+
+ http-server:
+ session-timeout: 7d
+ bind-port: 1630
+ rest-auth-type: token
+
+ refresh-external-catalogs:
+ interval: 3min
+ thread-count: 10
+ queue-size: 1000000
+
+ refresh-tables:
+ thread-count: 10
+ interval: 1min
+ max-pending-partition-count: 100 # default 100
+
+ self-optimizing:
+ commit-thread-count: 10
+ runtime-data-keep-days: 30
+ runtime-data-expire-interval-hours: 1
+
+ optimizer:
+ heart-beat-timeout: 1min
+ task-ack-timeout: 30s
+ polling-timeout: 3s
+ max-planning-parallelism: 1 # default 1
+
+ blocker:
+ timeout: 1min
+
+ # optional features
+ expire-snapshots:
+ enabled: true
+ thread-count: 10
+
+ clean-orphan-files:
+ enabled: true
+ thread-count: 10
+ interval: 1d
+
+ clean-dangling-delete-files:
+ enabled: true
+ thread-count: 10
+
+ sync-hive-tables:
+ enabled: false
+ thread-count: 10
+
+ data-expiration:
+ enabled: true
+ thread-count: 10
+ interval: 1d
+
+ auto-create-tags:
+ enabled: true
+ thread-count: 3
+ interval: 1min
+
+ table-manifest-io:
+ thread-count: 20
+
+ catalog-meta-cache:
+ expiration-interval: 60s
+
+ database:
+ type: derby
+ jdbc-driver-class: org.apache.derby.jdbc.EmbeddedDriver
+ url: jdbc:derby:/tmp/amoro/derby;create=true
+ connection-pool-max-total: 20
+ connection-pool-max-idle: 16
+ connection-pool-max-wait-millis: 1000
+
+# MySQL database configuration.
+# database:
+# type: mysql
+# jdbc-driver-class: com.mysql.cj.jdbc.Driver
+# url:
jdbc:mysql://127.0.0.1:3306/db?useUnicode=true&characterEncoding=UTF8&autoReconnect=true&useAffectedRows=true&allowPublicKeyRetrieval=true&useSSL=false
+# username: root
+# password: root
+# auto-create-tables: true
+# connection-pool-max-total: 20
+# connection-pool-max-idle: 16
+# connection-pool-max-wait-millis: 1000
+
+# Postgres database configuration.
+# database:
+# type: postgres
+# jdbc-driver-class: org.postgresql.Driver
+# url: jdbc:postgresql://127.0.0.1:5432/db
+# username: user
+# password: passwd
+# auto-create-tables: true
+# connection-pool-max-total: 20
+# connection-pool-max-idle: 16
+# connection-pool-max-wait-millis: 1000
+
+ terminal:
+ backend: local
+ result:
+ limit: 1000
+ stop-on-error: false
+ session:
+ timeout: 30min
+ local:
+ using-session-catalog-for-hive: false
+ spark.sql.iceberg.handle-timestamp-without-timezone: false
+
+# Kyuubi terminal backend configuration.
+# terminal:
+# backend: kyuubi
+# kyuubi.jdbc.url: jdbc:hive2://127.0.0.1:10009/
+
+# High availability configuration.
+# ha:
+# enabled: true
+# cluster-name: default
+# zookeeper-address: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
+
+
+containers:
+ - name: localContainer
+ container-impl: org.apache.amoro.server.manager.LocalOptimizerContainer
+ properties:
+ export.JAVA_HOME: "/opt/java" # JDK environment
+
+#containers:
+
+# - name: KubernetesContainer
+# container-impl:
org.apache.amoro.server.manager.KubernetesOptimizerContainer
+# properties:
+# kube-config-path: ~/.kube/config
+# image: apache/amoro:{version}
+# namespace: default
+
+# - name: flinkContainer
+# container-impl: org.apache.amoro.server.manager.FlinkOptimizerContainer
+# properties:
+# flink-home: /opt/flink/ # Flink
install home
+# target: yarn-per-job # Flink run
target, (yarn-per-job, yarn-application, kubernetes-application)
+# export.JVM_ARGS: -Djava.security.krb5.conf=/opt/krb5.conf # Flink
launch jvm args, like kerberos config when ues kerberos
+# export.HADOOP_CONF_DIR: /etc/hadoop/conf/ # Hadoop
config dir
+# export.HADOOP_USER_NAME: hadoop # Hadoop
user submit on yarn
+# export.FLINK_CONF_DIR: /opt/flink/conf/ # Flink
config dir
+# # flink kubernetes application properties.
+# job-uri: "local:///opt/flink/usrlib/optimizer-job.jar" # Optimizer
job main jar for kubernetes application
+# flink-conf.kubernetes.container.image:
"apache/amoro-flink-optimizer:{version}" # Optimizer image ref
+# flink-conf.kubernetes.service-account: flink # Service
account that is used within kubernetes cluster.
+
+#containers:
+# - name: sparkContainer
+# container-impl: org.apache.amoro.server.manager.SparkOptimizerContainer
+# properties:
+# spark-home: /opt/spark/ # Spark
install home
+# master: yarn # The
cluster manager to connect to. See the list of
https://spark.apache.org/docs/latest/submitting-applications.html#master-urls.
+# deploy-mode: cluster # Spark
deploy mode, client or cluster
+# export.JVM_ARGS: -Djava.security.krb5.conf=/opt/krb5.conf # Spark
launch jvm args, like kerberos config when ues kerberos
+# export.HADOOP_CONF_DIR: /etc/hadoop/conf/ # Hadoop
config dir
+# export.HADOOP_USER_NAME: hadoop # Hadoop
user submit on yarn
+# export.SPARK_CONF_DIR: /opt/spark/conf/ # Spark
config dir
+# # spark kubernetes application properties.
+# job-uri: "local:///opt/spark/usrlib/optimizer-job.jar" # Optimizer
job main jar for kubernetes application
+# ams-optimizing-uri: thrift://ams.amoro.service.local:1261 # AMS
optimizing uri
+# spark-conf.spark.dynamicAllocation.enabled: "true" # Enabling
DRA feature can make full use of computing resources
+# spark-conf.spark.shuffle.service.enabled: "false" # If spark
DRA is used on kubernetes, we should set it false
+# spark-conf.spark.dynamicAllocation.shuffleTracking.enabled: "true"
# Enables shuffle file tracking for executors, which allows
dynamic allocation without the need for an external shuffle service
+# spark-conf.spark.kubernetes.container.image:
"apache/amoro-spark-optimizer:{version}" # Optimizer image ref
+# spark-conf.spark.kubernetes.namespace: <spark-namespace>
# Namespace that is used within kubernetes cluster
+# spark-conf.spark.kubernetes.authenticate.driver.serviceAccountName:
<spark-sa> # Service account that is used within kubernetes
cluster.
diff --git a/amoro-ams/src/test/resources/config-without-units.yaml
b/amoro-ams/src/test/resources/config-without-units.yaml
new file mode 100644
index 000000000..a78164f2c
--- /dev/null
+++ b/amoro-ams/src/test/resources/config-without-units.yaml
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ams:
+ thrift-server:
+ max-message-size: 209715200 # 200MB
+
+ refresh-external-catalogs:
+ interval: 300000 # 5min
+
+ refresh-tables:
+ interval: 120000 # 2min
+
+ optimizer:
+ heart-beat-timeout: 120000 # 2min
+ task-ack-timeout: 60000 # 60s
+ polling-timeout: 6000 # 6s
+
+ blocker:
+ timeout: 120000 # 2min
+
+ auto-create-tags:
+ interval: 120000 # 2min
+
+ terminal:
+ session:
+ timeout: 30 # 30min when version < 0.8
\ No newline at end of file
diff --git
a/amoro-common/src/main/java/org/apache/amoro/config/ConfigHelpers.java
b/amoro-common/src/main/java/org/apache/amoro/config/ConfigHelpers.java
index d7a705e04..ec4395215 100644
--- a/amoro-common/src/main/java/org/apache/amoro/config/ConfigHelpers.java
+++ b/amoro-common/src/main/java/org/apache/amoro/config/ConfigHelpers.java
@@ -21,6 +21,8 @@ package org.apache.amoro.config;
import static
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkArgument;
import static
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull;
+import org.apache.amoro.utils.MemorySize;
+
import javax.annotation.Nonnull;
import java.io.File;
@@ -108,11 +110,22 @@ public class ConfigHelpers {
return (T) convertToDuration(rawValue);
} else if (clazz == Map.class) {
return (T) convertToProperties(rawValue);
+ } else if (clazz == MemorySize.class) {
+ return (T) convertToMemorySize(rawValue);
}
throw new IllegalArgumentException("Unsupported type: " + clazz);
}
+ @SuppressWarnings("unchecked")
+ public static <T> T convertToMemorySize(Object rawValue) {
+ if (rawValue instanceof MemorySize) {
+ return (T) rawValue;
+ } else {
+ return (T) MemorySize.parse(rawValue.toString());
+ }
+ }
+
@SuppressWarnings("unchecked")
public static <T> T convertToList(Object rawValue, Class<?> atomicClass) {
if (rawValue instanceof List) {
diff --git
a/amoro-common/src/main/java/org/apache/amoro/config/ConfigOptions.java
b/amoro-common/src/main/java/org/apache/amoro/config/ConfigOptions.java
index 4397ee4ce..29902c96d 100644
--- a/amoro-common/src/main/java/org/apache/amoro/config/ConfigOptions.java
+++ b/amoro-common/src/main/java/org/apache/amoro/config/ConfigOptions.java
@@ -20,6 +20,8 @@ package org.apache.amoro.config;
import static
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull;
+import org.apache.amoro.utils.MemorySize;
+
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
@@ -158,6 +160,14 @@ public class ConfigOptions {
return new TypedConfigOptionBuilder<>(key, PROPERTIES_MAP_CLASS);
}
+ /**
+ * Defines that the value of the option should be of {@link
org.apache.amoro.utils.MemorySize}
+ * type.
+ */
+ public TypedConfigOptionBuilder<MemorySize> memorySizeType() {
+ return new TypedConfigOptionBuilder<>(key, MemorySize.class);
+ }
+
/**
* Creates a ConfigOption with the given default value.
*