chl-wxp commented on PR #10586:
URL: https://github.com/apache/seatunnel/pull/10586#issuecomment-4059906585
> ### Issue 1: Provider Repeated Initialization Causes Resource Leak
> **Location**:
`seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:76-107`
>
> **Related Context**:
>
> * Provider factory: `DataSourceProviderFactory.java:50-51` (caches
Provider singleton)
> * Gravitino Provider implementation: `GravitinoDataSourceProvider.java:96`
(creates new client on each init)
> * Caller: `MultipleTableJobConfigParser.java:855-864` (called during job
parsing)
>
> **Problem Description**: The `resolveDataSourceConfigs()` method
repeatedly calls `init()` and `close()` on the globally cached Provider
singleton, leading to:
>
> 1. HTTP clients are created multiple times
> 2. Provider is closed after the first job execution
> 3. The second job execution uses the closed Provider, which may cause
exceptions
>
> **Potential Risks**:
>
> * **Risk 1**: HTTP client is already closed during the second job
execution, metadata retrieval fails
> * **Risk 2**: Resource leak, old clients created by each init are not
properly released
> * **Risk 3**: In SeaTunnel Server long-running scenarios, multiple job
submissions will cause instability
>
> **Impact Scope**:
>
> * Direct impact: All jobs using DataSource SPI
> * Indirect impact: Stability of SeaTunnel Client/Server processes
> * Impact surface: Core framework
>
> **Severity**: **BLOCKER**
>
> **Improvement Suggestions**:
>
> ```java
> // DataSourceConfigUtil.java
> public static Config resolveDataSourceConfigs(
> Config seaTunnelJobConfig, DataSourceConfig dataSourceConfig) {
> if (!dataSourceConfig.isEnabled()) {
> log.debug("DataSource Center is disabled, returning original
config");
> return seaTunnelJobConfig;
> }
>
> String providerKind = dataSourceConfig.getKind();
> log.info("Starting datasource config resolution with provider: {}",
providerKind);
>
> // Get Provider singleton (no duplicate init)
> DataSourceProvider provider = getOrCreateProvider(dataSourceConfig);
>
> // Only on first initialization
> if (!isProviderInitialized(provider)) {
>
provider.init(ConfigFactory.parseMap(dataSourceConfig.getProperties()));
> markProviderInitialized(provider);
> }
>
> // ... existing parsing logic ...
>
> // ❌ Remove provider.close()
> // provider.close();
>
> return ConfigFactory.parseMap(resultMap);
> }
>
> // Add initialization state management in DataSourceProviderFactory
> private static final ConcurrentMap<String, Boolean> INITIALIZED_PROVIDERS
= new ConcurrentHashMap<>();
> ```
>
> **Rationale**:
>
> * Provider should be a process-level singleton, managed by the framework
lifecycle
> * Initialize at process startup, clean up at process shutdown
> * Each job submission only uses the already initialized Provider, should
not affect its state
>
> ### Issue 2: Mapper Cache Key Does Not Include Provider Type
> **Location**:
`seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:268-282`
>
> **Related Context**:
>
> * Cache definition: `DataSourceConfigUtil.java:56-57`
> * Provider interface: `DataSourceProvider.java:98` (returns Mapper
collection)
>
> **Problem Description**: `MAPPER_CACHE` only uses `connectorIdentifier` as
the cache key. When multiple Providers implement the same Connector's Mapper,
the cache will be mixed.
>
> **Potential Risks**:
>
> * **Risk 1**: When switching Provider kind, the old Provider's Mapper is
still used
> * **Risk 2**: If two Providers' Mapper implementations have different
logic, it will cause configuration parsing errors
>
> **Impact Scope**:
>
> * Direct impact: Scenarios where multiple Providers coexist
> * Indirect impact: Correctness when switching Providers
> * Impact surface: Multiple Connectors
>
> **Severity**: **CRITICAL**
>
> **Improvement Suggestions**:
>
> ```java
> private static DataSourceMapper findMapper(
> DataSourceProvider provider, String connectorIdentifier) {
>
> String cacheKey = provider.kind() + "_" + connectorIdentifier; // ✅
Composite key
>
> return MAPPER_CACHE.computeIfAbsent(
> cacheKey,
> k -> {
> for (DataSourceMapper mapper :
provider.dataSourceMappers()) {
> if
(mapper.connectorIdentifier().equalsIgnoreCase(connectorIdentifier)) {
> return mapper;
> }
> }
> return null;
> });
> }
> ```
>
> **Rationale**:
>
> * Mapper implementation is bound to Provider, same-name Mappers from
different Providers may have different logic
> * Cache key must include Provider type to ensure uniqueness
>
> ### Issue 3: Configuration Merge Strategy Does Not Match Documentation
> **Location**:
`seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:319-342`
>
> **Related Context**:
>
> * Documentation: `docs/en/introduction/concepts/datasource-spi.md:51-52`
> * Connector configuration: All Connectors use datasource_id
>
> **Problem Description**: The code implementation is "external
configuration overrides job configuration", but the documentation states "job
configuration takes priority". This prevents users from overriding incorrect
external system configurations in job configuration.
>
> **Potential Risks**:
>
> * **Risk 1**: When external metadata system configuration is incorrect,
users cannot temporarily override and fix it
> * **Risk 2**: Violates user intuition, increases configuration debugging
difficulty
>
> **Impact Scope**:
>
> * Direct impact: All Connectors using datasource_id
> * Indirect impact: Configuration predictability
> * Impact surface: All Connectors
>
> **Severity**: **MAJOR**
>
> **Improvement Suggestions**:
>
> Solution 1: Modify code to make job configuration take priority
>
> ```java
> for (Map.Entry<String, Object> entry : originalMap.entrySet()) {
> String key = entry.getKey();
> Object value = entry.getValue();
> // ✅ Original config takes priority, external config only supplements
missing fields
> if (!mergedMap.containsKey(key)) {
> mergedMap.put(key, datasourceConfig.get(key));
> }
> }
> ```
>
> Solution 2: Modify documentation to state external configuration takes
priority
>
> ```
> When `datasource_id` is specified, the connector will:
> 1. Use the `datasource_id` to fetch connection details from the external
metadata service
> 2. Merge the fetched configuration with any additional parameters in the
job config
> 3. **Fetched configuration takes precedence over job-level parameters**
> ```
>
> **Rationale**:
>
> * Solution 1 (job configuration takes priority) is recommended because
users need an "escape hatch"
> * When external system configuration is incorrect, users can temporarily
override in job configuration
> * Follows the principle of least surprise
>
> ### Issue 4: HTTP Client Not Configured With Timeout
> **Location**:
`seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:62-64`
>
> **Related Context**:
>
> * HTTP request: `GravitinoClient.java:117-163` (executeGetRequest)
> * Provider resource management: `GravitinoDataSourceProvider.java:56`
(client field)
>
> **Problem Description**: Using the default `HttpClients.createDefault()`
without configuring connection timeout and read timeout may cause jobs to hang
indefinitely.
>
> **Potential Risks**:
>
> * **Risk 1**: When Gravitino service is unresponsive, job execution hangs
indefinitely
> * **Risk 2**: Cannot set reasonable timeout strategy, affecting fault
recovery
>
> **Impact Scope**:
>
> * Direct impact: All jobs using Gravitino
> * Indirect impact: Timeout control of job execution
> * Impact surface: Core framework
>
> **Severity**: **MAJOR**
>
> **Improvement Suggestions**:
>
> ```java
> public GravitinoClient() {
> RequestConfig config = RequestConfig.custom()
> .setConnectTimeout(5000) // 5 second connection timeout
> .setConnectionRequestTimeout(5000) // 5 second request timeout
> .setSocketTimeout(30000) // 30 second read timeout
> .build();
>
> this.httpClient = HttpClients.custom()
> .setDefaultRequestConfig(config)
> .setMaxConnTotal(50) // Maximum number of connections
> .setMaxConnPerRoute(20) // Maximum number of connections per
route
> .build();
> }
> ```
>
> **Rationale**:
>
> * Must set timeout to avoid jobs hanging indefinitely
> * Connection pool configuration can improve concurrent performance
> * Timeout value should be configurable, but default value needs to be
reasonable
>
> ### Issue 5: Thread Safety of Mapper in Concurrent Scenarios
> **Location**:
`seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/gravitino/GravitinoJdbcDataSourceMapper.java:78-81`
>
> **Related Context**:
>
> * Mapper interface documentation: `DataSourceMapper.java:39-42` (requires
thread safety)
> * Mapper cache: `DataSourceConfigUtil.java:56-57`
>
> **Problem Description**: `GravitinoJdbcDataSourceMapper` holds a shared
`GravitinoClient` instance. Although `CloseableHttpClient`'s `execute` method
is thread-safe, this design does not meet the interface documentation's thread
safety requirements.
>
> **Potential Risks**:
>
> * **Risk 1**: If `GravitinoClient` adds state later, concurrency issues
may arise
> * **Risk 2**: Violates interface contract, may introduce bugs during
future maintenance
>
> **Impact Scope**:
>
> * Direct impact: Concurrent job submission scenarios
> * Indirect impact: System concurrent stability
> * Impact surface: Core framework
>
> **Severity**: **MAJOR**
>
> **Improvement Suggestions**:
>
> Solution 1: Make Mapper stateless
>
> ```java
> public class GravitinoJdbcDataSourceMapper implements DataSourceMapper {
> private final String catalogBaseUrl;
> // ❌ Remove client field
>
> public GravitinoJdbcDataSourceMapper(String catalogBaseUrl) {
> this.catalogBaseUrl = catalogBaseUrl;
> }
>
> @Override
> public Map<String, Object> map(String datasourceId) {
> // ✅ Create temporary client on each call
> try (GravitinoClient client = new GravitinoClient()) {
> JsonNode propertiesNode = client.getMetaInfo(datasourceId,
catalogBaseUrl);
> return convertToJdbcConfig(propertiesNode);
> } catch (IOException e) {
> throw new RuntimeException(...);
> }
> }
> }
> ```
>
> Solution 2: Ensure GravitinoClient is thread-safe
>
> ```java
> // GravitinoClient.java
> @Override
> public JsonNode getMetaInfo(String sourceId, String metalakeUrl) throws
IOException {
> // ✅ Use synchronized to ensure thread safety
> synchronized (httpClient) {
> return executeGetRequest(metalakeUrl + sourceId);
> }
> }
> ```
>
> **Rationale**:
>
> * Solution 1 better aligns with interface contract, but has poorer
performance
> * Solution 2 requires clear documentation of GravitinoClient's thread
safety
> * In actual use, Solution 1 is recommended because HTTP calls are not
frequent operations
>
> ### Issue 6: Improper Handling of Null Mapper Return Values
> **Location**:
`seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:165-183`
>
> **Related Context**:
>
> * Mapper interface: `DataSourceMapper.java:65` (return may be null)
> * Error handling: `DataSourceConfigUtil.java:188-196`
>
> **Problem Description**: When Mapper is not found or returns empty
configuration, it silently returns the original configuration, which may lead
to users using incorrect connection configuration without noticing.
>
> **Potential Risks**:
>
> * **Risk 1**: User configured `datasource_id`, but actually used
connection information from job configuration
> * **Risk 2**: May connect to the wrong database, causing data security
issues
>
> **Impact Scope**:
>
> * Direct impact: Troubleshooting configuration errors
> * Indirect impact: Data security
> * Impact surface: All Connectors
>
> **Severity**: **MAJOR**
>
> **Improvement Suggestions**:
>
> ```java
> DataSourceMapper mapper = findMapper(provider, connectorIdentifier);
>
> if (mapper == null) {
> throw new DataSourceProviderException(
> String.format("No DataSourceMapper found for connector '%s' in
provider '%s'. " +
> "Please check if the connector is supported by the
provider.",
> connectorIdentifier, providerKind));
> }
>
> Map<String, Object> datasourceConfig = mapper.map(datasourceId);
>
> if (datasourceConfig == null || datasourceConfig.isEmpty()) {
> throw new DataSourceProviderException(
> String.format("DataSourceMapper returned empty config for
datasource_id: '%s'. " +
> "Please check if the datasource exists in the
external system.",
> datasourceId));
> }
> ```
>
> **Rationale**:
>
> * Fail-fast principle: Configuration errors should fail fast
> * Avoid data security issues caused by silent failures
> * Clear error messages for easier troubleshooting
>
> ### Issue 7: Logs May Leak Sensitive Information
> **Location**:
`seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:324-328`
>
> **Related Context**:
>
> * Log level: Uses `log.debug`
> * Sensitive fields: password, secret, token, etc.
>
> **Problem Description**: DEBUG logs record all configuration values,
including passwords. If DEBUG logging is enabled, it may cause sensitive
information leakage.
>
> **Potential Risks**:
>
> * **Risk 1**: After enabling DEBUG logs in production environment,
passwords are recorded to log files
> * **Risk 2**: Log collection and analysis systems may store sensitive
information
>
> **Impact Scope**:
>
> * Direct impact: Log security
> * Indirect impact: Compliance
> * Impact surface: Core framework
>
> **Severity**: **MINOR**
>
> **Improvement Suggestions**:
>
> ```java
> log.debug("Merging datasource config: key={}, datasource_id={}",
> key, datasourceId); // ❌ Remove value
>
> // Or add sensitive field filtering
> String safeValue = isSensitiveField(key) ? "***" : String.valueOf(value);
> log.debug("Merging datasource config: key={}, value={}, datasource_id={}",
> key, safeValue, datasourceId);
>
> private boolean isSensitiveField(String key) {
> return key.toLowerCase().contains("password") ||
> key.toLowerCase().contains("secret") ||
> key.toLowerCase().contains("token");
> }
> ```
>
> **Rationale**:
>
> * Security best practice: Do not record sensitive information in logs
> * Even at DEBUG level, security principles should be followed
updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]