DanielCarter-stack commented on PR #10416:
URL: https://github.com/apache/seatunnel/pull/10416#issuecomment-3816691874

   <!-- code-pr-reviewer -->
   <!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10416", "part": 1, 
"total": 1} -->
   ### Issue 1: PR mixes two unrelated changes
   
   **Location**: PR root directory (overall structure issue)
   
   **Related context**:
   - Commit 1: e9a27e42c - Fix PostgreSQL CDC
   - Commit 2: 65409fdd6 - Support Oracle CLOB
   
   **Issue description**:
   This PR mixes two completely unrelated changes:
   1. PostgreSQL CDC connector fix (Connector-V2/CDC)
   2. Oracle JDBC Sink CLOB support (Connector-V2/JDBC)
   
   Although they both involve fixes/improvements, they:
   - Belong to different Connectors
   - Solve different problems
   - Should have different Issue links
   - Should undergo Code Review separately
   - Should be tested separately
   
   **Potential risks**:
   - If one of the fixes has issues, it may cause the entire PR to be rejected
   - Difficult to track which fix resolves which Issue
   - Cherry-picking to other branches will introduce unrelated changes
   - Violates the "One PR, One Purpose" best practice
   
   **Impact scope**:
   - Direct impact: PR review, merge process
   - Indirect impact: Version management, issue tracking
   - Affected area: Entire project process
   
   **Severity**: **MAJOR**
   
   **Improvement suggestions**:
   Split the PR into two independent PRs:
   - PR #10416A: Fix critical issue where PostgreSQL CDC tasks cannot start
   - PR #10416B: Support Oracle JDBC Sink CLOB parameter passing
   
   **Rationale**:
   - Each change has a clear purpose and scope
   - Easier to conduct Code Review
   - Easier to track issues and test
   - Conforms to Apache project best practices
   
   ---
   
   ### Issue 2: PostgreSQL CDC fix lacks proper InterruptedException handling
   
   **Location**: `PostgresSourceFetchTaskContext.java:210`
   
   **Modified code**:
   ```java
   } catch (SQLException | InterruptedException ex) {
       String message = "ReplicationConnection init failed";
       throw new DebeziumException(message, ex);
   }
   ```
   
   **Related context**:
   - Parent class/interface: `JdbcSourceFetchTaskContext.java`
   - Caller: `PostgresSourceFetchTaskContext.configure()` (line 126)
   - Debezium method: `PostgresReplicationConnection.initConnection()` declares 
throwing `InterruptedException`
   
   **Issue description**:
   The new code catches `InterruptedException` but does not follow Java 
concurrent programming best practices:
   1. **Does not restore interrupt flag**: After catching 
`InterruptedException`, should call `Thread.currentThread().interrupt()`
   2. **Improper exception handling**: Wrapping a checked exception as a 
runtime exception may prevent callers from properly handling interrupts
   3. **Loses interrupt semantics**: Upper-level code may rely on interrupts to 
control task cancellation
   
   **Potential risks**:
   - If a thread is interrupted while waiting or blocked, the interrupt flag 
will be cleared, preventing upper-level code from detecting the interrupt
   - May prevent tasks from being cancelled normally
   - May prevent resources from being released in a timely manner
   
   **Impact scope**:
   - Direct impact: `PostgresSourceFetchTaskContext.configure()`
   - Indirect impact: Any task cancellation logic that depends on interrupt 
mechanism
   - Affected area: PostgreSQL CDC Connector
   
   **Severity**: **MAJOR**
   
   **Improvement suggestions**:
   ```java
   try {
       // initialize replication connection and create slot if needed
       replicationConnection.initConnection();
   } catch (SQLException ex) {
       String message = "ReplicationConnection init failed";
       if (ex.getMessage() != null && ex.getMessage().contains("already 
exists")) {
           message += "; when setting up multiple connectors for the same 
database host, " +
                     "please make sure to use a distinct replication slot name 
for each.";
       }
       throw new DebeziumException(message, ex);
   } catch (InterruptedException ex) {
       // Restore the interrupted status
       Thread.currentThread().interrupt();
       throw new DebeziumException("ReplicationConnection init interrupted", 
ex);
   }
   ```
   
   **Rationale**:
   1. Restores interrupt flag, following Java concurrent best practices
   2. Retains special handling for "already exists" errors
   3. Distinguishes between `SQLException` and `InterruptedException`, 
providing clearer error messages
   
   ---
   
   ### Issue 3: PostgreSQL CDC fix loses user-friendly error messages
   
   **Location**: `PostgresSourceFetchTaskContext.java:207-213`
   
   **Modified code**:
   ```java
   try {
       // initialize replication connection and create slot if needed
       replicationConnection.initConnection();
   } catch (SQLException | InterruptedException ex) {
       String message = "ReplicationConnection init failed";
       throw new DebeziumException(message, ex);
   }
   ```
   
   **Original code**:
   ```java
   try {
       replicationConnection.createReplicationSlot().orElse(null);
   } catch (SQLException ex) {
       String message = "Creation of replication slot failed";
       if (ex.getMessage().contains("already exists")) {
           message +=
               "; when setting up multiple connectors for the same database 
host, please make sure to use a distinct replication slot name for each.";
           log.warn(message);
       } else {
           throw new DebeziumException(message, ex);
       }
   }
   ```
   
   **Related context**:
   - Issue #10322 mentions "replication slot name already exists" error
   - Original code specifically handled this error scenario and provided 
friendly prompts
   
   **Issue description**:
   The modified code **completely removes** special handling for "already 
exists" errors, including:
   1. User-friendly error messages
   2. Warning logs (`log.warn`)
   
   This means:
   - When users configure multiple connectors and use the same slot name, they 
will **only get a vague "init failed" error**
   - The original `log.warn` is deleted, reducing observability
   
   **Potential risks**:
   - Users may have difficulty diagnosing configuration errors
   - May lead to users asking questions on support forums, increasing support 
costs
   - Violates the original code designer's intent (specifically handled this 
error)
   
   **Impact scope**:
   - Direct impact: User experience
   - Indirect impact: Support costs
   - Affected area: All users using PostgreSQL CDC with multiple connectors 
configured
   
   **Severity**: **MAJOR**
   
   **Improvement suggestions**:
   ```java
   try {
       // initialize replication connection and create slot if needed
       replicationConnection.initConnection();
   } catch (SQLException ex) {
       String message = "ReplicationConnection init failed";
       if (ex.getMessage() != null && ex.getMessage().contains("already 
exists")) {
           message += "; when setting up multiple connectors for the same 
database host, " +
                     "please make sure to use a distinct replication slot name 
for each.";
           log.warn(message);
       } else {
           throw new DebeziumException(message, ex);
       }
   } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
       throw new DebeziumException("ReplicationConnection init interrupted", 
ex);
   }
   ```
   
   **Rationale**:
   1. Retains user-friendly error messages
   2. Retains logging
   3. Also resolves Issue 2 (interrupt handling)
   4. Although using string matching, this is to provide better user experience
   
   ---
   
   ### Issue 4: PostgreSQL CDC fix lacks detailed code comments
   
   **Location**: `PostgresSourceFetchTaskContext.java:207-213`
   
   **Modified code**:
   ```java
   // initialize replication connection and create slot if needed
   replicationConnection.initConnection();
   ```
   
   **Issue description**:
   Comments are too simple and do not explain:
   1. Why use `initConnection()` instead of `createReplicationSlot()`
   2. Idempotency guarantee of `initConnection()`
   3. What problem this modification solves
   4. Relationship with Debezium version
   
   **Potential risks**:
   - Future maintainers may not understand why `initConnection()` needs to be 
called
   - Someone may mistakenly think `createReplicationSlot()` is more appropriate 
and want to change it back
   - Reduced code readability and maintainability
   
   **Impact scope**:
   - Direct impact: Code maintainability
   - Indirect impact: Potential incorrect modifications in the future
   - Affected area: PostgreSQL CDC Connector
   
   **Severity**: **MINOR**
   
   **Improvement suggestions**:
   ```java
   // Initialize the replication connection. This method is preferred over 
createReplicationSlot()
   // because it handles the case where the replication slot already exists 
(e.g., after job
   // restart from checkpoint or manual slot creation). The initConnection() 
method is idempotent
   // and will skip slot creation if the slot already exists, avoiding the
   // "replication slot already exists" error.
   // See: https://issues.apache.org/jira/browse/SEA-XXX (Issue #10322)
   replicationConnection.initConnection();
   ```
   
   **Rationale**:
   1. Clearly explains why `initConnection()` is used
   2. Explains idempotency guarantee
   3. Links to Issue for easy tracking
   4. Improves code maintainability
   
   ---
   
   ### Issue 5: Oracle CLOB fix lacks test coverage
   
   **Location**:
   - `OracleJdbcRowConverter.java:57-64`
   - `FieldNamedPreparedStatement.java:363-408`
   
   **Issue description**:
   This fix adds support for CLOB/NCLOB types but **has no corresponding test 
cases**:
   1. No unit test for `OracleJdbcRowConverter` CLOB writes
   2. No unit test for `FieldNamedPreparedStatement` CLOB handling
   3. No test for multi-index scenarios (named parameters mapped to multiple 
positions)
   4. No test for large data volume CLOBs
   
   **Potential risks**:
   - Code may have undiscovered bugs
   - Refactoring may break functionality
   - Cannot verify correctness of multi-index scenarios
   - Cannot verify performance and correctness of large data volume scenarios
   
   **Impact scope**:
   - Direct impact: Code quality assurance
   - Indirect impact: Potential production issues
   - Affected area: Oracle JDBC Sink users
   
   **Severity**: **MAJOR**
   
   **Improvement suggestions**:
   
   **Unit test 1**: `OracleJdbcRowConverterTest.java`
   ```java
   @Test
   public void testSetClobValue() throws SQLException {
       // Mock PreparedStatement
       PreparedStatement mockStmt = mock(PreparedStatement.class);
       
       // Create converter
       OracleJdbcRowConverter converter = new OracleJdbcRowConverter();
       
       // Test CLOB
       SeaTunnelDataType<String> clobType = 
SeaTunnelDataType.of(SqlType.STRING);
       String testValue = "This is a test CLOB value";
       
       converter.setValueToStatementByDataType(
           testValue, mockStmt, clobType, 1, OracleTypeConverter.ORACLE_CLOB);
       
       // Verify setClob was called with StringReader
       verify(mockStmt).setClob(eq(1), argThat(reader -> reader instanceof 
StringReader));
   }
   
   @Test
   public void testSetNClobValue() throws SQLException {
       // Similar test for NCLOB
   }
   ```
   
   **Unit test 2**: `FieldNamedPreparedStatementTest.java`
   ```java
   @Test
   public void testSetClobWithMultipleIndexes() throws SQLException, 
IOException {
       // Mock connection and statement
       Connection mockConn = mock(Connection.class);
       PreparedStatement mockStmt = mock(PreparedStatement.class);
       when(mockConn.prepareStatement(any())).thenReturn(mockStmt);
       
       // Create prepared statement with named parameters
       // where :name appears multiple times in the SQL
       String sql = "INSERT INTO test (id, name, name_copy) VALUES (:id, :name, 
:name)";
       String[] fieldNames = {"id", "name"};
       
       FieldNamedPreparedStatement pstmt = 
           FieldNamedPreparedStatement.prepareStatement(mockConn, sql, 
fieldNames);
       
       // Set CLOB value
       String clobValue = "Test CLOB data";
       pstmt.setClob(2, new StringReader(clobValue));
       
       // Verify that setClob was called for each mapped index
       // and that each call received a new StringReader
       verify(mockStmt, times(2)).setClob(anyInt(), argThat(reader -> reader 
instanceof StringReader));
   }
   ```
   
   **Integration test**: `OracleJdbcSinkIT.java` (new)
   ```java
   @Test
   public void testWriteClobData() throws Exception {
       // Create table with CLOB column
       String createTableSql = "CREATE TABLE test_clob (id INT, clob_col CLOB, 
nclob_col NCLOB)";
       // ...
       
       // Prepare test data
       // ...
       
       // Execute SeaTunnel job
       // ...
       
       // Verify data is correctly written
       String querySql = "SELECT clob_col, nclob_col FROM test_clob WHERE id = 
1";
       // ...
   }
   ```
   
   **Rationale**:
   1. Unit tests ensure basic logic is correct
   2. Integration tests ensure end-to-end process is correct
   3. Tests cover edge cases (multi-index, large data volume)
   4. Follows test pyramid principles
   
   ---
   
   ### Issue 6: Oracle CLOB fix uses Chinese comments, not compliant with 
project standards
   
   **Location**: `FieldNamedPreparedStatement.java:367-375`
   
   **Modified code**:
   ```java
   /**
    * Set CLOB parameter value, supporting both regular CLOB and NCLOB types. 
When only one index position needs to be set, use the corresponding setClob or 
setNClob method directly
    * When multiple index positions need to be set with the same value, to 
ensure all streams can correctly read data, the Reader needs to be converted to 
a string first, and then a new Reader is created for each index position for 
setting
    *
    * @param parameterIndex Parameter index position (starting from 1)
    * @param reader Reader object for reading CLOB data
    * @param isNClob Whether it is NCLOB type
    * @throws SQLException SQL execution exception
    */
   ```
   
   **Issue description**:
   JavaDoc comments are written in **Chinese**, while Apache SeaTunnel project 
coding standards require **English**.
   
   **Potential risks**:
   - Does not comply with Apache project internationalization standards
   - International contributors cannot understand comments
   - May be rejected by CI/CD checks (if code style checking exists)
   
   **Impact scope**:
   - Direct impact: Code standard compliance
   - Indirect impact: Project internationalization
   - Affected area: Oracle JDBC Sink
   
   **Severity**: **MINOR**
   
   **Improvement suggestions**:
   ```java
   /**
    * Sets the CLOB parameter value, supporting both regular CLOB and NCLOB 
types.
    * When there is only one index position to set, the corresponding setClob 
or setNClob
    * method is called directly. When there are multiple index positions that 
need to be
    * set with the same value, the Reader must first be converted to a String, 
and then
    * a new Reader is created for each index position to ensure all streams can 
read the data correctly.
    *
    * @param parameterIndex the parameter index position (starting from 1)
    * @param reader the Reader object for reading CLOB data
    * @param isNClob whether it is an NCLOB type
    * @throws SQLException if a SQL execution exception occurs
    */
   ```
   
   **Rationale**:
   1. Complies with Apache project internationalization standards
   2. All contributors can understand
   3. Improves code readability
   
   ---
   
   ### Issue 7: Oracle CLOB fix lacks logging and observability
   
   **Location**:
   - `OracleJdbcRowConverter.java:57-64`
   - `FieldNamedPreparedStatement.java:363-408`
   
   **Issue description**:
   For CLOB data writes, **no logs or metrics have been added**:
   1. No logging of CLOB data size
   2. No logging of multi-index scenario handling
   3. No performance metrics (e.g., CLOB write time)
   
   **Potential risks**:
   - Users lack information when debugging CLOB write issues
   - Cannot monitor CLOB write performance
   - Difficult to diagnose performance issues with large data volume CLOBs
   
   **Impact scope**:
   - Direct impact: Observability
   - Indirect impact: Issue diagnosis
   - Affected area: Oracle JDBC Sink users
   
   **Severity**: **MINOR**
   
   **Improvement suggestions** (optional):
   ```java
   private void setClob(int parameterIndex, Reader reader, boolean isNClob) 
throws SQLException {
       int[] indexes = indexMapping[parameterIndex - 1];
       if (indexes.length == 1) {
           if (isNClob) {
               statement.setNClob(indexes[0], reader);
           } else {
               statement.setClob(indexes[0], reader);
           }
       } else {
           try {
               String value = IOUtils.toString(reader);
               if (log.isDebugEnabled()) {
                   log.debug("Setting CLOB/NCLOB value for parameter {} across 
{} indexes, value length: {}",
                       parameterIndex, indexes.length, value.length());
               }
               for (int index : indexes) {
                   if (isNClob) {
                       statement.setNClob(index, new StringReader(value));
                   } else {
                       statement.setClob(index, new StringReader(value));
                   }
               }
           } catch (IOException e) {
               throw new SQLException(e.getLocalizedMessage(), e);
           }
       }
   }
   ```
   
   **Rationale**:
   1. Provides debug information to help users diagnose issues
   2. Does not affect performance (only logged at DEBUG level)
   3. Improves observability
   
   ---
   
   ### Issue 8: PR description is inaccurate
   
   **Location**: PR description (overall)
   
   **Issue description**:
   
   **1. PR title does not match content**:
   - PR title: `[Fix] [Connector-v2] [Postgres-CDC] Error: The replication slot 
name "ore_slot_test_20260112" already exists #10322`
   - Actual content: Includes PostgreSQL CDC fix **and** Oracle CLOB support
   - Issue: Title only mentions PostgreSQL CDC, not Oracle
   
   **2. "Does this PR introduce any user-facing change?" answer is inaccurate**:
   - Answer: `No — 仅修复底层连接初始化逻辑,不影响用户配置或输出格式`
   - Actual: Oracle CLOB support is a **new feature** that allows users to 
write CLOB/NCLOB fields
   - Issue: Oracle CLOB support is indeed a user-facing change
   
   **3. "How was this patch tested?" answer is incomplete**:
   - Answer: Only PostgreSQL CDC test description
   - Missing: Oracle CLOB test description
   - Issue: No explanation of how to test Oracle CLOB functionality
   
   **Potential risks**:
   - Misleads reviewers
   - Hides actual functional changes
   - May result in insufficient testing
   
   **Impact scope**:
   - Direct impact: PR review
   - Indirect impact: Merge decision
   - Affected area: Entire PR process
   
   **Severity**: **MAJOR**
   
   **Improvement suggestions**:
   
   **1. Split PR** (recommended):
   - PR #10416: `[Fix] [Connector-v2] [Postgres-CDC] Fix replication slot 
initialization`
   - PR #10417: `[Feature] [Connector-Jdbc] [Oracle] Support CLOB/NCLOB 
parameter passing`
   
   **2. If not splitting, update PR description**:
   ```
   ### Purpose of this pull request
   
   This PR includes two unrelated changes:
   
   #### Change 1: Fix PostgreSQL CDC replication slot initialization (#10322)
   **Problem**: PostgreSQL CDC tasks fail to start when replication slot 
already exists.
   
   **Solution**: Replace `createReplicationSlot()` with `initConnection()`, 
which handles existing slots gracefully.
   
   #### Change 2: Support CLOB/NCLOB for Oracle JDBC Sink
   **Problem**: Oracle JDBC Sink throws UnsupportedOperationException when 
writing CLOB/NCLOB fields.
   
   **Solution**: Implement `setClob(Reader)` and `setNClob(Reader)` methods in 
`FieldNamedPreparedStatement` and `OracleJdbcRowConverter`.
   
   ### Does this PR introduce any user-facing change?
   ** For PostgreSQL CDC**: No — Only fixes underlying connection 
initialization logic, does not affect user configuration or output format
   
   ** For Oracle JDBC Sink**: Yes — Users can now use Oracle JDBC Sink to write 
CLOB/NCLOB type fields
   
   ### How was this patch tested?
   **For PostgreSQL CDC**:
   - ✅ 手动创建复制槽后启动 Seatunnel,验证任务成功启动(原逻辑 crash)
   - ✅ 删除复制槽后启动,验证自动创建成功
   
   **For Oracle JDBC Sink**:
   - TODO: 添加测试说明
   ```
   
   **Rationale**:
   1. Accurately describes PR content
   2. Avoids misleading reviewers
   3. Provides complete test information


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to