DanielCarter-stack commented on PR #10468:
URL: https://github.com/apache/seatunnel/pull/10468#issuecomment-3868884107
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10468", "part": 1,
"total": 1} -->
### Issue 1: Feature not integrated into data reading flow
**Location**:
- Main file:
`seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/copy/PgCopyInput.java`
- Related files:
-
`seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java`
-
`seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.java`
**Issue Description**:
The `PgCopyInput` class and its related COPY functionality are fully
implemented, but **not integrated into the actual data reading flow**. Code
search reveals:
- `JdbcInputFormat` still uses the traditional `PreparedStatement` +
`ResultSet` approach to read data
- `PgCopyInput` has no callers
- Newly added configuration items `use_copy_statement`, `binary`,
`pg_copy_buffer_size` are not being used
**Potential Risks**:
- **Feature unavailable**: Users configuring `use_copy_statement=true` will
see no effect
- **Insufficient testing**: Lack of integration testing makes it impossible
to verify the feature works correctly
- **Misleading documentation**: Users following the documentation will not
achieve expected results
**Impact Scope**:
- **Direct impact**: PostgreSQL JDBC Source users
- **Indirect impact**: Large-volume data synchronization tasks relying on
performance improvements
- **Affected area**: Single Connector (JDBC PostgreSQL Source)
**Severity**: **BLOCKER**
**Improvement Suggestions**:
Branch logic needs to be added in `JdbcInputFormat` or `JdbcSourceReader` to
choose between traditional approach or COPY approach based on configuration:
```java
// JdbcInputFormat.java
public void open(JdbcSourceSplit inputSplit) throws IOException {
try {
splitTableSchema =
tables.get(inputSplit.getTablePath()).getTableSchema();
splitTableId = inputSplit.getTablePath().toString();
// Added: Choose reading method based on configuration
if (config.isUseCopyStatement() && isPostgreSQLDialect()) {
// Using COPY method
PgCopyInput copyInput = new PgCopyInput(config, jdbcDialect,
chunkSplitter, splitTableSchema, splitTableId);
copyInput.open(inputSplit);
// Using PgCopyInput to read data...
} else {
// Traditional method
statement = chunkSplitter.generateSplitStatement(inputSplit,
splitTableSchema);
resultSet = statement.executeQuery();
hasNext = resultSet.next();
}
} catch (SQLException se) {
throw new JdbcConnectorException(...);
}
}
private boolean isPostgreSQLDialect() {
return jdbcDialect instanceof PostgresDialect ||
jdbcDialect.dialectName().toLowerCase().contains("postgres");
}
```
**Rationale**: Only by integrating `PgCopyInput` into the data reading flow
can the feature be truly usable. This is a blocking issue that must be fixed
before merge.
---
### Issue 2: Thread safety issues caused by static variables
**Location**:
`seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/copy/PgCopyBinaryReader.java:89-90`
```java
private static int BUFFER_SIZE;
private static int MAX_BUFFER_SIZE;
```
**Related Context**:
- Constructor: `PgCopyBinaryReader.java:114-129`
- Initialization logic: `PgCopyBinaryReader.java:118-127`
**Issue Description**:
`BUFFER_SIZE` and `MAX_BUFFER_SIZE` are declared as `static`, but they are
initialized in the constructor based on the passed `pgCopyBufferSize`
parameter. This leads to:
1. **Multi-threading race condition**: When multiple threads create
`PgCopyBinaryReader` instances simultaneously, static variables will be
repeatedly overwritten
2. **Configuration inconsistency**: Different instances may be configured
with different buffer sizes, but the actual value used is the last set value
3. **Memory waste**: All instances share the same `MAX_BUFFER_SIZE`, which
may cause some instances to use excessively large buffers
**Potential Risks**:
- **Data race**: Multiple threads modifying static variables simultaneously
may cause `buffer.capacity()` and `MAX_BUFFER_SIZE` to be inconsistent
- **Memory overflow**: If one instance is configured with 1GB buffer size,
all instances will attempt to allocate 1TB maximum buffer
- **Performance degradation**: Buffer size configuration becomes
ineffective, all instances use the same buffer size
**Impact Scope**:
- **Direct impact**: All concurrent instances of `PgCopyBinaryReader`
- **Indirect impact**: All PostgreSQL Source tasks using COPY Binary mode
- **Affected area**: PostgreSQL JDBC Source (Binary mode)
**Severity**: **CRITICAL**
**Improvement Suggestions**:
Change static variables to instance variables:
```java
// PgCopyBinaryReader.java
public final class PgCopyBinaryReader implements PgCopyReader {
// Remove static modifier
private final int bufferSize;
private final int maxBufferSize;
private ByteBuffer buffer;
public PgCopyBinaryReader(InputStream stream, TableSchema schema,
Integer pgCopyBufferSize) {
this.stream = stream;
this.rowType = schema.toPhysicalRowDataType();
this.fieldTypes = rowType.getFieldTypes();
// Calculate and assign to instance variable
this.bufferSize = pgCopyBufferSize == null
? DEFAULT_BUFFER_SIZE
: 1 << (32 - Integer.numberOfLeadingZeros(pgCopyBufferSize -
1));
this.maxBufferSize = bufferSize * 1024;
this.buffer =
ByteBuffer.allocate(bufferSize).order(ByteOrder.BIG_ENDIAN);
}
private void ensureCapacityFor(int required) {
if (required <= buffer.capacity()) return;
if (required > maxBufferSize) { // Use instance variable
throw new JdbcConnectorException(...);
}
// ...
}
}
```
**Rationale**: Each instance should have independent buffer configuration.
Static variables will cause serious concurrency issues. This is a blocking
issue that must be fixed.
---
### Issue 3: Documentation and code default values are inconsistent
**Location**:
- Documentation: `docs/en/connectors/source/Jdbc.md` and
`docs/zh/connectors/source/Jdbc.md`
- Code:
`seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java:169-173`
```java
// Default value in code
public static final Option<Integer> PG_COPY_BUFFER_SIZE =
Options.key("pg_copy_buffer_size")
.intType()
.defaultValue(65536) // 64KB
.withDescription("Postgres copy buffer size");
```
**Issue Description**:
The documentation claims the default value of `pg_copy_buffer_size` is
`1048576` (1MB), but the actual default value in code is `65536` (64KB), a
16-fold difference.
**Potential Risks**:
- **Configuration misleading**: Users may rely on the default value in
documentation and not explicitly configure this parameter, resulting in actual
performance not meeting expectations
- **Troubleshooting difficulties**: When users encounter performance issues,
they will understand based on documentation that buffer size is 1MB, leading to
incorrect troubleshooting direction
**Impact Scope**:
- **Direct impact**: Users using COPY functionality
- **Affected area**: PostgreSQL JDBC Source
**Severity**: **MAJOR**
**Improvement Suggestions**:
Unify the default values in documentation and code. Based on PostgreSQL COPY
best practices, recommend using 1MB as the default value:
```java
// JdbcSourceOptions.java
public static final Option<Integer> PG_COPY_BUFFER_SIZE =
Options.key("pg_copy_buffer_size")
.intType()
.defaultValue(1048576) // Change to 1MB, consistent with
documentation
.withDescription("Postgres copy buffer size (bytes). Only
takes effect when use_copy_statement=true.");
```
Or, if 64KB is the more appropriate default value, update the documentation:
```markdown
| pg_copy_buffer_size | Int | No | 65536 | Buffer size for COPY reading
(bytes). |
```
**Rationale**: Inconsistency between documentation and code will cause user
confusion and configuration errors. Must be unified before merging.
---
### Issue 4: SQL injection risk
**Location**:
`seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/copy/PgCopyInput.java:76-79`
```java
String selectSql = chunkSplitter.generateSplitQuerySQL(split, tableSchema);
String copySql = String.format(
"COPY (%s) TO STDOUT WITH %s", selectSql, useBinary ? "BINARY" : "CSV");
```
**Related Context**:
- Caller: `PgCopyInput.open()` method
- SQL generation: `ChunkSplitter.generateSplitQuerySQL()`
- Log output: `PgCopyInput.java:82`
**Issue Description**:
`PgCopyInput` directly uses string concatenation to build COPY SQL without
any validation or escaping of `selectSql`. Although `generateSplitQuerySQL()`
is controlled internally by SeaTunnel, if:
1. `split.getSplitQuery()` contains user-provided raw SQL
2. `tablePath` or table names contain special characters (such as
semicolons, comment symbols)
3. Future extensions support dynamic query construction
Then SQL injection may occur.
**Potential Risks**:
- **SQL injection**: Malicious users can inject arbitrary SQL commands by
constructing special table names or query conditions
- **Data leakage**: Injected SQL may read data from other tables
- **Privilege escalation**: Using COPY command to write files may lead to
server file leakage
**Impact Scope**:
- **Direct impact**: All PostgreSQL Source tasks using COPY functionality
- **Affected area**: PostgreSQL JDBC Source
**Severity**: **CRITICAL**
**Improvement Suggestions**:
1. **Input validation**: Perform basic validation on `selectSql` to ensure
it does not contain dangerous SQL keywords or structures:
```java
private void validateSqlSafety(String sql) {
String upperSql = sql.toUpperCase();
// Check for dangerous keyword combinations
if (upperSql.contains(";") ||
upperSql.contains("--") ||
upperSql.contains("/*") ||
upperSql.matches(".*\\b(DROP|DELETE|INSERT|UPDATE|ALTER|CREATE|TRUNCATE)\\b.*"))
{
throw new JdbcConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"Potentially unsafe SQL detected: " + sql);
}
}
public void open(JdbcSourceSplit split) {
String selectSql = chunkSplitter.generateSplitQuerySQL(split,
tableSchema);
validateSqlSafety(selectSql); // Add validation
String copySql = String.format(
"COPY (%s) TO STDOUT WITH %s", selectSql, useBinary ? "BINARY" :
"CSV");
// ...
}
```
2. **Use parameterized queries**: Although COPY command does not support
PreparedStatement, you can use `quoteIdentifier()` to escape identifiers:
```java
String copySql = String.format(
"COPY (%s) TO STDOUT WITH %s",
selectSql.replaceAll(";", ""), // Remove semicolon
useBinary ? "BINARY" : "CSV");
```
**Rationale**: SQL injection is a serious security issue. Although the risk
is low in the current scenario (SQL is generated by SeaTunnel), security
programming best practices should be followed.
---
### Issue 5: ChunkSplitter interface changes affect all subclasses
**Location**:
`seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java:370-373`
```java
public String generateSplitQuerySQL(JdbcSourceSplit split, TableSchema
tableSchema) {
throw new UnsupportedOperationException("Not supported by this
splitter");
}
```
**Related Context**:
- Subclass: `FixedChunkSplitter.java:196-246` (implemented)
- Subclass: `DynamicChunkSplitter` (not implemented)
**Issue Description**:
A new abstract method `generateSplitQuerySQL()` was added to the
`ChunkSplitter` base class, but only the `FixedChunkSplitter` version is
implemented. When `DynamicChunkSplitter` is called, it will throw
`UnsupportedOperationException`.
**Potential Risks**:
- **Runtime exception**: Users configuring dynamic sharding
(`useDynamicSplitter=true`) and enabling COPY will crash at runtime
- **Feature limitation**: COPY functionality is only available in fixed
sharding mode, limiting user choices
- **Inconsistency**: `use_copy_statement` configuration is visible to all
Splitter types, but only FixedChunkSplitter supports it
**Impact Scope**:
- **Direct impact**: PostgreSQL Source tasks using dynamic sharding
- **Indirect impact**: Large-volume scenarios relying on dynamic sharding
- **Affected area**: PostgreSQL JDBC Source (dynamic sharding mode)
**Severity**: **MAJOR**
**Improvement Suggestions**:
There are two solutions:
**Solution A**: Implement the method for `DynamicChunkSplitter`:
```java
// DynamicChunkSplitter.java
@Override
public String generateSplitQuerySQL(JdbcSourceSplit split, TableSchema
tableSchema) {
// Dynamic sharding is typically used for tables without primary keys or
uneven shard keys
// Use split.query directly as SELECT SQL
if (StringUtils.isNotBlank(split.getSplitQuery())) {
return split.getSplitQuery();
}
// Fall back to full table scan
return String.format("SELECT * FROM %s",
jdbcDialect.tableIdentifier(split.getTablePath()));
}
```
**Solution B**: Restrict COPY functionality to only support fixed sharding
in configuration:
```java
// JdbcSourceConfig.java
public static JdbcSourceConfig of(ReadonlyConfig config) {
// ...
boolean useCopy = config.get(JdbcSourceOptions.USE_COPY_STATEMENT);
boolean useDynamic =
config.getOptional(JdbcSourceOptions.PARTITION_COLUMN).isEmpty();
if (useCopy && useDynamic) {
throw new IllegalArgumentException(
"use_copy_statement is only supported with fixed partition mode.
" +
"Please configure partition_column or set
use_copy_statement=false.");
}
// ...
}
```
**Rationale**: Interface changes need to consider all implementation
classes, otherwise runtime exceptions will occur. It is recommended to
implement at least Solution B before merging, adding configuration validation.
---
### Issue 6: Lack of integration tests and E2E tests
**Location**:
- Unit tests:
`seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/copy/PgNumericDecoderTest.java`
- E2E tests: `seatunnel-e2e/seatunnel-connector-v2-e2e/` (no related tests
found)
**Issue Description**:
The PR contains only one unit test file `PgNumericDecoderTest.java`, which
tests the decoding logic of NUMERIC type. But lacks:
1. Unit tests for `PgCopyBinaryReader`
2. Unit tests for `PgCopyCsvReader`
3. Integration tests for `PgCopyInput`
4. End-to-end E2E tests (reading data from PostgreSQL to other Sinks)
**Potential Risks**:
- **Feature unverifiable**: Cannot verify whether the feature works
correctly through CI/CD processes
- **Regression risk**: Future code changes may break COPY functionality
without test coverage
- **User unable to reproduce**: PR description states "verified using local
SeaTunnelEngineLocalExample", but no reproducible test script is provided
**Impact Scope**:
- **Direct impact**: Code quality and maintainability
- **Indirect impact**: Stability of future versions
- **Affected area**: Entire JDBC Connector
**Severity**: **MAJOR**
**Improvement Suggestions**:
1. **Add PgCopyReader unit tests**:
```java
// PgCopyBinaryReaderTest.java
@Test
public void testBinaryHeaderParsing() {
byte[] header = "PGCOPY\n\377\r\n\0\0\0\0\0\0\0\0\0".getBytes();
InputStream stream = new ByteArrayInputStream(header);
PgCopyBinaryReader reader = new PgCopyBinaryReader(stream, schema,
65536);
// Verify headerParsed is true
}
@Test
public void testRowParsing() {
// Construct test data: row with 2 fields
byte[] rowData = {...};
InputStream stream = new ByteArrayInputStream(rowData);
PgCopyBinaryReader reader = new PgCopyBinaryReader(stream, schema,
65536);
assertTrue(reader.hasNext());
SeaTunnelRow row = reader.next();
// Verify field values
}
```
2. **Add integration tests**:
```java
// JdbcPostgresCopyIT.java
@ExtendWith(PostgresContainerExtension.class)
public class JdbcPostgresCopyIT {
@Test
public void testCopyRead() throws Exception {
// 1. Prepare test data
// 2. Configure Source: use_copy_statement=true, binary=true
// 3. Execute sync task
// 4. Verify correctness of read data
}
}
```
3. **Provide reproducible test scripts**:
Add to PR description:
```bash
# Test steps
# 1. Start PostgreSQL
docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=test postgres:14
# 2. Prepare test data
psql -h localhost -U postgres -d test -c "CREATE TABLE test_copy AS SELECT
generate_series(1, 10000) AS id, 'test_' || id AS name"
# 3. Configure SeaTunnel task
# cat test_copy.conf
source {
Jdbc {
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
user = "postgres"
password = "test"
query = "SELECT * FROM test_copy"
use_copy_statement = true
binary = true
}
}
sink {
Console {}
}
# 4. Run task
./bin/seatunnel.sh -c test_copy.conf
```
**Rationale**: Insufficient test coverage is a code quality issue.
Especially for core functionality (such as high-performance data transfer),
sufficient test coverage is essential.
---
### Issue 7: Improper log level may leak sensitive information
**Location**:
`seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/copy/PgCopyInput.java:82`
```java
LOG.info("Open PG COPY split={}, sql={}", split.splitId(), copySql);
```
**Related Context**:
- Logging framework: SLF4J
- Log location: `PgCopyInput.open()` method
**Issue Description**:
Logging complete COPY SQL at INFO level may contain:
1. Sensitive business data (if query contains plaintext values)
2. Table structure information (may leak data model)
3. Filter conditions (may leak business logic)
In production environments, INFO level logs are persisted to log systems,
posing information leakage risks.
**Potential Risks**:
- **Data leakage**: Log systems may be accessed by unauthorized personnel,
leading to sensitive information leakage
- **Log bloat**: Logging SQL every time a split is opened will cause log
volume to surge
- **Performance impact**: Frequent INFO logging will affect performance
**Impact Scope**:
- **Direct impact**: Production environment security
- **Indirect impact**: Log storage costs
- **Affected area**: All tasks using COPY functionality
**Severity**: **MINOR**
**Improvement Suggestions**:
Change log level from INFO to DEBUG:
```java
// PgCopyInput.java
public void open(JdbcSourceSplit split) {
try {
String selectSql = chunkSplitter.generateSplitQuerySQL(split,
tableSchema);
String copySql = String.format(
"COPY (%s) TO STDOUT WITH %s", selectSql, useBinary ? "BINARY" :
"CSV");
Connection conn = getConnection();
LOG.debug("Open PG COPY split={}, sql={}", split.splitId(),
copySql); // Change to DEBUG
copyManagerProxy = new CopyManagerProxy(conn);
copyStream = copyManagerProxy.copyOutAsStream(copySql);
// ...
}
}
```
Additionally, a switch can be added to configuration to control whether to
log detailed SQL:
```java
// JdbcSourceOptions.java
public static final Option<Boolean> LOG_SQL =
Options.key("log_sql")
.booleanType()
.defaultValue(false)
.withDescription("Whether to log SQL statements (DEBUG
level). Only for debugging.");
// PgCopyInput.java
if (config.isLogSql()) {
LOG.debug("Open PG COPY split={}, sql={}", split.splitId(), copySql);
} else {
LOG.info("Open PG COPY split={}", split.splitId()); // Do not log SQL
}
```
**Rationale**: Protecting sensitive information is a security best practice.
Recommend changing the log level to DEBUG before merging, or adding a
configuration switch.
---
### Issue 8: Unreasonable buffer size configuration may cause memory overflow
**Location**:
`seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/copy/PgCopyBinaryReader.java:127`
```java
MAX_BUFFER_SIZE = BUFFER_SIZE * 1024;
```
**Related Context**:
- Initialization: `PgCopyBinaryReader.java:118-127`
- Expansion logic: `PgCopyBinaryReader.java:207-233`
**Issue Description**:
The maximum buffer size is set to 1024 times the initial size:
- Default initial size: 64KB (65536 bytes)
- Maximum buffer: 64MB (65536 * 1024)
- If user configures 1MB initial size, maximum buffer will reach 1GB
Although the code has `ensureCapacityFor()` checks, in extreme cases (single
field is very large), memory overflow may still occur.
**Potential Risks**:
- **Memory overflow**: If PostgreSQL table has large TEXT/BYTEA fields,
buffer expansion to the upper limit may be triggered
- **Configuration risk**: Users may configure excessively large initial
buffer size, causing all instances to attempt to allocate huge buffers
- **Concurrency amplification**: Multiple concurrent tasks expanding buffers
simultaneously may cause host memory exhaustion
**Impact Scope**:
- **Direct impact**: Tasks using COPY Binary mode
- **Indirect impact**: Cluster stability
- **Affected area**: PostgreSQL JDBC Source (Binary mode)
**Severity**: **MAJOR**
**Improvement Suggestions**:
1. **Set reasonable upper limit**:
```java
// PgCopyBinaryReader.java
private static final int MAX_ALLOWED_BUFFER_SIZE = 256 * 1024 * 1024; //
256MB hard limit
public PgCopyBinaryReader(InputStream stream, TableSchema schema, Integer
pgCopyBufferSize) {
// ...
MAX_BUFFER_SIZE = Math.min(bufferSize * 1024, MAX_ALLOWED_BUFFER_SIZE);
this.buffer =
ByteBuffer.allocate(bufferSize).order(ByteOrder.BIG_ENDIAN);
}
```
2. **Add configuration validation**:
```java
// JdbcSourceOptions.java
public static final Option<Integer> PG_COPY_BUFFER_SIZE =
Options.key("pg_copy_buffer_size")
.intType()
.defaultValue(65536)
.withDescription("Postgres copy buffer size (bytes). Must be
between 65536 and 10485760 (10MB).");
// JdbcSourceConfig.java
public static JdbcSourceConfig of(ReadonlyConfig config) {
int bufferSize = config.get(JdbcSourceOptions.PG_COPY_BUFFER_SIZE);
if (bufferSize < 65536 || bufferSize > 10 * 1024 * 1024) {
throw new IllegalArgumentException(
"pg_copy_buffer_size must be between 64KB and 10MB, got: " +
bufferSize);
}
// ...
}
```
3. **Add documentation说明**:
In configuration documentation, clearly explain buffer size limitations and
recommended values:
```markdown
| pg_copy_buffer_size | Int | No | 65536 | Buffer size for COPY reading
(bytes). Recommended range: 65536-1048576 (64KB-1MB). Larger values may improve
performance but increase memory usage. Maximum allowed: 10485760 (10MB). |
```
**Rationale**: Memory management is a critical issue in production
environments. Reasonable upper limits must be set to prevent memory overflow.
---
### Issue 9: Improper exception handling may lead to resource leaks
**Location**:
`seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/copy/PgCopyUtils.java:41-50`
```java
public static void closeQuietly(Object obj) {
if (obj instanceof Closeable) {
Closeable c = (Closeable) obj;
try {
c.close();
} catch (Exception ignored) {
// Exception ignored
}
}
}
```
**Related Context**:
- Caller: `PgCopyInput.close()`
- Resource types: `PgCopyReader`, `InputStream`, `CopyManagerProxy`
**Issue Description**:
The `closeQuietly()` method silently ignores all close exceptions, which may
lead to:
1. **Undetected resource leaks**: If `close()` method throws an exception,
it indicates resources may not be properly released, but the exception is
ignored and the problem is masked
2. **Uncertain transaction state**: For database connections, close failure
may cause transactions not to be properly rolled back or committed
3. **Difficult debugging**: Resource leak issues in production environments
are difficult to troubleshoot
**Potential Risks**:
- **Resource leaks**: Database connections, file handles and other resources
not properly released
- **Connection pool exhaustion**: Large numbers of leaked connections cause
connection pool exhaustion
- **Data inconsistency**: Transactions not properly ended cause data
inconsistency
**Impact Scope**:
- **Direct impact**: Tasks using COPY functionality
- **Indirect impact**: Database connection pool availability
- **Affected area**: PostgreSQL JDBC Source
**Severity**: **MAJOR**
**Improvement Suggestions**:
1. **Log close exceptions**:
```java
public static void closeQuietly(Object obj, String resourceName) {
if (obj instanceof Closeable) {
Closeable c = (Closeable) obj;
try {
c.close();
} catch (Exception e) {
// Log exception but do not throw
LOG.warn("Failed to close resource: {}", resourceName, e);
// Optional: Add exception to monitoring metrics
}
}
}
// PgCopyInput.java
@Override
public void close() {
PgCopyUtils.closeQuietly(reader, "PgCopyReader");
PgCopyUtils.closeQuietly(copyStream, "PgCopyInputStream");
PgCopyUtils.closeQuietly(copyManagerProxy, "CopyManagerProxy");
}
```
2. **Use try-with-resources**:
If possible, refactor `PgCopyInput` to use try-with-resources:
```java
public void open(JdbcSourceSplit split) {
try {
copyManagerProxy = new CopyManagerProxy(getConnection());
copyStream = copyManagerProxy.copyOutAsStream(copySql);
reader = createReader(copyStream);
hasNext = reader.hasNext();
} catch (Exception e) {
// Ensure resource cleanup on exception
close();
throw e;
}
}
```
3. **Add resource leak detection** (optional, for testing):
```java
private boolean closed = false;
@Override
public void close() {
if (closed) {
LOG.warn("PgCopyInput already closed");
return;
}
try {
// Close resource
} finally {
closed = true;
}
}
@Override
protected void finalize() throws Throwable {
try {
if (!closed) {
LOG.error("PgCopyInput leaked, not closed properly");
}
} finally {
super.finalize();
}
}
```
**Rationale**: Proper resource management is the foundation of stability.
Close exceptions should not be silently ignored. Recommend at least logging
warning messages.
---
### Issue 10: Non-standard configuration namespace
**Location**:
`seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceOptions.java:163-173`
```java
public static final Option<Boolean> BINARY =
Options.key("binary")
.booleanType()
.defaultValue(false)
.withDescription("Use binary copy mode");
public static final Option<Integer> PG_COPY_BUFFER_SIZE =
Options.key("pg_copy_buffer_size")
.intType()
.defaultValue(65536)
.withDescription("Postgres copy buffer size");
```
**Issue Description**:
Configuration item naming is non-standard:
1. **Lack of namespace**: `binary` and `pg_copy_buffer_size` should belong
to `copy` namespace
2. **Naming inconsistency**: `pg_copy_buffer_size` uses underscore
separation, while other configuration items (such as `partition_column`) also
use underscore separation, but SeaTunnel configuration tends to use underscores
3. **Unclear prefix**: `binary` is too generic and may conflict with other
future binary functionality
**Potential Risks**:
- **Configuration conflicts**: Future addition of other binary-related
functionality may cause naming conflicts
- **Poor readability**: Users cannot tell from configuration names that
these are COPY-related configurations
- **Poor extensibility**: If supporting other database COPY functionality
(such as MySQL's LOAD DATA), configuration items will be confusing
**Impact Scope**:
- **Direct impact**: Configuration readability and maintainability
- **Affected area**: JDBC Source configuration
**Severity**: **MINOR**
**Improvement Suggestions**:
Use namespaced configuration names:
```java
// JdbcSourceOptions.java
public static final Option<Boolean> COPY_ENABLED =
Options.key("copy.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether to use COPY method for reading
(PostgreSQL only).");
public static final Option<Boolean>_COPY_BINARY =
Options.key("copy.binary")
.booleanType()
.defaultValue(false)
.withDescription("Whether to use binary format for COPY
reading. Only takes effect when copy.enabled=true.");
public static final Option<Integer> COPY_BUFFER_SIZE =
Options.key("copy.buffer_size")
.intType()
.defaultValue(65536)
.withDescription("Buffer size for COPY reading (bytes). Only
takes effect when copy.enabled=true.");
```
Corresponding configuration example:
```hocon
source {
Jdbc {
url = "jdbc:postgresql://localhost:5432/test"
# ...
copy {
enabled = true
binary = true
buffer_size = 1048576
}
}
}
```
**Rationale**: Using namespaces can improve configuration readability and
extensibility, avoiding future naming conflicts. Although this is a minor
issue, recommend optimizing before merging.
---
### Issue 11: Exactly-Once semantic incompatibility
**Location**:
Entire COPY functionality implementation
**Issue Description**:
The COPY protocol is streaming and does not support offset recording and
checkpoint resumption. This is incompatible with SeaTunnel's Exactly-Once
semantics:
1. **No offset concept**: COPY protocol does not support resuming from
intermediate positions, only from the beginning
2. **State cannot be serialized**: `PgCopyInput` and `PgCopyReader` do not
implement `Serializable`, cannot save state in checkpoints
3. **Difficult failure recovery**: If a task fails during reading, after
restart it can only read from the beginning, potentially producing duplicate
data
**Potential Risks**:
- **Data duplication**: After task failure and restart, it will re-read from
the split's beginning, causing data duplication
- **Exactly-Once configuration misleading**: Users enable
`is_exactly_once=true` and `use_copy_statement=true`, but Exactly-Once cannot
actually be guaranteed
- **State inconsistency**: After checkpoint recovery, data position cannot
be restored
**Impact Scope**:
- **Direct impact**: Tasks requiring Exactly-Once semantics
- **Indirect impact**: Data consistency
- **Affected area**: PostgreSQL JDBC Source (COPY mode)
**Severity**: **MAJOR**
**Improvement Suggestions**:
1. **Add configuration validation**:
```java
// JdbcSourceConfig.java
public static JdbcSourceConfig of(ReadonlyConfig config) {
boolean useCopy = config.get(JdbcSourceOptions.COPY_ENABLED);
boolean isExactlyOnce = config.get(JdbcCommonOptions.IS_EXACTLY_ONCE);
if (useCopy && isExactlyOnce) {
throw new IllegalArgumentException(
"use_copy_statement is incompatible with is_exactly_once=true. "
+
"COPY protocol does not support offset-based recovery. " +
"Please either set use_copy_statement=false or
is_exactly_once=false.");
}
// ...
}
```
2. **Clearly state in documentation**:
```markdown
### Limitations
- **Exactly-Once**: COPY mode does NOT support exactly-once semantics. When
`use_copy_statement=true`,
the connector operates in at-least-once mode. If you require exactly-once
guarantees, please keep
`use_copy_statement=false` (use standard SELECT queries).
- **Fault Tolerance**: COPY mode cannot recover from intermediate positions.
If a task fails, it will
re-read the entire split from the beginning, which may result in duplicate
records.
```
3. **Provide best practices**:
Add usage scenario recommendations in documentation:
```markdown
### When to use COPY mode?
**Recommended scenarios**:
- Full data loads (not incremental sync)
- One-time batch migrations
- Scenarios where duplicate records can be tolerated or deduplicated
downstream
**NOT recommended for**:
- Incremental CDC scenarios
- Exactly-once required workloads
- Frequent fault recovery environments
```
**Rationale**: Feature limitations should be clearly stated in the design
phase to avoid user misuse causing data consistency issues. This is a problem
that must be resolved before merging.
---
### Issue 12: Lack of quantitative data for performance improvements
**Location**:
PR description and documentation
**Issue Description**:
The PR description and documentation claim that the COPY command "is
significantly faster than standard SELECT queries", but lack:
1. **Benchmark data**: No performance comparison test results provided
2. **Quantitative metrics**: No specific multiples or percentages of
performance improvement stated
3. **Applicable scenarios**: No explanation of what data volumes and
scenarios show significant performance improvements
4. **Cost analysis**: No explanation of additional costs of COPY mode (such
as memory usage, CPU usage)
**Potential Risks**:
- **Excessive expectations**: Users may expect unrealistic performance
improvements
- **Misuse risk**: Users may enable COPY in small data volume scenarios,
actually reducing performance
- **Wrong optimization direction**: Lack of data support makes it impossible
to optimize specifically
**Impact Scope**:
- **Direct impact**: User usage decisions
- **Affected area**: PostgreSQL JDBC Source users
**Severity**: **MINOR**
**Improvement Suggestions**:
1. **Provide benchmark results** (optional, but strongly recommended):
```markdown
### Performance Benchmarks
We tested COPY mode against standard SELECT queries on a PostgreSQL 14
instance with different data volumes:
| Rows | Data Size | SELECT (time) | COPY CSV (time) | COPY Binary (time) |
Speedup (CSV) | Speedup (Binary) |
|------|-----------|---------------|-----------------|-------------------|---------------|------------------|
| 10K | 1 MB | 0.5s | 0.3s | 0.2s |
1.7x | 2.5x |
| 100K | 10 MB | 5.2s | 2.1s | 1.4s |
2.5x | 3.7x |
| 1M | 100 MB | 58s | 18s | 12s |
3.2x | 4.8x |
| 10M | 1 GB | 650s | 185s | 120s |
3.5x | 5.4x |
**Test Environment**:
- PostgreSQL 14.5 on Ubuntu 20.04
- 4 vCPU, 16GB RAM
- Network: 1Gbps
- SeaTunnel 2.3.8
**Conclusion**: COPY mode provides significant performance improvements,
especially for large data volumes (>100K rows). Binary format is consistently
faster than CSV format.
```
2. **Add usage recommendations**:
```markdown
### Performance Tuning
**Buffer Size**:
- Default (64KB): Good for most workloads
- Large data (>1GB rows): Increase to 1MB or larger
- Limited memory: Decrease to 32KB
**Binary vs CSV**:
- Binary: Faster, but requires PostgreSQL-specific parsing
- CSV: Slightly slower, but more portable
- Recommendation: Use Binary for production, CSV for debugging
**Partition Strategy**:
- COPY mode works best with parallel partitioning
- Ensure `partition_column` is properly configured for optimal performance
```
3. **Add monitoring metric descriptions**:
```markdown
### Monitoring
When COPY mode is enabled, the following metrics are logged at task
completion:
- `rows_parsed`: Total number of rows read
- `bytes_read`: Total bytes transferred
- `buffer_expansions`: Number of times the buffer was expanded
- `elapsed_ms`: Total time spent reading
- `rows_per_second`: Read throughput
- `bytes_per_second`: Data transfer rate
Example log:
```
PG COPY summary: rows=1000000 rows, bytes=104857600 B, expansions=2 times,
elapsed=15000 ms, rows_per_second=66666 rows/s, bytes_per_second=6990506 B/s
```
```
**Rationale**: Although this is not a technical issue, providing performance
data can help users make informed decisions and also demonstrates the value of
the feature. Recommend adding at least simple performance comparison
explanations before merging.
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]