[ 
https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35277:
-----------------------------------
    Labels: pull-request-available  (was: )

> Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-35277
>                 URL: https://issues.apache.org/jira/browse/FLINK-35277
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: 3.1.0
>         Environment: Flink 1.18.0 
> Flink CDC 3.1-pre
> DB2 11.5.x
>  
>            Reporter: wenli xiao
>            Priority: Minor
>              Labels: pull-request-available
>         Attachments: image-2024-04-30-22-19-17-350.png
>
>
> 1. background
> When attempting to use Flink CDC 3.1 in the Flink connector to load data from 
> DB2 to Apache Doris, I set up DB2 using the Docker image 
> `{{{}ruanhang/db2-cdc-demo:v1`{}}}. After configuring the DB2 asynchronous 
> CDC, I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', 
> 'MYTABLE'){}}}. However, I encountered an error when attempting to add the 
> eleventh table:
> [23505][-803] One or more values in the INSERT statement, UPDATE statement, 
> or foreign key update caused by a DELETE statement are not valid because the 
> primary key, unique constraint or unique index identified by "2" constrains 
> table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index 
> key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14
> !image-2024-04-30-22-19-17-350.png!
>  
> 2. 
> The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a 
> duplicate primary key.
> Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}:
> create table IBMSNAP_PRUNCNTL
> (
>   TARGET_SERVER     CHARACTER(18) not null,
>   TARGET_OWNER      VARCHAR(128)  not null,
>   TARGET_TABLE      VARCHAR(128)  not null,
>   SYNCHTIME         TIMESTAMP(6),
>   SYNCHPOINT        VARCHAR(16) FOR BIT DATA,
>   SOURCE_OWNER      VARCHAR(128)  not null,
>   SOURCE_TABLE      VARCHAR(128)  not null,
>   SOURCE_VIEW_QUAL  SMALLINT      not null,
>   APPLY_QUAL        CHARACTER(18) not null,
>   SET_NAME          CHARACTER(18) not null,
>   CNTL_SERVER       CHARACTER(18) not null,
>   TARGET_STRUCTURE  SMALLINT      not null,
>   CNTL_ALIAS        CHARACTER(8),
>   PHYS_CHANGE_OWNER VARCHAR(128),
>   PHYS_CHANGE_TABLE VARCHAR(128),
>   MAP_ID            VARCHAR(10)   not null
> );
> ​
> create unique index IBMSNAP_PRUNCNTLX
>    on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, 
> APPLY_QUAL, SET_NAME, TARGET_SERVER,
>                         TARGET_TABLE, TARGET_OWNER);
> ​
> create unique index IBMSNAP_PRUNCNTLX1
>    on IBMSNAP_PRUNCNTL (MAP_ID);
> ​
> create index IBMSNAP_PRUNCNTLX2
>    on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE);
> ​
> create index IBMSNAP_PRUNCNTLX3
>    on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER);
> The issue stems from the logic in {{asncdc.addtable}} not aligning with the 
> {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The 
> original insert statement is as follows:
> – Original insert statement
> SET stmtSQL =   'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 
>                'TARGET_SERVER, ' || 
>                'TARGET_OWNER, ' || 
>                'TARGET_TABLE, ' || 
>                'SYNCHTIME, ' || 
>                'SYNCHPOINT, ' || 
>                'SOURCE_OWNER, ' || 
>                'SOURCE_TABLE, ' || 
>                'SOURCE_VIEW_QUAL, ' || 
>                'APPLY_QUAL, ' || 
>                'SET_NAME, ' || 
>                'CNTL_SERVER , ' || 
>                'TARGET_STRUCTURE , ' || 
>                'CNTL_ALIAS , ' || 
>                'PHYS_CHANGE_OWNER , ' || 
>                'PHYS_CHANGE_TABLE , ' || 
>                'MAP_ID ' || 
>                ') VALUES ( ' || 
>                '''KAFKA'', ' || 
>                '''' || tableschema || ''', ' || 
>                '''' || tablename || ''', ' ||
>                'NULL, ' || 
>                'NULL, ' || 
>                '''' || tableschema || ''', ' || 
>                '''' || tablename || ''', ' ||
>                '0, ' || 
>                '''KAFKAQUAL'', ' || 
>                '''SET001'', ' || 
>                ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
>                '8, ' || 
>                ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
>                '''ASNCDC'', ' || 
>                '''CDC_' || tableschema ||  '_' || tablename || ''', ' ||
>                ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN 
> CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) 
> END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || 
>                '   )';
> EXECUTE IMMEDIATE stmtSQL;
> The {{max(MAP_ID)}} logic is incorrect, as the correct result should be 
> {{{}CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)){}}}. This issue 
> prevents the addition of the eleventh table. For more details about 
> {{{}asncdcaddremove.sql{}}}, please refer to: 
> [asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to