e-mhui opened a new pull request, #7865:
URL: https://github.com/apache/inlong/pull/7865

   ### Prepare a Pull Request
   
   [INLONG-7858][Sort] Fix Oracle CDC fetch two different db name for the same 
table
   
   - Fixes #7858 
   
   ### Motivation
   
   If the database name configured for Oracle CDC in Flink SQL is lowercase, 
such as the following example (`'database-name' = 'xe'`), 
   
   ```sql
   CREATE TABLE products (
        ID INT NOT NULL,
        NAME STRING,
        DESCRIPTION STRING,
        WEIGHT DECIMAL(10, 3),
        PRIMARY KEY(id) NOT ENFORCED
        ) WITH (
        'connector' = 'oracle-cdc',
        'hostname' = 'localhost',
        'port' = '1521',
        'username' = 'flinkuser',
        'password' = 'flinkpw',
        'database-name' = 'xe',
        'schema-name' = 'inventory',
        'table-name' = 'products');
   ```
   
   the following problems will occur:
   1. For the same table (`TB1`),the snapshot phase  will use the lowercase 
database name  (`xe.FLINKUSER.TB1`), and the incremental phase will use the 
uppercase database name (`XE.FLINKUSER.TB1`).
   2. For the same table (`TB1`), after being captured in the snapshot phase, 
it will be captured again in the incremental phase because their database names 
are inconsistent.
   
   In `OracleConnectorConfig`, the database name in Flink SQL is converted to 
uppercase, but the database name used in the snapshot phase is directly taken 
from Flink SQL. Therefore, when constructing the `tableId`, the database name 
in Flink SQL also needs to be converted to uppercase.
   
   <img width="1467" alt="企业微信截图_92720bc3-5148-463a-9b35-12c3c8f77570" 
src="https://user-images.githubusercontent.com/111486498/232319384-94ceef68-a5e6-4238-9506-30a7546184e2.png";>
   
   ### Modifications
   
   When constructing the `tableId`, the database name in Flink SQL also needs 
to be converted to uppercase.
   
   ```java
               jdbcConnection.query(
                       queryTablesSql,
                       rs -> {
                           while (rs.next()) {
                               String schemaName = rs.getString(1);
                               String tableName = rs.getString(2);
                               TableId tableId =
                                       new 
TableId(jdbcConnection.database().toUpperCase(), schemaName, tableName);
                               tableIdSet.add(tableId);
                           }
                       });
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to