This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cbfebf163a [Feature][Zeta]Add datasource spi (#10586)
cbfebf163a is described below
commit cbfebf163a42eba22826e0e562656c5c51858af1
Author: 老王 <[email protected]>
AuthorDate: Tue Mar 24 20:38:17 2026 +0800
[Feature][Zeta]Add datasource spi (#10586)
---
.../common-options/sink-common-options.md | 7 +-
.../common-options/source-common-options.md | 1 +
docs/en/introduction/concepts/datasource-spi.md | 311 ++++++++++++++++++
.../common-options/sink-common-options.md | 5 +
.../common-options/source-common-options.md | 1 +
docs/zh/introduction/concepts/datasource-spi.md | 311 ++++++++++++++++++
.../api/common/SeaTunnelAPIErrorCode.java | 3 +-
.../api/datasource/DataSourceProvider.java | 102 ++++++
.../api/datasource/DataSourceProviderFactory.java | 133 ++++++++
.../exception/DataSourceProviderException.java | 43 +++
.../gravitino/GravitinoDataSourceProvider.java | 218 +++++++++++++
.../api/metalake/MetalakeConfigUtils.java | 1 +
.../api/metalake/gravitino/GravitinoClient.java | 15 +-
.../api/options/ConnectorCommonOptions.java | 9 +
.../connectors/seatunnel/jdbc/MetalakeIT.java | 16 +-
.../src/test/resources/config/seatunnel.yaml | 48 +++
...urce_to_assert_sink_with_datasource_enable.conf | 96 ++++++
.../seatunnel/engine/client/SeaTunnelClient.java | 2 +
.../client/job/ClientJobExecutionEnvironment.java | 5 +-
.../engine/common/config/EngineConfig.java | 3 +
.../config/YamlSeaTunnelDomConfigProcessor.java | 28 ++
.../common/config/server/DataSourceConfig.java | 50 +++
.../common/config/server/DataSourceOptions.java | 45 +++
.../common/config/server/ServerConfigOptions.java | 6 +
.../common/utils/DataSourceConfigResolver.java | 354 +++++++++++++++++++++
.../YamlSeaTunnelDomConfigProcessorTest.java | 56 ++++
.../common/utils/DataSourceConfigUtilTest.java | 214 +++++++++++++
.../resources/conf/datasource-nested-config.yaml | 50 +++
.../src/test/resources/conf/datasource-test.conf | 67 ++++
.../core/parse/MultipleTableJobConfigParser.java | 119 ++++++-
.../seatunnel/engine/server/SeaTunnelServer.java | 3 +
.../server/rest/RestJobExecutionEnvironment.java | 6 +-
32 files changed, 2314 insertions(+), 14 deletions(-)
diff --git a/docs/en/connectors/common-options/sink-common-options.md
b/docs/en/connectors/common-options/sink-common-options.md
index a0aeed7c6d..c3d8bb96e8 100644
--- a/docs/en/connectors/common-options/sink-common-options.md
+++ b/docs/en/connectors/common-options/sink-common-options.md
@@ -12,9 +12,10 @@ The old configuration name `source_table_name` is
deprecated, please migrate to
:::
-| Name | Type | Required | Default | Description
|
-|--------------|--------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| plugin_input | String | No | - | When `plugin_input` is not
specified, the current plug-in processes the data set `dataset` output by the
previous plugin in the configuration file <br/> When `plugin_input` is
specified, the current plug-in is processing the data set corresponding to this
parameter. |
+| Name | Type | Required | Default | Description
|
+|---------------|--------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| plugin_input | String | No | - | When `plugin_input` is not
specified, the current plug-in processes the data set `dataset` output by the
previous plugin in the configuration file <br/> When `plugin_input` is
specified, the current plug-in is processing the data set corresponding to this
parameter.
|
+| datasource_id | String | No | - | The data source ID for
retrieving connection configuration from DataSource Center. When specified, the
connector will fetch connection details (e.g., URL, username, password) from
the external metadata service instead of using direct configuration. See
[DataSource SPI](../../introduction/concepts/datasource-spi) for more
information. |
# Important note
diff --git a/docs/en/connectors/common-options/source-common-options.md
b/docs/en/connectors/common-options/source-common-options.md
index 9cc539da03..59d3b929c8 100644
--- a/docs/en/connectors/common-options/source-common-options.md
+++ b/docs/en/connectors/common-options/source-common-options.md
@@ -16,6 +16,7 @@ The old configuration name `result_table_name` is deprecated,
please migrate to
|---------------|--------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| plugin_output | String | No | - | When `plugin_output` is not
specified, the data processed by this plugin will not be registered as a data
set `(dataStream/dataset)` that can be directly accessed by other plugins, or
called a temporary table `(table)` <br/>When `plugin_output` is specified, the
data processed by this plugin will be registered as a data set
`(dataStream/dataset)` that can be directly accessed by other plugins, or
called a temporary table `(table)` . The dat [...]
| parallelism | Int | No | - | When `parallelism` is not
specified, the `parallelism` in env is used by default. <br/>When parallelism
is specified, it will override the parallelism in env.
[...]
+| datasource_id | String | No | - | The data source ID for
retrieving connection configuration from DataSource Center. When specified, the
connector will fetch connection details (e.g., URL, username, password) from
the external metadata service instead of using direct configuration. See
[DataSource SPI](../../introduction/concepts/datasource-spi) for more
information.
[...]
# Important note
diff --git a/docs/en/introduction/concepts/datasource-spi.md
b/docs/en/introduction/concepts/datasource-spi.md
new file mode 100644
index 0000000000..a43508deff
--- /dev/null
+++ b/docs/en/introduction/concepts/datasource-spi.md
@@ -0,0 +1,311 @@
+---
+title: DataSource SPI
+weight: 6
+---
+
+# DataSource SPI
+
+## Overview
+
+The DataSource SPI (Service Provider Interface) is an extension mechanism
introduced in SeaTunnel for centralized management of data source connection
configurations. It allows external metadata systems to manage data source
metadata, while SeaTunnel jobs reference these configurations via a simple
`datasource_id`.
+
+### Benefits
+
+- **Simplified Configuration**: Data source connection details (URL, username,
password, etc.) are managed externally instead of being duplicated across job
configs
+- **Enhanced Security**: Sensitive credentials are no longer stored in job
configuration files
+- **Centralized Management**: Changes to data source configurations only need
to be made once in the external system
+- **Backward Compatible**: Existing jobs without `datasource_id` continue to
work as before
+- **Extensible**: Custom metadata systems can be integrated by implementing
the `DataSourceProvider` interface
+
+## Using datasource_id
+
+`datasource_id` is a common parameter available to all SeaTunnel connectors.
When specified, the connector retrieves connection configuration from the
external metadata service instead of using direct configuration.
+
+### Usage Example
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ datasource_id = "mysql-source-01"
+ database = "test_db"
+ table = "users"
+ query = "select * from users where status = 'active'"
+ }
+}
+
+sink {
+ Jdbc {
+ datasource_id = "mysql-sink-01"
+ database = "reporting_db"
+ table = "user_summary"
+ }
+}
+```
+
+When `datasource_id` is specified, the connector will:
+1. Use the `datasource_id` to fetch connection details from the external
metadata service
+2. Merge the fetched configuration with any additional parameters in the job
config
+3. Job-level parameters take precedence over fetched configuration
+
+## DataSource SPI Specification
+
+This section defines the standard SPI interfaces that all DataSource providers
must implement.
+
+### DataSourceProvider Interface
+
+The `DataSourceProvider` interface is the contract for integrating external
metadata systems with SeaTunnel. Implementations are discovered via Java SPI
using the `@AutoService` annotation.
+
+**Location**:
`seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/DataSourceProvider.java`
+
+```java
+public interface DataSourceProvider extends AutoCloseable {
+
+ /**
+ * Returns a unique identifier for this provider.
+ * Must match the "kind" value in seatunnel.yaml configuration.
+ * Examples: "gravitino", "datahub", "atlas", "custom"
+ */
+ String kind();
+
+ /**
+ * Initializes the provider with configuration from seatunnel.yaml.
+ * Called once during SeaTunnel startup.
+ *
+ * @param config Provider-specific configuration
+ */
+ void init(Config config);
+
+ /**
+ * Maps a datasource_id to connector configuration.
+ *
+ * @param connectorIdentifier The connector identifier (e.g., "Jdbc",
"Kafka")
+ * @param datasourceId The data source ID in the external system
+ * @return Configuration map for the connector, or null if mapping fails
+ */
+ Map<String, Object> datasourceMap(String connectorIdentifier, String
datasourceId);
+
+ /**
+ * Closes resources held by this provider.
+ * Called once during SeaTunnel shutdown.
+ */
+ @Override
+ void close();
+}
+```
+
+### Lifecycle
+
+1. **Discovery**: Provider instances are discovered via
`@AutoService(DataSourceProvider.class)` and cached
+2. **Initialization**: `init(Config)` is called with configuration from
`seatunnel.yaml`
+3. **Usage**: `datasourceMap(String, String)` is called to resolve
`datasource_id` for each connector
+4. **Cleanup**: `close()` is called during shutdown
+
+### Resource Management
+
+Providers are responsible for managing all resources needed for datasource
mapping:
+- HTTP clients for REST API calls
+- Connection pools for database access
+- Any other shared resources
+
+These resources should be created in `init()`, reused across `datasourceMap()`
calls, and cleaned up in `close()`.
+
+## Configuration
+
+The following configuration examples use **Gravitino as the default
provider**. For other providers, adjust the `kind` and provider-specific
options accordingly.
+
+### seatunnel.yaml Configuration
+
+To enable the DataSource Center, add the following configuration to
`seatunnel.yaml`:
+
+```yaml
+seatunnel:
+ engine:
+ datasource:
+ enabled: true
+ kind: gravitino
+ gravitino:
+ uri: http://127.0.0.1:8090
+ metalake: test_metalake
+```
+
+### Configuration Options
+
+| Option | Type | Default | Description
|
+|----------------------|---------|-------------|--------------------------------------------------------|
+| `enabled` | Boolean | `false` | Whether to enable DataSource
Center |
+| `kind` | String | `gravitino` | The DataSource provider type
to use |
+| `gravitino.uri` | String | - | Gravitino server URI
(required when kind=gravitino) |
+| `gravitino.metalake` | String | - | Gravitino metalake name
(required when kind=gravitino) |
+
+## Default Implementation: Gravitino
+
+Apache Gravitino is the default implementation of the DataSource SPI. To use
Gravitino as the DataSource Center, you must set `datasource.enabled` to `true`
and explicitly specify `kind` as `gravitino` along with the required
configuration parameters.
+
+### datasource_id Configuration
+
+When using Gravitino as the DataSource Center, the `datasource_id` value
should be configured as the **catalog name** in Gravitino.
+
+For example, if you have a catalog named `mysql-catalog` in Gravitino, use it
directly as the `datasource_id`:
+
+```hocon
+source {
+ Jdbc {
+ datasource_id = "mysql-catalog"
+ database = "test_db"
+ table = "users"
+ }
+}
+```
+
+### Property Mapping
+
+The Gravitino provider performs **limited property name mapping** from
Gravitino catalog properties to SeaTunnel connector configuration. **Only the
following four property mappings are supported**:
+
+| Gravitino Property | SeaTunnel Property |
+|--------------------|--------------------|
+| `jdbc-url` | `url` |
+| `jdbc-user` | `username` |
+| `jdbc-password` | `password` |
+| `jdbc-driver` | `driver` |
+
+> **Note**: Any other properties in the Gravitino catalog are not passed. If
you need additional property mappings, consider implementing a custom
`DataSourceProvider`.
+
+### Connector Support
+
+The Gravitino provider currently supports:
+- **Jdbc** connector (fully supported)
+
+### Example
+
+#### Gravitino Catalog Response
+
+```json
+{
+ "code": 0,
+ "catalog": {
+ "name": "mysql-catalog",
+ "type": "relational",
+ "provider": "jdbc-mysql",
+ "properties": {
+ "jdbc-url": "jdbc:mysql://localhost:3306/",
+ "jdbc-user": "root",
+ "jdbc-password": "secret",
+ "jdbc-driver": "com.mysql.cj.jdbc.Driver"
+ }
+ }
+}
+```
+
+#### Mapped SeaTunnel Configuration
+
+```hocon
+{
+ url = "jdbc:mysql://localhost:3306/"
+ username = "root"
+ password = "secret"
+ driver = "com.mysql.cj.jdbc.Driver"
+}
+```
+
+## Implementing a Custom Provider
+
+To integrate a custom metadata system with SeaTunnel, implement the
`DataSourceProvider` interface.
+
+### Step 1: Add Dependency
+
+Add the `seatunnel-api` dependency to your project's `pom.xml`:
+
+```xml
+<dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${seatunnel.version}</version>
+ <scope>provided</scope>
+</dependency>
+```
+
+> **Note**: Use `<scope>provided</scope>` since SeaTunnel already includes
this dependency at runtime.
+
+### Step 2: Create a Provider Class
+
+```java
+@AutoService(DataSourceProvider.class)
+public class MyDataSourceProvider implements DataSourceProvider {
+
+ private HttpClient httpClient;
+
+ @Override
+ public String kind() {
+ return "my-provider";
+ }
+
+ @Override
+ public void init(Config config) {
+ // Initialize your client, connection pool, etc.
+ this.httpClient = HttpClient.newBuilder().build();
+ }
+
+ @Override
+ public Map<String, Object> datasourceMap(String connectorIdentifier,
String datasourceId) {
+ // Fetch from your metadata service based on connector type
+ // Return SeaTunnel-compatible configuration
+ switch (connectorIdentifier.toLowerCase()) {
+ case "jdbc":
+ return fetchJdbcConfig(datasourceId);
+ case "kafka":
+ return fetchKafkaConfig(datasourceId);
+ default:
+ return Collections.emptyMap();
+ }
+ }
+
+ @Override
+ public void close() {
+ // Clean up resources
+ if (httpClient != null) {
+ // Clean up HTTP client
+ }
+ }
+}
+```
+
+### Step 3: Configure seatunnel.yaml
+
+```yaml
+seatunnel:
+ engine:
+ datasource:
+ enabled: true
+ kind: my-provider
+ my-provider:
+ endpoint: https://my-metadata-service.com
+ api-key: your-api-key
+```
+
+### Step 4: Package and Deploy
+
+- Include your implementation in SeaTunnel's classpath
+- The `@AutoService` annotation will register it automatically via Java SPI
+
+## Runtime Flow
+
+1. **SeaTunnel Startup**
+ - Loads the configured `DataSourceProvider` based on `seatunnel.yaml`
+ - Calls `init()` with provider-specific configuration
+
+2. **Job Submission**
+ - Parses job configuration
+ - Detects presence of `datasource_id` in connector configs
+
+3. **Configuration Fetching**
+ - Calls `provider.datasourceMap(connectorIdentifier, datasourceId)` to
retrieve configuration from external system
+ - The provider queries the metadata service and returns connector
configuration
+
+4. **Configuration Merge**
+ - Merges fetched configuration with job-level parameters
+ - Job-level parameters take precedence
diff --git a/docs/zh/connectors/common-options/sink-common-options.md
b/docs/zh/connectors/common-options/sink-common-options.md
index a9f49a508b..ec4ef2fe73 100644
--- a/docs/zh/connectors/common-options/sink-common-options.md
+++ b/docs/zh/connectors/common-options/sink-common-options.md
@@ -16,6 +16,11 @@ sidebar_position: 4
|--------------|--------|------|-----|
| plugin_input | string | 否 | - |
| parallelism | int | 否 | - |
+| datasource_id | string | 否 | - |
+
+### datasource_id [string]
+
+用于从数据源中心获取连接配置的数据源 ID。当指定此参数时,连接器将从外部元数据服务获取连接详细信息(如 URL、用户名、密码),而不是使用直接配置。详见
[数据源 SPI](../../introduction/concepts/datasource-spi)。
### plugin_input [string]
diff --git a/docs/zh/connectors/common-options/source-common-options.md
b/docs/zh/connectors/common-options/source-common-options.md
index 4189c07591..558e169781 100644
--- a/docs/zh/connectors/common-options/source-common-options.md
+++ b/docs/zh/connectors/common-options/source-common-options.md
@@ -16,6 +16,7 @@ sidebar_position: 3
|---------------|--------|----|-----|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| plugin_output | String | 否 | - | 当未指定 `plugin_output`
时,此插件处理的数据将不会被注册为可由其他插件直接访问的数据集 `(dataStream/dataset)`,或称为临时表
`(table)`。<br/>当指定了 `plugin_output` 时,此插件处理的数据将被注册为可由其他插件直接访问的数据集
`(dataStream/dataset)`,或称为临时表 `(table)`。此处注册的数据集 `(dataStream/dataset)` 可通过指定
`plugin_input` 直接被其他插件访问。 |
| parallelism | Int | 否 | - | 当未指定 `parallelism` 时,默认使用环境中的
`parallelism`。<br/>当指定了 `parallelism` 时,将覆盖环境中的 `parallelism` 设置。
|
+| datasource_id | String | 否 | - | 用于从数据源中心获取连接配置的数据源
ID。当指定此参数时,连接器将从外部元数据服务获取连接详细信息(如 URL、用户名、密码),而不是使用直接配置。详见 [数据源
SPI](../../introduction/concepts/datasource-spi)。
|
# 重要提示
diff --git a/docs/zh/introduction/concepts/datasource-spi.md
b/docs/zh/introduction/concepts/datasource-spi.md
new file mode 100644
index 0000000000..91cea00a80
--- /dev/null
+++ b/docs/zh/introduction/concepts/datasource-spi.md
@@ -0,0 +1,311 @@
+---
+title: 数据源 SPI
+weight: 6
+---
+
+# 数据源 SPI
+
+## 概述
+
+数据源 SPI(Service Provider Interface)是 SeaTunnel
引入的扩展机制,用于集中管理数据源连接配置。它允许外部元数据系统管理数据源元数据,而 SeaTunnel 作业通过简单的 `datasource_id`
引用这些配置。
+
+### 优势
+
+- **简化配置**:数据源连接信息(URL、用户名、密码等)在外部管理,无需在多个作业配置中重复
+- **增强安全性**:敏感凭据不再存储在作业配置文件中
+- **集中管理**:对数据源配置的修改只需在外部系统中进行一次
+- **向后兼容**:不使用 `datasource_id` 的现有作业可以继续正常工作
+- **可扩展**:通过实现 `DataSourceProvider` 接口可以集成自定义元数据系统
+
+## 使用 datasource_id
+
+`datasource_id` 是所有 SeaTunnel 连接器都可用的通用参数。当指定此参数时,连接器将从外部元数据服务获取连接配置,而不是使用直接配置。
+
+### 使用示例
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ datasource_id = "mysql-source-01"
+ database = "test_db"
+ table = "users"
+ query = "select * from users where status = 'active'"
+ }
+}
+
+sink {
+ Jdbc {
+ datasource_id = "mysql-sink-01"
+ database = "reporting_db"
+ table = "user_summary"
+ }
+}
+```
+
+当指定 `datasource_id` 时,连接器将:
+1. 使用 `datasource_id` 从外部元数据服务获取连接详细信息
+2. 将获取的配置与作业配置中的其他参数合并
+3. 作业级别的参数优先于获取的配置
+
+## 数据源 SPI 规范
+
+本节定义所有数据源提供者必须实现的标准 SPI 接口。
+
+### DataSourceProvider 接口
+
+`DataSourceProvider` 接口是将外部元数据系统与 SeaTunnel 集成的契约。实现通过使用 `@AutoService` 注解的
Java SPI 机制被发现。
+
+**位置**:`seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/DataSourceProvider.java`
+
+```java
+public interface DataSourceProvider extends AutoCloseable {
+
+ /**
+ * 返回此提供者的唯一标识符。
+ * 必须与 seatunnel.yaml 配置中的 "kind" 值匹配。
+ * 示例:"gravitino"、"datahub"、"atlas"、"custom"
+ */
+ String kind();
+
+ /**
+ * 使用来自 seatunnel.yaml 的配置初始化提供者。
+ * 在 SeaTunnel 启动期间调用一次。
+ *
+ * @param config 提供者特定的配置
+ */
+ void init(Config config);
+
+ /**
+ * 将 datasource_id 映射到连接器配置。
+ *
+ * @param connectorIdentifier 连接器标识符(例如:"Jdbc"、"Kafka")
+ * @param datasourceId 外部系统中的数据源 ID
+ * @return 连接器的配置映射,如果映射失败则返回 null
+ */
+ Map<String, Object> datasourceMap(String connectorIdentifier, String
datasourceId);
+
+ /**
+ * 关闭此提供者持有的资源。
+ * 在 SeaTunnel 关闭期间调用。
+ */
+ @Override
+ void close();
+}
+```
+
+### 生命周期
+
+1. **发现**:提供者实例通过 `@AutoService(DataSourceProvider.class)` 被发现并缓存
+2. **初始化**:使用来自 `seatunnel.yaml` 的配置调用 `init(Config)`
+3. **使用**:调用 `datasourceMap(String, String)` 来解析每个连接器的 `datasource_id`
+4. **清理**:关闭期间调用 `close()`
+
+### 资源管理
+
+提供者负责管理数据源映射所需的所有资源:
+- 用于 REST API 调用的 HTTP 客户端
+- 用于数据库访问的连接池
+- 任何其他共享资源
+
+这些资源应在 `init()` 中创建,在多次 `datasourceMap()` 调用中重用,并在 `close()` 中清理。
+
+## 配置
+
+以下配置示例以 **Gravitino 作为默认提供者**为例。如需使用其他提供者,请相应调整 `kind` 和提供者特定的选项。
+
+### seatunnel.yaml 配置
+
+要启用数据源中心,请在 `seatunnel.yaml` 中添加以下配置:
+
+```yaml
+seatunnel:
+ engine:
+ datasource:
+ enabled: true
+ kind: gravitino
+ gravitino:
+ uri: http://127.0.0.1:8090
+ metalake: test_metalake
+```
+
+### 配置选项
+
+| 选项 | 类型 | 默认值 | 描述
|
+|----------------------|---------|-------------|---------------------------------------------|
+| `enabled` | Boolean | `false` | 是否启用数据源中心
|
+| `kind` | String | `gravitino` | 要使用的数据源提供者类型
|
+| `gravitino.uri` | String | - | Gravitino 服务器 URI(当
kind=gravitino 时必填) |
+| `gravitino.metalake` | String | - | Gravitino metalake 名称(当
kind=gravitino 时必填) |
+
+## 默认实现:Gravitino
+
+Apache Gravitino 是数据源 SPI 的默认实现。要使用 Gravitino 作为数据源中心,必须将 `datasource.enabled`
设置为 `true`,并明确指定 `kind` 为 `gravitino`,同时配置所需的参数。
+
+### datasource_id 配置
+
+当使用 Gravitino 作为数据源中心时,`datasource_id` 的值应配置为 Gravitino 中 **catalog 的名称**。
+
+例如,如果 Gravitino 中有一个名为 `mysql-catalog` 的 catalog,则直接将其作为 `datasource_id` 使用:
+
+```hocon
+source {
+ Jdbc {
+ datasource_id = "mysql-catalog"
+ database = "test_db"
+ table = "users"
+ }
+}
+```
+
+### 属性映射
+
+Gravitino 提供者执行**有限的属性名映射**,从 Gravitino 目录属性映射到 SeaTunnel
连接器配置。**仅支持以下四种属性映射**:
+
+| Gravitino 属性 | SeaTunnel 属性 |
+|-----------------|--------------|
+| `jdbc-url` | `url` |
+| `jdbc-user` | `username` |
+| `jdbc-password` | `password` |
+| `jdbc-driver` | `driver` |
+
+> **注意**:Gravitino 目录中的任何其他属性不会传递。如果您需要额外的属性映射,请考虑实现自定义的 `DataSourceProvider`。
+
+### 连接器支持
+
+Gravitino 提供者目前支持:
+- **Jdbc** 连接器(完全支持)
+
+### 示例
+
+#### Gravitino 目录响应
+
+```json
+{
+ "code": 0,
+ "catalog": {
+ "name": "mysql-catalog",
+ "type": "relational",
+ "provider": "jdbc-mysql",
+ "properties": {
+ "jdbc-url": "jdbc:mysql://localhost:3306/",
+ "jdbc-user": "root",
+ "jdbc-password": "secret",
+ "jdbc-driver": "com.mysql.cj.jdbc.Driver"
+ }
+ }
+}
+```
+
+#### 映射后的 SeaTunnel 配置
+
+```hocon
+{
+ url = "jdbc:mysql://localhost:3306/"
+ username = "root"
+ password = "secret"
+ driver = "com.mysql.cj.jdbc.Driver"
+}
+```
+
+## 实现自定义提供者
+
+要将自定义元数据系统与 SeaTunnel 集成,请实现 `DataSourceProvider` 接口。
+
+### 步骤 1:添加依赖
+
+将 `seatunnel-api` 依赖添加到项目的 `pom.xml` 中:
+
+```xml
+<dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${seatunnel.version}</version>
+ <scope>provided</scope>
+</dependency>
+```
+
+> **注意**:使用 `<scope>provided</scope>`,因为 SeaTunnel 在运行时已包含此依赖。
+
+### 步骤 2:创建提供者类
+
+```java
+@AutoService(DataSourceProvider.class)
+public class MyDataSourceProvider implements DataSourceProvider {
+
+ private HttpClient httpClient;
+
+ @Override
+ public String kind() {
+ return "my-provider";
+ }
+
+ @Override
+ public void init(Config config) {
+ // 初始化客户端、连接池等
+ this.httpClient = HttpClient.newBuilder().build();
+ }
+
+ @Override
+ public Map<String, Object> datasourceMap(String connectorIdentifier,
String datasourceId) {
+ // 根据连接器类型从元数据服务获取
+ // 返回 SeaTunnel 兼容的配置
+ switch (connectorIdentifier.toLowerCase()) {
+ case "jdbc":
+ return fetchJdbcConfig(datasourceId);
+ case "kafka":
+ return fetchKafkaConfig(datasourceId);
+ default:
+ return Collections.emptyMap();
+ }
+ }
+
+ @Override
+ public void close() {
+ // 清理资源
+ if (httpClient != null) {
+ // 清理 HTTP 客户端
+ }
+ }
+}
+```
+
+### 步骤 3:配置 seatunnel.yaml
+
+```yaml
+seatunnel:
+ engine:
+ datasource:
+ enabled: true
+ kind: my-provider
+ my-provider:
+ endpoint: https://my-metadata-service.com
+ api-key: your-api-key
+```
+
+### 步骤 4:打包和部署
+
+- 将实现包含在 SeaTunnel 的类路径中
+- `@AutoService` 注解将通过 Java SPI 自动注册
+
+## 运行时流程
+
+1. **SeaTunnel 启动**
+ - 根据 `seatunnel.yaml` 加载配置的 `DataSourceProvider`
+ - 使用提供者特定的配置调用 `init()`
+
+2. **作业提交**
+ - 解析作业配置
+ - 检测连接器配置中是否存在 `datasource_id`
+
+3. **配置获取**
+ - 调用 `provider.datasourceMap(connectorIdentifier, datasourceId)` 从外部系统检索配置
+ - 提供者查询元数据服务并返回连接器配置
+
+4. **配置合并**
+ - 将获取的配置与作业级别的参数合并
+ - 作业级别的参数优先
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
index 7c550b3cc3..6ad333127c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
@@ -33,7 +33,8 @@ public enum SeaTunnelAPIErrorCode implements
SeaTunnelErrorCode {
SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist"),
LIST_DATABASES_FAILED("API-12", "List databases failed"),
LIST_TABLES_FAILED("API-13", "List tables failed"),
- GET_PRIMARY_KEY_FAILED("API-14", "Get primary key failed");
+ GET_PRIMARY_KEY_FAILED("API-14", "Get primary key failed"),
+ DATASOURCE_PROVIDER_INITIALIZE_FAILED("API-15", "DataSource provider
initialize failed");
private final String code;
private final String description;
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/DataSourceProvider.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/DataSourceProvider.java
new file mode 100644
index 0000000000..f0a72f56f5
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/DataSourceProvider.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.datasource;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.Map;
+
+/**
+ * SPI interface for external data source metadata providers.
+ *
+ * <p>Implementations of this interface are discovered via Java SPI and
provide integration with
+ * external metadata services (e.g., Gravitino, DataHub, Atlas).
+ *
+ * <p>The provider acts as an entry point for discovering and mapping data
sources to SeaTunnel
+ * connectors.
+ *
+ * <h2>Lifecycle </h2>
+ *
+ * <ol>
+ * <li>Provider instances are discovered via {@code @AutoService} and cached
for the lifetime of
+ * the SeaTunnel client process
+ * <li>{@link #init(Config)} is called once during startup with
configuration from {@code
+ * seatunnel.yaml}
+ * <li>{@link #datasourceMap(String, String)} is called to resolve {@code
datasourceId} in job
+ * configs
+ * <li>{@link #close()} is called once during client shutdown
+ * </ol>
+ *
+ * <h2>Resource Management </h2>
+ *
+ * <p>Providers are responsible for managing all resources needed for
datasource mapping:
+ *
+ * <ul>
+ * <li>HTTP clients for REST API calls
+ * <li>Connection pools for JDBC/Redis access
+ * <li>Any other shared resources
+ * </ul>
+ *
+ * <p>This ensures:
+ *
+ * <ul>
+ * <li>Resources are created once in {@link #init(Config)}
+ * <li>Resources are reused across multiple {@link #datasourceMap(String,
String)} calls
+ * <li>Resources are cleaned up in {@link #close()}
+ * </ul>
+ *
+ * <h2>Thread Safety </h2>
+ *
+ * <p>Provider instances may be accessed concurrently by multiple threads.
Implementations must be
+ * thread-safe.
+ */
+public interface DataSourceProvider extends AutoCloseable {
+
+ /**
+ * Returns a unique identifier for this data source provider.
+ *
+ * <p>The identifier should match the kind specified in the configuration
file (e.g.,
+ * "gravitino", "datahub", "atlas"). Use lower case for consistency.
+ *
+ * @return unique provider identifier
+ */
+ String kind();
+
+ /**
+ * Initializes the provider with the given configuration.
+ *
+ * @param config the configuration for this provider
+ */
+ void init(Config config);
+
+ /**
+ * Maps the given data source ID to connector configuration.
+ *
+ * <p>This method retrieves metadata from the external system for the
specified data source and
+ * converts it into a configuration map compatible with the target
connector.
+ *
+ * @param connectorIdentifier the connector identifier (e.g., "Jdbc",
"MySQL-CDC", "Kafka")
+ * @param datasourceId the data source ID in the external metadata system
+ * @return configuration map for the connector, or null if mapping fails
+ */
+ Map<String, Object> datasourceMap(String connectorIdentifier, String
datasourceId);
+
+ /** Closes resources held by this provider. */
+ @Override
+ void close();
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/DataSourceProviderFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/DataSourceProviderFactory.java
new file mode 100644
index 0000000000..3a89623877
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/DataSourceProviderFactory.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.datasource;
+
+import
org.apache.seatunnel.api.datasource.exception.DataSourceProviderException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+
+/**
+ * Utility class for discovering and loading {@link DataSourceProvider}
implementations via Java
+ * SPI.
+ *
+ * <p>This class provides methods to:
+ *
+ * <ul>
+ * <li>Find a specific provider by kind
+ * <li>Handle provider loading errors gracefully
+ * </ul>
+ */
+@Slf4j
+public final class DataSourceProviderFactory {
+
+ /**
+ * Finds a {@link DataSourceProvider} by its kind identifier.
+ *
+ * @param kind the kind identifier of the provider to find
+ * @return the provider
+ * @throws DataSourceProviderException if provider is not found or
multiple providers with the
+ * same kind are found
+ */
+ public static DataSourceProvider getProvider(String kind) {
+ List<DataSourceProvider> providers = loadProviders();
+
+ DataSourceProvider matchedProvider = null;
+ List<String> matchedKinds = new ArrayList<>();
+
+ for (DataSourceProvider provider : providers) {
+ if (provider.kind().equalsIgnoreCase(kind)) {
+ if (matchedProvider != null) {
+ log.error(
+ "Multiple DataSourceProvider implementations found
for kind '{}': {}",
+ kind,
+ matchedKinds);
+ throw new DataSourceProviderException(
+ String.format(
+ "Multiple DataSourceProvider
implementations found for kind '%s'.\n\n"
+ + "Ambiguous provider classes
are:\n\n%s",
+ kind, String.join("\n", matchedKinds)));
+ }
+ matchedProvider = provider;
+ matchedKinds.add(provider.getClass().getName());
+ }
+ }
+
+ if (matchedProvider == null) {
+ List<String> availableKinds = new ArrayList<>();
+ for (DataSourceProvider provider : providers) {
+ availableKinds.add(provider.kind());
+ }
+ log.debug("No DataSourceProvider found for kind: {}", kind);
+ throw new DataSourceProviderException(
+ String.format(
+ "No DataSourceProvider found for kind '%s'.\n\n"
+ + "Available provider kinds are:\n\n%s",
+ kind, String.join("\n", availableKinds)));
+ }
+
+ return matchedProvider;
+ }
+
+ /**
+ * Clears the provider cache.
+ *
+ * <p>This method is primarily intended for testing purposes. Currently,
this method does
+ * nothing as providers are loaded on-demand via SPI without caching.
+ */
+ public static void clearCache() {
+ // No-op for testing compatibility
+ }
+
+ /**
+ * Loads all providers via ServiceLoader.
+ *
+ * @return list of all discovered providers
+ */
+ private static List<DataSourceProvider> loadProviders() {
+ try {
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ List<DataSourceProvider> providers = new ArrayList<>();
+ ServiceLoader.load(DataSourceProvider.class, classLoader)
+ .iterator()
+ .forEachRemaining(providers::add);
+
+ if (providers.isEmpty()) {
+ log.info("No DataSourceProvider implementations found");
+ } else {
+ log.info(
+ "Loaded {} DataSourceProvider: {}",
+ providers.size(),
+ providers.stream()
+ .map(DataSourceProvider::kind)
+ .reduce((a, b) -> a + ", " + b)
+ .orElse(""));
+ }
+
+ return providers;
+ } catch (ServiceConfigurationError e) {
+ log.error("Could not load service provider for
DataSourceProvider.", e);
+ throw new DataSourceProviderException(
+ "Could not load service provider for DataSourceProvider.",
e);
+ }
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/exception/DataSourceProviderException.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/exception/DataSourceProviderException.java
new file mode 100644
index 0000000000..143f48779b
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/exception/DataSourceProviderException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.datasource.exception;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+/** A DataSourceProvider-related, runtime exception. */
+public class DataSourceProviderException extends SeaTunnelRuntimeException {
+
+ /** @param message the detail message. */
+ public DataSourceProviderException(String message) {
+ super(SeaTunnelAPIErrorCode.DATASOURCE_PROVIDER_INITIALIZE_FAILED,
message);
+ }
+
+ /** @param cause the cause. */
+ public DataSourceProviderException(Throwable cause) {
+ super(SeaTunnelAPIErrorCode.DATASOURCE_PROVIDER_INITIALIZE_FAILED,
cause);
+ }
+
+ /**
+ * @param message the detail message.
+ * @param cause the cause.
+ */
+ public DataSourceProviderException(String message, Throwable cause) {
+ super(SeaTunnelAPIErrorCode.DATASOURCE_PROVIDER_INITIALIZE_FAILED,
message, cause);
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/gravitino/GravitinoDataSourceProvider.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/gravitino/GravitinoDataSourceProvider.java
new file mode 100644
index 0000000000..8f5e43d4a7
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/gravitino/GravitinoDataSourceProvider.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.datasource.gravitino;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.datasource.DataSourceProvider;
+import org.apache.seatunnel.api.metalake.gravitino.GravitinoClient;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Gravitino implementation of {@link DataSourceProvider}.
+ *
+ * <p>This provider integrates with Apache Gravitino for centralized data
source metadata
+ * management.
+ *
+ * <p>Configuration (from seatunnel.yaml under seatunnel.engine.datasource):
+ *
+ * <pre>
+ * datasource:
+ * enabled: true
+ * kind: gravitino
+ * uri: <a href="http://localhost:8090">...</a> # Gravitino server
URI
+ * metalake: seatunnel # Metalake name
+ * </pre>
+ *
+ * <p>Gravitino response example:
+ *
+ * <pre>
+ * {
+ * "code": 0,
+ * "catalog": {
+ * "name": "local-mysql",
+ * "type": "relational",
+ * "provider": "jdbc-mysql",
+ * "properties": {
+ * "jdbc-url": "jdbc:mysql://localhost:3306/",
+ * "jdbc-user": "root",
+ * "jdbc-driver": "com.mysql.cj.jdbc.Driver",
+ * "jdbc-password": "123456"
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * <p>Maps to SeaTunnel JDBC config:
+ *
+ * <pre>
+ * {
+ * "url": "jdbc:mysql://localhost:3306/",
+ * "username": "root",
+ * "password": "123456",
+ * "driver": "com.mysql.cj.jdbc.Driver"
+ * }
+ * </pre>
+ */
+@Slf4j
+@AutoService(DataSourceProvider.class)
+public class GravitinoDataSourceProvider implements DataSourceProvider {
+
+ private String uri;
+ private String metalake;
+ private GravitinoClient client;
+
+ private static final String METALAKE_API_PATH = "/api/metalakes/";
+ private static final String CATALOGS_PATH = "/catalogs/";
+
+ // Gravitino JDBC property names
+ private static final String GRAVITINO_JDBC_URL = "jdbc-url";
+ private static final String GRAVITINO_JDBC_USER = "jdbc-user";
+ private static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password";
+ private static final String GRAVITINO_JDBC_DRIVER = "jdbc-driver";
+
+ // SeaTunnel JDBC config names
+ private static final String SEATUNNEL_URL = "url";
+ private static final String SEATUNNEL_USERNAME = "username";
+ private static final String SEATUNNEL_PASSWORD = "password";
+ private static final String SEATUNNEL_DRIVER = "driver";
+
+ // Supported connector identifier
+ private static final String JDBC_CONNECTOR = "Jdbc";
+
+ public static final Option<String> URI =
+ Options.key("uri")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Gravitino server URI, e.g.,
http://localhost:8090");
+
+ public static final Option<String> METALAKE =
+ Options.key("metalake")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Gravitino metalake name to use for data
source metadata");
+
+ @Override
+ public String kind() {
+ return "gravitino";
+ }
+
+ @Override
+ public void init(Config config) {
+ // Extract Gravitino-specific configuration
+ String uri = config.getString(URI.key());
+ String metalake = config.getString(METALAKE.key());
+ log.info("Gravitino server URI: {}", uri);
+ log.info("Gravitino metalake name: {}", metalake);
+ // Validate required parameters
+ if (uri == null || uri.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Gravitino URI is required. Please configure
'seatunnel.engine.datasource.uri' in seatunnel.yaml");
+ }
+ if (metalake == null || metalake.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Gravitino metalake is required. Please configure
'seatunnel.engine.datasource.metalake' in seatunnel.yaml");
+ }
+ this.uri = uri;
+ this.metalake = metalake;
+ this.client = new GravitinoClient();
+ }
+
+ @Override
+ public Map<String, Object> datasourceMap(String connectorIdentifier,
String datasourceId) {
+ if (!JDBC_CONNECTOR.equalsIgnoreCase(connectorIdentifier)) {
+ log.warn(
+ "Unsupported connector '{}' for Gravitino provider, only
'{}' is supported",
+ connectorIdentifier,
+ JDBC_CONNECTOR);
+ return Collections.emptyMap();
+ }
+
+ try {
+ String catalogBaseUrl = buildMetalakeUrl();
+ JsonNode propertiesNode = client.getMetaInfo(datasourceId,
catalogBaseUrl);
+ return convertToJdbcConfig(propertiesNode);
+ } catch (IOException e) {
+ throw new SeaTunnelException(
+ String.format(
+ "Failed to fetch metadata from Gravitino for
datasource: %s",
+ datasourceId),
+ e);
+ }
+ }
+
+ /**
+ * Builds the metalake URL for Gravitino API calls.
+ *
+ * @return complete metalake URL
+ */
+ private String buildMetalakeUrl() {
+ String baseUri = uri.endsWith("/") ? uri : uri + "/";
+ return baseUri + METALAKE_API_PATH + metalake + CATALOGS_PATH;
+ }
+
+ /**
+ * Converts Gravitino properties to SeaTunnel JDBC connector configuration.
+ *
+ * <p>Mapping:
+ *
+ * <ul>
+ * <li>jdbc-url → url
+ * <li>jdbc-user → username
+ * <li>jdbc-password → password
+ * <li>jdbc-driver → driver
+ * </ul>
+ *
+ * @param propertiesNode Gravitino properties JSON node
+ * @return SeaTunnel JDBC configuration map
+ */
+ private Map<String, Object> convertToJdbcConfig(JsonNode propertiesNode) {
+ Map<String, Object> config = new HashMap<>();
+ if (propertiesNode.has(GRAVITINO_JDBC_URL)) {
+ config.put(SEATUNNEL_URL,
propertiesNode.get(GRAVITINO_JDBC_URL).asText());
+ }
+ if (propertiesNode.has(GRAVITINO_JDBC_USER)) {
+ config.put(SEATUNNEL_USERNAME,
propertiesNode.get(GRAVITINO_JDBC_USER).asText());
+ }
+ if (propertiesNode.has(GRAVITINO_JDBC_PASSWORD)) {
+ config.put(SEATUNNEL_PASSWORD,
propertiesNode.get(GRAVITINO_JDBC_PASSWORD).asText());
+ }
+ if (propertiesNode.has(GRAVITINO_JDBC_DRIVER)) {
+ config.put(SEATUNNEL_DRIVER,
propertiesNode.get(GRAVITINO_JDBC_DRIVER).asText());
+ }
+ return config;
+ }
+
+ @Override
+ public void close() {
+ if (client != null) {
+ client.close();
+ }
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeConfigUtils.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeConfigUtils.java
index adc2fb0896..ed38a5ba60 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeConfigUtils.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/MetalakeConfigUtils.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+@Deprecated
@Slf4j
public class MetalakeConfigUtils {
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java
index 975bb94bc7..0f531fe826 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -60,7 +61,19 @@ public class GravitinoClient implements MetalakeClient {
private final CloseableHttpClient httpClient;
public GravitinoClient() {
- this.httpClient = HttpClients.createDefault();
+ RequestConfig config =
+ RequestConfig.custom()
+ .setConnectTimeout(5000)
+ .setConnectionRequestTimeout(5000)
+ .setSocketTimeout(30000)
+ .build();
+
+ this.httpClient =
+ HttpClients.custom()
+ .setDefaultRequestConfig(config)
+ .setMaxConnTotal(50)
+ .setMaxConnPerRoute(20)
+ .build();
}
@VisibleForTesting
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/ConnectorCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/ConnectorCommonOptions.java
index 49b89b45a6..d91cee5b1f 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/ConnectorCommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/options/ConnectorCommonOptions.java
@@ -72,4 +72,13 @@ public class ConnectorCommonOptions
"When plugin_input is not specified, "
+ "the current plug-in processes the data
set dataset output by the previous plugin in the configuration file. "
+ "When plugin_input is specified, the
current plug-in is processing the data set corresponding to this parameter.");
+
+ public static Option<String> DATASOURCE_ID =
+ Options.key("datasource_id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The data source ID for retrieving connection
configuration from DataSource Center. "
+ + "When specified, the connector will
fetch connection details (e.g., URL, username, password) "
+ + "from the external metadata service
instead of using direct configuration.");
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/MetalakeIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/MetalakeIT.java
index 62338e55f3..56394abeba 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/MetalakeIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/MetalakeIT.java
@@ -130,6 +130,13 @@ public class MetalakeIT extends SeaTunnelContainer {
+
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
Paths.get(SEATUNNEL_HOME, "config").toString());
+ // Copy datasource-enabled seatunnel.yaml
+ server.withCopyFileToContainer(
+ MountableFile.forHostPath(
+ PROJECT_ROOT_PATH
+ +
"/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/config/seatunnel.yaml"),
+ Paths.get(SEATUNNEL_HOME, "config",
"seatunnel.yaml").toString());
+
server.withCopyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
@@ -151,7 +158,7 @@ public class MetalakeIT extends SeaTunnelContainer {
"bash",
"-c",
"sleep 60 && curl -L 'http://127.0.0.1:8090/api/metalakes' -H
'Content-Type: application/json' -H 'Accept: application/vnd.gravitino.v1+json'
-d '{\"name\":\"test_metalake\",\"comment\":\"for metalake
test\",\"properties\":{}}'"
- + "&& curl -L
'http://127.0.0.1:8090/api/metalakes/test_metalake/catalogs' -H 'Content-Type:
application/json' -H 'Accept: application/vnd.gravitino.v1+json' -d
'{\"name\":\"test_catalog\",\"type\":\"relational\",\"provider\":\"jdbc-mysql\",\"comment\":\"for
metalake
test\",\"properties\":{\"jdbc-driver\":\"com.mysql.cj.jdbc.Driver\",\"jdbc-url\":\"not
used\",\"jdbc-user\":\"root\",\"jdbc-password\":\"Abc!@#135_seatunnel\"}}'");
+ + "&& curl -L
'http://127.0.0.1:8090/api/metalakes/test_metalake/catalogs' -H 'Content-Type:
application/json' -H 'Accept: application/vnd.gravitino.v1+json' -d
'{\"name\":\"test_catalog\",\"type\":\"relational\",\"provider\":\"jdbc-mysql\",\"comment\":\"for
metalake
test\",\"properties\":{\"jdbc-driver\":\"com.mysql.cj.jdbc.Driver\",\"jdbc-url\":\"jdbc:mysql://mysql-e2e:3306/seatunnel?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true\",\"jdbc-user\":\"
[...]
dbServer =
initContainer().withImagePullPolicy(PullPolicy.alwaysPull());
@@ -190,6 +197,13 @@ public class MetalakeIT extends SeaTunnelContainer {
Assertions.assertEquals(0, execResult.getExitCode());
}
+ @Test
+ public void testDataSource() throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
executeJob("/jdbc_mysql_source_to_assert_sink_with_datasource_enable.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
String driverUrl() {
return
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/config/seatunnel.yaml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/config/seatunnel.yaml
new file mode 100644
index 0000000000..57e9601167
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/config/seatunnel.yaml
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel:
+ engine:
+ history-job-expire-minutes: 1
+ backup-count: 2
+ queue-type: blockingqueue
+ print-execution-info-interval: 10
+ classloader-cache-mode: false
+ slot-service:
+ dynamic-slot: true
+ checkpoint:
+ interval: 300000
+ timeout: 100000
+ storage:
+ type: localfile
+ max-retained: 3
+ plugin-config:
+ namespace: /tmp/seatunnel/checkpoint_snapshot/
+ http:
+ enable-http: true
+ port: 8080
+ telemetry:
+ metric:
+ enabled: false
+ logs:
+ scheduled-deletion-enable: false
+ datasource:
+ enabled: true
+ kind: gravitino
+ gravitino:
+ uri: http://127.0.0.1:8090
+ metalake: test_metalake
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_mysql_source_to_assert_sink_with_datasource_enable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_mysql_source_to_assert_sink_with_datasource_enable.conf
new file mode 100644
index 0000000000..4f53d3da07
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_mysql_source_to_assert_sink_with_datasource_enable.conf
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ datasource_id = "test_catalog"
+ query = "select * from source"
+ }
+}
+
+transform {
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 101
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 99
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_bit_8
+ field_type = bytes
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_bit_16
+ field_type = bytes
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_bit_32
+ field_type = bytes
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_bit_64
+ field_type = bytes
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_bigint_30
+ field_type = "decimal(20,0)"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index c38d4adc92..b84cb92380 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.engine.client.job.JobClient;
import
org.apache.seatunnel.engine.client.job.JobMetricsRunner.JobMetricsSummary;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.utils.DataSourceConfigResolver;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetClusterHealthMetricsCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
@@ -106,6 +107,7 @@ public class SeaTunnelClient implements
SeaTunnelClientInstance, AutoCloseable {
@Override
public void close() {
hazelcastClient.getHazelcastInstance().shutdown();
+ DataSourceConfigResolver.closeProviders();
}
public ILogger getLogger() {
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
index 383b8bcb2c..b43f71fa1b 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.config.server.DataSourceConfig;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment;
@@ -106,6 +107,7 @@ public class ClientJobExecutionEnvironment extends
AbstractJobEnvironment {
jobClient.getCheckpointData(
Long.parseLong(jobConfig.getJobContext().getJobId()));
}
+ DataSourceConfig dataSourceConfig =
seaTunnelConfig.getEngineConfig().getDataSourceConfig();
return new MultipleTableJobConfigParser(
jobFilePath,
variables,
@@ -113,7 +115,8 @@ public class ClientJobExecutionEnvironment extends
AbstractJobEnvironment {
jobConfig,
commonPluginJars,
isStartWithSavePoint,
- pipelineCheckpoints);
+ pipelineCheckpoints,
+ dataSourceConfig);
}
@VisibleForTesting
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index 79a4d0ace0..5edb6f83f8 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.common.config;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
import
org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig;
+import org.apache.seatunnel.engine.common.config.server.DataSourceConfig;
import org.apache.seatunnel.engine.common.config.server.HttpConfig;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
@@ -99,6 +100,8 @@ public class EngineConfig {
private HttpConfig httpConfig =
ServerConfigOptions.MasterServerConfigOptions.HTTP.defaultValue();
+ private DataSourceConfig dataSourceConfig =
ServerConfigOptions.DATASOURCE.defaultValue();
+
public void setBackupCount(int newBackupCount) {
checkBackupCount(newBackupCount, 0);
this.backupCount = newBackupCount;
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 2794862a5a..d6aec92f99 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -26,6 +26,8 @@ import
org.apache.seatunnel.engine.common.config.server.ConnectorJarHAStorageCon
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageMode;
import
org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig;
+import org.apache.seatunnel.engine.common.config.server.DataSourceConfig;
+import org.apache.seatunnel.engine.common.config.server.DataSourceOptions;
import org.apache.seatunnel.engine.common.config.server.HttpConfig;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
@@ -255,6 +257,8 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
ScheduleStrategy.valueOf(getTextContent(node).toUpperCase(Locale.ROOT)));
} else if
(ServerConfigOptions.MasterServerConfigOptions.HTTP.key().equals(name)) {
engineConfig.setHttpConfig(parseHttpConfig(node));
+ } else if (ServerConfigOptions.DATASOURCE.key().equals(name)) {
+ engineConfig.setDataSourceConfig(parseDataSourceConfig(node));
} else if
(ServerConfigOptions.MasterServerConfigOptions.COORDINATOR_SERVICE
.key()
.equals(name)) {
@@ -584,4 +588,28 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
}
return httpConfig;
}
+
+ private DataSourceConfig parseDataSourceConfig(Node dataSourceNode) {
+ DataSourceConfig dataSourceConfig = new DataSourceConfig();
+ String providerKind = null;
+
+ for (Node node : childElements(dataSourceNode)) {
+ String name = cleanNodeName(node);
+ if (DataSourceOptions.ENABLED.key().equals(name)) {
+
dataSourceConfig.setEnabled(getBooleanValue(getTextContent(node)));
+ } else if (DataSourceOptions.KIND.key().equals(name)) {
+ providerKind = getTextContent(node);
+ dataSourceConfig.setKind(providerKind);
+ } else if (providerKind != null &&
providerKind.equalsIgnoreCase(name)) {
+ // Parse nested provider properties (e.g., gravitino.uri,
gravitino.metalake)
+ for (Node propertyNode : childElements(node)) {
+ String propertyName = cleanNodeName(propertyNode);
+ dataSourceConfig
+ .getProperties()
+ .put(propertyName, getTextContent(propertyNode));
+ }
+ }
+ }
+ return dataSourceConfig;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/DataSourceConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/DataSourceConfig.java
new file mode 100644
index 0000000000..e1172585d0
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/DataSourceConfig.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config.server;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration for DataSource Center which manages external metadata
providers.
+ *
+ * <p>This config contains only common properties (enabled, kind) and a
properties map for
+ * provider-specific settings. Provider implementations should extract their
own configuration from
+ * the properties map.
+ */
+@Data
+public class DataSourceConfig implements Serializable {
+
+ /** Whether to enable DataSource Center. */
+ private boolean enabled = DataSourceOptions.ENABLED.defaultValue();
+
+ /**
+ * The kind of DataSource provider to use. Supported values: "gravitino",
"datahub", "atlas",
+ * etc.
+ */
+ private String kind = DataSourceOptions.KIND.defaultValue();
+
+ /**
+ * Provider-specific properties. Each provider (e.g., Gravitino) should
extract its own
+ * configuration from this map.
+ */
+ private Map<String, String> properties = new HashMap<>();
+}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/DataSourceOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/DataSourceOptions.java
new file mode 100644
index 0000000000..eee5c15ad6
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/DataSourceOptions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config.server;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+/** Configuration options for DataSource Center. */
+public class DataSourceOptions {
+
+ /** Whether to enable DataSource Center. */
+ public static final Option<Boolean> ENABLED =
+ Options.key("enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to enable DataSource Center for
centralized data source metadata management. "
+ + "When enabled, data source connection
details can be referenced via datasourceId instead of being directly specified
in job configs.");
+
+ /**
+ * The kind of DataSource provider to use. Supported values: "gravitino",
"datahub", "atlas",
+ * etc.
+ */
+ public static final Option<String> KIND =
+ Options.key("kind")
+ .stringType()
+ .defaultValue("gravitino")
+ .withDescription(
+ "The kind of DataSource provider to use. Supported
values: gravitino, datahub, atlas, etc.");
+}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 631267c7e3..d4dd206023 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -66,6 +66,12 @@ public class ServerConfigOptions {
.type(new TypeReference<TelemetryConfig>() {})
.defaultValue(new TelemetryConfig())
.withDescription("The telemetry configuration.");
+
+ public static final Option<DataSourceConfig> DATASOURCE =
+ Options.key("datasource")
+ .type(new TypeReference<DataSourceConfig>() {})
+ .defaultValue(new DataSourceConfig())
+ .withDescription("The DataSource Center configuration.");
// The options for metrics end
/////////////////////////////////////////////////
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigResolver.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigResolver.java
new file mode 100644
index 0000000000..dbba701b7b
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigResolver.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.utils;
+
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigException;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueType;
+
+import org.apache.seatunnel.api.datasource.DataSourceProvider;
+import org.apache.seatunnel.api.datasource.DataSourceProviderFactory;
+import
org.apache.seatunnel.api.datasource.exception.DataSourceProviderException;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.engine.common.config.server.DataSourceConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Utility class for resolving data source configurations from DataSource
Center.
+ *
+ * <p>This utility provides methods to merge connection configurations
retrieved from external
+ * metadata services (via {@link DataSourceProvider}) into SeaTunnel connector
configurations.
+ */
+@Slf4j
+public final class DataSourceConfigResolver {
+
+ /** Cache for initialized DataSourceProvider instance. */
+ private static volatile DataSourceProvider cachedProvider = null;
+
+ /**
+ * Resolves and merges data source configurations for a SeaTunnel job
config.
+ *
+ * @param seaTunnelJobConfig the SeaTunnel job configuration (Hocon/Config
format)
+ * @param dataSourceConfig the DataSource configuration containing
provider kind and properties
+ * @return a new Config with datasource configurations merged
+ */
+ public static Config resolveDataSourceConfigs(
+ Config seaTunnelJobConfig, DataSourceConfig dataSourceConfig) {
+ if (!dataSourceConfig.isEnabled()) {
+ log.debug("DataSource Center is disabled, returning original
config");
+ return seaTunnelJobConfig;
+ }
+
+ String providerKind = dataSourceConfig.getKind();
+ log.info("Starting datasource config resolution with provider: {}",
providerKind);
+
+ // Get or create initialized provider instance (cached with lazy
loading)
+ DataSourceProvider provider =
+ getOrCreateProvider(
+ providerKind,
ConfigFactory.parseMap(dataSourceConfig.getProperties()));
+
+ // Get original config as unwrapped map
+ Map<String, Object> originalMap =
seaTunnelJobConfig.root().unwrapped();
+ Map<String, Object> resultMap = new HashMap<>(originalMap);
+
+ // Resolve source configs
+ List<? extends Config> sourceConfigs =
+ TypesafeConfigUtils.getConfigList(
+ seaTunnelJobConfig, PluginType.SOURCE.getType(),
Collections.emptyList());
+ List<Object> resolvedSources = new ArrayList<>();
+ for (Config sourceConfig : sourceConfigs) {
+ Config resolved = resolveConnectorConfig(sourceConfig, provider,
providerKind);
+ resolvedSources.add(resolved.root().unwrapped());
+ }
+ if (!resolvedSources.isEmpty()) {
+ resultMap.put(PluginType.SOURCE.getType(), resolvedSources);
+ }
+
+ // Resolve sink configs
+ List<? extends Config> sinkConfigs =
+ TypesafeConfigUtils.getConfigList(
+ seaTunnelJobConfig, PluginType.SINK.getType(),
Collections.emptyList());
+ List<Object> resolvedSinks = new ArrayList<>();
+ for (Config sinkConfig : sinkConfigs) {
+ Config resolved = resolveConnectorConfig(sinkConfig, provider,
providerKind);
+ resolvedSinks.add(resolved.root().unwrapped());
+ }
+ if (!resolvedSinks.isEmpty()) {
+ resultMap.put(PluginType.SINK.getType(), resolvedSinks);
+ }
+
+ return ConfigFactory.parseMap(resultMap);
+ }
+
+ /**
+ * Gets or creates an initialized DataSourceProvider instance with lazy
loading caching.
+ *
+ * @param kind the provider kind (e.g., "gravitino", "datahub")
+ * @param config the configuration for the provider
+ * @return initialized DataSourceProvider instance
+ */
+ private static DataSourceProvider getOrCreateProvider(String kind, Config
config) {
+ DataSourceProvider provider = cachedProvider;
+ if (provider == null) {
+ synchronized (DataSourceConfigResolver.class) {
+ provider = cachedProvider;
+ if (provider == null) {
+ provider = DataSourceProviderFactory.getProvider(kind);
+ provider.init(config);
+ cachedProvider = provider;
+ log.info("Created and cached new DataSourceProvider: {}",
kind);
+ }
+ }
+ }
+ return provider;
+ }
+
+ /**
+ * Resolves and merges data source configuration for a single connector
config.
+ *
+ * <p>If the config contains a {@code datasource_id}, this method will:
+ *
+ * <ol>
+ * <li>Use the provided {@link DataSourceProvider} (already initialized)
+ * <li>Fetch the connection config from the metadata service using the
datasource_id
+ * <li>Merge the fetched config into the original config
+ * </ol>
+ *
+ * @param connectorConfig the connector configuration
+ * @param provider the initialized DataSourceProvider instance
+ * @param providerKind the kind of DataSourceProvider (e.g., "gravitino",
"datahub")
+ * @return a new Config with datasource configuration merged, or the
original config if no
+ * datasource_id is present
+ */
+ private static Config resolveConnectorConfig(
+ Config connectorConfig, DataSourceProvider provider, String
providerKind) {
+ Optional<String> datasourceIdOptional =
getDatasourceId(connectorConfig);
+
+ if (!datasourceIdOptional.isPresent()) {
+ log.debug(
+ "No datasource_id found in connector config at path: {},
returning original config",
+ connectorConfig.origin().description());
+ return connectorConfig;
+ }
+
+ String datasourceId = datasourceIdOptional.get();
+ String connectorIdentifier = getConnectorIdentifier(connectorConfig);
+
+ log.info(
+ "Resolving datasource config for connector: {}, datasource_id:
{}, provider: {}",
+ connectorIdentifier,
+ datasourceId,
+ providerKind);
+
+ try {
+ // Fetch connection config from metadata service via provider
+ Map<String, Object> datasourceConfig =
+ provider.datasourceMap(connectorIdentifier, datasourceId);
+
+ if (datasourceConfig == null || datasourceConfig.isEmpty()) {
+ log.warn(
+ "Received empty or null config from DataSourceProvider
for datasource_id: {}",
+ datasourceId);
+ return connectorConfig;
+ }
+
+ // Merge the fetched config into the original config
+ return mergeConfig(connectorConfig, datasourceConfig,
datasourceId);
+
+ } catch (DataSourceProviderException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new DataSourceProviderException(
+ String.format(
+ "Failed to resolve datasource config for
connector: %s, datasource_id: %s, provider: %s",
+ connectorIdentifier, datasourceId, providerKind),
+ e);
+ }
+ }
+
+ /**
+ * Gets the datasource_id from a connector config.
+ *
+ * <p>This method first checks at the root level, then looks inside the
nested connector config.
+ *
+ * @param config the connector configuration
+ * @return Optional containing the datasource_id if present, empty
otherwise
+ */
+ private static Optional<String> getDatasourceId(Config config) {
+ try {
+ // First check at root level (for configs like { Jdbc: {...},
datasource_id: "ds-123" })
+ if (config.hasPath(ConnectorCommonOptions.DATASOURCE_ID.key())) {
+ return
Optional.of(config.getString(ConnectorCommonOptions.DATASOURCE_ID.key()));
+ }
+
+ // If not found at root, check inside the nested connector config
+ // (for configs like { Jdbc: { datasource_id: "ds-123", ... } })
+ String connectorIdentifier = getConnectorIdentifier(config);
+ if (!"unknown".equals(connectorIdentifier)) {
+ Config nestedConfig = config.getConfig(connectorIdentifier);
+ if
(nestedConfig.hasPath(ConnectorCommonOptions.DATASOURCE_ID.key())) {
+ return Optional.of(
+
nestedConfig.getString(ConnectorCommonOptions.DATASOURCE_ID.key()));
+ }
+ }
+ } catch (ConfigException e) {
+ log.debug("Failed to get datasource_id from config", e);
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Gets the connector identifier (plugin name) from a connector config.
+ *
+ * <p>This method first checks for plugin_name in the config (from
TypesafeConfigUtils
+ * processing), then looks for a nested object structure.
+ *
+ * @param config the connector configuration
+ * @return the connector identifier or "unknown" if not found
+ */
+ private static String getConnectorIdentifier(Config config) {
+ // First check if plugin_name is present (added by TypesafeConfigUtils)
+ try {
+ if (config.hasPath(ConnectorCommonOptions.PLUGIN_NAME.key())) {
+ return
config.getString(ConnectorCommonOptions.PLUGIN_NAME.key());
+ }
+ } catch (ConfigException e) {
+ // Ignore, try the nested structure approach
+ }
+
+ // Fallback: look for nested object structure (original config format)
+ for (Map.Entry<String, ConfigValue> entry : config.root().entrySet()) {
+ if (entry.getValue().valueType() == ConfigValueType.OBJECT) {
+ return entry.getKey();
+ }
+ }
+ return "unknown";
+ }
+
+ /**
+ * Merges the datasource configuration into the original connector config.
+ *
+ * <p>The datasource config values will override values in the original
config if keys conflict.
+ *
+ * @param connectorConfig the original connector configuration
+ * @param datasourceConfig the configuration fetched from DataSource Center
+ * @param datasourceId the datasource ID (for logging purposes)
+ * @return a new Config with merged configurations
+ */
+ private static Config mergeConfig(
+ Config connectorConfig, Map<String, Object> datasourceConfig,
String datasourceId) {
+ // Get the connector identifier (plugin name)
+ String connectorIdentifier = getConnectorIdentifier(connectorConfig);
+
+ // Check if this is the flat structure from TypesafeConfigUtils (has
plugin_name field)
+ boolean isFlatStructure =
connectorConfig.hasPath(ConnectorCommonOptions.PLUGIN_NAME.key());
+
+ Map<String, Object> originalMap;
+ if (isFlatStructure) {
+ // Flat structure: directly use the config's root map
+ originalMap = new HashMap<>(connectorConfig.root().unwrapped());
+ } else {
+ // Nested structure: get the nested config inside the plugin
+ Config originalNestedConfig =
connectorConfig.getConfig(connectorIdentifier);
+ originalMap = new
HashMap<>(originalNestedConfig.root().unwrapped());
+ // Also include the plugin_name as the root key
+ Map<String, Object> wrapperMap = new HashMap<>();
+ wrapperMap.put(connectorIdentifier, originalMap);
+ originalMap = wrapperMap;
+ }
+
+ // Create merged map
+ Map<String, Object> mergedMap = new HashMap<>(originalMap);
+
+ // Merge datasource config - values from datasourceConfig will override
+ for (Map.Entry<String, Object> entry : datasourceConfig.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ log.debug("Merging datasource config: key={}, datasource_id={}",
key, datasourceId);
+
+ if (isFlatStructure) {
+ // Flat structure: merge directly into the map
+ mergedMap.put(key, value);
+ } else {
+ // Nested structure: merge into the nested map
+ @SuppressWarnings("unchecked")
+ Map<String, Object> nestedMap =
+ (Map<String, Object>)
mergedMap.get(connectorIdentifier);
+ if (nestedMap != null) {
+ nestedMap.put(key, value);
+ }
+ }
+ }
+
+ Config mergedConfig = ConfigFactory.parseMap(mergedMap);
+
+ log.info(
+ "Successfully merged datasource config for datasource_id: {},
connector: {}, merged keys count: {}",
+ datasourceId,
+ connectorIdentifier,
+ datasourceConfig.size());
+
+ return mergedConfig;
+ }
+
+ /**
+ * Closes the cached DataSourceProvider instance.
+ *
+ * <p>This method should be called when the application shuts down (e.g.,
when the SeaTunnel
+ * Server or Client is stopping) to properly release all resources held by
the provider.
+ *
+ * <p>This method is idempotent and can be safely called multiple times.
+ */
+ public static void closeProviders() {
+ DataSourceProvider provider = cachedProvider;
+ if (provider != null) {
+ try {
+ log.info("Closing cached DataSourceProvider");
+ provider.close();
+ } catch (Exception e) {
+ log.warn("Failed to close DataSourceProvider", e);
+ }
+ cachedProvider = null;
+ }
+ log.info("DataSourceProvider closed");
+ }
+
+ /**
+ * Clears the provider cache.
+ *
+ * <p>This method is primarily intended for testing purposes. It closes
the cached provider and
+ * clears the cache.
+ */
+ @VisibleForTesting
+ public static void clearCache() {
+ closeProviders();
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessorTest.java
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessorTest.java
new file mode 100644
index 0000000000..b6d9c35e72
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config;
+
+import org.apache.seatunnel.engine.common.config.server.DataSourceConfig;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.nio.file.Paths;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for {@link YamlSeaTunnelDomConfigProcessor} focusing on
datasource config parsing. */
+public class YamlSeaTunnelDomConfigProcessorTest {
+
+ @Test
+ public void testParseDataSourceConfigWithNestedProvider() throws Exception
{
+ String configPath =
+ Paths.get(
+ YamlSeaTunnelDomConfigProcessorTest.class
+
.getResource("/conf/datasource-nested-config.yaml")
+ .toURI())
+ .toString();
+
+ SeaTunnelConfig config =
+ new YamlSeaTunnelConfigBuilder(new
FileInputStream(configPath)).build();
+ DataSourceConfig dataSourceConfig =
config.getEngineConfig().getDataSourceConfig();
+
+ // Verify basic settings
+ assertTrue(dataSourceConfig.isEnabled());
+ assertEquals("test_kind", dataSourceConfig.getKind());
+
+ // Verify nested provider properties are parsed correctly
+ assertEquals(3, dataSourceConfig.getProperties().size());
+ assertEquals("http://127.0.0.1:8090",
dataSourceConfig.getProperties().get("test_config1"));
+ assertEquals("test_metalake",
dataSourceConfig.getProperties().get("test_config2"));
+ assertEquals("test",
dataSourceConfig.getProperties().get("test_config3"));
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtilTest.java
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtilTest.java
new file mode 100644
index 0000000000..4cfbf54038
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtilTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.utils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.datasource.DataSourceProvider;
+import
org.apache.seatunnel.api.datasource.exception.DataSourceProviderException;
+import org.apache.seatunnel.engine.common.config.server.DataSourceConfig;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import lombok.SneakyThrows;
+
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class DataSourceConfigUtilTest {
+
+ private static final String TEST_PROVIDER_KIND = "test-provider";
+
+ @BeforeEach
+ public void setUp() {
+ // Clear the cache before each test
+ DataSourceConfigResolver.clearCache();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ // Clear the cache after each test
+ DataSourceConfigResolver.clearCache();
+ }
+
+ @Test
+ public void testResolveDataSourceConfigsWithDatasourceId() {
+ // Register mock provider
+ MockTestDataSourceProvider provider = new MockTestDataSourceProvider();
+ registerProvider(provider);
+
+ // Load config from file with datasource_id
+ Config jobConfig = loadTestConfig();
+
+ // Create DataSourceConfig
+ DataSourceConfig dataSourceConfig = new DataSourceConfig();
+ dataSourceConfig.setEnabled(true);
+ dataSourceConfig.setKind(TEST_PROVIDER_KIND);
+
+ // Resolve with datasource_id
+ Config resolved =
+ DataSourceConfigResolver.resolveDataSourceConfigs(jobConfig,
dataSourceConfig);
+
+ assertNotNull(resolved);
+
+ // Verify we have 3 sources and 3 sinks
+ List<? extends Config> sources = resolved.getConfigList("source");
+ List<? extends Config> sinks = resolved.getConfigList("sink");
+ assertEquals(3, sources.size());
+ assertEquals(3, sinks.size());
+
+ // Verify first source has datasource config merged (flat structure)
+ Config source1 = sources.get(0);
+ // In flat structure, values are directly at root level
+ assertEquals("jdbc:postgresql://metadata:5432/metadata_db",
source1.getString("url"));
+ assertEquals("metadata_user", source1.getString("username"));
+ assertEquals("metadata_password", source1.getString("password"));
+ // The original query should still be there
+ assertEquals("select id, name from table1",
source1.getString("query"));
+
+ // Verify second source has datasource config merged (flat structure)
+ Config source2 = sources.get(1);
+ assertEquals("jdbc:postgresql://metadata:5432/metadata_db",
source2.getString("url"));
+ assertEquals("metadata_user", source2.getString("username"));
+ assertEquals("select id, value from table2",
source2.getString("query"));
+
+ // Verify third source (Jdbc without datasource_id) - keep original
config
+ Config source3 = sources.get(2);
+ assertEquals("jdbc:mysql://localhost:3306", source3.getString("url"));
+ assertEquals("com.mysql.cj.jdbc.Driver", source3.getString("driver"));
+ assertEquals("root", source3.getString("user"));
+ assertEquals("123456", source3.getString("password"));
+ assertEquals("select id, name from table4",
source3.getString("query"));
+
+ // Verify first sink has datasource config merged (flat structure)
+ Config sink1 = sinks.get(0);
+ assertEquals("jdbc:postgresql://metadata:5432/metadata_db",
sink1.getString("url"));
+ assertEquals("metadata_user", sink1.getString("username"));
+ assertEquals("insert into sink_table1 (id, name) values (?, ?)",
sink1.getString("query"));
+
+ // Verify second sink has datasource config merged (flat structure)
+ Config sink2 = sinks.get(1);
+ assertEquals("jdbc:postgresql://metadata:5432/metadata_db",
sink2.getString("url"));
+ assertEquals("insert into sink_table2 (id, value) values (?, ?)",
sink2.getString("query"));
+
+ // Verify third sink (Jdbc without datasource_id) - keep original
config
+ Config sink3 = sinks.get(2);
+ assertEquals("jdbc:mysql://localhost:3306", sink3.getString("url"));
+ assertEquals("com.mysql.cj.jdbc.Driver", sink3.getString("driver"));
+ assertEquals("root", sink3.getString("user"));
+ assertEquals("123456", sink3.getString("password"));
+ assertEquals("insert into sink_table4 (id, name) values (?, ?)",
sink3.getString("query"));
+ }
+
+ @Test
+ public void testResolveDataSourceConfigsWithNoProvider() {
+ // Clear any cached provider
+ DataSourceConfigResolver.clearCache();
+
+ // Load config from file with datasource_id
+ Config jobConfig = loadTestConfig();
+
+ // Create DataSourceConfig with unknown provider kind
+ DataSourceConfig dataSourceConfig = new DataSourceConfig();
+ dataSourceConfig.setEnabled(true);
+ dataSourceConfig.setKind("unknown-provider-kind");
+
+ // Try to resolve with a provider kind that doesn't exist
+ DataSourceProviderException exception =
+ assertThrows(
+ DataSourceProviderException.class,
+ () ->
+
DataSourceConfigResolver.resolveDataSourceConfigs(
+ jobConfig, dataSourceConfig));
+
+ // Verify exception is thrown
+ assertNotNull(exception);
+ assertNotNull(exception.getMessage());
+ }
+
+ /** Helper method to load the test config file. */
+ @SneakyThrows
+ private Config loadTestConfig() {
+ return ConfigFactory.parseFile(
+ Paths.get(
+ Objects.requireNonNull(
+
DataSourceConfigUtilTest.class.getResource(
+
"/conf/datasource-test.conf"))
+ .toURI())
+ .toFile());
+ }
+
+ /**
+ * Helper method to manually register a provider for testing. This is a
workaround since we
+ * can't easily use SPI in unit tests.
+ */
+ private void registerProvider(DataSourceProvider provider) {
+ try {
+ java.lang.reflect.Field providerField =
+
DataSourceConfigResolver.class.getDeclaredField("cachedProvider");
+ providerField.setAccessible(true);
+ providerField.set(null, provider);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to register provider for
testing", e);
+ }
+ }
+
+ /** Mock DataSourceProvider for testing. */
+ public static class MockTestDataSourceProvider implements
DataSourceProvider {
+
+ @Override
+ public String kind() {
+ return TEST_PROVIDER_KIND;
+ }
+
+ @Override
+ public void init(Config config) {
+ // No-op for testing
+ }
+
+ @Override
+ public Map<String, Object> datasourceMap(String connectorIdentifier,
String datasourceId) {
+ // Only support Jdbc connector for testing
+ if (!"Jdbc".equalsIgnoreCase(connectorIdentifier)) {
+ return new HashMap<>();
+ }
+ // Simulate fetching connection config from metadata service
+ Map<String, Object> config = new HashMap<>();
+ config.put("url", "jdbc:postgresql://metadata:5432/metadata_db");
+ config.put("driver", "org.postgresql.Driver");
+ config.put("username", "metadata_user");
+ config.put("password", "metadata_password");
+ return config;
+ }
+
+ @Override
+ public void close() {
+ // No-op for testing
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/resources/conf/datasource-nested-config.yaml
b/seatunnel-engine/seatunnel-engine-common/src/test/resources/conf/datasource-nested-config.yaml
new file mode 100644
index 0000000000..6c12e912ec
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/test/resources/conf/datasource-nested-config.yaml
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Test configuration with nested datasource provider config
+seatunnel:
+ engine:
+ history-job-expire-minutes: 1
+ backup-count: 2
+ queue-type: blockingqueue
+ print-execution-info-interval: 10
+ classloader-cache-mode: false
+ slot-service:
+ dynamic-slot: true
+ checkpoint:
+ interval: 300000
+ timeout: 100000
+ storage:
+ type: localfile
+ max-retained: 3
+ plugin-config:
+ namespace: /tmp/seatunnel/checkpoint_snapshot/
+ http:
+ enable-http: true
+ port: 8080
+ telemetry:
+ metric:
+ enabled: false
+ logs:
+ scheduled-deletion-enable: false
+ datasource:
+ enabled: true
+ kind: test_kind
+ test_kind:
+ test_config1: http://127.0.0.1:8090
+ test_config2: test_metalake
+ test_config3: test
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/resources/conf/datasource-test.conf
b/seatunnel-engine/seatunnel-engine-common/src/test/resources/conf/datasource-test.conf
new file mode 100644
index 0000000000..c1228ca88e
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/test/resources/conf/datasource-test.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ datasource_id = "ds-source-1"
+ query = "select id, name from table1"
+ plugin_output = "001"
+ }
+
+ Jdbc {
+ datasource_id = "ds-source-2"
+ query = "select id, value from table2"
+ plugin_output = "002"
+ }
+
+ Jdbc {
+ url = "jdbc:mysql://localhost:3306"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "123456"
+ query = "select id, name from table4"
+ plugin_output = "004"
+ }
+}
+
+sink {
+ Jdbc {
+ datasource_id = "ds-sink-1"
+ query = "insert into sink_table1 (id, name) values (?, ?)"
+ plugin_input = "001"
+ }
+
+ Jdbc {
+ datasource_id = "ds-sink-2"
+ query = "insert into sink_table2 (id, value) values (?, ?)"
+ plugin_input = "002"
+ }
+
+ Jdbc {
+ url = "jdbc:mysql://localhost:3306"
+ driver = "com.mysql.cj.jdbc.Driver"
+ user = "root"
+ password = "123456"
+ query = "insert into sink_table4 (id, name) values (?, ?)"
+ plugin_input = "004"
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 3c8f43a673..ff474c4a60 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -21,6 +21,8 @@ import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTestin
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueType;
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.shade.org.apache.commons.lang3.tuple.ImmutablePair;
@@ -54,9 +56,11 @@ import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.server.DataSourceConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
+import org.apache.seatunnel.engine.common.utils.DataSourceConfigResolver;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -130,6 +134,8 @@ public class MultipleTableJobConfigParser {
private final boolean isStartWithSavePoint;
private final List<JobPipelineCheckpointData> pipelineCheckpoints;
+ private final DataSourceConfig dataSourceConfig;
+
@VisibleForTesting
public MultipleTableJobConfigParser(
String jobDefineFilePath, IdGenerator idGenerator, JobConfig
jobConfig) {
@@ -145,7 +151,8 @@ public class MultipleTableJobConfigParser {
jobConfig,
Collections.emptyList(),
false,
- Collections.emptyList());
+ Collections.emptyList(),
+ new DataSourceConfig());
}
@VisibleForTesting
@@ -162,7 +169,8 @@ public class MultipleTableJobConfigParser {
jobConfig,
commonPluginJars,
isStartWithSavePoint,
- Collections.emptyList());
+ Collections.emptyList(),
+ new DataSourceConfig());
}
public MultipleTableJobConfigParser(
@@ -172,14 +180,16 @@ public class MultipleTableJobConfigParser {
JobConfig jobConfig,
List<URL> commonPluginJars,
boolean isStartWithSavePoint,
- List<JobPipelineCheckpointData> pipelineCheckpoints) {
+ List<JobPipelineCheckpointData> pipelineCheckpoints,
+ DataSourceConfig dataSourceConfig) {
this(
ConfigBuilder.of(Paths.get(jobDefineFilePath), variables),
idGenerator,
jobConfig,
commonPluginJars,
isStartWithSavePoint,
- pipelineCheckpoints);
+ pipelineCheckpoints,
+ dataSourceConfig);
}
public MultipleTableJobConfigParser(
@@ -188,14 +198,16 @@ public class MultipleTableJobConfigParser {
JobConfig jobConfig,
List<URL> commonPluginJars,
boolean isStartWithSavePoint,
- List<JobPipelineCheckpointData> pipelineCheckpoints) {
+ List<JobPipelineCheckpointData> pipelineCheckpoints,
+ DataSourceConfig dataSourceConfig) {
this.idGenerator = idGenerator;
this.jobConfig = jobConfig;
this.commonPluginJars = commonPluginJars;
this.isStartWithSavePoint = isStartWithSavePoint;
- this.seaTunnelJobConfig =
MetalakeConfigUtils.getMetalakeConfig(seaTunnelJobConfig);
+ this.seaTunnelJobConfig = handleDataSource(seaTunnelJobConfig,
dataSourceConfig);
this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
this.pipelineCheckpoints = pipelineCheckpoints;
+ this.dataSourceConfig = dataSourceConfig;
ConfigValidator.of(this.envOptions).validate(new
EnvOptionRule().optionRule());
}
@@ -841,4 +853,99 @@ public class MultipleTableJobConfigParser {
.collect(Collectors.toList());
return new ChangeStreamTableSourceCheckpoint(coordinatorState,
subtaskState);
}
+
+ private Config handleDataSource(Config seaTunnelJobConfig,
DataSourceConfig dataSourceConfig) {
+ Config tempconfig = seaTunnelJobConfig;
+ // Only resolve datasource configs when:
+ // 1. DataSource is enabled
+ // 2. The job config contains datasource_id in any connector
+ if (dataSourceConfig != null
+ && dataSourceConfig.isEnabled()
+ && hasDatasourceId(seaTunnelJobConfig)) {
+ tempconfig =
+ DataSourceConfigResolver.resolveDataSourceConfigs(
+ seaTunnelJobConfig, dataSourceConfig);
+ }
+ // Compatible with old code
+ tempconfig = MetalakeConfigUtils.getMetalakeConfig(tempconfig);
+ return tempconfig;
+ }
+
+ /**
+ * Checks if the job config contains datasource_id in any connector
configuration.
+ *
+ * @param config the SeaTunnel job configuration
+ * @return true if any connector (source or sink) contains datasource_id,
false otherwise
+ */
+ private boolean hasDatasourceId(Config config) {
+ List<? extends Config> sourceConfigs =
+ TypesafeConfigUtils.getConfigList(
+ config, PluginType.SOURCE.getType(),
Collections.emptyList());
+ for (Config sourceConfig : sourceConfigs) {
+ if (hasDatasourceIdInConnector(sourceConfig)) {
+ return true;
+ }
+ }
+
+ List<? extends Config> sinkConfigs =
+ TypesafeConfigUtils.getConfigList(
+ config, PluginType.SINK.getType(),
Collections.emptyList());
+ for (Config sinkConfig : sinkConfigs) {
+ if (hasDatasourceIdInConnector(sinkConfig)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Checks if a single connector config contains datasource_id.
+ *
+ * @param connectorConfig the connector configuration
+ * @return true if datasource_id is present, false otherwise
+ */
+ private boolean hasDatasourceIdInConnector(Config connectorConfig) {
+ try {
+ // Check at root level
+ if
(connectorConfig.hasPath(ConnectorCommonOptions.DATASOURCE_ID.key())) {
+ return true;
+ }
+
+ // Check inside the nested connector config
+ String connectorIdentifier =
getConnectorIdentifier(connectorConfig);
+ if (!"unknown".equals(connectorIdentifier)) {
+ Config nestedConfig =
connectorConfig.getConfig(connectorIdentifier);
+ if
(nestedConfig.hasPath(ConnectorCommonOptions.DATASOURCE_ID.key())) {
+ return true;
+ }
+ }
+ } catch (Exception e) {
+ log.debug("Failed to check datasource_id in connector config", e);
+ }
+ return false;
+ }
+
+ /**
+ * Gets the connector identifier (plugin name) from a connector config.
+ *
+ * @param config the connector configuration
+ * @return the connector identifier or \”unknown\” if not found
+ */
+ private String getConnectorIdentifier(Config config) {
+ try {
+ if (config.hasPath(ConnectorCommonOptions.PLUGIN_NAME.key())) {
+ return
config.getString(ConnectorCommonOptions.PLUGIN_NAME.key());
+ }
+ } catch (Exception e) {
+ // Ignore, try the nested structure approach
+ }
+ // Fallback: look for nested object structure
+ for (Map.Entry<String, ConfigValue> entry : config.root().entrySet()) {
+ if (entry.getValue().valueType() == ConfigValueType.OBJECT) {
+ return entry.getKey();
+ }
+ }
+ return "unknown";
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 2baebf8c40..4355e5b76d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineRetryableException;
+import org.apache.seatunnel.engine.common.utils.DataSourceConfigResolver;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.classloader.DefaultClassLoaderService;
import
org.apache.seatunnel.engine.server.checkpoint.monitor.CheckpointMonitorService;
@@ -234,6 +235,8 @@ public class SeaTunnelServer
if (eventService != null) {
eventService.shutdownNow();
}
+
+ DataSourceConfigResolver.closeProviders();
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
index 54977563ef..a91228e344 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.shade.org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.server.DataSourceConfig;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment;
@@ -109,13 +110,16 @@ public class RestJobExecutionEnvironment extends
AbstractJobEnvironment {
+ ", cannot start with save point.");
}
}
+ DataSourceConfig dataSourceConfig =
+
seaTunnelServer.getSeaTunnelConfig().getEngineConfig().getDataSourceConfig();
return new MultipleTableJobConfigParser(
seaTunnelJobConfig,
idGenerator,
jobConfig,
commonPluginJars,
isStartWithSavePoint,
- pipelineCheckpoints);
+ pipelineCheckpoints,
+ dataSourceConfig);
}
private List<JobPipelineCheckpointData>
loadPipelineCheckpointsFromMasterNode() {