[ https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842404#comment-17842404 ]
wenli xiao commented on FLINK-35277: ------------------------------------ asncdcaddremove.sql: https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189 > Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. > ----------------------------------------------------------------------------- > > Key: FLINK-35277 > URL: https://issues.apache.org/jira/browse/FLINK-35277 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: 3.1.0 > Environment: Flink 1.18.0 > DB2 11.5.x > > Reporter: wenli xiao > Priority: Minor > Attachments: image-2024-04-30-22-19-17-350.png > > > 1. background > When attempting to use Flink CDC 3.1 in the Flink connector to load data from > DB2 to Apache Doris, I set up DB2 using the Docker image > {{{}ruanhang/db2-cdc-demo:v1{}}}. After configuring the DB2 asynchronous CDC, > I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', > 'MYTABLE'){}}}. However, I encountered an error when attempting to add the > eleventh table: > [23505][-803] One or more values in the INSERT statement, UPDATE statement, > or foreign key update caused by a DELETE statement are not valid because the > primary key, unique constraint or unique index identified by "2" constrains > table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index > key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14 > !image-2024-04-30-22-19-17-350.png! > > 2. > The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a > duplicate primary key. > Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}: > create table IBMSNAP_PRUNCNTL > ( > TARGET_SERVER CHARACTER(18) not null, > TARGET_OWNER VARCHAR(128) not null, > TARGET_TABLE VARCHAR(128) not null, > SYNCHTIME TIMESTAMP(6), > SYNCHPOINT VARCHAR(16) FOR BIT DATA, > SOURCE_OWNER VARCHAR(128) not null, > SOURCE_TABLE VARCHAR(128) not null, > SOURCE_VIEW_QUAL SMALLINT not null, > APPLY_QUAL CHARACTER(18) not null, > SET_NAME CHARACTER(18) not null, > CNTL_SERVER CHARACTER(18) not null, > TARGET_STRUCTURE SMALLINT not null, > CNTL_ALIAS CHARACTER(8), > PHYS_CHANGE_OWNER VARCHAR(128), > PHYS_CHANGE_TABLE VARCHAR(128), > MAP_ID VARCHAR(10) not null > ); > > create unique index IBMSNAP_PRUNCNTLX > on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, > APPLY_QUAL, SET_NAME, TARGET_SERVER, > TARGET_TABLE, TARGET_OWNER); > > create unique index IBMSNAP_PRUNCNTLX1 > on IBMSNAP_PRUNCNTL (MAP_ID); > > create index IBMSNAP_PRUNCNTLX2 > on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE); > > create index IBMSNAP_PRUNCNTLX3 > on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER); > The issue stems from the logic in {{asncdc.addtable}} not aligning with the > {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The > original insert statement is as follows: > – Original insert statement > SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || > 'TARGET_SERVER, ' || > 'TARGET_OWNER, ' || > 'TARGET_TABLE, ' || > 'SYNCHTIME, ' || > 'SYNCHPOINT, ' || > 'SOURCE_OWNER, ' || > 'SOURCE_TABLE, ' || > 'SOURCE_VIEW_QUAL, ' || > 'APPLY_QUAL, ' || > 'SET_NAME, ' || > 'CNTL_SERVER , ' || > 'TARGET_STRUCTURE , ' || > 'CNTL_ALIAS , ' || > 'PHYS_CHANGE_OWNER , ' || > 'PHYS_CHANGE_TABLE , ' || > 'MAP_ID ' || > ') VALUES ( ' || > '''KAFKA'', ' || > '''' || tableschema || ''', ' || > '''' || tablename || ''', ' || > 'NULL, ' || > 'NULL, ' || > '''' || tableschema || ''', ' || > '''' || tablename || ''', ' || > '0, ' || > '''KAFKAQUAL'', ' || > '''SET001'', ' || > ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || > '8, ' || > ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || > '''ASNCDC'', ' || > '''CDC_' || tableschema || '_' || tablename || ''', ' || > ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN > CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) > END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || > ' )'; > EXECUTE IMMEDIATE stmtSQL; > The {{max(MAP_ID)}} logic is incorrect, as the correct result should be > {{{}CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)){}}}. This issue > prevents the addition of the eleventh table. For more details about > {{{}asncdcaddremove.sql{}}}, please refer to: > [asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189] -- This message was sent by Atlassian Jira (v8.20.10#820010)