This is an automated email from the ASF dual-hosted git repository.
klion26 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 869860ec3 [AMORO-4040] Change default value of
optimizer.task-execute-timeout to Integer.MAX_VALUE s (#4042)
869860ec3 is described below
commit 869860ec357931d22a8a941781bf6f39814dc9c7
Author: davedwwang <[email protected]>
AuthorDate: Wed Jan 28 13:59:04 2026 +0800
[AMORO-4040] Change default value of optimizer.task-execute-timeout to
Integer.MAX_VALUE s (#4042)
In 0.8.1, we add `optimizer.task-executor-timeout` and set the default
value to 1h
which breaks the behavior with the previous version.
In this commit, we change the default to Integer.MAX_VALUE s, so that it
behaviors
like the same as before.
---
.../apache/amoro/server/AmoroManagementConf.java | 6 +--
.../apache/amoro/server/AmoroServiceContainer.java | 6 ++-
.../amoro/server/DefaultOptimizingService.java | 22 ++++++----
.../apache/amoro/config/ConfigurationsTest.java | 30 +++++++++++++
.../amoro/config/ConfigurationException.java | 49 ++++++++++++++++++++++
.../org/apache/amoro/config/Configurations.java | 16 +++++++
docs/configuration/ams-config.md | 2 +-
7 files changed, 117 insertions(+), 14 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index e0a26fed5..7877ee76a 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -487,9 +487,9 @@ public class AmoroManagementConf {
public static final ConfigOption<Duration> OPTIMIZER_TASK_EXECUTE_TIMEOUT =
ConfigOptions.key("optimizer.task-execute-timeout")
.durationType()
- .defaultValue(Duration.ofHours(1))
- .withDescription("Timeout duration for task execution, default to 1
hour.");
-
+ .defaultValue(Duration.ofSeconds(Integer.MAX_VALUE))
+ .withDescription(
+ "Timeout duration for task execution, default to
Integer.MAX_VALUE seconds(about 24,855 days).");
public static final ConfigOption<Integer> OPTIMIZER_MAX_PLANNING_PARALLELISM
=
ConfigOptions.key("optimizer.max-planning-parallelism")
.intType()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index ebb4bf130..4fd3e5e9a 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -28,6 +28,7 @@ import org.apache.amoro.OptimizerProperties;
import org.apache.amoro.api.AmoroTableMetastore;
import org.apache.amoro.api.OptimizingService;
import org.apache.amoro.config.ConfigHelpers;
+import org.apache.amoro.config.ConfigurationException;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.shade.utils.ConfigShadeUtils;
import org.apache.amoro.exception.AmoroRuntimeException;
@@ -149,6 +150,9 @@ public class AmoroServiceContainer {
service.transitionToLeader();
// Used to block AMS instances that have acquired leadership
service.waitFollowerShip();
+ } catch (ConfigurationException e) {
+ LOG.error("AMS will exit...", e);
+ System.exit(1);
} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
@@ -157,7 +161,7 @@ public class AmoroServiceContainer {
}
}
} catch (Throwable t) {
- LOG.error("AMS encountered an unknown exception, will exist", t);
+ LOG.error("AMS encountered an unknown exception, will exit...", t);
System.exit(1);
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 6cc05bc56..f3b13e69a 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -113,17 +113,17 @@ public class DefaultOptimizingService extends
StatedPersistentBase
OptimizerManager optimizerManager,
TableService tableService) {
this.optimizerTouchTimeout =
- serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis();
+
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
this.taskAckTimeout =
-
serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT).toMillis();
+
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT);
this.taskExecuteTimeout =
-
serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT).toMillis();
+
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT);
this.refreshGroupInterval =
-
serviceConfig.get(AmoroManagementConf.OPTIMIZING_REFRESH_GROUP_INTERVAL).toMillis();
+
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZING_REFRESH_GROUP_INTERVAL);
this.maxPlanningParallelism =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM);
this.pollingTimeout =
-
serviceConfig.get(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT).toMillis();
+
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT);
this.breakQuotaLimit =
serviceConfig.getBoolean(AmoroManagementConf.OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED);
this.tableService = tableService;
@@ -511,8 +511,7 @@ public class DefaultOptimizingService extends
StatedPersistentBase
}
private void retryTask(TaskRuntime<?> task, OptimizingQueue queue) {
- if (task.getStatus() == TaskRuntime.Status.ACKED
- && task.getStartTime() + taskExecuteTimeout <
System.currentTimeMillis()) {
+ if (isTaskExecTimeout(task)) {
LOG.warn(
"Task {} has been suspended in ACK state for {} (start time: {}),
put it to retry queue, optimizer {}. (Note: The task may have finished
executing, but ams did not receive the COMPLETE message from the optimizer.)",
task.getTaskId(),
@@ -543,11 +542,16 @@ public class DefaultOptimizingService extends
StatedPersistentBase
&& task.getStatus() != TaskRuntime.Status.SUCCESS
|| task.getStatus() == TaskRuntime.Status.SCHEDULED
&& task.getStartTime() + taskAckTimeout <
System.currentTimeMillis()
- || task.getStatus() == TaskRuntime.Status.ACKED
- && task.getStartTime() + taskExecuteTimeout <
System.currentTimeMillis();
+ || isTaskExecTimeout(task);
}
}
+ private boolean isTaskExecTimeout(TaskRuntime<?> task) {
+ return task.getStatus() == TaskRuntime.Status.ACKED
+ && taskExecuteTimeout > 0
+ && task.getStartTime() + taskExecuteTimeout <
System.currentTimeMillis();
+ }
+
private class OptimizingConfigWatcher implements Runnable {
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/config/ConfigurationsTest.java
b/amoro-ams/src/test/java/org/apache/amoro/config/ConfigurationsTest.java
index 4019aa490..1e9a9df41 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/config/ConfigurationsTest.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/config/ConfigurationsTest.java
@@ -35,6 +35,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
+import java.util.Properties;
/**
* End-to-end test cases for configuration documentation.
@@ -74,6 +75,35 @@ public class ConfigurationsTest {
generateConfigurationMarkdown("ams-config.md", "AMS Configuration", 100,
confInfoList);
}
+ @Test
+ public void testGetDurationInMillis() throws Exception {
+ Properties properties = new Properties();
+ properties.put(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT.key(),
"1h");
+ Configurations configuration =
ConfigHelpers.createConfiguration(properties);
+ long durationInMillis =
+
configuration.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT);
+ Assertions.assertEquals(3600000, durationInMillis);
+
+ // default value test
+ properties = new Properties();
+ configuration = ConfigHelpers.createConfiguration(properties);
+ durationInMillis =
+
configuration.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT);
+ Assertions.assertEquals(Integer.MAX_VALUE * 1000L, durationInMillis);
+
+ properties.put(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT.key(),
Long.MAX_VALUE + "m");
+ final Configurations conf1 = ConfigHelpers.createConfiguration(properties);
+ Assertions.assertThrows(
+ ConfigurationException.class,
+ () ->
conf1.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT));
+
+ properties.put(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT.key(),
"-1m");
+ final Configurations conf2 = ConfigHelpers.createConfiguration(properties);
+ Assertions.assertThrows(
+ ConfigurationException.class,
+ () ->
conf2.getDurationInMillis(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT));
+ }
+
/**
* Generate configuration documentation for multiple configuration classes.
*
diff --git
a/amoro-common/src/main/java/org/apache/amoro/config/ConfigurationException.java
b/amoro-common/src/main/java/org/apache/amoro/config/ConfigurationException.java
new file mode 100644
index 000000000..9294b8e95
--- /dev/null
+++
b/amoro-common/src/main/java/org/apache/amoro/config/ConfigurationException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.config;
+
+/**
+ * Exception thrown when a configuration value causes an exception, typically
when converting
+ * Duration to milliseconds. This exception can be caught to trigger process
exit.
+ */
+public class ConfigurationException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String configKey;
+
+ public ConfigurationException(String configKey, String message) {
+ super(message);
+ this.configKey = configKey;
+ }
+
+ public ConfigurationException(String configKey, String message, Throwable
cause) {
+ super(message, cause);
+ this.configKey = configKey;
+ }
+
+ /**
+ * Returns the configuration key that caused the overflow.
+ *
+ * @return the configuration key
+ */
+ public String getConfigKey() {
+ return configKey;
+ }
+}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java
b/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java
index 1b1ace835..74ecf61a1 100644
--- a/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java
+++ b/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java
@@ -20,6 +20,7 @@ package org.apache.amoro.config;
import static
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkNotNull;
+import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -532,6 +533,21 @@ public class Configurations implements
java.io.Serializable, Cloneable {
return getOptional(option).orElseGet(option::defaultValue);
}
+ public long getDurationInMillis(ConfigOption<Duration> option) {
+ long result;
+ try {
+ result = getOptional(option).orElseGet(option::defaultValue).toMillis();
+ } catch (Exception e) { // may be throw java.lang.ArithmeticException:
long overflow
+ throw new ConfigurationException(
+ option.key(),
+ String.format(
+ "Exception when converting duration to millis for config option
'%s': %s",
+ option.key(), e.getMessage()),
+ e);
+ }
+ return result;
+ }
+
public <T> Optional<T> getOptional(ConfigOption<T> option) {
Optional<Object> rawValue = getRawValueFromOption(option);
Class<?> clazz = option.getClazz();
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index 147122e14..d976978c2 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -92,7 +92,7 @@ table td:last-child, table th:last-child { width: 40%;
word-break: break-all; }
| optimizer.max-planning-parallelism | 1 | Max planning parallelism in one
optimizer group. |
| optimizer.polling-timeout | 3 s | Optimizer polling task timeout. |
| optimizer.task-ack-timeout | 30 s | Timeout duration for task
acknowledgment. |
-| optimizer.task-execute-timeout | 1 h | Timeout duration for task execution,
default to 1 hour. |
+| optimizer.task-execute-timeout | 2147483647 s | Timeout duration for task
execution, default to Integer.MAX_VALUE seconds(about 24,855 days). |
| overview-cache.max-size | 3360 | Max size of overview cache. |
| overview-cache.refresh-interval | 3 min | Interval for refreshing overview
cache. |
| refresh-external-catalogs.interval | 3 min | Interval to refresh the
external catalog. |