This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6b6d8fae54 [Improve] re-struct Zeta Engine config options (#8741)
6b6d8fae54 is described below
commit 6b6d8fae54a62fd36f79b1eaeb4fbc1afe45cefa
Author: Jarvis <[email protected]>
AuthorDate: Wed Feb 19 13:52:43 2025 +0800
[Improve] re-struct Zeta Engine config options (#8741)
---
.../engine/common/config/EngineConfig.java | 48 +-
.../config/YamlSeaTunnelDomConfigProcessor.java | 221 +++++---
.../common/config/server/CheckpointConfig.java | 12 +-
.../config/server/CheckpointStorageConfig.java | 6 +-
.../config/server/ConnectorJarHAStorageConfig.java | 4 +-
.../config/server/ConnectorJarStorageConfig.java | 17 +-
.../config/server/CoordinatorServiceConfig.java | 14 +-
.../engine/common/config/server/HttpConfig.java | 15 +-
.../common/config/server/ServerConfigOptions.java | 560 +++++++++++----------
.../common/config/server/SlotServiceConfig.java | 10 +-
.../server/service/jar/StorageStrategyFactory.java | 2 +-
.../server/utils/SystemLoadCalculateTest.java | 8 +-
12 files changed, 552 insertions(+), 365 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index dd043aba55..8e0be8b83e 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -41,35 +41,44 @@ import static
com.hazelcast.internal.util.Preconditions.checkPositive;
@Data
public class EngineConfig {
- private int backupCount = ServerConfigOptions.BACKUP_COUNT.defaultValue();
+ private int backupCount =
+
ServerConfigOptions.MasterServerConfigOptions.BACKUP_COUNT.defaultValue();
private int printExecutionInfoInterval =
- ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.defaultValue();
+
ServerConfigOptions.MasterServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL
+ .defaultValue();
private int printJobMetricsInfoInterval =
- ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.defaultValue();
+
ServerConfigOptions.MasterServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL
+ .defaultValue();
private int jobMetricsBackupInterval =
- ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL.defaultValue();
+
ServerConfigOptions.MasterServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL
+ .defaultValue();
private ThreadShareMode taskExecutionThreadShareMode =
-
ServerConfigOptions.TASK_EXECUTION_THREAD_SHARE_MODE.defaultValue();
+
ServerConfigOptions.WorkerServerConfigOptions.TASK_EXECUTION_THREAD_SHARE_MODE
+ .defaultValue();
- private SlotServiceConfig slotServiceConfig =
ServerConfigOptions.SLOT_SERVICE.defaultValue();
+ private SlotServiceConfig slotServiceConfig =
+
ServerConfigOptions.WorkerServerConfigOptions.SLOT_SERVICE.defaultValue();
- private CheckpointConfig checkpointConfig =
ServerConfigOptions.CHECKPOINT.defaultValue();
+ private CheckpointConfig checkpointConfig =
+
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT.defaultValue();
private CoordinatorServiceConfig coordinatorServiceConfig =
- ServerConfigOptions.COORDINATOR_SERVICE.defaultValue();
+
ServerConfigOptions.MasterServerConfigOptions.COORDINATOR_SERVICE.defaultValue();
private ConnectorJarStorageConfig connectorJarStorageConfig =
- ServerConfigOptions.CONNECTOR_JAR_STORAGE_CONFIG.defaultValue();
+
ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_CONFIG
+ .defaultValue();
private boolean classloaderCacheMode =
ServerConfigOptions.CLASSLOADER_CACHE_MODE.defaultValue();
- private QueueType queueType =
ServerConfigOptions.QUEUE_TYPE.defaultValue();
+ private QueueType queueType =
+
ServerConfigOptions.WorkerServerConfigOptions.QUEUE_TYPE.defaultValue();
private int historyJobExpireMinutes =
- ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.defaultValue();
+
ServerConfigOptions.MasterServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.defaultValue();
private ClusterRole clusterRole = ClusterRole.MASTER_AND_WORKER;
@@ -81,9 +90,10 @@ public class EngineConfig {
private TelemetryConfig telemetryConfig =
ServerConfigOptions.TELEMETRY.defaultValue();
private ScheduleStrategy scheduleStrategy =
- ServerConfigOptions.JOB_SCHEDULE_STRATEGY.defaultValue();
+
ServerConfigOptions.MasterServerConfigOptions.JOB_SCHEDULE_STRATEGY.defaultValue();
- private HttpConfig httpConfig = ServerConfigOptions.HTTP.defaultValue();
+ private HttpConfig httpConfig =
+ ServerConfigOptions.MasterServerConfigOptions.HTTP.defaultValue();
public void setBackupCount(int newBackupCount) {
checkBackupCount(newBackupCount, 0);
@@ -97,21 +107,24 @@ public class EngineConfig {
public void setPrintExecutionInfoInterval(int printExecutionInfoInterval) {
checkPositive(
printExecutionInfoInterval,
- ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL + " must be
> 0");
+
ServerConfigOptions.MasterServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL
+ + " must be > 0");
this.printExecutionInfoInterval = printExecutionInfoInterval;
}
public void setPrintJobMetricsInfoInterval(int
printJobMetricsInfoInterval) {
checkPositive(
printJobMetricsInfoInterval,
- ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + " must
be > 0");
+
ServerConfigOptions.MasterServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL
+ + " must be > 0");
this.printJobMetricsInfoInterval = printJobMetricsInfoInterval;
}
public void setJobMetricsBackupInterval(int jobMetricsBackupInterval) {
checkPositive(
jobMetricsBackupInterval,
- ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL + " must be >
0");
+
ServerConfigOptions.MasterServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL
+ + " must be > 0");
this.jobMetricsBackupInterval = jobMetricsBackupInterval;
}
@@ -123,7 +136,8 @@ public class EngineConfig {
public void setHistoryJobExpireMinutes(int historyJobExpireMinutes) {
checkPositive(
historyJobExpireMinutes,
- ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES + " must be >
0");
+
ServerConfigOptions.MasterServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES
+ + " must be > 0");
this.historyJobExpireMinutes = historyJobExpireMinutes;
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 4c4d62132c..7b4c968dd2 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -96,12 +96,16 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
SlotServiceConfig slotServiceConfig = new SlotServiceConfig();
for (Node node : childElements(slotServiceNode)) {
String name = cleanNodeName(node);
- if (ServerConfigOptions.DYNAMIC_SLOT.key().equals(name)) {
+ if
(ServerConfigOptions.WorkerServerConfigOptions.DYNAMIC_SLOT.key().equals(name))
{
slotServiceConfig.setDynamicSlot(getBooleanValue(getTextContent(node)));
- } else if (ServerConfigOptions.SLOT_NUM.key().equals(name)) {
+ } else if
(ServerConfigOptions.WorkerServerConfigOptions.SLOT_NUM.key().equals(name)) {
slotServiceConfig.setSlotNum(
- getIntegerValue(ServerConfigOptions.SLOT_NUM.key(),
getTextContent(node)));
- } else if
(ServerConfigOptions.SLOT_ALLOCATE_STRATEGY.key().equals(name)) {
+ getIntegerValue(
+
ServerConfigOptions.WorkerServerConfigOptions.SLOT_NUM.key(),
+ getTextContent(node)));
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.SLOT_ALLOCATE_STRATEGY
+ .key()
+ .equals(name)) {
slotServiceConfig.setAllocateStrategy(
AllocateStrategy.valueOf(getTextContent(node).toUpperCase()));
} else {
@@ -115,14 +119,18 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
CoordinatorServiceConfig coordinatorServiceConfig = new
CoordinatorServiceConfig();
for (Node node : childElements(coordinatorServiceNode)) {
String name = cleanNodeName(node);
- if (ServerConfigOptions.MAX_THREAD_NUM.key().equals(name)) {
+ if
(ServerConfigOptions.MasterServerConfigOptions.MAX_THREAD_NUM.key().equals(name))
{
coordinatorServiceConfig.setMaxThreadNum(
getIntegerValue(
- ServerConfigOptions.MAX_THREAD_NUM.key(),
getTextContent(node)));
- } else if (ServerConfigOptions.CORE_THREAD_NUM.key().equals(name))
{
+
ServerConfigOptions.MasterServerConfigOptions.MAX_THREAD_NUM.key(),
+ getTextContent(node)));
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.CORE_THREAD_NUM
+ .key()
+ .equals(name)) {
coordinatorServiceConfig.setCoreThreadNum(
getIntegerValue(
- ServerConfigOptions.CORE_THREAD_NUM.key(),
getTextContent(node)));
+
ServerConfigOptions.MasterServerConfigOptions.CORE_THREAD_NUM.key(),
+ getTextContent(node)));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
@@ -134,56 +142,91 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
final EngineConfig engineConfig = config.getEngineConfig();
for (Node node : childElements(engineNode)) {
String name = cleanNodeName(node);
- if (ServerConfigOptions.BACKUP_COUNT.key().equals(name)) {
+ if
(ServerConfigOptions.MasterServerConfigOptions.BACKUP_COUNT.key().equals(name))
{
engineConfig.setBackupCount(
getIntegerValue(
- ServerConfigOptions.BACKUP_COUNT.key(),
getTextContent(node)));
- } else if (ServerConfigOptions.QUEUE_TYPE.key().equals(name)) {
+
ServerConfigOptions.MasterServerConfigOptions.BACKUP_COUNT.key(),
+ getTextContent(node)));
+ } else if (ServerConfigOptions.WorkerServerConfigOptions.QUEUE_TYPE
+ .key()
+ .equals(name)) {
engineConfig.setQueueType(
QueueType.valueOf(getTextContent(node).toUpperCase(Locale.ROOT)));
- } else if
(ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL
+ .key()
+ .equals(name)) {
engineConfig.setPrintExecutionInfoInterval(
getIntegerValue(
-
ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key(),
+ ServerConfigOptions.MasterServerConfigOptions
+ .PRINT_EXECUTION_INFO_INTERVAL
+ .key(),
getTextContent(node)));
- } else if
(ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL
+ .key()
+ .equals(name)) {
engineConfig.setPrintJobMetricsInfoInterval(
getIntegerValue(
-
ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key(),
+ ServerConfigOptions.MasterServerConfigOptions
+ .PRINT_JOB_METRICS_INFO_INTERVAL
+ .key(),
getTextContent(node)));
- } else if
(ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL
+ .key()
+ .equals(name)) {
engineConfig.setJobMetricsBackupInterval(
getIntegerValue(
-
ServerConfigOptions.JOB_METRICS_BACKUP_INTERVAL.key(),
+ ServerConfigOptions.MasterServerConfigOptions
+ .JOB_METRICS_BACKUP_INTERVAL
+ .key(),
getTextContent(node)));
- } else if
(ServerConfigOptions.TASK_EXECUTION_THREAD_SHARE_MODE.key().equals(name)) {
+ } else if (ServerConfigOptions.WorkerServerConfigOptions
+ .TASK_EXECUTION_THREAD_SHARE_MODE
+ .key()
+ .equals(name)) {
String mode = getTextContent(node).toUpperCase(Locale.ROOT);
if (!Arrays.asList("ALL", "OFF", "PART").contains(mode)) {
throw new IllegalArgumentException(
-
ServerConfigOptions.TASK_EXECUTION_THREAD_SHARE_MODE
+ ServerConfigOptions.WorkerServerConfigOptions
+ .TASK_EXECUTION_THREAD_SHARE_MODE
+ " must in [ALL, OFF, PART]");
}
engineConfig.setTaskExecutionThreadShareMode(ThreadShareMode.valueOf(mode));
- } else if (ServerConfigOptions.SLOT_SERVICE.key().equals(name)) {
+ } else if
(ServerConfigOptions.WorkerServerConfigOptions.SLOT_SERVICE
+ .key()
+ .equals(name)) {
engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node));
- } else if (ServerConfigOptions.CHECKPOINT.key().equals(name)) {
+ } else if (ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT
+ .key()
+ .equals(name)) {
engineConfig.setCheckpointConfig(parseCheckpointConfig(node));
- } else if
(ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES
+ .key()
+ .equals(name)) {
engineConfig.setHistoryJobExpireMinutes(
getIntegerValue(
-
ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.key(),
+ ServerConfigOptions.MasterServerConfigOptions
+ .HISTORY_JOB_EXPIRE_MINUTES
+ .key(),
getTextContent(node)));
- } else if
(ServerConfigOptions.CONNECTOR_JAR_STORAGE_CONFIG.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_CONFIG
+ .key()
+ .equals(name)) {
engineConfig.setConnectorJarStorageConfig(parseConnectorJarStorageConfig(node));
} else if
(ServerConfigOptions.CLASSLOADER_CACHE_MODE.key().equals(name)) {
engineConfig.setClassloaderCacheMode(getBooleanValue(getTextContent(node)));
- } else if
(ServerConfigOptions.EVENT_REPORT_HTTP.equalsIgnoreCase(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.EVENT_REPORT_HTTP
+ .equalsIgnoreCase(name)) {
NamedNodeMap attributes = node.getAttributes();
- Node urlNode =
attributes.getNamedItem(ServerConfigOptions.EVENT_REPORT_HTTP_URL);
+ Node urlNode =
+ attributes.getNamedItem(
+ ServerConfigOptions.MasterServerConfigOptions
+ .EVENT_REPORT_HTTP_URL);
if (urlNode != null) {
engineConfig.setEventReportHttpApi(getTextContent(urlNode));
Node headersNode =
-
attributes.getNamedItem(ServerConfigOptions.EVENT_REPORT_HTTP_HEADERS);
+ attributes.getNamedItem(
+
ServerConfigOptions.MasterServerConfigOptions
+ .EVENT_REPORT_HTTP_HEADERS);
if (headersNode != null) {
Map<String, String> headers = new LinkedHashMap<>();
NodeList nodeList = headersNode.getChildNodes();
@@ -196,12 +239,16 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
}
} else if (ServerConfigOptions.TELEMETRY.key().equals(name)) {
engineConfig.setTelemetryConfig(parseTelemetryConfig(node));
- } else if
(ServerConfigOptions.JOB_SCHEDULE_STRATEGY.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.JOB_SCHEDULE_STRATEGY
+ .key()
+ .equals(name)) {
engineConfig.setScheduleStrategy(
ScheduleStrategy.valueOf(getTextContent(node).toUpperCase(Locale.ROOT)));
- } else if (ServerConfigOptions.HTTP.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.HTTP.key().equals(name)) {
engineConfig.setHttpConfig(parseHttpConfig(node));
- } else if
(ServerConfigOptions.COORDINATOR_SERVICE.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.COORDINATOR_SERVICE
+ .key()
+ .equals(name)) {
engineConfig.setCoordinatorServiceConfig(parseCoordinatorServiceConfig(node));
} else {
LOGGER.warning("Unrecognized element: " + name);
@@ -219,22 +266,35 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
CheckpointConfig checkpointConfig = new CheckpointConfig();
for (Node node : childElements(checkpointNode)) {
String name = cleanNodeName(node);
- if (ServerConfigOptions.CHECKPOINT_INTERVAL.key().equals(name)) {
+ if
(ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_INTERVAL
+ .key()
+ .equals(name)) {
checkpointConfig.setCheckpointInterval(
getIntegerValue(
- ServerConfigOptions.CHECKPOINT_INTERVAL.key(),
+
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_INTERVAL
+ .key(),
getTextContent(node)));
- } else if
(ServerConfigOptions.CHECKPOINT_TIMEOUT.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_TIMEOUT
+ .key()
+ .equals(name)) {
checkpointConfig.setCheckpointTimeout(
getIntegerValue(
- ServerConfigOptions.CHECKPOINT_TIMEOUT.key(),
+
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_TIMEOUT
+ .key(),
getTextContent(node)));
- } else if
(ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.key().equals(name)) {
+ } else if (ServerConfigOptions.MasterServerConfigOptions
+ .SCHEMA_CHANGE_CHECKPOINT_TIMEOUT
+ .key()
+ .equals(name)) {
checkpointConfig.setSchemaChangeCheckpointTimeout(
getIntegerValue(
-
ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.key(),
+ ServerConfigOptions.MasterServerConfigOptions
+ .SCHEMA_CHANGE_CHECKPOINT_TIMEOUT
+ .key(),
getTextContent(node)));
- } else if
(ServerConfigOptions.CHECKPOINT_STORAGE.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_STORAGE
+ .key()
+ .equals(name)) {
checkpointConfig.setStorage(parseCheckpointStorageConfig(node));
} else {
LOGGER.warning("Unrecognized element: " + name);
@@ -248,14 +308,23 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
CheckpointStorageConfig checkpointStorageConfig = new
CheckpointStorageConfig();
for (Node node : childElements(checkpointStorageConfigNode)) {
String name = cleanNodeName(node);
- if
(ServerConfigOptions.CHECKPOINT_STORAGE_TYPE.key().equals(name)) {
+ if
(ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_STORAGE_TYPE
+ .key()
+ .equals(name)) {
checkpointStorageConfig.setStorage(getTextContent(node));
- } else if
(ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED
+ .key()
+ .equals(name)) {
checkpointStorageConfig.setMaxRetainedCheckpoints(
getIntegerValue(
-
ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.key(),
+ ServerConfigOptions.MasterServerConfigOptions
+ .CHECKPOINT_STORAGE_MAX_RETAINED
+ .key(),
getTextContent(node)));
- } else if
(ServerConfigOptions.CHECKPOINT_STORAGE_PLUGIN_CONFIG.key().equals(name)) {
+ } else if (ServerConfigOptions.MasterServerConfigOptions
+ .CHECKPOINT_STORAGE_PLUGIN_CONFIG
+ .key()
+ .equals(name)) {
Map<String, String> pluginConfig =
parseCheckpointPluginConfig(node);
checkpointStorageConfig.setStoragePluginConfig(pluginConfig);
} else {
@@ -285,30 +354,47 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
ConnectorJarStorageConfig connectorJarStorageConfig = new
ConnectorJarStorageConfig();
for (Node node : childElements(connectorJarStorageConfigNode)) {
String name = cleanNodeName(node);
- if
(ServerConfigOptions.ENABLE_CONNECTOR_JAR_STORAGE.key().equals(name)) {
+ if
(ServerConfigOptions.MasterServerConfigOptions.ENABLE_CONNECTOR_JAR_STORAGE
+ .key()
+ .equals(name)) {
connectorJarStorageConfig.setEnable(getBooleanValue(getTextContent(node)));
- } else if
(ServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE
+ .key()
+ .equals(name)) {
String mode = getTextContent(node).toUpperCase();
if (StringUtils.isNotBlank(mode)
&& !Arrays.asList("SHARED",
"ISOLATED").contains(mode)) {
throw new IllegalArgumentException(
- ServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE
+
ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE
+ " must in [SHARED, ISOLATED]");
}
connectorJarStorageConfig.setStorageMode(ConnectorJarStorageMode.valueOf(mode));
- } else if
(ServerConfigOptions.CONNECTOR_JAR_STORAGE_PATH.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_PATH
+ .key()
+ .equals(name)) {
connectorJarStorageConfig.setStoragePath(getTextContent(node));
- } else if
(ServerConfigOptions.CONNECTOR_JAR_CLEANUP_TASK_INTERVAL.key().equals(name)) {
+ } else if (ServerConfigOptions.MasterServerConfigOptions
+ .CONNECTOR_JAR_CLEANUP_TASK_INTERVAL
+ .key()
+ .equals(name)) {
connectorJarStorageConfig.setCleanupTaskInterval(
getIntegerValue(
-
ServerConfigOptions.CONNECTOR_JAR_CLEANUP_TASK_INTERVAL.key(),
+ ServerConfigOptions.MasterServerConfigOptions
+ .CONNECTOR_JAR_CLEANUP_TASK_INTERVAL
+ .key(),
getTextContent(node)));
- } else if
(ServerConfigOptions.CONNECTOR_JAR_EXPIRY_TIME.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_EXPIRY_TIME
+ .key()
+ .equals(name)) {
connectorJarStorageConfig.setConnectorJarExpiryTime(
getIntegerValue(
-
ServerConfigOptions.CONNECTOR_JAR_EXPIRY_TIME.key(),
+ ServerConfigOptions.MasterServerConfigOptions
+ .CONNECTOR_JAR_EXPIRY_TIME
+ .key(),
getTextContent(node)));
- } else if
(ServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_CONFIG.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_CONFIG
+ .key()
+ .equals(name)) {
connectorJarStorageConfig.setConnectorJarHAStorageConfig(
parseConnectorJarHAStorageConfig(node));
} else {
@@ -323,16 +409,20 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
ConnectorJarHAStorageConfig connectorJarHAStorageConfig = new
ConnectorJarHAStorageConfig();
for (Node node : childElements(connectorJarHAStorageConfigNode)) {
String name = cleanNodeName(node);
- if
(ServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_TYPE.key().equals(name)) {
+ if
(ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_TYPE
+ .key()
+ .equals(name)) {
String type = getTextContent(node);
if (StringUtils.isNotBlank(type)
&& !Arrays.asList("localfile", "hdfs").contains(type))
{
throw new IllegalArgumentException(
- ServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_TYPE
+ ServerConfigOptions.MasterServerConfigOptions
+ .CONNECTOR_JAR_HA_STORAGE_TYPE
+ " must in [localfile, hdfs]");
}
connectorJarHAStorageConfig.setType(type);
- } else if
(ServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_PLUGIN_CONFIG
+ } else if (ServerConfigOptions.MasterServerConfigOptions
+ .CONNECTOR_JAR_HA_STORAGE_PLUGIN_CONFIG
.key()
.equals(name)) {
Map<String, String> connectorJarHAStoragePluginConfig =
@@ -404,19 +494,30 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
HttpConfig httpConfig = new HttpConfig();
for (Node node : childElements(httpNode)) {
String name = cleanNodeName(node);
- if (ServerConfigOptions.PORT.key().equals(name)) {
+ if
(ServerConfigOptions.MasterServerConfigOptions.PORT.key().equals(name)) {
httpConfig.setPort(
- getIntegerValue(ServerConfigOptions.PORT.key(),
getTextContent(node)));
- } else if (ServerConfigOptions.CONTEXT_PATH.key().equals(name)) {
+ getIntegerValue(
+
ServerConfigOptions.MasterServerConfigOptions.PORT.key(),
+ getTextContent(node)));
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.CONTEXT_PATH
+ .key()
+ .equals(name)) {
httpConfig.setContextPath(getTextContent(node));
- } else if (ServerConfigOptions.ENABLE_HTTP.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.ENABLE_HTTP
+ .key()
+ .equals(name)) {
httpConfig.setEnabled(getBooleanValue(getTextContent(node)));
- } else if
(ServerConfigOptions.ENABLE_DYNAMIC_PORT.key().equals(name)) {
+ } else if
(ServerConfigOptions.MasterServerConfigOptions.ENABLE_DYNAMIC_PORT
+ .key()
+ .equals(name)) {
httpConfig.setEnableDynamicPort(getBooleanValue(getTextContent(node)));
- } else if (ServerConfigOptions.PORT_RANGE.key().equals(name)) {
+ } else if (ServerConfigOptions.MasterServerConfigOptions.PORT_RANGE
+ .key()
+ .equals(name)) {
httpConfig.setPortRange(
getIntegerValue(
- ServerConfigOptions.PORT_RANGE.key(),
getTextContent(node)));
+
ServerConfigOptions.MasterServerConfigOptions.PORT_RANGE.key(),
+ getTextContent(node)));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
index 76fb99f4d8..2a1c959704 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
@@ -28,12 +28,16 @@ public class CheckpointConfig implements Serializable {
public static final long MINIMAL_CHECKPOINT_TIME = 10;
- private long checkpointInterval =
ServerConfigOptions.CHECKPOINT_INTERVAL.defaultValue();
- private long checkpointTimeout =
ServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue();
+ private long checkpointInterval =
+
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_INTERVAL.defaultValue();
+ private long checkpointTimeout =
+
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue();
private long schemaChangeCheckpointTimeout =
-
ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.defaultValue();
+
ServerConfigOptions.MasterServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT
+ .defaultValue();
- private CheckpointStorageConfig storage =
ServerConfigOptions.CHECKPOINT_STORAGE.defaultValue();
+ private CheckpointStorageConfig storage =
+
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_STORAGE.defaultValue();
private boolean checkpointEnable = true;
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
index b6b4983d85..61dde630c8 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointStorageConfig.java
@@ -25,10 +25,12 @@ import java.util.Map;
@Data
public class CheckpointStorageConfig {
- private String storage =
ServerConfigOptions.CHECKPOINT_STORAGE_TYPE.defaultValue();
+ private String storage =
+
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_STORAGE_TYPE.defaultValue();
private int maxRetainedCheckpoints =
- ServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED.defaultValue();
+
ServerConfigOptions.MasterServerConfigOptions.CHECKPOINT_STORAGE_MAX_RETAINED
+ .defaultValue();
/** Storage plugin instance configuration */
private Map<String, String> storagePluginConfig = new HashMap<>();
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarHAStorageConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarHAStorageConfig.java
index 811d6cdc6a..86593e33c0 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarHAStorageConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarHAStorageConfig.java
@@ -25,7 +25,9 @@ import java.util.Map;
@Data
public class ConnectorJarHAStorageConfig {
- private String type =
ServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_TYPE.defaultValue();
+ private String type =
+
ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_TYPE
+ .defaultValue();
/** Storage plugin instance configuration */
private Map<String, String> storagePluginConfig = new HashMap<>();
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarStorageConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarStorageConfig.java
index 4048ddb859..d83568195c 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarStorageConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ConnectorJarStorageConfig.java
@@ -23,21 +23,26 @@ import static
com.hazelcast.internal.util.Preconditions.checkNotNull;
@Data
public class ConnectorJarStorageConfig {
- private Boolean enable =
ServerConfigOptions.ENABLE_CONNECTOR_JAR_STORAGE.defaultValue();
+ private Boolean enable =
+
ServerConfigOptions.MasterServerConfigOptions.ENABLE_CONNECTOR_JAR_STORAGE
+ .defaultValue();
private ConnectorJarStorageMode storageMode =
- ServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE.defaultValue();
+
ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE.defaultValue();
- private String storagePath =
ServerConfigOptions.CONNECTOR_JAR_STORAGE_PATH.defaultValue();
+ private String storagePath =
+
ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_PATH.defaultValue();
private Integer cleanupTaskInterval =
-
ServerConfigOptions.CONNECTOR_JAR_CLEANUP_TASK_INTERVAL.defaultValue();
+
ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_CLEANUP_TASK_INTERVAL
+ .defaultValue();
private Integer connectorJarExpiryTime =
- ServerConfigOptions.CONNECTOR_JAR_EXPIRY_TIME.defaultValue();
+
ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_EXPIRY_TIME.defaultValue();
private ConnectorJarHAStorageConfig connectorJarHAStorageConfig =
- ServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_CONFIG.defaultValue();
+
ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_HA_STORAGE_CONFIG
+ .defaultValue();
public ConnectorJarStorageConfig setStorageMode(ConnectorJarStorageMode
storageMode) {
checkNotNull(storageMode);
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java
index 3ed455e373..fb8316f2f5 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java
@@ -26,17 +26,23 @@ import static
com.hazelcast.internal.util.Preconditions.checkPositive;
@Data
public class CoordinatorServiceConfig implements Serializable {
- private int coreThreadNum =
ServerConfigOptions.CORE_THREAD_NUM.defaultValue();
+ private int coreThreadNum =
+
ServerConfigOptions.MasterServerConfigOptions.CORE_THREAD_NUM.defaultValue();
- private int maxThreadNum =
ServerConfigOptions.MAX_THREAD_NUM.defaultValue();
+ private int maxThreadNum =
+
ServerConfigOptions.MasterServerConfigOptions.MAX_THREAD_NUM.defaultValue();
public void setCoreThreadNum(int coreThreadNum) {
- checkPositive(coreThreadNum, ServerConfigOptions.CORE_THREAD_NUM + "
must be >= 0");
+ checkPositive(
+ coreThreadNum,
+ ServerConfigOptions.MasterServerConfigOptions.CORE_THREAD_NUM
+ " must be >= 0");
this.coreThreadNum = coreThreadNum;
}
public void setMaxThreadNum(int maxThreadNum) {
- checkPositive(maxThreadNum, ServerConfigOptions.MAX_THREAD_NUM + "
must be > 0");
+ checkPositive(
+ maxThreadNum,
+ ServerConfigOptions.MasterServerConfigOptions.MAX_THREAD_NUM +
" must be > 0");
this.maxThreadNum = maxThreadNum;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/HttpConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/HttpConfig.java
index d0227263a5..4aaabc49b1 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/HttpConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/HttpConfig.java
@@ -26,18 +26,21 @@ import static
com.hazelcast.internal.util.Preconditions.checkPositive;
@Data
public class HttpConfig implements Serializable {
- private boolean enabled = ServerConfigOptions.ENABLE_HTTP.defaultValue();
+ private boolean enabled =
+
ServerConfigOptions.MasterServerConfigOptions.ENABLE_HTTP.defaultValue();
- private int port = ServerConfigOptions.PORT.defaultValue();
+ private int port =
ServerConfigOptions.MasterServerConfigOptions.PORT.defaultValue();
- private String contextPath =
ServerConfigOptions.CONTEXT_PATH.defaultValue();
+ private String contextPath =
+
ServerConfigOptions.MasterServerConfigOptions.CONTEXT_PATH.defaultValue();
- private boolean enableDynamicPort =
ServerConfigOptions.ENABLE_DYNAMIC_PORT.defaultValue();
+ private boolean enableDynamicPort =
+
ServerConfigOptions.MasterServerConfigOptions.ENABLE_DYNAMIC_PORT.defaultValue();
- private int portRange = ServerConfigOptions.PORT_RANGE.defaultValue();
+ private int portRange =
ServerConfigOptions.MasterServerConfigOptions.PORT_RANGE.defaultValue();
public void setPort(int port) {
- checkPositive(port, ServerConfigOptions.HTTP + " must be > 0");
+ checkPositive(port, ServerConfigOptions.MasterServerConfigOptions.HTTP
+ " must be > 0");
this.port = port;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 119d8c9c61..6aab8dfa79 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -26,210 +26,6 @@ import java.util.Map;
public class ServerConfigOptions {
- public static final Option<Integer> BACKUP_COUNT =
- Options.key("backup-count")
- .intType()
- .defaultValue(1)
- .withDescription("The number of backup copies of each
partition.");
-
- public static final Option<Integer> PRINT_EXECUTION_INFO_INTERVAL =
- Options.key("print-execution-info-interval")
- .intType()
- .defaultValue(60)
- .withDescription(
- "The interval (in seconds) between two consecutive
executions of the print execution info task.");
-
- public static final Option<Integer> PRINT_JOB_METRICS_INFO_INTERVAL =
- Options.key("print-job-metrics-info-interval")
- .intType()
- .defaultValue(60)
- .withDescription("The interval (in seconds) of job print
metrics info");
-
- public static final Option<Integer> JOB_METRICS_BACKUP_INTERVAL =
- Options.key("job-metrics-backup-interval")
- .intType()
- .defaultValue(10)
- .withDescription("The interval (in seconds) of job metrics
backups");
-
- public static final Option<ThreadShareMode>
TASK_EXECUTION_THREAD_SHARE_MODE =
- Options.key("task_execution_thread_share_mode")
- .type(new TypeReference<ThreadShareMode>() {})
- .defaultValue(ThreadShareMode.OFF)
- .withDescription(
- "The thread sharing mode of TaskExecutionServer,
including ALL, OFF, PART. Default is OFF");
-
- public static final Option<Boolean> DYNAMIC_SLOT =
- Options.key("dynamic-slot")
- .booleanType()
- .defaultValue(true)
- .withDescription("Whether to use dynamic slot.");
-
- public static final Option<Integer> SLOT_NUM =
- Options.key("slot-num")
- .intType()
- .defaultValue(2)
- .withDescription(
- "The number of slots. Only valid when dynamic slot
is disabled.");
-
- public static final Option<Integer> CHECKPOINT_INTERVAL =
- Options.key("interval")
- .intType()
- .defaultValue(300000)
- .withDescription(
- "The interval (in milliseconds) between two
consecutive checkpoints.");
-
- public static final Option<Integer> CHECKPOINT_TIMEOUT =
- Options.key("timeout")
- .intType()
- .defaultValue(30000)
- .withDescription("The timeout (in milliseconds) for a
checkpoint.");
-
- public static final Option<Integer> SCHEMA_CHANGE_CHECKPOINT_TIMEOUT =
- Options.key("schema-change-timeout")
- .intType()
- .defaultValue(30000)
- .withDescription(
- "The timeout (in milliseconds) for a schema change
checkpoint.");
-
- public static final Option<String> CHECKPOINT_STORAGE_TYPE =
- Options.key("type")
- .stringType()
- .defaultValue("localfile")
- .withDescription("The checkpoint storage type.");
-
- public static final Option<Integer> CHECKPOINT_STORAGE_MAX_RETAINED =
- Options.key("max-retained")
- .intType()
- .defaultValue(20)
- .withDescription("The maximum number of retained
checkpoints.");
-
- public static final Option<QueueType> QUEUE_TYPE =
- Options.key("queue-type")
- .type(new TypeReference<QueueType>() {})
- .defaultValue(QueueType.BLOCKINGQUEUE)
- .withDescription("The internal data cache queue type.");
-
- public static final Option<CheckpointStorageConfig> CHECKPOINT_STORAGE =
- Options.key("storage")
- .type(new TypeReference<CheckpointStorageConfig>() {})
- .defaultValue(new CheckpointStorageConfig())
- .withDescription("The checkpoint storage configuration.");
-
- public static final Option<AllocateStrategy> SLOT_ALLOCATE_STRATEGY =
- Options.key("slot-allocate-strategy")
- .enumType(AllocateStrategy.class)
- .defaultValue(AllocateStrategy.RANDOM)
- .withDescription(
- "When the strategy is SLOT_RATIO, the system
allocates tasks based on the slot usage ratio, with priority given to workers
with low usage rates; When the strategy is SYSTEM_LOAD, the system allocates
tasks based on server load, with priority given to workers with lower load.");
-
- public static final Option<SlotServiceConfig> SLOT_SERVICE =
- Options.key("slot-service")
- .type(new TypeReference<SlotServiceConfig>() {})
- .defaultValue(new SlotServiceConfig())
- .withDescription("The slot service configuration.");
-
- public static final Option<CheckpointConfig> CHECKPOINT =
- Options.key("checkpoint")
- .type(new TypeReference<CheckpointConfig>() {})
- .defaultValue(new CheckpointConfig())
- .withDescription("The checkpoint configuration.");
-
- public static final Option<Map<String, String>>
CHECKPOINT_STORAGE_PLUGIN_CONFIG =
- Options.key("plugin-config")
- .type(new TypeReference<Map<String, String>>() {})
- .noDefaultValue()
- .withDescription("The checkpoint storage instance
configuration.");
-
- public static final Option<Integer> CORE_THREAD_NUM =
- Options.key("core-thread-num")
- .intType()
- .defaultValue(10)
- .withDescription("The core thread num of coordinator
service");
-
- public static final Option<Integer> MAX_THREAD_NUM =
- Options.key("max-thread-num")
- .intType()
- .defaultValue(Integer.MAX_VALUE)
- .withDescription("The max thread num of coordinator
service");
-
- public static final Option<CoordinatorServiceConfig> COORDINATOR_SERVICE =
- Options.key("coordinator-service")
- .type(new TypeReference<CoordinatorServiceConfig>() {})
- .defaultValue(new CoordinatorServiceConfig())
- .withDescription("The coordinator service configuration.");
-
- public static final Option<Integer> HISTORY_JOB_EXPIRE_MINUTES =
- Options.key("history-job-expire-minutes")
- .intType()
- .defaultValue(1440)
- .withDescription("The expire time of history jobs.time
unit minute");
-
- public static final Option<ScheduleStrategy> JOB_SCHEDULE_STRATEGY =
- Options.key("job-schedule-strategy")
- .enumType(ScheduleStrategy.class)
- .defaultValue(ScheduleStrategy.REJECT)
- .withDescription(
- "When the policy is REJECT, when the task queue is
full, the task will be rejected; when the policy is WAIT, when the task queue
is full, the task will wait");
-
- public static final Option<Boolean> ENABLE_CONNECTOR_JAR_STORAGE =
- Options.key("enable")
- .booleanType()
- .defaultValue(Boolean.FALSE)
- .withDescription(
- "Enable the engine server Jar package storage
service,"
- + " automatically upload connector Jar
packages and dependent third-party Jar packages"
- + " to the server before job execution."
- + " Enabling this configuration does not
require the server to hold all connector Jar packages");
-
- public static final Option<ConnectorJarStorageMode>
CONNECTOR_JAR_STORAGE_MODE =
- Options.key("connector-jar-storage-mode")
- .enumType(ConnectorJarStorageMode.class)
- .defaultValue(ConnectorJarStorageMode.SHARED)
- .withDescription(
- "The storage mode of the connector jar package,
including SHARED, ISOLATED. Default is SHARED");
-
- public static final Option<String> CONNECTOR_JAR_STORAGE_PATH =
- Options.key("connector-jar-storage-path")
- .stringType()
- .defaultValue("")
- .withDescription("The user defined connector jar storage
path.");
-
- public static final Option<Integer> CONNECTOR_JAR_CLEANUP_TASK_INTERVAL =
- Options.key("connector-jar-cleanup-task-interval")
- .intType()
- .defaultValue(3600)
- .withDescription("The user defined connector jar cleanup
task interval.");
-
- public static final Option<Integer> CONNECTOR_JAR_EXPIRY_TIME =
- Options.key("connector-jar-expiry-time")
- .intType()
- .defaultValue(600)
- .withDescription("The user defined connector jar expiry
time.");
-
- public static final Option<String> CONNECTOR_JAR_HA_STORAGE_TYPE =
- Options.key("type")
- .stringType()
- .defaultValue("localfile")
- .withDescription("The connector jar HA storage type.");
-
- public static final Option<Map<String, String>>
CONNECTOR_JAR_HA_STORAGE_PLUGIN_CONFIG =
- Options.key("plugin-config")
- .mapType()
- .noDefaultValue()
- .withDescription("The connector jar HA storage instance
configuration.");
-
- public static final Option<ConnectorJarHAStorageConfig>
CONNECTOR_JAR_HA_STORAGE_CONFIG =
- Options.key("jar-ha-storage")
- .type(new TypeReference<ConnectorJarHAStorageConfig>() {})
- .defaultValue(new ConnectorJarHAStorageConfig())
- .withDescription("The connector jar ha storage
configuration.");
-
- public static final Option<ConnectorJarStorageConfig>
CONNECTOR_JAR_STORAGE_CONFIG =
- Options.key("jar-storage")
- .type(new TypeReference<ConnectorJarStorageConfig>() {})
- .defaultValue(new ConnectorJarStorageConfig())
- .withDescription("The connector jar storage
configuration.");
-
public static final Option<Boolean> CLASSLOADER_CACHE_MODE =
Options.key("classloader-cache-mode")
.booleanType()
@@ -237,6 +33,20 @@ public class ServerConfigOptions {
.withDescription(
"Whether to use classloader cache mode. With cache
mode, all jobs share the same classloader if the jars are the same");
+ /////////////////////////////////////////////////
+ // The options for metrics start
+ public static final Option<Boolean> TELEMETRY_METRIC_ENABLED =
+ Options.key("enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether open metrics export.");
+
+ public static final Option<TelemetryMetricConfig> TELEMETRY_METRIC =
+ Options.key("metric")
+ .type(new TypeReference<TelemetryMetricConfig>() {})
+ .defaultValue(new TelemetryMetricConfig())
+ .withDescription("The telemetry metric configuration.");
+
public static final Option<Boolean>
TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE =
Options.key("scheduled-deletion-enable")
.booleanType()
@@ -251,63 +61,297 @@ public class ServerConfigOptions {
.defaultValue(new TelemetryLogsConfig())
.withDescription("The telemetry logs configuration.");
- public static final Option<Boolean> TELEMETRY_METRIC_ENABLED =
- Options.key("enabled")
- .booleanType()
- .defaultValue(false)
- .withDescription("Whether open metrics export.");
-
- public static final Option<TelemetryMetricConfig> TELEMETRY_METRIC =
- Options.key("metric")
- .type(new TypeReference<TelemetryMetricConfig>() {})
- .defaultValue(new TelemetryMetricConfig())
- .withDescription("The telemetry metric configuration.");
-
public static final Option<TelemetryConfig> TELEMETRY =
Options.key("telemetry")
.type(new TypeReference<TelemetryConfig>() {})
.defaultValue(new TelemetryConfig())
.withDescription("The telemetry configuration.");
-
- public static final Option<Integer> PORT =
- Options.key("port")
- .intType()
- .defaultValue(8080)
- .withDescription("The port of the http server.");
-
- public static final Option<Boolean> ENABLE_HTTP =
- Options.key("enable-http")
- .booleanType()
- .defaultValue(false)
- .withDescription("Whether to enable the http server.");
-
- public static final Option<String> CONTEXT_PATH =
- Options.key("context-path")
- .stringType()
- .defaultValue("")
- .withDescription("The context path of the http server.");
-
- public static final Option<Boolean> ENABLE_DYNAMIC_PORT =
- Options.key("enable-dynamic-port")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "Whether to enable the dynamic port of the http
server. If true, We will use the unused port");
-
- public static final Option<Integer> PORT_RANGE =
- Options.key("port-range")
- .intType()
- .defaultValue(100)
- .withDescription(
- "The port range of the http server. If
enable-dynamic-port is true, We will use the unused port in the range");
-
- public static final Option<HttpConfig> HTTP =
- Options.key("http")
- .type(new TypeReference<HttpConfig>() {})
- .defaultValue(new HttpConfig())
- .withDescription("The http configuration.");
-
- public static final String EVENT_REPORT_HTTP = "event-report-http";
- public static final String EVENT_REPORT_HTTP_URL = "url";
- public static final String EVENT_REPORT_HTTP_HEADERS = "headers";
+ // The options for metrics end
+ /////////////////////////////////////////////////
+
+ /** The options for master. */
+ public static class MasterServerConfigOptions {
+
+ public static final Option<Integer> PRINT_EXECUTION_INFO_INTERVAL =
+ Options.key("print-execution-info-interval")
+ .intType()
+ .defaultValue(60)
+ .withDescription(
+ "The interval (in seconds) between two
consecutive executions of the print execution info task.");
+
+ public static final Option<Integer> PRINT_JOB_METRICS_INFO_INTERVAL =
+ Options.key("print-job-metrics-info-interval")
+ .intType()
+ .defaultValue(60)
+ .withDescription("The interval (in seconds) of job
print metrics info");
+
+ public static final Option<Integer> JOB_METRICS_BACKUP_INTERVAL =
+ Options.key("job-metrics-backup-interval")
+ .intType()
+ .defaultValue(10)
+ .withDescription("The interval (in seconds) of job
metrics backups");
+
+ /////////////////////////////////////////////////
+ // The options about Hazelcast IMAP store start
+ public static final Option<Integer> BACKUP_COUNT =
+ Options.key("backup-count")
+ .intType()
+ .defaultValue(1)
+ .withDescription("The number of backup copies of each
partition.");
+
+ public static final Option<Integer> HISTORY_JOB_EXPIRE_MINUTES =
+ Options.key("history-job-expire-minutes")
+ .intType()
+ .defaultValue(1440)
+ .withDescription("The expire time of history jobs.time
unit minute");
+ // The options about Hazelcast IMAP store end
+ /////////////////////////////////////////////////
+
+ /////////////////////////////////////////////////
+ // The options for checkpoint start
+ public static final Option<Integer> CHECKPOINT_INTERVAL =
+ Options.key("interval")
+ .intType()
+ .defaultValue(300000)
+ .withDescription(
+ "The interval (in milliseconds) between two
consecutive checkpoints.");
+
+ public static final Option<Integer> CHECKPOINT_TIMEOUT =
+ Options.key("timeout")
+ .intType()
+ .defaultValue(30000)
+ .withDescription("The timeout (in milliseconds) for a
checkpoint.");
+
+ public static final Option<String> CHECKPOINT_STORAGE_TYPE =
+ Options.key("type")
+ .stringType()
+ .defaultValue("localfile")
+ .withDescription("The checkpoint storage type.");
+
+ public static final Option<Integer> CHECKPOINT_STORAGE_MAX_RETAINED =
+ Options.key("max-retained")
+ .intType()
+ .defaultValue(20)
+ .withDescription("The maximum number of retained
checkpoints.");
+
+ public static final Option<CheckpointStorageConfig> CHECKPOINT_STORAGE
=
+ Options.key("storage")
+ .type(new TypeReference<CheckpointStorageConfig>() {})
+ .defaultValue(new CheckpointStorageConfig())
+ .withDescription("The checkpoint storage
configuration.");
+
+ public static final Option<Integer> SCHEMA_CHANGE_CHECKPOINT_TIMEOUT =
+ Options.key("schema-change-timeout")
+ .intType()
+ .defaultValue(30000)
+ .withDescription(
+ "The timeout (in milliseconds) for a schema
change checkpoint.");
+
+ public static final Option<Map<String, String>>
CHECKPOINT_STORAGE_PLUGIN_CONFIG =
+ Options.key("plugin-config")
+ .type(new TypeReference<Map<String, String>>() {})
+ .noDefaultValue()
+ .withDescription("The checkpoint storage instance
configuration.");
+
+ public static final Option<CheckpointConfig> CHECKPOINT =
+ Options.key("checkpoint")
+ .type(new TypeReference<CheckpointConfig>() {})
+ .defaultValue(new CheckpointConfig())
+ .withDescription("The checkpoint configuration.");
+ // The options for checkpoint end
+ /////////////////////////////////////////////////
+
+ /////////////////////////////////////////////////////
+ // The options for job scheduler start
+ public static final Option<AllocateStrategy> SLOT_ALLOCATE_STRATEGY =
+ Options.key("slot-allocate-strategy")
+ .enumType(AllocateStrategy.class)
+ .defaultValue(AllocateStrategy.RANDOM)
+ .withDescription(
+ "When the strategy is SLOT_RATIO, the system
allocates tasks based on the slot usage ratio, with priority given to workers
with low usage rates; When the strategy is SYSTEM_LOAD, the system allocates
tasks based on server load, with priority given to workers with lower load.");
+
+ public static final Option<ScheduleStrategy> JOB_SCHEDULE_STRATEGY =
+ Options.key("job-schedule-strategy")
+ .enumType(ScheduleStrategy.class)
+ .defaultValue(ScheduleStrategy.REJECT)
+ .withDescription(
+ "When the policy is REJECT, when the task
queue is full, the task will be rejected; when the policy is WAIT, when the
task queue is full, the task will wait");
+ // The options for job scheduler end
+ /////////////////////////////////////////////////////
+
+ /////////////////////////////////////////////////////
+ // The options for http server start
+ public static final Option<Integer> PORT =
+ Options.key("port")
+ .intType()
+ .defaultValue(8080)
+ .withDescription("The port of the http server.");
+
+ public static final Option<Boolean> ENABLE_HTTP =
+ Options.key("enable-http")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to enable the http server.");
+
+ public static final Option<String> CONTEXT_PATH =
+ Options.key("context-path")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The context path of the http
server.");
+
+ public static final Option<Boolean> ENABLE_DYNAMIC_PORT =
+ Options.key("enable-dynamic-port")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to enable the dynamic port of the
http server. If true, We will use the unused port");
+
+ public static final Option<Integer> PORT_RANGE =
+ Options.key("port-range")
+ .intType()
+ .defaultValue(100)
+ .withDescription(
+ "The port range of the http server. If
enable-dynamic-port is true, We will use the unused port in the range");
+
+ public static final Option<HttpConfig> HTTP =
+ Options.key("http")
+ .type(new TypeReference<HttpConfig>() {})
+ .defaultValue(new HttpConfig())
+ .withDescription("The http configuration.");
+
+ public static final String EVENT_REPORT_HTTP = "event-report-http";
+ public static final String EVENT_REPORT_HTTP_URL = "url";
+ public static final String EVENT_REPORT_HTTP_HEADERS = "headers";
+
+ // The options for http server end
+ /////////////////////////////////////////////////////
+
+ /////////////////////////////////////////////////
+ // The options for connector jar storage start
+ public static final Option<Boolean> ENABLE_CONNECTOR_JAR_STORAGE =
+ Options.key("enable")
+ .booleanType()
+ .defaultValue(Boolean.FALSE)
+ .withDescription(
+ "Enable the engine server Jar package storage
service,"
+ + " automatically upload connector Jar
packages and dependent third-party Jar packages"
+ + " to the server before job
execution."
+ + " Enabling this configuration does
not require the server to hold all connector Jar packages");
+
+ public static final Option<ConnectorJarStorageMode>
CONNECTOR_JAR_STORAGE_MODE =
+ Options.key("connector-jar-storage-mode")
+ .enumType(ConnectorJarStorageMode.class)
+ .defaultValue(ConnectorJarStorageMode.SHARED)
+ .withDescription(
+ "The storage mode of the connector jar
package, including SHARED, ISOLATED. Default is SHARED");
+
+ public static final Option<String> CONNECTOR_JAR_STORAGE_PATH =
+ Options.key("connector-jar-storage-path")
+ .stringType()
+ .defaultValue("")
+ .withDescription("The user defined connector jar
storage path.");
+
+ public static final Option<Integer>
CONNECTOR_JAR_CLEANUP_TASK_INTERVAL =
+ Options.key("connector-jar-cleanup-task-interval")
+ .intType()
+ .defaultValue(3600)
+ .withDescription("The user defined connector jar
cleanup task interval.");
+
+ public static final Option<Integer> CONNECTOR_JAR_EXPIRY_TIME =
+ Options.key("connector-jar-expiry-time")
+ .intType()
+ .defaultValue(600)
+ .withDescription("The user defined connector jar
expiry time.");
+
+ public static final Option<String> CONNECTOR_JAR_HA_STORAGE_TYPE =
+ Options.key("type")
+ .stringType()
+ .defaultValue("localfile")
+ .withDescription("The connector jar HA storage type.");
+
+ public static final Option<Map<String, String>>
CONNECTOR_JAR_HA_STORAGE_PLUGIN_CONFIG =
+ Options.key("plugin-config")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("The connector jar HA storage
instance configuration.");
+
+ public static final Option<ConnectorJarHAStorageConfig>
CONNECTOR_JAR_HA_STORAGE_CONFIG =
+ Options.key("jar-ha-storage")
+ .type(new TypeReference<ConnectorJarHAStorageConfig>()
{})
+ .defaultValue(new ConnectorJarHAStorageConfig())
+ .withDescription("The connector jar ha storage
configuration.");
+
+ public static final Option<ConnectorJarStorageConfig>
CONNECTOR_JAR_STORAGE_CONFIG =
+ Options.key("jar-storage")
+ .type(new TypeReference<ConnectorJarStorageConfig>()
{})
+ .defaultValue(new ConnectorJarStorageConfig())
+ .withDescription("The connector jar storage
configuration.");
+ // The options for connector jar storage end
+ /////////////////////////////////////////////////
+
+ /////////////////////////////////////////////////
+ // The options for coordinator service start
+ public static final Option<Integer> CORE_THREAD_NUM =
+ Options.key("core-thread-num")
+ .intType()
+ .defaultValue(10)
+ .withDescription("The core thread num of coordinator
service");
+
+ public static final Option<Integer> MAX_THREAD_NUM =
+ Options.key("max-thread-num")
+ .intType()
+ .defaultValue(Integer.MAX_VALUE)
+ .withDescription("The max thread num of coordinator
service");
+
+ public static final Option<CoordinatorServiceConfig>
COORDINATOR_SERVICE =
+ Options.key("coordinator-service")
+ .type(new TypeReference<CoordinatorServiceConfig>() {})
+ .defaultValue(new CoordinatorServiceConfig())
+ .withDescription("The coordinator service
configuration.");
+ // The options for coordinator service end
+ /////////////////////////////////////////////////
+
+ }
+
+ /** The options for worker. */
+ public static class WorkerServerConfigOptions {
+
+ public static final Option<ThreadShareMode>
TASK_EXECUTION_THREAD_SHARE_MODE =
+ Options.key("task_execution_thread_share_mode")
+ .type(new TypeReference<ThreadShareMode>() {})
+ .defaultValue(ThreadShareMode.OFF)
+ .withDescription(
+ "The thread sharing mode of
TaskExecutionServer, including ALL, OFF, PART. Default is OFF");
+
+ public static final Option<QueueType> QUEUE_TYPE =
+ Options.key("queue-type")
+ .type(new TypeReference<QueueType>() {})
+ .defaultValue(QueueType.BLOCKINGQUEUE)
+ .withDescription("The internal data cache queue
type.");
+
+ /////////////////////////////////////////////////
+ // The options for slot start
+ public static final Option<Boolean> DYNAMIC_SLOT =
+ Options.key("dynamic-slot")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether to use dynamic slot.");
+
+ public static final Option<Integer> SLOT_NUM =
+ Options.key("slot-num")
+ .intType()
+ .defaultValue(2)
+ .withDescription(
+ "The number of slots. Only valid when dynamic
slot is disabled.");
+
+ public static final Option<SlotServiceConfig> SLOT_SERVICE =
+ Options.key("slot-service")
+ .type(new TypeReference<SlotServiceConfig>() {})
+ .defaultValue(new SlotServiceConfig())
+ .withDescription("The slot service configuration.");
+
+ // The options for slot end
+ /////////////////////////////////////////////////
+
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/SlotServiceConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/SlotServiceConfig.java
index 92f70fda3b..158f2803e9 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/SlotServiceConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/SlotServiceConfig.java
@@ -27,14 +27,16 @@ import static
com.hazelcast.internal.util.Preconditions.checkPositive;
public class SlotServiceConfig implements Serializable {
private AllocateStrategy allocateStrategy =
- ServerConfigOptions.SLOT_ALLOCATE_STRATEGY.defaultValue();
+
ServerConfigOptions.MasterServerConfigOptions.SLOT_ALLOCATE_STRATEGY.defaultValue();
- private boolean dynamicSlot =
ServerConfigOptions.DYNAMIC_SLOT.defaultValue();
+ private boolean dynamicSlot =
+
ServerConfigOptions.WorkerServerConfigOptions.DYNAMIC_SLOT.defaultValue();
- private int slotNum = ServerConfigOptions.SLOT_NUM.defaultValue();
+ private int slotNum =
ServerConfigOptions.WorkerServerConfigOptions.SLOT_NUM.defaultValue();
public void setSlotNum(int slotNum) {
- checkPositive(slotNum, ServerConfigOptions.SLOT_NUM + " must be > 0");
+ checkPositive(
+ slotNum,
ServerConfigOptions.WorkerServerConfigOptions.SLOT_NUM + " must be > 0");
this.slotNum = slotNum;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/StorageStrategyFactory.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/StorageStrategyFactory.java
index f0fe4d9b84..f64eb014d5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/StorageStrategyFactory.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/StorageStrategyFactory.java
@@ -39,7 +39,7 @@ public class StorageStrategyFactory {
connectorJarStorageConfig, seaTunnelServer);
default:
throw new IllegalArgumentException(
- ServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE
+
ServerConfigOptions.MasterServerConfigOptions.CONNECTOR_JAR_STORAGE_MODE
+ " must in [SHARED, ISOLATED]");
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/SystemLoadCalculateTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/SystemLoadCalculateTest.java
index b98bad5f6c..e4e2ba2c45 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/SystemLoadCalculateTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/SystemLoadCalculateTest.java
@@ -354,7 +354,9 @@ public class SystemLoadCalculateTest {
when(rm.getEngineConfig().getSlotServiceConfig())
.thenReturn(Mockito.mock(SlotServiceConfig.class));
when(rm.getEngineConfig().getSlotServiceConfig().getAllocateStrategy())
-
.thenReturn(ServerConfigOptions.SLOT_ALLOCATE_STRATEGY.defaultValue());
+ .thenReturn(
+
ServerConfigOptions.MasterServerConfigOptions.SLOT_ALLOCATE_STRATEGY
+ .defaultValue());
// Simulate ResourceRequestHandler to call calculateWeight to
calculate weight
SystemLoadStrategy systemLoadStrategy = new
SystemLoadStrategy(workerLoadMap);
systemLoadStrategy.calculateWeight(workerProfile2,
workerAssignedSlots2);
@@ -459,7 +461,9 @@ public class SystemLoadCalculateTest {
when(rm.getEngineConfig().getSlotServiceConfig())
.thenReturn(Mockito.mock(SlotServiceConfig.class));
when(rm.getEngineConfig().getSlotServiceConfig().getAllocateStrategy())
-
.thenReturn(ServerConfigOptions.SLOT_ALLOCATE_STRATEGY.defaultValue());
+ .thenReturn(
+
ServerConfigOptions.MasterServerConfigOptions.SLOT_ALLOCATE_STRATEGY
+ .defaultValue());
WorkerProfile workerProfile1 = Mockito.mock(WorkerProfile.class);
when(workerProfile1.getAssignedSlots()).thenReturn(new SlotProfile[0]);