DanielCarter-stack commented on PR #10416:
URL: https://github.com/apache/seatunnel/pull/10416#issuecomment-3816691874
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10416", "part": 1,
"total": 1} -->
### Issue 1: PR mixes two unrelated changes
**Location**: PR root directory (overall structure issue)
**Related context**:
- Commit 1: e9a27e42c - Fix PostgreSQL CDC
- Commit 2: 65409fdd6 - Support Oracle CLOB
**Issue description**:
This PR mixes two completely unrelated changes:
1. PostgreSQL CDC connector fix (Connector-V2/CDC)
2. Oracle JDBC Sink CLOB support (Connector-V2/JDBC)
Although they both involve fixes/improvements, they:
- Belong to different Connectors
- Solve different problems
- Should have different Issue links
- Should undergo Code Review separately
- Should be tested separately
**Potential risks**:
- If one of the fixes has issues, it may cause the entire PR to be rejected
- Difficult to track which fix resolves which Issue
- Cherry-picking to other branches will introduce unrelated changes
- Violates the "One PR, One Purpose" best practice
**Impact scope**:
- Direct impact: PR review, merge process
- Indirect impact: Version management, issue tracking
- Affected area: Entire project process
**Severity**: **MAJOR**
**Improvement suggestions**:
Split the PR into two independent PRs:
- PR #10416A: Fix critical issue where PostgreSQL CDC tasks cannot start
- PR #10416B: Support Oracle JDBC Sink CLOB parameter passing
**Rationale**:
- Each change has a clear purpose and scope
- Easier to conduct Code Review
- Easier to track issues and test
- Conforms to Apache project best practices
---
### Issue 2: PostgreSQL CDC fix lacks proper InterruptedException handling
**Location**: `PostgresSourceFetchTaskContext.java:210`
**Modified code**:
```java
} catch (SQLException | InterruptedException ex) {
String message = "ReplicationConnection init failed";
throw new DebeziumException(message, ex);
}
```
**Related context**:
- Parent class/interface: `JdbcSourceFetchTaskContext.java`
- Caller: `PostgresSourceFetchTaskContext.configure()` (line 126)
- Debezium method: `PostgresReplicationConnection.initConnection()` declares
throwing `InterruptedException`
**Issue description**:
The new code catches `InterruptedException` but does not follow Java
concurrent programming best practices:
1. **Does not restore interrupt flag**: After catching
`InterruptedException`, should call `Thread.currentThread().interrupt()`
2. **Improper exception handling**: Wrapping a checked exception as a
runtime exception may prevent callers from properly handling interrupts
3. **Loses interrupt semantics**: Upper-level code may rely on interrupts to
control task cancellation
**Potential risks**:
- If a thread is interrupted while waiting or blocked, the interrupt flag
will be cleared, preventing upper-level code from detecting the interrupt
- May prevent tasks from being cancelled normally
- May prevent resources from being released in a timely manner
**Impact scope**:
- Direct impact: `PostgresSourceFetchTaskContext.configure()`
- Indirect impact: Any task cancellation logic that depends on interrupt
mechanism
- Affected area: PostgreSQL CDC Connector
**Severity**: **MAJOR**
**Improvement suggestions**:
```java
try {
// initialize replication connection and create slot if needed
replicationConnection.initConnection();
} catch (SQLException ex) {
String message = "ReplicationConnection init failed";
if (ex.getMessage() != null && ex.getMessage().contains("already
exists")) {
message += "; when setting up multiple connectors for the same
database host, " +
"please make sure to use a distinct replication slot name
for each.";
}
throw new DebeziumException(message, ex);
} catch (InterruptedException ex) {
// Restore the interrupted status
Thread.currentThread().interrupt();
throw new DebeziumException("ReplicationConnection init interrupted",
ex);
}
```
**Rationale**:
1. Restores interrupt flag, following Java concurrent best practices
2. Retains special handling for "already exists" errors
3. Distinguishes between `SQLException` and `InterruptedException`,
providing clearer error messages
---
### Issue 3: PostgreSQL CDC fix loses user-friendly error messages
**Location**: `PostgresSourceFetchTaskContext.java:207-213`
**Modified code**:
```java
try {
// initialize replication connection and create slot if needed
replicationConnection.initConnection();
} catch (SQLException | InterruptedException ex) {
String message = "ReplicationConnection init failed";
throw new DebeziumException(message, ex);
}
```
**Original code**:
```java
try {
replicationConnection.createReplicationSlot().orElse(null);
} catch (SQLException ex) {
String message = "Creation of replication slot failed";
if (ex.getMessage().contains("already exists")) {
message +=
"; when setting up multiple connectors for the same database
host, please make sure to use a distinct replication slot name for each.";
log.warn(message);
} else {
throw new DebeziumException(message, ex);
}
}
```
**Related context**:
- Issue #10322 mentions "replication slot name already exists" error
- Original code specifically handled this error scenario and provided
friendly prompts
**Issue description**:
The modified code **completely removes** special handling for "already
exists" errors, including:
1. User-friendly error messages
2. Warning logs (`log.warn`)
This means:
- When users configure multiple connectors and use the same slot name, they
will **only get a vague "init failed" error**
- The original `log.warn` is deleted, reducing observability
**Potential risks**:
- Users may have difficulty diagnosing configuration errors
- May lead to users asking questions on support forums, increasing support
costs
- Violates the original code designer's intent (specifically handled this
error)
**Impact scope**:
- Direct impact: User experience
- Indirect impact: Support costs
- Affected area: All users using PostgreSQL CDC with multiple connectors
configured
**Severity**: **MAJOR**
**Improvement suggestions**:
```java
try {
// initialize replication connection and create slot if needed
replicationConnection.initConnection();
} catch (SQLException ex) {
String message = "ReplicationConnection init failed";
if (ex.getMessage() != null && ex.getMessage().contains("already
exists")) {
message += "; when setting up multiple connectors for the same
database host, " +
"please make sure to use a distinct replication slot name
for each.";
log.warn(message);
} else {
throw new DebeziumException(message, ex);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new DebeziumException("ReplicationConnection init interrupted",
ex);
}
```
**Rationale**:
1. Retains user-friendly error messages
2. Retains logging
3. Also resolves Issue 2 (interrupt handling)
4. Although using string matching, this is to provide better user experience
---
### Issue 4: PostgreSQL CDC fix lacks detailed code comments
**Location**: `PostgresSourceFetchTaskContext.java:207-213`
**Modified code**:
```java
// initialize replication connection and create slot if needed
replicationConnection.initConnection();
```
**Issue description**:
Comments are too simple and do not explain:
1. Why use `initConnection()` instead of `createReplicationSlot()`
2. Idempotency guarantee of `initConnection()`
3. What problem this modification solves
4. Relationship with Debezium version
**Potential risks**:
- Future maintainers may not understand why `initConnection()` needs to be
called
- Someone may mistakenly think `createReplicationSlot()` is more appropriate
and want to change it back
- Reduced code readability and maintainability
**Impact scope**:
- Direct impact: Code maintainability
- Indirect impact: Potential incorrect modifications in the future
- Affected area: PostgreSQL CDC Connector
**Severity**: **MINOR**
**Improvement suggestions**:
```java
// Initialize the replication connection. This method is preferred over
createReplicationSlot()
// because it handles the case where the replication slot already exists
(e.g., after job
// restart from checkpoint or manual slot creation). The initConnection()
method is idempotent
// and will skip slot creation if the slot already exists, avoiding the
// "replication slot already exists" error.
// See: https://issues.apache.org/jira/browse/SEA-XXX (Issue #10322)
replicationConnection.initConnection();
```
**Rationale**:
1. Clearly explains why `initConnection()` is used
2. Explains idempotency guarantee
3. Links to Issue for easy tracking
4. Improves code maintainability
---
### Issue 5: Oracle CLOB fix lacks test coverage
**Location**:
- `OracleJdbcRowConverter.java:57-64`
- `FieldNamedPreparedStatement.java:363-408`
**Issue description**:
This fix adds support for CLOB/NCLOB types but **has no corresponding test
cases**:
1. No unit test for `OracleJdbcRowConverter` CLOB writes
2. No unit test for `FieldNamedPreparedStatement` CLOB handling
3. No test for multi-index scenarios (named parameters mapped to multiple
positions)
4. No test for large data volume CLOBs
**Potential risks**:
- Code may have undiscovered bugs
- Refactoring may break functionality
- Cannot verify correctness of multi-index scenarios
- Cannot verify performance and correctness of large data volume scenarios
**Impact scope**:
- Direct impact: Code quality assurance
- Indirect impact: Potential production issues
- Affected area: Oracle JDBC Sink users
**Severity**: **MAJOR**
**Improvement suggestions**:
**Unit test 1**: `OracleJdbcRowConverterTest.java`
```java
@Test
public void testSetClobValue() throws SQLException {
// Mock PreparedStatement
PreparedStatement mockStmt = mock(PreparedStatement.class);
// Create converter
OracleJdbcRowConverter converter = new OracleJdbcRowConverter();
// Test CLOB
SeaTunnelDataType<String> clobType =
SeaTunnelDataType.of(SqlType.STRING);
String testValue = "This is a test CLOB value";
converter.setValueToStatementByDataType(
testValue, mockStmt, clobType, 1, OracleTypeConverter.ORACLE_CLOB);
// Verify setClob was called with StringReader
verify(mockStmt).setClob(eq(1), argThat(reader -> reader instanceof
StringReader));
}
@Test
public void testSetNClobValue() throws SQLException {
// Similar test for NCLOB
}
```
**Unit test 2**: `FieldNamedPreparedStatementTest.java`
```java
@Test
public void testSetClobWithMultipleIndexes() throws SQLException,
IOException {
// Mock connection and statement
Connection mockConn = mock(Connection.class);
PreparedStatement mockStmt = mock(PreparedStatement.class);
when(mockConn.prepareStatement(any())).thenReturn(mockStmt);
// Create prepared statement with named parameters
// where :name appears multiple times in the SQL
String sql = "INSERT INTO test (id, name, name_copy) VALUES (:id, :name,
:name)";
String[] fieldNames = {"id", "name"};
FieldNamedPreparedStatement pstmt =
FieldNamedPreparedStatement.prepareStatement(mockConn, sql,
fieldNames);
// Set CLOB value
String clobValue = "Test CLOB data";
pstmt.setClob(2, new StringReader(clobValue));
// Verify that setClob was called for each mapped index
// and that each call received a new StringReader
verify(mockStmt, times(2)).setClob(anyInt(), argThat(reader -> reader
instanceof StringReader));
}
```
**Integration test**: `OracleJdbcSinkIT.java` (new)
```java
@Test
public void testWriteClobData() throws Exception {
// Create table with CLOB column
String createTableSql = "CREATE TABLE test_clob (id INT, clob_col CLOB,
nclob_col NCLOB)";
// ...
// Prepare test data
// ...
// Execute SeaTunnel job
// ...
// Verify data is correctly written
String querySql = "SELECT clob_col, nclob_col FROM test_clob WHERE id =
1";
// ...
}
```
**Rationale**:
1. Unit tests ensure basic logic is correct
2. Integration tests ensure end-to-end process is correct
3. Tests cover edge cases (multi-index, large data volume)
4. Follows test pyramid principles
---
### Issue 6: Oracle CLOB fix uses Chinese comments, not compliant with
project standards
**Location**: `FieldNamedPreparedStatement.java:367-375`
**Modified code**:
```java
/**
* Set CLOB parameter value, supporting both regular CLOB and NCLOB types.
When only one index position needs to be set, use the corresponding setClob or
setNClob method directly
* When multiple index positions need to be set with the same value, to
ensure all streams can correctly read data, the Reader needs to be converted to
a string first, and then a new Reader is created for each index position for
setting
*
* @param parameterIndex Parameter index position (starting from 1)
* @param reader Reader object for reading CLOB data
* @param isNClob Whether it is NCLOB type
* @throws SQLException SQL execution exception
*/
```
**Issue description**:
JavaDoc comments are written in **Chinese**, while Apache SeaTunnel project
coding standards require **English**.
**Potential risks**:
- Does not comply with Apache project internationalization standards
- International contributors cannot understand comments
- May be rejected by CI/CD checks (if code style checking exists)
**Impact scope**:
- Direct impact: Code standard compliance
- Indirect impact: Project internationalization
- Affected area: Oracle JDBC Sink
**Severity**: **MINOR**
**Improvement suggestions**:
```java
/**
* Sets the CLOB parameter value, supporting both regular CLOB and NCLOB
types.
* When there is only one index position to set, the corresponding setClob
or setNClob
* method is called directly. When there are multiple index positions that
need to be
* set with the same value, the Reader must first be converted to a String,
and then
* a new Reader is created for each index position to ensure all streams can
read the data correctly.
*
* @param parameterIndex the parameter index position (starting from 1)
* @param reader the Reader object for reading CLOB data
* @param isNClob whether it is an NCLOB type
* @throws SQLException if a SQL execution exception occurs
*/
```
**Rationale**:
1. Complies with Apache project internationalization standards
2. All contributors can understand
3. Improves code readability
---
### Issue 7: Oracle CLOB fix lacks logging and observability
**Location**:
- `OracleJdbcRowConverter.java:57-64`
- `FieldNamedPreparedStatement.java:363-408`
**Issue description**:
For CLOB data writes, **no logs or metrics have been added**:
1. No logging of CLOB data size
2. No logging of multi-index scenario handling
3. No performance metrics (e.g., CLOB write time)
**Potential risks**:
- Users lack information when debugging CLOB write issues
- Cannot monitor CLOB write performance
- Difficult to diagnose performance issues with large data volume CLOBs
**Impact scope**:
- Direct impact: Observability
- Indirect impact: Issue diagnosis
- Affected area: Oracle JDBC Sink users
**Severity**: **MINOR**
**Improvement suggestions** (optional):
```java
private void setClob(int parameterIndex, Reader reader, boolean isNClob)
throws SQLException {
int[] indexes = indexMapping[parameterIndex - 1];
if (indexes.length == 1) {
if (isNClob) {
statement.setNClob(indexes[0], reader);
} else {
statement.setClob(indexes[0], reader);
}
} else {
try {
String value = IOUtils.toString(reader);
if (log.isDebugEnabled()) {
log.debug("Setting CLOB/NCLOB value for parameter {} across
{} indexes, value length: {}",
parameterIndex, indexes.length, value.length());
}
for (int index : indexes) {
if (isNClob) {
statement.setNClob(index, new StringReader(value));
} else {
statement.setClob(index, new StringReader(value));
}
}
} catch (IOException e) {
throw new SQLException(e.getLocalizedMessage(), e);
}
}
}
```
**Rationale**:
1. Provides debug information to help users diagnose issues
2. Does not affect performance (only logged at DEBUG level)
3. Improves observability
---
### Issue 8: PR description is inaccurate
**Location**: PR description (overall)
**Issue description**:
**1. PR title does not match content**:
- PR title: `[Fix] [Connector-v2] [Postgres-CDC] Error: The replication slot
name "ore_slot_test_20260112" already exists #10322`
- Actual content: Includes PostgreSQL CDC fix **and** Oracle CLOB support
- Issue: Title only mentions PostgreSQL CDC, not Oracle
**2. "Does this PR introduce any user-facing change?" answer is inaccurate**:
- Answer: `No — 仅修复底层连接初始化逻辑,不影响用户配置或输出格式`
- Actual: Oracle CLOB support is a **new feature** that allows users to
write CLOB/NCLOB fields
- Issue: Oracle CLOB support is indeed a user-facing change
**3. "How was this patch tested?" answer is incomplete**:
- Answer: Only PostgreSQL CDC test description
- Missing: Oracle CLOB test description
- Issue: No explanation of how to test Oracle CLOB functionality
**Potential risks**:
- Misleads reviewers
- Hides actual functional changes
- May result in insufficient testing
**Impact scope**:
- Direct impact: PR review
- Indirect impact: Merge decision
- Affected area: Entire PR process
**Severity**: **MAJOR**
**Improvement suggestions**:
**1. Split PR** (recommended):
- PR #10416: `[Fix] [Connector-v2] [Postgres-CDC] Fix replication slot
initialization`
- PR #10417: `[Feature] [Connector-Jdbc] [Oracle] Support CLOB/NCLOB
parameter passing`
**2. If not splitting, update PR description**:
```
### Purpose of this pull request
This PR includes two unrelated changes:
#### Change 1: Fix PostgreSQL CDC replication slot initialization (#10322)
**Problem**: PostgreSQL CDC tasks fail to start when replication slot
already exists.
**Solution**: Replace `createReplicationSlot()` with `initConnection()`,
which handles existing slots gracefully.
#### Change 2: Support CLOB/NCLOB for Oracle JDBC Sink
**Problem**: Oracle JDBC Sink throws UnsupportedOperationException when
writing CLOB/NCLOB fields.
**Solution**: Implement `setClob(Reader)` and `setNClob(Reader)` methods in
`FieldNamedPreparedStatement` and `OracleJdbcRowConverter`.
### Does this PR introduce any user-facing change?
** For PostgreSQL CDC**: No — Only fixes underlying connection
initialization logic, does not affect user configuration or output format
** For Oracle JDBC Sink**: Yes — Users can now use Oracle JDBC Sink to write
CLOB/NCLOB type fields
### How was this patch tested?
**For PostgreSQL CDC**:
- ✅ 手动创建复制槽后启动 Seatunnel,验证任务成功启动(原逻辑 crash)
- ✅ 删除复制槽后启动,验证自动创建成功
**For Oracle JDBC Sink**:
- TODO: 添加测试说明
```
**Rationale**:
1. Accurately describes PR content
2. Avoids misleading reviewers
3. Provides complete test information
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]