chl-wxp commented on PR #10586:
URL: https://github.com/apache/seatunnel/pull/10586#issuecomment-4059906585

   > ### Issue 1: Provider Repeated Initialization Causes Resource Leak
   > **Location**: 
`seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:76-107`
   > 
   > **Related Context**:
   > 
   > * Provider factory: `DataSourceProviderFactory.java:50-51` (caches 
Provider singleton)
   > * Gravitino Provider implementation: `GravitinoDataSourceProvider.java:96` 
(creates new client on each init)
   > * Caller: `MultipleTableJobConfigParser.java:855-864` (called during job 
parsing)
   > 
   > **Problem Description**: The `resolveDataSourceConfigs()` method 
repeatedly calls `init()` and `close()` on the globally cached Provider 
singleton, leading to:
   > 
   > 1. HTTP clients are created multiple times
   > 2. Provider is closed after the first job execution
   > 3. The second job execution uses the closed Provider, which may cause 
exceptions
   > 
   > **Potential Risks**:
   > 
   > * **Risk 1**: HTTP client is already closed during the second job 
execution, metadata retrieval fails
   > * **Risk 2**: Resource leak, old clients created by each init are not 
properly released
   > * **Risk 3**: In SeaTunnel Server long-running scenarios, multiple job 
submissions will cause instability
   > 
   > **Impact Scope**:
   > 
   > * Direct impact: All jobs using DataSource SPI
   > * Indirect impact: Stability of SeaTunnel Client/Server processes
   > * Impact surface: Core framework
   > 
   > **Severity**: **BLOCKER**
   > 
   > **Improvement Suggestions**:
   > 
   > ```java
   > // DataSourceConfigUtil.java
   > public static Config resolveDataSourceConfigs(
   >         Config seaTunnelJobConfig, DataSourceConfig dataSourceConfig) {
   >     if (!dataSourceConfig.isEnabled()) {
   >         log.debug("DataSource Center is disabled, returning original 
config");
   >         return seaTunnelJobConfig;
   >     }
   > 
   >     String providerKind = dataSourceConfig.getKind();
   >     log.info("Starting datasource config resolution with provider: {}", 
providerKind);
   > 
   >     // Get Provider singleton (no duplicate init)
   >     DataSourceProvider provider = getOrCreateProvider(dataSourceConfig);
   >     
   >     // Only on first initialization
   >     if (!isProviderInitialized(provider)) {
   >         
provider.init(ConfigFactory.parseMap(dataSourceConfig.getProperties()));
   >         markProviderInitialized(provider);
   >     }
   >     
   >     // ... existing parsing logic ...
   >     
   >     // ❌ Remove provider.close()
   >     // provider.close();
   >     
   >     return ConfigFactory.parseMap(resultMap);
   > }
   > 
   > // Add initialization state management in DataSourceProviderFactory
   > private static final ConcurrentMap<String, Boolean> INITIALIZED_PROVIDERS 
= new ConcurrentHashMap<>();
   > ```
   > 
   > **Rationale**:
   > 
   > * Provider should be a process-level singleton, managed by the framework 
lifecycle
   > * Initialize at process startup, clean up at process shutdown
   > * Each job submission only uses the already initialized Provider, should 
not affect its state
   > 
   > ### Issue 2: Mapper Cache Key Does Not Include Provider Type
   > **Location**: 
`seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:268-282`
   > 
   > **Related Context**:
   > 
   > * Cache definition: `DataSourceConfigUtil.java:56-57`
   > * Provider interface: `DataSourceProvider.java:98` (returns Mapper 
collection)
   > 
   > **Problem Description**: `MAPPER_CACHE` only uses `connectorIdentifier` as 
the cache key. When multiple Providers implement the same Connector's Mapper, 
the cache will be mixed.
   > 
   > **Potential Risks**:
   > 
   > * **Risk 1**: When switching Provider kind, the old Provider's Mapper is 
still used
   > * **Risk 2**: If two Providers' Mapper implementations have different 
logic, it will cause configuration parsing errors
   > 
   > **Impact Scope**:
   > 
   > * Direct impact: Scenarios where multiple Providers coexist
   > * Indirect impact: Correctness when switching Providers
   > * Impact surface: Multiple Connectors
   > 
   > **Severity**: **CRITICAL**
   > 
   > **Improvement Suggestions**:
   > 
   > ```java
   > private static DataSourceMapper findMapper(
   >         DataSourceProvider provider, String connectorIdentifier) {
   > 
   >     String cacheKey = provider.kind() + "_" + connectorIdentifier;  // ✅ 
Composite key
   > 
   >     return MAPPER_CACHE.computeIfAbsent(
   >             cacheKey,
   >             k -> {
   >                 for (DataSourceMapper mapper : 
provider.dataSourceMappers()) {
   >                     if 
(mapper.connectorIdentifier().equalsIgnoreCase(connectorIdentifier)) {
   >                         return mapper;
   >                     }
   >                 }
   >                 return null;
   >             });
   > }
   > ```
   > 
   > **Rationale**:
   > 
   > * Mapper implementation is bound to Provider, same-name Mappers from 
different Providers may have different logic
   > * Cache key must include Provider type to ensure uniqueness
   > 
   > ### Issue 3: Configuration Merge Strategy Does Not Match Documentation
   > **Location**: 
`seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:319-342`
   > 
   > **Related Context**:
   > 
   > * Documentation: `docs/en/introduction/concepts/datasource-spi.md:51-52`
   > * Connector configuration: All Connectors use datasource_id
   > 
   > **Problem Description**: The code implementation is "external 
configuration overrides job configuration", but the documentation states "job 
configuration takes priority". This prevents users from overriding incorrect 
external system configurations in job configuration.
   > 
   > **Potential Risks**:
   > 
   > * **Risk 1**: When external metadata system configuration is incorrect, 
users cannot temporarily override and fix it
   > * **Risk 2**: Violates user intuition, increases configuration debugging 
difficulty
   > 
   > **Impact Scope**:
   > 
   > * Direct impact: All Connectors using datasource_id
   > * Indirect impact: Configuration predictability
   > * Impact surface: All Connectors
   > 
   > **Severity**: **MAJOR**
   > 
   > **Improvement Suggestions**:
   > 
   > Solution 1: Modify code to make job configuration take priority
   > 
   > ```java
   > for (Map.Entry<String, Object> entry : originalMap.entrySet()) {
   >     String key = entry.getKey();
   >     Object value = entry.getValue();
   >     // ✅ Original config takes priority, external config only supplements 
missing fields
   >     if (!mergedMap.containsKey(key)) {
   >         mergedMap.put(key, datasourceConfig.get(key));
   >     }
   > }
   > ```
   > 
   > Solution 2: Modify documentation to state external configuration takes 
priority
   > 
   > ```
   > When `datasource_id` is specified, the connector will:
   > 1. Use the `datasource_id` to fetch connection details from the external 
metadata service
   > 2. Merge the fetched configuration with any additional parameters in the 
job config
   > 3. **Fetched configuration takes precedence over job-level parameters**
   > ```
   > 
   > **Rationale**:
   > 
   > * Solution 1 (job configuration takes priority) is recommended because 
users need an "escape hatch"
   > * When external system configuration is incorrect, users can temporarily 
override in job configuration
   > * Follows the principle of least surprise
   > 
   > ### Issue 4: HTTP Client Not Configured With Timeout
   > **Location**: 
`seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:62-64`
   > 
   > **Related Context**:
   > 
   > * HTTP request: `GravitinoClient.java:117-163` (executeGetRequest)
   > * Provider resource management: `GravitinoDataSourceProvider.java:56` 
(client field)
   > 
   > **Problem Description**: Using the default `HttpClients.createDefault()` 
without configuring connection timeout and read timeout may cause jobs to hang 
indefinitely.
   > 
   > **Potential Risks**:
   > 
   > * **Risk 1**: When Gravitino service is unresponsive, job execution hangs 
indefinitely
   > * **Risk 2**: Cannot set reasonable timeout strategy, affecting fault 
recovery
   > 
   > **Impact Scope**:
   > 
   > * Direct impact: All jobs using Gravitino
   > * Indirect impact: Timeout control of job execution
   > * Impact surface: Core framework
   > 
   > **Severity**: **MAJOR**
   > 
   > **Improvement Suggestions**:
   > 
   > ```java
   > public GravitinoClient() {
   >     RequestConfig config = RequestConfig.custom()
   >         .setConnectTimeout(5000)      // 5 second connection timeout
   >         .setConnectionRequestTimeout(5000)  // 5 second request timeout
   >         .setSocketTimeout(30000)      // 30 second read timeout
   >         .build();
   >     
   >     this.httpClient = HttpClients.custom()
   >         .setDefaultRequestConfig(config)
   >         .setMaxConnTotal(50)          // Maximum number of connections
   >         .setMaxConnPerRoute(20)       // Maximum number of connections per 
route
   >         .build();
   > }
   > ```
   > 
   > **Rationale**:
   > 
   > * Must set timeout to avoid jobs hanging indefinitely
   > * Connection pool configuration can improve concurrent performance
   > * Timeout value should be configurable, but default value needs to be 
reasonable
   > 
   > ### Issue 5: Thread Safety of Mapper in Concurrent Scenarios
   > **Location**: 
`seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/gravitino/GravitinoJdbcDataSourceMapper.java:78-81`
   > 
   > **Related Context**:
   > 
   > * Mapper interface documentation: `DataSourceMapper.java:39-42` (requires 
thread safety)
   > * Mapper cache: `DataSourceConfigUtil.java:56-57`
   > 
   > **Problem Description**: `GravitinoJdbcDataSourceMapper` holds a shared 
`GravitinoClient` instance. Although `CloseableHttpClient`'s `execute` method 
is thread-safe, this design does not meet the interface documentation's thread 
safety requirements.
   > 
   > **Potential Risks**:
   > 
   > * **Risk 1**: If `GravitinoClient` adds state later, concurrency issues 
may arise
   > * **Risk 2**: Violates interface contract, may introduce bugs during 
future maintenance
   > 
   > **Impact Scope**:
   > 
   > * Direct impact: Concurrent job submission scenarios
   > * Indirect impact: System concurrent stability
   > * Impact surface: Core framework
   > 
   > **Severity**: **MAJOR**
   > 
   > **Improvement Suggestions**:
   > 
   > Solution 1: Make Mapper stateless
   > 
   > ```java
   > public class GravitinoJdbcDataSourceMapper implements DataSourceMapper {
   >     private final String catalogBaseUrl;
   >     // ❌ Remove client field
   >     
   >     public GravitinoJdbcDataSourceMapper(String catalogBaseUrl) {
   >         this.catalogBaseUrl = catalogBaseUrl;
   >     }
   >     
   >     @Override
   >     public Map<String, Object> map(String datasourceId) {
   >         // ✅ Create temporary client on each call
   >         try (GravitinoClient client = new GravitinoClient()) {
   >             JsonNode propertiesNode = client.getMetaInfo(datasourceId, 
catalogBaseUrl);
   >             return convertToJdbcConfig(propertiesNode);
   >         } catch (IOException e) {
   >             throw new RuntimeException(...);
   >         }
   >     }
   > }
   > ```
   > 
   > Solution 2: Ensure GravitinoClient is thread-safe
   > 
   > ```java
   > // GravitinoClient.java
   > @Override
   > public JsonNode getMetaInfo(String sourceId, String metalakeUrl) throws 
IOException {
   >     // ✅ Use synchronized to ensure thread safety
   >     synchronized (httpClient) {
   >         return executeGetRequest(metalakeUrl + sourceId);
   >     }
   > }
   > ```
   > 
   > **Rationale**:
   > 
   > * Solution 1 better aligns with interface contract, but has poorer 
performance
   > * Solution 2 requires clear documentation of GravitinoClient's thread 
safety
   > * In actual use, Solution 1 is recommended because HTTP calls are not 
frequent operations
   > 
   > ### Issue 6: Improper Handling of Null Mapper Return Values
   > **Location**: 
`seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:165-183`
   > 
   > **Related Context**:
   > 
   > * Mapper interface: `DataSourceMapper.java:65` (return may be null)
   > * Error handling: `DataSourceConfigUtil.java:188-196`
   > 
   > **Problem Description**: When Mapper is not found or returns empty 
configuration, it silently returns the original configuration, which may lead 
to users using incorrect connection configuration without noticing.
   > 
   > **Potential Risks**:
   > 
   > * **Risk 1**: User configured `datasource_id`, but actually used 
connection information from job configuration
   > * **Risk 2**: May connect to the wrong database, causing data security 
issues
   > 
   > **Impact Scope**:
   > 
   > * Direct impact: Troubleshooting configuration errors
   > * Indirect impact: Data security
   > * Impact surface: All Connectors
   > 
   > **Severity**: **MAJOR**
   > 
   > **Improvement Suggestions**:
   > 
   > ```java
   > DataSourceMapper mapper = findMapper(provider, connectorIdentifier);
   > 
   > if (mapper == null) {
   >     throw new DataSourceProviderException(
   >         String.format("No DataSourceMapper found for connector '%s' in 
provider '%s'. " +
   >                       "Please check if the connector is supported by the 
provider.",
   >                       connectorIdentifier, providerKind));
   > }
   > 
   > Map<String, Object> datasourceConfig = mapper.map(datasourceId);
   > 
   > if (datasourceConfig == null || datasourceConfig.isEmpty()) {
   >     throw new DataSourceProviderException(
   >         String.format("DataSourceMapper returned empty config for 
datasource_id: '%s'. " +
   >                       "Please check if the datasource exists in the 
external system.",
   >                       datasourceId));
   > }
   > ```
   > 
   > **Rationale**:
   > 
   > * Fail-fast principle: Configuration errors should fail fast
   > * Avoid data security issues caused by silent failures
   > * Clear error messages for easier troubleshooting
   > 
   > ### Issue 7: Logs May Leak Sensitive Information
   > **Location**: 
`seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:324-328`
   > 
   > **Related Context**:
   > 
   > * Log level: Uses `log.debug`
   > * Sensitive fields: password, secret, token, etc.
   > 
   > **Problem Description**: DEBUG logs record all configuration values, 
including passwords. If DEBUG logging is enabled, it may cause sensitive 
information leakage.
   > 
   > **Potential Risks**:
   > 
   > * **Risk 1**: After enabling DEBUG logs in production environment, 
passwords are recorded to log files
   > * **Risk 2**: Log collection and analysis systems may store sensitive 
information
   > 
   > **Impact Scope**:
   > 
   > * Direct impact: Log security
   > * Indirect impact: Compliance
   > * Impact surface: Core framework
   > 
   > **Severity**: **MINOR**
   > 
   > **Improvement Suggestions**:
   > 
   > ```java
   > log.debug("Merging datasource config: key={}, datasource_id={}",
   >         key, datasourceId);  // ❌ Remove value
   > 
   > // Or add sensitive field filtering
   > String safeValue = isSensitiveField(key) ? "***" : String.valueOf(value);
   > log.debug("Merging datasource config: key={}, value={}, datasource_id={}",
   >         key, safeValue, datasourceId);
   > 
   > private boolean isSensitiveField(String key) {
   >     return key.toLowerCase().contains("password") ||
   >            key.toLowerCase().contains("secret") ||
   >            key.toLowerCase().contains("token");
   > }
   > ```
   > 
   > **Rationale**:
   > 
   > * Security best practice: Do not record sensitive information in logs
   > * Even at DEBUG level, security principles should be followed
   
   updated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to