[ 
https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wenli xiao updated FLINK-35277:
-------------------------------
    Description: 
1. background

When attempting to use Flink CDC 3.1 in the Flink connector to load data from 
DB2 to Apache Doris, I set up DB2 using the Docker image 
{{{}ruanhang/db2-cdc-demo:v1{}}}. After configuring the DB2 asynchronous CDC, I 
tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', 
'MYTABLE'){}}}. However, I encountered an error when attempting to add the 
eleventh table:
[23505][-803] One or more values in the INSERT statement, UPDATE statement, or 
foreign key update caused by a DELETE statement are not valid because the 
primary key, unique constraint or unique index identified by "2" constrains 
table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index 
key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14
!image-2024-04-30-22-19-17-350.png!

 

2. 

The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a duplicate 
primary key.

Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}:
create table IBMSNAP_PRUNCNTL
(
  TARGET_SERVER     CHARACTER(18) not null,
  TARGET_OWNER      VARCHAR(128)  not null,
  TARGET_TABLE      VARCHAR(128)  not null,
  SYNCHTIME         TIMESTAMP(6),
  SYNCHPOINT        VARCHAR(16) FOR BIT DATA,
  SOURCE_OWNER      VARCHAR(128)  not null,
  SOURCE_TABLE      VARCHAR(128)  not null,
  SOURCE_VIEW_QUAL  SMALLINT      not null,
  APPLY_QUAL        CHARACTER(18) not null,
  SET_NAME          CHARACTER(18) not null,
  CNTL_SERVER       CHARACTER(18) not null,
  TARGET_STRUCTURE  SMALLINT      not null,
  CNTL_ALIAS        CHARACTER(8),
  PHYS_CHANGE_OWNER VARCHAR(128),
  PHYS_CHANGE_TABLE VARCHAR(128),
  MAP_ID            VARCHAR(10)   not null
);
​
create unique index IBMSNAP_PRUNCNTLX
   on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, 
APPLY_QUAL, SET_NAME, TARGET_SERVER,
                        TARGET_TABLE, TARGET_OWNER);
​
create unique index IBMSNAP_PRUNCNTLX1
   on IBMSNAP_PRUNCNTL (MAP_ID);
​
create index IBMSNAP_PRUNCNTLX2
   on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE);
​
create index IBMSNAP_PRUNCNTLX3
   on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER);
The issue stems from the logic in {{asncdc.addtable}} not aligning with the 
{{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The 
original insert statement is as follows:
– Original insert statement
SET stmtSQL =   'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 
               'TARGET_SERVER, ' || 
               'TARGET_OWNER, ' || 
               'TARGET_TABLE, ' || 
               'SYNCHTIME, ' || 
               'SYNCHPOINT, ' || 
               'SOURCE_OWNER, ' || 
               'SOURCE_TABLE, ' || 
               'SOURCE_VIEW_QUAL, ' || 
               'APPLY_QUAL, ' || 
               'SET_NAME, ' || 
               'CNTL_SERVER , ' || 
               'TARGET_STRUCTURE , ' || 
               'CNTL_ALIAS , ' || 
               'PHYS_CHANGE_OWNER , ' || 
               'PHYS_CHANGE_TABLE , ' || 
               'MAP_ID ' || 
               ') VALUES ( ' || 
               '''KAFKA'', ' || 
               '''' || tableschema || ''', ' || 
               '''' || tablename || ''', ' ||
               'NULL, ' || 
               'NULL, ' || 
               '''' || tableschema || ''', ' || 
               '''' || tablename || ''', ' ||
               '0, ' || 
               '''KAFKAQUAL'', ' || 
               '''SET001'', ' || 
               ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
               '8, ' || 
               ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
               '''ASNCDC'', ' || 
               '''CDC_' || tableschema ||  '_' || tablename || ''', ' ||
               ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN 
CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) 
END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || 
               '   )';
EXECUTE IMMEDIATE stmtSQL;
The {{max(MAP_ID)}} logic is incorrect, as the correct result should be 
{{{}CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)){}}}. This issue prevents 
the addition of the eleventh table. For more details about 
{{{}asncdcaddremove.sql{}}}, please refer to: 
[asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189]

  was:
1. background

When attempting to use Flink CDC 3.1 in the Flink connector to load data from 
DB2 to Apache Doris, I set up DB2 using the Docker image 
{{ruanhang/db2-cdc-demo:v1}}. After configuring the DB2 asynchronous CDC, I 
tried to capture a table using {{CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE')}}. 
However, I encountered an error when attempting to add the eleventh table:
[23505][-803] One or more values in the INSERT statement, UPDATE statement, or 
foreign key update caused by a DELETE statement are not valid because the 
primary key, unique constraint or unique index identified by "2" constrains 
table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index 
key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14
!image-2024-04-30-22-19-17-350.png!

 

2. 

The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a duplicate 
primary key.

Here is the schema of {{Asncdc.IBMSNAP_PRUNCNTL}}:
create table IBMSNAP_PRUNCNTL
(
    TARGET_SERVER     CHARACTER(18) not null,
    TARGET_OWNER      VARCHAR(128)  not null,
    TARGET_TABLE      VARCHAR(128)  not null,
    SYNCHTIME         TIMESTAMP(6),
    SYNCHPOINT        VARCHAR(16) FOR BIT DATA,
    SOURCE_OWNER      VARCHAR(128)  not null,
    SOURCE_TABLE      VARCHAR(128)  not null,
    SOURCE_VIEW_QUAL  SMALLINT      not null,
    APPLY_QUAL        CHARACTER(18) not null,
    SET_NAME          CHARACTER(18) not null,
    CNTL_SERVER       CHARACTER(18) not null,
    TARGET_STRUCTURE  SMALLINT      not null,
    CNTL_ALIAS        CHARACTER(8),
    PHYS_CHANGE_OWNER VARCHAR(128),
    PHYS_CHANGE_TABLE VARCHAR(128),
    MAP_ID            VARCHAR(10)   not null
);
​
create unique index IBMSNAP_PRUNCNTLX
    on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, 
APPLY_QUAL, SET_NAME, TARGET_SERVER,
                         TARGET_TABLE, TARGET_OWNER);
​
create unique index IBMSNAP_PRUNCNTLX1
    on IBMSNAP_PRUNCNTL (MAP_ID);
​
create index IBMSNAP_PRUNCNTLX2
    on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE);
​
create index IBMSNAP_PRUNCNTLX3
    on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER);
The issue stems from the logic in {{asncdc.addtable}} not aligning with the 
{{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The 
original insert statement is as follows:
-- Original insert statement
SET stmtSQL =   'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 
                'TARGET_SERVER,  ' || 
                'TARGET_OWNER,  ' || 
                'TARGET_TABLE,  ' || 
                'SYNCHTIME,  ' || 
                'SYNCHPOINT,  ' || 
                'SOURCE_OWNER,  ' || 
                'SOURCE_TABLE,  ' || 
                'SOURCE_VIEW_QUAL,  ' || 
                'APPLY_QUAL,  ' || 
                'SET_NAME,  ' || 
                'CNTL_SERVER ,  ' || 
                'TARGET_STRUCTURE ,  ' || 
                'CNTL_ALIAS ,  ' || 
                'PHYS_CHANGE_OWNER ,  ' || 
                'PHYS_CHANGE_TABLE ,  ' || 
                'MAP_ID  ' || 
                ') VALUES ( ' || 
                '''KAFKA'', ' || 
                '''' || tableschema || ''', ' || 
                '''' || tablename || ''', ' ||
                'NULL, ' || 
                'NULL, ' || 
                '''' || tableschema || ''', ' || 
                '''' || tablename || ''', ' ||
                '0, ' || 
                '''KAFKAQUAL'', ' || 
                '''SET001'', ' || 
                ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
                '8, ' || 
                ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
                '''ASNCDC'', ' || 
                '''CDC_' ||  tableschema ||  '_' || tablename || ''', ' ||
                ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN 
CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10))  
END AS MYINT from  ASNCDC.IBMSNAP_PRUNCNTL ) ' || 
                '    )';
EXECUTE IMMEDIATE stmtSQL;
The {{max(MAP_ID)}} logic is incorrect, as the correct result should be 
{{CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10))}}. This issue prevents the 
addition of the eleventh table. For more details about {{asncdcaddremove.sql}}, 
please refer to: 
[asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189].


> Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-35277
>                 URL: https://issues.apache.org/jira/browse/FLINK-35277
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: 3.1.0
>         Environment: Flink 1.18.0 
> DB2 11.5.x
>  
>            Reporter: wenli xiao
>            Priority: Minor
>         Attachments: image-2024-04-30-22-19-17-350.png
>
>
> 1. background
> When attempting to use Flink CDC 3.1 in the Flink connector to load data from 
> DB2 to Apache Doris, I set up DB2 using the Docker image 
> {{{}ruanhang/db2-cdc-demo:v1{}}}. After configuring the DB2 asynchronous CDC, 
> I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', 
> 'MYTABLE'){}}}. However, I encountered an error when attempting to add the 
> eleventh table:
> [23505][-803] One or more values in the INSERT statement, UPDATE statement, 
> or foreign key update caused by a DELETE statement are not valid because the 
> primary key, unique constraint or unique index identified by "2" constrains 
> table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index 
> key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14
> !image-2024-04-30-22-19-17-350.png!
>  
> 2. 
> The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a 
> duplicate primary key.
> Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}:
> create table IBMSNAP_PRUNCNTL
> (
>   TARGET_SERVER     CHARACTER(18) not null,
>   TARGET_OWNER      VARCHAR(128)  not null,
>   TARGET_TABLE      VARCHAR(128)  not null,
>   SYNCHTIME         TIMESTAMP(6),
>   SYNCHPOINT        VARCHAR(16) FOR BIT DATA,
>   SOURCE_OWNER      VARCHAR(128)  not null,
>   SOURCE_TABLE      VARCHAR(128)  not null,
>   SOURCE_VIEW_QUAL  SMALLINT      not null,
>   APPLY_QUAL        CHARACTER(18) not null,
>   SET_NAME          CHARACTER(18) not null,
>   CNTL_SERVER       CHARACTER(18) not null,
>   TARGET_STRUCTURE  SMALLINT      not null,
>   CNTL_ALIAS        CHARACTER(8),
>   PHYS_CHANGE_OWNER VARCHAR(128),
>   PHYS_CHANGE_TABLE VARCHAR(128),
>   MAP_ID            VARCHAR(10)   not null
> );
> ​
> create unique index IBMSNAP_PRUNCNTLX
>    on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, 
> APPLY_QUAL, SET_NAME, TARGET_SERVER,
>                         TARGET_TABLE, TARGET_OWNER);
> ​
> create unique index IBMSNAP_PRUNCNTLX1
>    on IBMSNAP_PRUNCNTL (MAP_ID);
> ​
> create index IBMSNAP_PRUNCNTLX2
>    on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE);
> ​
> create index IBMSNAP_PRUNCNTLX3
>    on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER);
> The issue stems from the logic in {{asncdc.addtable}} not aligning with the 
> {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The 
> original insert statement is as follows:
> – Original insert statement
> SET stmtSQL =   'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 
>                'TARGET_SERVER, ' || 
>                'TARGET_OWNER, ' || 
>                'TARGET_TABLE, ' || 
>                'SYNCHTIME, ' || 
>                'SYNCHPOINT, ' || 
>                'SOURCE_OWNER, ' || 
>                'SOURCE_TABLE, ' || 
>                'SOURCE_VIEW_QUAL, ' || 
>                'APPLY_QUAL, ' || 
>                'SET_NAME, ' || 
>                'CNTL_SERVER , ' || 
>                'TARGET_STRUCTURE , ' || 
>                'CNTL_ALIAS , ' || 
>                'PHYS_CHANGE_OWNER , ' || 
>                'PHYS_CHANGE_TABLE , ' || 
>                'MAP_ID ' || 
>                ') VALUES ( ' || 
>                '''KAFKA'', ' || 
>                '''' || tableschema || ''', ' || 
>                '''' || tablename || ''', ' ||
>                'NULL, ' || 
>                'NULL, ' || 
>                '''' || tableschema || ''', ' || 
>                '''' || tablename || ''', ' ||
>                '0, ' || 
>                '''KAFKAQUAL'', ' || 
>                '''SET001'', ' || 
>                ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
>                '8, ' || 
>                ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || 
>                '''ASNCDC'', ' || 
>                '''CDC_' || tableschema ||  '_' || tablename || ''', ' ||
>                ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN 
> CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) 
> END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || 
>                '   )';
> EXECUTE IMMEDIATE stmtSQL;
> The {{max(MAP_ID)}} logic is incorrect, as the correct result should be 
> {{{}CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)){}}}. This issue 
> prevents the addition of the eleventh table. For more details about 
> {{{}asncdcaddremove.sql{}}}, please refer to: 
> [asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to