DanielCarter-stack commented on PR #10402:
URL: https://github.com/apache/seatunnel/pull/10402#issuecomment-3853838337

   <!-- code-pr-reviewer -->
   <!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10402", "part": 1, 
"total": 1} -->
   ### Issue 1: HttpClient Resource Leak
   
   **Location**: 
`seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:54-150`
   
   ```java
   private static volatile CloseableHttpClient httpClient;
   
   private CloseableHttpClient getHttpClient() {
       if (httpClient == null) {
           synchronized (GravitinoClient.class) {
               if (httpClient == null) {
                   httpClient = HttpClients.createDefault();
               }
           }
       }
       return httpClient;
   }
   ```
   
   **Related Context**:
   - Caller: `GravitinoClient.executeGetRequest()` (line 95)
   - HTTP request retry loop: lines 98-115
   - CloseableHttpResponse using try-with-resources: line 102
   
   **Issue Description**:
   `CloseableHttpClient` implements the `AutoCloseable` interface and manages 
connection pool and thread pool resources. The current code uses a singleton 
pattern but never calls the `close()` method, resulting in:
   1. Connections in the connection pool are never released
   2. Threads in the thread pool are never terminated
   3. In long-running processes, if the classloader reloads, multiple 
HttpClient instances will accumulate, exacerbating the resource leak
   
   **Potential Risks**:
   - **Risk 1**: File descriptor leak. On Linux systems, each TCP connection 
occupies a file descriptor. Long-running processes may lead to "Too many open 
files" errors.
   - **Risk 2**: Thread leak. HttpClient uses a connection pool by default and 
maintains threads internally. Not closing it causes thread accumulation.
   - **Risk 3**: Memory leak. Resources such as connection pools and DNS 
resolution caches will not be garbage collected.
   
   **Impact Scope**:
   - **Direct Impact**: All SeaTunnel jobs using `schema_url`
   - **Indirect Impact**: Long-running SeaTunnel Engine services
   - **Affected Area**: Core framework (seatunnel-api)
   
   **Severity**: BLOCKER
   
   **Improvement Suggestions**:
   
   ```java
   // Solution 1: Use shutdown hook (recommended)
   private static volatile CloseableHttpClient httpClient;
   private static volatile boolean closed = false;
   
   private CloseableHttpClient getHttpClient() {
       if (closed) {
           throw new IllegalStateException("HttpClient has been closed");
       }
       if (httpClient == null) {
           synchronized (GravitinoClient.class) {
               if (httpClient == null && !closed) {
                   httpClient = HttpClients.createDefault();
                   // Register JVM shutdown hook
                   Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                       try {
                           if (httpClient != null) {
                               httpClient.close();
                           }
                       } catch (IOException e) {
                           // Ignore shutdown exceptions
                       }
                   }));
               }
           }
       }
       return httpClient;
   }
   
   // Solution 2: Do not use singleton, create new instance each time (simple 
but poor performance)
   private JsonNode executeGetRequest(String url) throws IOException {
       IOException lastException = null;
       for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
           HttpGet request = new HttpGet(url);
           request.addHeader(HEADER_ACCEPT, MEDIA_TYPE_GRAVITINO_V1);
           
           try (CloseableHttpClient client = HttpClients.createDefault();
                CloseableHttpResponse response = client.execute(request)) {
               HttpEntity entity = response.getEntity();
               if (entity == null) {
                   throw new RuntimeException(ERROR_NO_RESPONSE_ENTITY);
               }
               try {
                   return JsonUtils.readTree(entity.getContent());
               } finally {
                   EntityUtils.consume(entity);
               }
           } catch (IOException e) {
               lastException = e;
           }
       }
       throw lastException;
   }
   
   // Solution 3: Implement HttpClient lifecycle management (most complete but 
complex)
   public class GravitinoClient implements MetalakeClient, AutoCloseable {
       private CloseableHttpClient httpClient;
       
       private synchronized CloseableHttpClient getHttpClient() {
           if (httpClient == null) {
               httpClient = HttpClients.createDefault();
           }
           return httpClient;
       }
       
       @Override
       public void close() throws IOException {
           if (httpClient != null) {
               httpClient.close();
               httpClient = null;
           }
       }
   }
   ```
   
   **Rationale**:
   - Option 1 balances performance (singleton reuse) and resource management 
(shutdown hook)
   - Option 2 is simple but has performance overhead (creating new client each 
time)
   - Option 3 is most complete but requires modifying caller code to ensure 
`close()` is called
   
   ---
   
   ### Issue 2: factoryIdentifier Misused as Catalog Name
   
   **Location**: 
`seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java:48-52`
   
   ```java
   default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext 
context) {
       final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
               new TableSchemaDiscoverer(context, factoryIdentifier());
       return metaLakeSchemaDiscoverer.discoverTableSchemas();
   }
   ```
   
   **Related Context**:
   - Interface definition: `TableSourceFactory.java:24` (`String 
factoryIdentifier()`)
   - Caller: `LocalFileSourceFactory.createSource()` (connector-file-local)
   - catalogName usage: `TableSchemaDiscoverer.java:122, 136, 147`
   
   **Issue Description**:
   `factoryIdentifier()` returns a connector identifier (such as "LocalFile", 
"HdfsFile"), which is not the semantic meaning of catalog. In 
`TableSchemaDiscoverer`, catalogName is used to:
   1. Construct the catalog name for `CatalogTable`
   2. As part of `TableIdentifier.of(catalogName, tablePath)`
   
   This results in:
   - When users configure `schema { table = "mydb.mytable" }`, the final 
catalog name will be replaced with "LocalFile"
   - Cannot correctly distinguish between different catalogs (such as 
production and test environments)
   
   **Potential Risks**:
   - **Risk 1**: Catalog name semantic confusion, making it difficult for users 
to understand why the catalog name became the connector name
   - **Risk 2**: If users specify a catalog name in the configuration (such as 
`table = "prod_db.table1"`), it will be overwritten
   
   **Impact Scope**:
   - **Direct Impact**: All scenarios using `schema_url` or `schema.fields`
   - **Indirect Impact**: Downstream functionality that relies on correct 
catalog names (such as multi-catalog writes)
   - **Affected Area**: All file connectors
   
   **Severity**: CRITICAL
   
   **Improvement Suggestions**:
   
   ```java
   // Solution 1: Parse catalog name from configuration (recommended)
   default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext 
context) {
       // Get catalog name from configuration, use factoryIdentifier as default 
if not available
       String catalogName = context.getOptions()
           .getOptional(ConnectorCommonOptions.CATALOG_NAME)  // Need to add 
this option
           .orElse(factoryIdentifier());
       
       final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
               new TableSchemaDiscoverer(context, catalogName);
       return metaLakeSchemaDiscoverer.discoverTableSchemas();
   }
   
   // Solution 2: Parse catalog name from table path
   default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext 
context) {
       // Check the table property in configuration, extract catalog if it's a 
three-part name
       String catalogName = factoryIdentifier();
       
       Optional<String> configuredTable = context.getOptions()
           .getOptional(ConnectorCommonOptions.SCHEMA)
           .flatMap(schema -> schema.get("table"))
           .map(String::valueOf);
       
       if (configuredTable.isPresent()) {
           String tablePath = configuredTable.get();
           String[] parts = tablePath.split("\\.");
           if (parts.length == 3) {
               catalogName = parts[0];  // Use the configured catalog
           }
       }
       
       final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
               new TableSchemaDiscoverer(context, catalogName);
       return metaLakeSchemaDiscoverer.discoverTableSchemas();
   }
   
   // Solution 3: Do not use factoryIdentifier, use fixed default value
   default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext 
context) {
       final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
               new TableSchemaDiscoverer(context, "default");  // Use "default" 
as the default catalog
       return metaLakeSchemaDiscoverer.discoverTableSchemas();
   }
   ```
   
   **Rationale**:
   - Option 1 is most flexible, allowing users to customize catalog names
   - Option 2 has best compatibility, inferring catalog from existing 
configuration
   - Option 3 is simplest and aligns with most users' expectations (using 
"default" catalog)
   
   ---
   
   ### Issue 3: HTTP Retry Mechanism Lacks Backoff Strategy and Exception 
Classification
   
   **Location**: 
`seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:95-118`
   
   ```java
   private JsonNode executeGetRequest(String url) throws IOException {
       IOException lastException = null;
   
       for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
           HttpGet request = new HttpGet(url);
           request.addHeader(HEADER_ACCEPT, MEDIA_TYPE_GRAVITINO_V1);
   
           try (CloseableHttpResponse response = 
getHttpClient().execute(request)) {
               HttpEntity entity = response.getEntity();
               if (entity == null) {
                   throw new RuntimeException(ERROR_NO_RESPONSE_ENTITY);
               }
               try {
                   return JsonUtils.readTree(entity.getContent());
               } finally {
                   EntityUtils.consume(entity);
               }
           } catch (IOException e) {
               lastException = e;
           }
       }
   
       throw lastException;
   }
   ```
   
   **Related Context**:
   - Constant definitions: `MAX_RETRY_ATTEMPTS = 2` (line 50)
   - Caller: `getTableSchema()` (line 69)
   
   **Issue Description**:
   The current retry mechanism has the following issues:
   1. **No delay**: Retries are executed immediately, which may cause a 
thundering herd effect
   2. **No exception classification**: All IOExceptions are retried, but 
certain exceptions should not be retried (such as DNS resolution failures)
   3. **Missing exponential backoff**: Fixed number of retries, no delay 
adjustment based on failure count
   4. **Exception context loss**: The final thrown exception does not include 
URL and retry count information
   
   **Potential Risks**:
   - **Risk 1**: When the Gravitino service is temporarily unavailable, rapid 
retries may exacerbate service pressure
   - **Risk 2**: Exceptions such as DNS resolution failures and connection 
refusals should not be retried, but the current code will retry them
   - **Risk 3**: Difficult debugging, exception messages lack key information 
(such as URL, retry count)
   
   **Impact Scope**:
   - **Direct Impact**: All scenarios where metadata is obtained through 
`schema_url`
   - **Indirect Impact**: Stability of Gravitino service
   - **Affected Area**: Core framework
   
   **Severity**: MAJOR
   
   **Improvement Suggestions**:
   
   ```java
   private JsonNode executeGetRequest(String url) throws IOException {
       IOException lastException = null;
       long baseDelayMs = 1000;  // Base delay of 1 second
   
       for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
           HttpGet request = new HttpGet(url);
           request.addHeader(HEADER_ACCEPT, MEDIA_TYPE_GRAVITINO_V1);
   
           try (CloseableHttpResponse response = 
getHttpClient().execute(request)) {
               HttpEntity entity = response.getEntity();
               if (entity == null) {
                   throw new RuntimeException(ERROR_NO_RESPONSE_ENTITY);
               }
               try {
                   return JsonUtils.readTree(entity.getContent());
               } finally {
                   EntityUtils.consume(entity);
               }
           } catch (IOException e) {
               lastException = e;
               
               // Determine if it should retry
               if (!shouldRetry(e) || attempt == MAX_RETRY_ATTEMPTS) {
                   break;
               }
               
               // Exponential backoff
               try {
                   long delayMs = baseDelayMs * (1L << (attempt - 1));
                   Thread.sleep(delayMs);
               } catch (InterruptedException ie) {
                   Thread.currentThread().interrupt();
                   throw new IOException("Retry interrupted", ie);
               }
           }
       }
   
       // Construct detailed exception information
       String errorMsg = String.format(
           "Failed to fetch schema from Gravitino after %d attempts. URL: %s. 
Last error: %s",
           MAX_RETRY_ATTEMPTS, url, lastException.getMessage()
       );
       throw new IOException(errorMsg, lastException);
   }
   
   /**
    * Determine if the exception should be retried
    */
   private boolean shouldRetry(IOException e) {
       // Non-retryable exception types
       if (e instanceof java.net.UnknownHostException) {
           return false;  // DNS resolution failed
       }
       if (e instanceof java.net.ConnectException) {
           return false;  // Connection refused
       }
       if (e instanceof org.apache.http.client.HttpResponseException) {
           int statusCode = ((org.apache.http.client.HttpResponseException) 
e).getStatusCode();
           if (statusCode == 404 || statusCode == 400 || statusCode == 401) {
               return false;  // Client errors should not be retried
           }
       }
       
       // Retryable exceptions: timeout, server errors (5xx)
       return true;
   }
   ```
   
   **Rationale**:
   - Exponential backoff prevents thundering herd effect
   - Exception classification avoids meaningless retries
   - Detailed exception information facilitates debugging
   - Aligns with best practices for network request retries
   
   ---
   
   ### Issue 4: Environment Variable Resolution Lacks Error Handling
   
   **Location**: 
`seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/TableSchemaDiscoverer.java:165-169`
   
   ```java
   if (StringUtils.isNotEmpty(
           System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
       return MetaLakeType.valueOf(
               
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()));
   }
   ```
   
   **Related Context**:
   - Enum definition: `MetaLakeType.java` (only one value: `GRAVITINO`)
   - Configuration option definition: `EnvCommonOptions.METALAKE_TYPE`
   
   **Issue Description**:
   When the environment variable `METALAKE_TYPE` is set to an invalid value 
(such as `invalid`), `MetaLakeType.valueOf()` throws 
`IllegalArgumentException`, causing job submission to fail. The error message 
is unfriendly, and users do not know what value to set.
   
   **Potential Risks**:
   - **Risk 1**: Users receive obscure error messages (`No enum constant 
MetaLakeType.invalid`) when configuration is incorrect
   - **Risk 2**: Lack of prompt information, users do not know the valid value 
is `gravitino`
   
   **Impact Scope**:
   - **Direct Impact**: Users using environment variable to configure 
`METALAKE_TYPE`
   - **Indirect Impact**: Difficult for operations personnel to troubleshoot
   - **Affected Area**: All scenarios using metadata service
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   
   ```java
   // Solution 1: Use try-catch to provide friendly error messages
   if (StringUtils.isNotEmpty(
           System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
       String metalakeTypeStr = 
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase());
       try {
           return MetaLakeType.valueOf(metalakeTypeStr);
       } catch (IllegalArgumentException e) {
           throw new IllegalArgumentException(
               String.format(
                   "Invalid metalake type '%s'. Supported values are: %s. " +
                   "Please check your environment variable '%s' or 
configuration.",
                   metalakeTypeStr,
                   Arrays.stream(MetaLakeType.values())
                       .map(Enum::name)
                       .collect(Collectors.joining(", ")),
                   EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()
               ),
               e
           );
       }
   }
   
   // Solution 2: Ignore invalid values, use default values and log warnings 
(more lenient)
   if (StringUtils.isNotEmpty(
           System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
       String metalakeTypeStr = 
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase());
       try {
           return MetaLakeType.valueOf(metalakeTypeStr);
       } catch (IllegalArgumentException e) {
           log.warn(
               "Invalid metalake type '{}' from environment variable '{}'. " +
               "Using default value '{}'. Supported values are: {}",
               metalakeTypeStr,
               EnvCommonOptions.METALAKE_TYPE.key().toUpperCase(),
               MetaLakeType.GRAVITINO.getType(),
               Arrays.stream(MetaLakeType.values())
                   .map(Enum::name)
                   .collect(Collectors.joining(", "))
           );
           return MetaLakeType.GRAVITINO;
       }
   }
   
   // Solution 3: Use case-insensitive matching (most robust)
   if (StringUtils.isNotEmpty(
           System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
       String metalakeTypeStr = 
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase());
       for (MetaLakeType type : MetaLakeType.values()) {
           if (type.name().equalsIgnoreCase(metalakeTypeStr)) {
               return type;
           }
       }
       // No matching type found
       throw new IllegalArgumentException(
           String.format(
               "Invalid metalake type '%s'. Supported values are: %s",
               metalakeTypeStr,
               Arrays.stream(MetaLakeType.values())
                   .map(Enum::name)
                   .collect(Collectors.joining(", "))
           )
       );
   }
   ```
   
   **Rationale**:
   - Option 1 provides most detailed error information, helping users quickly 
locate issues
   - Option 2 is most lenient but may hide configuration errors
   - Option 3 is most robust, supporting case-insensitive input
   
   ---
   
   ### Issue 5: Exception Handling Lacks Context Information
   
   **Location**: 
`seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/TableSchemaDiscoverer.java:125-140`
   
   ```java
   private CatalogTable discoverTableSchemaFromMetaLake(String schemaUrl, 
String configTablePath) {
       try {
           JsonNode schemaNode = metalakeClient.getTableSchema(schemaUrl);
           final TablePath tableSchemaPath;
           if (StringUtils.isNotEmpty(configTablePath)) {
               tableSchemaPath = TablePath.of(configTablePath);
           } else {
               tableSchemaPath = metalakeClient.getTableSchemaPath(schemaUrl);
           }
           final TableSchema tableSchema = 
metaLakeTableSchemaConvertor.convertor(schemaNode);
           return metaLakeTableSchemaConvertor.buildCatalogTable(
                   catalogName, tableSchemaPath, tableSchema);
       } catch (IOException e) {
           throw new 
SeaTunnelRuntimeException(GET_META_LAKE_TABLE_SCHEMA_FAILED, e);
       }
   }
   ```
   
   **Related Context**:
   - Error code definition: 
`SchemaEvolutionErrorCode.GET_META_LAKE_TABLE_SCHEMA_FAILED`
   - Exception message: `"Get meta lake table schema failed"`
   
   **Issue Description**:
   When fetching schema from Gravitino fails, the thrown 
`SeaTunnelRuntimeException` only contains the original `IOException`, lacking 
key context information:
   1. `schemaUrl` - User-configured URL
   2. `tablePath` - Parsed table path
   3. `catalogName` - Catalog name
   
   This makes debugging difficult, and users cannot quickly locate which table 
and which URL have the problem.
   
   **Potential Risks**:
   - **Risk 1**: In production environments, if multiple tables use 
`schema_url`, it is impossible to quickly locate which table failed from logs
   - **Risk 2**: Operations personnel need to manually check configuration 
files to determine the failed URL
   
   **Impact Scope**:
   - **Direct Impact**: All scenarios using `schema_url`
   - **Indirect Impact**: Troubleshooting efficiency
   - **Affected Area**: All file connectors
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   
   ```java
   private CatalogTable discoverTableSchemaFromMetaLake(String schemaUrl, 
String configTablePath) {
       try {
           JsonNode schemaNode = metalakeClient.getTableSchema(schemaUrl);
           final TablePath tableSchemaPath;
           if (StringUtils.isNotEmpty(configTablePath)) {
               tableSchemaPath = TablePath.of(configTablePath);
           } else {
               tableSchemaPath = metalakeClient.getTableSchemaPath(schemaUrl);
           }
           final TableSchema tableSchema = 
metaLakeTableSchemaConvertor.convertor(schemaNode);
           return metaLakeTableSchemaConvertor.buildCatalogTable(
                   catalogName, tableSchemaPath, tableSchema);
       } catch (IOException e) {
           // Construct detailed error message
           String errorMsg = String.format(
               "Failed to get table schema from MetaLake. " +
               "Schema URL: %s, " +
               "Configured table path: %s, " +
               "Catalog name: %s, " +
               "Error: %s",
               schemaUrl,
               configTablePath != null ? configTablePath : "not configured",
               catalogName,
               e.getMessage()
           );
           throw new 
SeaTunnelRuntimeException(GET_META_LAKE_TABLE_SCHEMA_FAILED, 
               new IOException(errorMsg, e));
       }
   }
   ```
   
   **Rationale**:
   - Include all key context information in exception messages
   - Preserve original exception as cause for viewing complete stack trace
   - Aligns with exception handling best practices (fail-fast with context)
   
   ---
   
   ### Issue 6: BaseFileSourceConfig Constructor Signature Change Causes 
Breaking Change
   
   **Location**: 
`seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java:57-60`
   
   ```java
   // Before
   public BaseFileSourceConfig(ReadonlyConfig readonlyConfig)
   
   // Now
   public BaseFileSourceConfig(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTableFromConfig)
   ```
   
   **Related Context**:
   - Parent class: `BaseFileSourceConfig` (abstract class)
   - Subclasses: `BaseFileSourceConfig` implementations of 6 file connectors
   - Caller: `BaseMultipleTableFileSourceConfig`
   
   **Issue Description**:
   This is a breaking change that will cause all external code inheriting 
`BaseFileSourceConfig` to fail compilation. Although all internal subclasses in 
the current PR have been updated, there is no:
   1. Deprecation warning (`@Deprecated`)
   2. Migration guide
   3. Compatible transition solution
   
   **Potential Risks**:
   - **Risk 1**: If users have customized file connectors inheriting 
`BaseFileSourceConfig`, code will fail to compile
   - **Risk 2**: After upgrading to a new version, users need to modify code to 
compile successfully
   
   **Impact Scope**:
   - **Direct Impact**: All external code inheriting `BaseFileSourceConfig`
   - **Indirect Impact**: Upgrade compatibility
   - **Affected Area**: All file connectors
   
   **Severity**: MAJOR
   
   **Improvement Suggestions**:
   
   ```java
   // Solution 1: Keep the old constructor, mark as deprecated (recommended)
   @Deprecated
   public BaseFileSourceConfig(ReadonlyConfig readonlyConfig) {
       this(readonlyConfig, null);  // Call the new constructor
       log.warn(
           "BaseFileSourceConfig(ReadonlyConfig) is deprecated and will be 
removed in future versions. " +
           "Please use BaseFileSourceConfig(ReadonlyConfig, CatalogTable) 
instead."
       );
   }
   
   public BaseFileSourceConfig(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTableFromConfig) {
       this.baseFileSourceConfig = readonlyConfig;
       this.fileFormat = 
readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE);
       this.readStrategy = ReadStrategyFactory.of(readonlyConfig, 
getHadoopConfig());
       this.filePaths = parseFilePaths(readonlyConfig);
       this.catalogTableFromConfig = catalogTableFromConfig;
       this.catalogTable = parseCatalogTable(readonlyConfig);
   }
   
   // Solution 2: Use Builder pattern (more thorough improvement)
   public abstract class BaseFileSourceConfig implements Serializable {
       // ... field definitions
       
       protected BaseFileSourceConfig(Builder<?> builder) {
           this.baseFileSourceConfig = builder.readonlyConfig;
           this.fileFormat = builder.fileFormat;
           this.readStrategy = builder.readStrategy;
           this.filePaths = builder.filePaths;
           this.catalogTableFromConfig = builder.catalogTableFromConfig;
           this.catalogTable = parseCatalogTable(builder.readonlyConfig);
       }
       
       protected abstract static class Builder<T extends Builder<T>> {
           private ReadonlyConfig readonlyConfig;
           private FileFormat fileFormat;
           private ReadStrategy readStrategy;
           private List<String> filePaths;
           private CatalogTable catalogTableFromConfig;
           
           public T readonlyConfig(ReadonlyConfig readonlyConfig) {
               this.readonlyConfig = readonlyConfig;
               return self();
           }
           
           public T catalogTableFromConfig(CatalogTable catalogTableFromConfig) 
{
               this.catalogTableFromConfig = catalogTableFromConfig;
               return self();
           }
           
           protected abstract T self();
           
           public abstract BaseFileSourceConfig build();
       }
   }
   ```
   
   **Rationale**:
   - Option 1 provides backward compatibility, giving users time to migrate
   - Option 2 is more thorough, using Builder pattern to avoid the problem of 
too many constructor parameters
   
   ---
   
   ### Issue 7: GravitinoClient.getTableSchemaPath Regex Parsing Lacks Error 
Handling
   
   **Location**: 
`seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:74-86`
   
   ```java
   @Override
   public TablePath getTableSchemaPath(String schemaHttpUrl) {
       Matcher matcher = TABLE_URL_PATTERN.matcher(schemaHttpUrl);
       if (!matcher.find()) {
           throw new SeaTunnelRuntimeException(
                   ERROR_INVALID_TABLE_URL,
                   "Invalid table URL format, expected: 
/catalogs/{catalog}/schemas/{schema}/tables/{table}");
       }
       String catalogName = matcher.group(1);
       String schemaName = matcher.group(2);
       String tableName = matcher.group(3);
       return TablePath.of(catalogName, schemaName, tableName);
   }
   ```
   
   **Related Context**:
   - Regex definition: `TABLE_URL_PATTERN = 
Pattern.compile("/catalogs/([^/]+)/schemas/([^/]+)/tables/([^/]+)")`
   - Error code: `ERROR_INVALID_TABLE_URL`
   
   **Issue Description**:
   When the URL format is incorrect, the thrown exception message lacks the 
actual URL, making it impossible for users to know which URL has the wrong 
format. Additionally, `matcher.find()` only finds the first match. If the URL 
contains multiple substrings that meet the condition, incorrect results may be 
parsed.
   
   **Potential Risks**:
   - **Risk 1**: Exception message does not contain the actual URL, making 
debugging difficult
   - **Risk 2**: `matcher.find()` may match the wrong location (although 
probability is very low)
   
   **Impact Scope**:
   - **Direct Impact**: All scenarios using `schema_url` without configuring 
`table` property
   - **Indirect Impact**: Troubleshooting efficiency
   - **Affected Area**: All file connectors
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   
   ```java
   @Override
   public TablePath getTableSchemaPath(String schemaHttpUrl) {
       if (schemaHttpUrl == null || schemaHttpUrl.isEmpty()) {
           throw new SeaTunnelRuntimeException(
                   ERROR_INVALID_TABLE_URL,
                   "Table URL cannot be null or empty");
       }
       
       Matcher matcher = TABLE_URL_PATTERN.matcher(schemaHttpUrl);
       if (!matcher.matches()) {  // Use matches() instead of find()
           throw new SeaTunnelRuntimeException(
                   ERROR_INVALID_TABLE_URL,
                   String.format(
                       "Invalid table URL format: '%s'. " +
                       "Expected format: 
http://host/api/metalakes/{metalake}/catalogs/{catalog}/schemas/{schema}/tables/{table}";,
                       schemaHttpUrl
                   ));
       }
       String catalogName = matcher.group(1);
       String schemaName = matcher.group(2);
       String tableName = matcher.group(3);
       return TablePath.of(catalogName, schemaName, tableName);
   }
   ```
   
   **Rationale**:
   - Use `matches()` to ensure the entire URL matches, not partial matching
   - Include the actual URL in exception messages
   - Add null checks to provide more friendly error messages
   
   ---
   
   ### Issue 8: HttpClient Connections May Become Invalid in Concurrent 
Scenarios
   
   **Location**: 
`seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:141-150`
   
   ```java
   private CloseableHttpClient getHttpClient() {
       if (httpClient == null) {
           synchronized (GravitinoClient.class) {
               if (httpClient == null) {
                   httpClient = HttpClients.createDefault();
               }
           }
       }
       return httpClient;
   }
   ```
   
   **Issue Description**:
   Although `CloseableHttpClient` is thread-safe, the following scenarios may 
cause issues:
   1. After Gravitino service restarts, old connections in HttpClient may 
become invalid
   2. After DNS resolution changes, HttpClient may still use the old IP address
   3. In long-running processes, HttpClient's connection pool may encounter 
various issues (such as idle connections being closed by the server)
   
   Apache HttpClient's `createDefault()` method creates a client using the 
default connection manager, which automatically handles connection 
invalidation, but requires proper configuration.
   
   **Potential Risks**:
   - **Risk 1**: After Gravitino service restarts, requests in a short period 
may fail (until the connection manager detects connection invalidation)
   - **Risk 2**: Long-running processes may accumulate many invalid connections
   
   **Impact Scope**:
   - **Direct Impact**: Long-running SeaTunnel jobs
   - **Indirect Impact**: Gravitino service operations
   - **Affected Area**: All scenarios using `schema_url`
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   
   ```java
   private CloseableHttpClient getHttpClient() {
       if (httpClient == null) {
           synchronized (GravitinoClient.class) {
               if (httpClient == null) {
                   // Configure connection manager to periodically check 
connection validity
                   PoolingHttpClientConnectionManager connectionManager =
                       PoolingHttpClientConnectionManagerBuilder.create()
                           .setMaxConnTotal(20)  // Maximum number of 
connections
                           .setMaxConnPerRoute(10)  // Maximum connections per 
route
                           .build();
                   
                   // Create HttpClient with reasonable timeout and retry 
configuration
                   RequestConfig requestConfig = RequestConfig.custom()
                       .setConnectTimeout(Timeout.ofMilliseconds(5000))  // 
Connection timeout of 5 seconds
                       
.setConnectionRequestTimeout(Timeout.ofMilliseconds(5000))  // Timeout for 
getting connection from connection pool
                       .build();
                   
                   httpClient = HttpClients.custom()
                       .setConnectionManager(connectionManager)
                       .setDefaultRequestConfig(requestConfig)
                       .evictIdleConnections(30, TimeUnit.SECONDS)  // Clean up 
idle connections every 30 seconds
                       .build();
                       
                   // Register JVM shutdown hook
                   Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                       try {
                           if (httpClient != null) {
                               httpClient.close();
                           }
                       } catch (IOException e) {
                           // Ignore shutdown exceptions
                       }
                   }));
               }
           }
       }
       return httpClient;
   }
   ```
   
   **Rationale**:
   - Configure connection pool parameters to avoid resource exhaustion
   - Periodically clean up idle connections to avoid using invalid connections
   - Set reasonable timeout values
   - Add shutdown hook to resolve resource leak issues
   
   ---


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to