DanielCarter-stack commented on PR #10402:
URL: https://github.com/apache/seatunnel/pull/10402#issuecomment-3853838337
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10402", "part": 1,
"total": 1} -->
### Issue 1: HttpClient Resource Leak
**Location**:
`seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:54-150`
```java
private static volatile CloseableHttpClient httpClient;
private CloseableHttpClient getHttpClient() {
if (httpClient == null) {
synchronized (GravitinoClient.class) {
if (httpClient == null) {
httpClient = HttpClients.createDefault();
}
}
}
return httpClient;
}
```
**Related Context**:
- Caller: `GravitinoClient.executeGetRequest()` (line 95)
- HTTP request retry loop: lines 98-115
- CloseableHttpResponse using try-with-resources: line 102
**Issue Description**:
`CloseableHttpClient` implements the `AutoCloseable` interface and manages
connection pool and thread pool resources. The current code uses a singleton
pattern but never calls the `close()` method, resulting in:
1. Connections in the connection pool are never released
2. Threads in the thread pool are never terminated
3. In long-running processes, if the classloader reloads, multiple
HttpClient instances will accumulate, exacerbating the resource leak
**Potential Risks**:
- **Risk 1**: File descriptor leak. On Linux systems, each TCP connection
occupies a file descriptor. Long-running processes may lead to "Too many open
files" errors.
- **Risk 2**: Thread leak. HttpClient uses a connection pool by default and
maintains threads internally. Not closing it causes thread accumulation.
- **Risk 3**: Memory leak. Resources such as connection pools and DNS
resolution caches will not be garbage collected.
**Impact Scope**:
- **Direct Impact**: All SeaTunnel jobs using `schema_url`
- **Indirect Impact**: Long-running SeaTunnel Engine services
- **Affected Area**: Core framework (seatunnel-api)
**Severity**: BLOCKER
**Improvement Suggestions**:
```java
// Solution 1: Use shutdown hook (recommended)
private static volatile CloseableHttpClient httpClient;
private static volatile boolean closed = false;
private CloseableHttpClient getHttpClient() {
if (closed) {
throw new IllegalStateException("HttpClient has been closed");
}
if (httpClient == null) {
synchronized (GravitinoClient.class) {
if (httpClient == null && !closed) {
httpClient = HttpClients.createDefault();
// Register JVM shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
if (httpClient != null) {
httpClient.close();
}
} catch (IOException e) {
// Ignore shutdown exceptions
}
}));
}
}
}
return httpClient;
}
// Solution 2: Do not use singleton, create new instance each time (simple
but poor performance)
private JsonNode executeGetRequest(String url) throws IOException {
IOException lastException = null;
for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
HttpGet request = new HttpGet(url);
request.addHeader(HEADER_ACCEPT, MEDIA_TYPE_GRAVITINO_V1);
try (CloseableHttpClient client = HttpClients.createDefault();
CloseableHttpResponse response = client.execute(request)) {
HttpEntity entity = response.getEntity();
if (entity == null) {
throw new RuntimeException(ERROR_NO_RESPONSE_ENTITY);
}
try {
return JsonUtils.readTree(entity.getContent());
} finally {
EntityUtils.consume(entity);
}
} catch (IOException e) {
lastException = e;
}
}
throw lastException;
}
// Solution 3: Implement HttpClient lifecycle management (most complete but
complex)
public class GravitinoClient implements MetalakeClient, AutoCloseable {
private CloseableHttpClient httpClient;
private synchronized CloseableHttpClient getHttpClient() {
if (httpClient == null) {
httpClient = HttpClients.createDefault();
}
return httpClient;
}
@Override
public void close() throws IOException {
if (httpClient != null) {
httpClient.close();
httpClient = null;
}
}
}
```
**Rationale**:
- Option 1 balances performance (singleton reuse) and resource management
(shutdown hook)
- Option 2 is simple but has performance overhead (creating new client each
time)
- Option 3 is most complete but requires modifying caller code to ensure
`close()` is called
---
### Issue 2: factoryIdentifier Misused as Catalog Name
**Location**:
`seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java:48-52`
```java
default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext
context) {
final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
new TableSchemaDiscoverer(context, factoryIdentifier());
return metaLakeSchemaDiscoverer.discoverTableSchemas();
}
```
**Related Context**:
- Interface definition: `TableSourceFactory.java:24` (`String
factoryIdentifier()`)
- Caller: `LocalFileSourceFactory.createSource()` (connector-file-local)
- catalogName usage: `TableSchemaDiscoverer.java:122, 136, 147`
**Issue Description**:
`factoryIdentifier()` returns a connector identifier (such as "LocalFile",
"HdfsFile"), which is not the semantic meaning of catalog. In
`TableSchemaDiscoverer`, catalogName is used to:
1. Construct the catalog name for `CatalogTable`
2. As part of `TableIdentifier.of(catalogName, tablePath)`
This results in:
- When users configure `schema { table = "mydb.mytable" }`, the final
catalog name will be replaced with "LocalFile"
- Cannot correctly distinguish between different catalogs (such as
production and test environments)
**Potential Risks**:
- **Risk 1**: Catalog name semantic confusion, making it difficult for users
to understand why the catalog name became the connector name
- **Risk 2**: If users specify a catalog name in the configuration (such as
`table = "prod_db.table1"`), it will be overwritten
**Impact Scope**:
- **Direct Impact**: All scenarios using `schema_url` or `schema.fields`
- **Indirect Impact**: Downstream functionality that relies on correct
catalog names (such as multi-catalog writes)
- **Affected Area**: All file connectors
**Severity**: CRITICAL
**Improvement Suggestions**:
```java
// Solution 1: Parse catalog name from configuration (recommended)
default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext
context) {
// Get catalog name from configuration, use factoryIdentifier as default
if not available
String catalogName = context.getOptions()
.getOptional(ConnectorCommonOptions.CATALOG_NAME) // Need to add
this option
.orElse(factoryIdentifier());
final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
new TableSchemaDiscoverer(context, catalogName);
return metaLakeSchemaDiscoverer.discoverTableSchemas();
}
// Solution 2: Parse catalog name from table path
default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext
context) {
// Check the table property in configuration, extract catalog if it's a
three-part name
String catalogName = factoryIdentifier();
Optional<String> configuredTable = context.getOptions()
.getOptional(ConnectorCommonOptions.SCHEMA)
.flatMap(schema -> schema.get("table"))
.map(String::valueOf);
if (configuredTable.isPresent()) {
String tablePath = configuredTable.get();
String[] parts = tablePath.split("\\.");
if (parts.length == 3) {
catalogName = parts[0]; // Use the configured catalog
}
}
final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
new TableSchemaDiscoverer(context, catalogName);
return metaLakeSchemaDiscoverer.discoverTableSchemas();
}
// Solution 3: Do not use factoryIdentifier, use fixed default value
default List<CatalogTable> discoverTableSchemas(TableSourceFactoryContext
context) {
final TableSchemaDiscoverer metaLakeSchemaDiscoverer =
new TableSchemaDiscoverer(context, "default"); // Use "default"
as the default catalog
return metaLakeSchemaDiscoverer.discoverTableSchemas();
}
```
**Rationale**:
- Option 1 is most flexible, allowing users to customize catalog names
- Option 2 has best compatibility, inferring catalog from existing
configuration
- Option 3 is simplest and aligns with most users' expectations (using
"default" catalog)
---
### Issue 3: HTTP Retry Mechanism Lacks Backoff Strategy and Exception
Classification
**Location**:
`seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:95-118`
```java
private JsonNode executeGetRequest(String url) throws IOException {
IOException lastException = null;
for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
HttpGet request = new HttpGet(url);
request.addHeader(HEADER_ACCEPT, MEDIA_TYPE_GRAVITINO_V1);
try (CloseableHttpResponse response =
getHttpClient().execute(request)) {
HttpEntity entity = response.getEntity();
if (entity == null) {
throw new RuntimeException(ERROR_NO_RESPONSE_ENTITY);
}
try {
return JsonUtils.readTree(entity.getContent());
} finally {
EntityUtils.consume(entity);
}
} catch (IOException e) {
lastException = e;
}
}
throw lastException;
}
```
**Related Context**:
- Constant definitions: `MAX_RETRY_ATTEMPTS = 2` (line 50)
- Caller: `getTableSchema()` (line 69)
**Issue Description**:
The current retry mechanism has the following issues:
1. **No delay**: Retries are executed immediately, which may cause a
thundering herd effect
2. **No exception classification**: All IOExceptions are retried, but
certain exceptions should not be retried (such as DNS resolution failures)
3. **Missing exponential backoff**: Fixed number of retries, no delay
adjustment based on failure count
4. **Exception context loss**: The final thrown exception does not include
URL and retry count information
**Potential Risks**:
- **Risk 1**: When the Gravitino service is temporarily unavailable, rapid
retries may exacerbate service pressure
- **Risk 2**: Exceptions such as DNS resolution failures and connection
refusals should not be retried, but the current code will retry them
- **Risk 3**: Difficult debugging, exception messages lack key information
(such as URL, retry count)
**Impact Scope**:
- **Direct Impact**: All scenarios where metadata is obtained through
`schema_url`
- **Indirect Impact**: Stability of Gravitino service
- **Affected Area**: Core framework
**Severity**: MAJOR
**Improvement Suggestions**:
```java
private JsonNode executeGetRequest(String url) throws IOException {
IOException lastException = null;
long baseDelayMs = 1000; // Base delay of 1 second
for (int attempt = 1; attempt <= MAX_RETRY_ATTEMPTS; attempt++) {
HttpGet request = new HttpGet(url);
request.addHeader(HEADER_ACCEPT, MEDIA_TYPE_GRAVITINO_V1);
try (CloseableHttpResponse response =
getHttpClient().execute(request)) {
HttpEntity entity = response.getEntity();
if (entity == null) {
throw new RuntimeException(ERROR_NO_RESPONSE_ENTITY);
}
try {
return JsonUtils.readTree(entity.getContent());
} finally {
EntityUtils.consume(entity);
}
} catch (IOException e) {
lastException = e;
// Determine if it should retry
if (!shouldRetry(e) || attempt == MAX_RETRY_ATTEMPTS) {
break;
}
// Exponential backoff
try {
long delayMs = baseDelayMs * (1L << (attempt - 1));
Thread.sleep(delayMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Retry interrupted", ie);
}
}
}
// Construct detailed exception information
String errorMsg = String.format(
"Failed to fetch schema from Gravitino after %d attempts. URL: %s.
Last error: %s",
MAX_RETRY_ATTEMPTS, url, lastException.getMessage()
);
throw new IOException(errorMsg, lastException);
}
/**
* Determine if the exception should be retried
*/
private boolean shouldRetry(IOException e) {
// Non-retryable exception types
if (e instanceof java.net.UnknownHostException) {
return false; // DNS resolution failed
}
if (e instanceof java.net.ConnectException) {
return false; // Connection refused
}
if (e instanceof org.apache.http.client.HttpResponseException) {
int statusCode = ((org.apache.http.client.HttpResponseException)
e).getStatusCode();
if (statusCode == 404 || statusCode == 400 || statusCode == 401) {
return false; // Client errors should not be retried
}
}
// Retryable exceptions: timeout, server errors (5xx)
return true;
}
```
**Rationale**:
- Exponential backoff prevents thundering herd effect
- Exception classification avoids meaningless retries
- Detailed exception information facilitates debugging
- Aligns with best practices for network request retries
---
### Issue 4: Environment Variable Resolution Lacks Error Handling
**Location**:
`seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/TableSchemaDiscoverer.java:165-169`
```java
if (StringUtils.isNotEmpty(
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
return MetaLakeType.valueOf(
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()));
}
```
**Related Context**:
- Enum definition: `MetaLakeType.java` (only one value: `GRAVITINO`)
- Configuration option definition: `EnvCommonOptions.METALAKE_TYPE`
**Issue Description**:
When the environment variable `METALAKE_TYPE` is set to an invalid value
(such as `invalid`), `MetaLakeType.valueOf()` throws
`IllegalArgumentException`, causing job submission to fail. The error message
is unfriendly, and users do not know what value to set.
**Potential Risks**:
- **Risk 1**: Users receive obscure error messages (`No enum constant
MetaLakeType.invalid`) when configuration is incorrect
- **Risk 2**: Lack of prompt information, users do not know the valid value
is `gravitino`
**Impact Scope**:
- **Direct Impact**: Users using environment variable to configure
`METALAKE_TYPE`
- **Indirect Impact**: Difficult for operations personnel to troubleshoot
- **Affected Area**: All scenarios using metadata service
**Severity**: MINOR
**Improvement Suggestions**:
```java
// Solution 1: Use try-catch to provide friendly error messages
if (StringUtils.isNotEmpty(
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
String metalakeTypeStr =
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase());
try {
return MetaLakeType.valueOf(metalakeTypeStr);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format(
"Invalid metalake type '%s'. Supported values are: %s. " +
"Please check your environment variable '%s' or
configuration.",
metalakeTypeStr,
Arrays.stream(MetaLakeType.values())
.map(Enum::name)
.collect(Collectors.joining(", ")),
EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()
),
e
);
}
}
// Solution 2: Ignore invalid values, use default values and log warnings
(more lenient)
if (StringUtils.isNotEmpty(
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
String metalakeTypeStr =
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase());
try {
return MetaLakeType.valueOf(metalakeTypeStr);
} catch (IllegalArgumentException e) {
log.warn(
"Invalid metalake type '{}' from environment variable '{}'. " +
"Using default value '{}'. Supported values are: {}",
metalakeTypeStr,
EnvCommonOptions.METALAKE_TYPE.key().toUpperCase(),
MetaLakeType.GRAVITINO.getType(),
Arrays.stream(MetaLakeType.values())
.map(Enum::name)
.collect(Collectors.joining(", "))
);
return MetaLakeType.GRAVITINO;
}
}
// Solution 3: Use case-insensitive matching (most robust)
if (StringUtils.isNotEmpty(
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase()))) {
String metalakeTypeStr =
System.getenv(EnvCommonOptions.METALAKE_TYPE.key().toUpperCase());
for (MetaLakeType type : MetaLakeType.values()) {
if (type.name().equalsIgnoreCase(metalakeTypeStr)) {
return type;
}
}
// No matching type found
throw new IllegalArgumentException(
String.format(
"Invalid metalake type '%s'. Supported values are: %s",
metalakeTypeStr,
Arrays.stream(MetaLakeType.values())
.map(Enum::name)
.collect(Collectors.joining(", "))
)
);
}
```
**Rationale**:
- Option 1 provides most detailed error information, helping users quickly
locate issues
- Option 2 is most lenient but may hide configuration errors
- Option 3 is most robust, supporting case-insensitive input
---
### Issue 5: Exception Handling Lacks Context Information
**Location**:
`seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/TableSchemaDiscoverer.java:125-140`
```java
private CatalogTable discoverTableSchemaFromMetaLake(String schemaUrl,
String configTablePath) {
try {
JsonNode schemaNode = metalakeClient.getTableSchema(schemaUrl);
final TablePath tableSchemaPath;
if (StringUtils.isNotEmpty(configTablePath)) {
tableSchemaPath = TablePath.of(configTablePath);
} else {
tableSchemaPath = metalakeClient.getTableSchemaPath(schemaUrl);
}
final TableSchema tableSchema =
metaLakeTableSchemaConvertor.convertor(schemaNode);
return metaLakeTableSchemaConvertor.buildCatalogTable(
catalogName, tableSchemaPath, tableSchema);
} catch (IOException e) {
throw new
SeaTunnelRuntimeException(GET_META_LAKE_TABLE_SCHEMA_FAILED, e);
}
}
```
**Related Context**:
- Error code definition:
`SchemaEvolutionErrorCode.GET_META_LAKE_TABLE_SCHEMA_FAILED`
- Exception message: `"Get meta lake table schema failed"`
**Issue Description**:
When fetching schema from Gravitino fails, the thrown
`SeaTunnelRuntimeException` only contains the original `IOException`, lacking
key context information:
1. `schemaUrl` - User-configured URL
2. `tablePath` - Parsed table path
3. `catalogName` - Catalog name
This makes debugging difficult, and users cannot quickly locate which table
and which URL have the problem.
**Potential Risks**:
- **Risk 1**: In production environments, if multiple tables use
`schema_url`, it is impossible to quickly locate which table failed from logs
- **Risk 2**: Operations personnel need to manually check configuration
files to determine the failed URL
**Impact Scope**:
- **Direct Impact**: All scenarios using `schema_url`
- **Indirect Impact**: Troubleshooting efficiency
- **Affected Area**: All file connectors
**Severity**: MINOR
**Improvement Suggestions**:
```java
private CatalogTable discoverTableSchemaFromMetaLake(String schemaUrl,
String configTablePath) {
try {
JsonNode schemaNode = metalakeClient.getTableSchema(schemaUrl);
final TablePath tableSchemaPath;
if (StringUtils.isNotEmpty(configTablePath)) {
tableSchemaPath = TablePath.of(configTablePath);
} else {
tableSchemaPath = metalakeClient.getTableSchemaPath(schemaUrl);
}
final TableSchema tableSchema =
metaLakeTableSchemaConvertor.convertor(schemaNode);
return metaLakeTableSchemaConvertor.buildCatalogTable(
catalogName, tableSchemaPath, tableSchema);
} catch (IOException e) {
// Construct detailed error message
String errorMsg = String.format(
"Failed to get table schema from MetaLake. " +
"Schema URL: %s, " +
"Configured table path: %s, " +
"Catalog name: %s, " +
"Error: %s",
schemaUrl,
configTablePath != null ? configTablePath : "not configured",
catalogName,
e.getMessage()
);
throw new
SeaTunnelRuntimeException(GET_META_LAKE_TABLE_SCHEMA_FAILED,
new IOException(errorMsg, e));
}
}
```
**Rationale**:
- Include all key context information in exception messages
- Preserve original exception as cause for viewing complete stack trace
- Aligns with exception handling best practices (fail-fast with context)
---
### Issue 6: BaseFileSourceConfig Constructor Signature Change Causes
Breaking Change
**Location**:
`seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java:57-60`
```java
// Before
public BaseFileSourceConfig(ReadonlyConfig readonlyConfig)
// Now
public BaseFileSourceConfig(ReadonlyConfig readonlyConfig, CatalogTable
catalogTableFromConfig)
```
**Related Context**:
- Parent class: `BaseFileSourceConfig` (abstract class)
- Subclasses: `BaseFileSourceConfig` implementations of 6 file connectors
- Caller: `BaseMultipleTableFileSourceConfig`
**Issue Description**:
This is a breaking change that will cause all external code inheriting
`BaseFileSourceConfig` to fail compilation. Although all internal subclasses in
the current PR have been updated, there is no:
1. Deprecation warning (`@Deprecated`)
2. Migration guide
3. Compatible transition solution
**Potential Risks**:
- **Risk 1**: If users have customized file connectors inheriting
`BaseFileSourceConfig`, code will fail to compile
- **Risk 2**: After upgrading to a new version, users need to modify code to
compile successfully
**Impact Scope**:
- **Direct Impact**: All external code inheriting `BaseFileSourceConfig`
- **Indirect Impact**: Upgrade compatibility
- **Affected Area**: All file connectors
**Severity**: MAJOR
**Improvement Suggestions**:
```java
// Solution 1: Keep the old constructor, mark as deprecated (recommended)
@Deprecated
public BaseFileSourceConfig(ReadonlyConfig readonlyConfig) {
this(readonlyConfig, null); // Call the new constructor
log.warn(
"BaseFileSourceConfig(ReadonlyConfig) is deprecated and will be
removed in future versions. " +
"Please use BaseFileSourceConfig(ReadonlyConfig, CatalogTable)
instead."
);
}
public BaseFileSourceConfig(ReadonlyConfig readonlyConfig, CatalogTable
catalogTableFromConfig) {
this.baseFileSourceConfig = readonlyConfig;
this.fileFormat =
readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE);
this.readStrategy = ReadStrategyFactory.of(readonlyConfig,
getHadoopConfig());
this.filePaths = parseFilePaths(readonlyConfig);
this.catalogTableFromConfig = catalogTableFromConfig;
this.catalogTable = parseCatalogTable(readonlyConfig);
}
// Solution 2: Use Builder pattern (more thorough improvement)
public abstract class BaseFileSourceConfig implements Serializable {
// ... field definitions
protected BaseFileSourceConfig(Builder<?> builder) {
this.baseFileSourceConfig = builder.readonlyConfig;
this.fileFormat = builder.fileFormat;
this.readStrategy = builder.readStrategy;
this.filePaths = builder.filePaths;
this.catalogTableFromConfig = builder.catalogTableFromConfig;
this.catalogTable = parseCatalogTable(builder.readonlyConfig);
}
protected abstract static class Builder<T extends Builder<T>> {
private ReadonlyConfig readonlyConfig;
private FileFormat fileFormat;
private ReadStrategy readStrategy;
private List<String> filePaths;
private CatalogTable catalogTableFromConfig;
public T readonlyConfig(ReadonlyConfig readonlyConfig) {
this.readonlyConfig = readonlyConfig;
return self();
}
public T catalogTableFromConfig(CatalogTable catalogTableFromConfig)
{
this.catalogTableFromConfig = catalogTableFromConfig;
return self();
}
protected abstract T self();
public abstract BaseFileSourceConfig build();
}
}
```
**Rationale**:
- Option 1 provides backward compatibility, giving users time to migrate
- Option 2 is more thorough, using Builder pattern to avoid the problem of
too many constructor parameters
---
### Issue 7: GravitinoClient.getTableSchemaPath Regex Parsing Lacks Error
Handling
**Location**:
`seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:74-86`
```java
@Override
public TablePath getTableSchemaPath(String schemaHttpUrl) {
Matcher matcher = TABLE_URL_PATTERN.matcher(schemaHttpUrl);
if (!matcher.find()) {
throw new SeaTunnelRuntimeException(
ERROR_INVALID_TABLE_URL,
"Invalid table URL format, expected:
/catalogs/{catalog}/schemas/{schema}/tables/{table}");
}
String catalogName = matcher.group(1);
String schemaName = matcher.group(2);
String tableName = matcher.group(3);
return TablePath.of(catalogName, schemaName, tableName);
}
```
**Related Context**:
- Regex definition: `TABLE_URL_PATTERN =
Pattern.compile("/catalogs/([^/]+)/schemas/([^/]+)/tables/([^/]+)")`
- Error code: `ERROR_INVALID_TABLE_URL`
**Issue Description**:
When the URL format is incorrect, the thrown exception message lacks the
actual URL, making it impossible for users to know which URL has the wrong
format. Additionally, `matcher.find()` only finds the first match. If the URL
contains multiple substrings that meet the condition, incorrect results may be
parsed.
**Potential Risks**:
- **Risk 1**: Exception message does not contain the actual URL, making
debugging difficult
- **Risk 2**: `matcher.find()` may match the wrong location (although
probability is very low)
**Impact Scope**:
- **Direct Impact**: All scenarios using `schema_url` without configuring
`table` property
- **Indirect Impact**: Troubleshooting efficiency
- **Affected Area**: All file connectors
**Severity**: MINOR
**Improvement Suggestions**:
```java
@Override
public TablePath getTableSchemaPath(String schemaHttpUrl) {
if (schemaHttpUrl == null || schemaHttpUrl.isEmpty()) {
throw new SeaTunnelRuntimeException(
ERROR_INVALID_TABLE_URL,
"Table URL cannot be null or empty");
}
Matcher matcher = TABLE_URL_PATTERN.matcher(schemaHttpUrl);
if (!matcher.matches()) { // Use matches() instead of find()
throw new SeaTunnelRuntimeException(
ERROR_INVALID_TABLE_URL,
String.format(
"Invalid table URL format: '%s'. " +
"Expected format:
http://host/api/metalakes/{metalake}/catalogs/{catalog}/schemas/{schema}/tables/{table}",
schemaHttpUrl
));
}
String catalogName = matcher.group(1);
String schemaName = matcher.group(2);
String tableName = matcher.group(3);
return TablePath.of(catalogName, schemaName, tableName);
}
```
**Rationale**:
- Use `matches()` to ensure the entire URL matches, not partial matching
- Include the actual URL in exception messages
- Add null checks to provide more friendly error messages
---
### Issue 8: HttpClient Connections May Become Invalid in Concurrent
Scenarios
**Location**:
`seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:141-150`
```java
private CloseableHttpClient getHttpClient() {
if (httpClient == null) {
synchronized (GravitinoClient.class) {
if (httpClient == null) {
httpClient = HttpClients.createDefault();
}
}
}
return httpClient;
}
```
**Issue Description**:
Although `CloseableHttpClient` is thread-safe, the following scenarios may
cause issues:
1. After Gravitino service restarts, old connections in HttpClient may
become invalid
2. After DNS resolution changes, HttpClient may still use the old IP address
3. In long-running processes, HttpClient's connection pool may encounter
various issues (such as idle connections being closed by the server)
Apache HttpClient's `createDefault()` method creates a client using the
default connection manager, which automatically handles connection
invalidation, but requires proper configuration.
**Potential Risks**:
- **Risk 1**: After Gravitino service restarts, requests in a short period
may fail (until the connection manager detects connection invalidation)
- **Risk 2**: Long-running processes may accumulate many invalid connections
**Impact Scope**:
- **Direct Impact**: Long-running SeaTunnel jobs
- **Indirect Impact**: Gravitino service operations
- **Affected Area**: All scenarios using `schema_url`
**Severity**: MINOR
**Improvement Suggestions**:
```java
private CloseableHttpClient getHttpClient() {
if (httpClient == null) {
synchronized (GravitinoClient.class) {
if (httpClient == null) {
// Configure connection manager to periodically check
connection validity
PoolingHttpClientConnectionManager connectionManager =
PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnTotal(20) // Maximum number of
connections
.setMaxConnPerRoute(10) // Maximum connections per
route
.build();
// Create HttpClient with reasonable timeout and retry
configuration
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(Timeout.ofMilliseconds(5000)) //
Connection timeout of 5 seconds
.setConnectionRequestTimeout(Timeout.ofMilliseconds(5000)) // Timeout for
getting connection from connection pool
.build();
httpClient = HttpClients.custom()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig)
.evictIdleConnections(30, TimeUnit.SECONDS) // Clean up
idle connections every 30 seconds
.build();
// Register JVM shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
if (httpClient != null) {
httpClient.close();
}
} catch (IOException e) {
// Ignore shutdown exceptions
}
}));
}
}
}
return httpClient;
}
```
**Rationale**:
- Configure connection pool parameters to avoid resource exhaustion
- Periodically clean up idle connections to avoid using invalid connections
- Set reasonable timeout values
- Add shutdown hook to resolve resource leak issues
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]