[
https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
wenli xiao updated FLINK-35277:
-------------------------------
Environment:
Flink 1.18.0
Flink CDC 3.1-pre
DB2 11.5.x
was:
Flink 1.18.0
DB2 11.5.x
> Fix the error in the `asncdcaddremove.sql` script for the DB2 test container.
> -----------------------------------------------------------------------------
>
> Key: FLINK-35277
> URL: https://issues.apache.org/jira/browse/FLINK-35277
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: 3.1.0
> Environment: Flink 1.18.0
> Flink CDC 3.1-pre
> DB2 11.5.x
>
> Reporter: wenli xiao
> Priority: Minor
> Attachments: image-2024-04-30-22-19-17-350.png
>
>
> 1. background
> When attempting to use Flink CDC 3.1 in the Flink connector to load data from
> DB2 to Apache Doris, I set up DB2 using the Docker image
> {{{}ruanhang/db2-cdc-demo:v1{}}}. After configuring the DB2 asynchronous CDC,
> I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA',
> 'MYTABLE'){}}}. However, I encountered an error when attempting to add the
> eleventh table:
> [23505][-803] One or more values in the INSERT statement, UPDATE statement,
> or foreign key update caused by a DELETE statement are not valid because the
> primary key, unique constraint or unique index identified by "2" constrains
> table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index
> key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14
> !image-2024-04-30-22-19-17-350.png!
>
> 2.
> The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a
> duplicate primary key.
> Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}:
> create table IBMSNAP_PRUNCNTL
> (
> TARGET_SERVER CHARACTER(18) not null,
> TARGET_OWNER VARCHAR(128) not null,
> TARGET_TABLE VARCHAR(128) not null,
> SYNCHTIME TIMESTAMP(6),
> SYNCHPOINT VARCHAR(16) FOR BIT DATA,
> SOURCE_OWNER VARCHAR(128) not null,
> SOURCE_TABLE VARCHAR(128) not null,
> SOURCE_VIEW_QUAL SMALLINT not null,
> APPLY_QUAL CHARACTER(18) not null,
> SET_NAME CHARACTER(18) not null,
> CNTL_SERVER CHARACTER(18) not null,
> TARGET_STRUCTURE SMALLINT not null,
> CNTL_ALIAS CHARACTER(8),
> PHYS_CHANGE_OWNER VARCHAR(128),
> PHYS_CHANGE_TABLE VARCHAR(128),
> MAP_ID VARCHAR(10) not null
> );
>
> create unique index IBMSNAP_PRUNCNTLX
> on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL,
> APPLY_QUAL, SET_NAME, TARGET_SERVER,
> TARGET_TABLE, TARGET_OWNER);
>
> create unique index IBMSNAP_PRUNCNTLX1
> on IBMSNAP_PRUNCNTL (MAP_ID);
>
> create index IBMSNAP_PRUNCNTLX2
> on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE);
>
> create index IBMSNAP_PRUNCNTLX3
> on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER);
> The issue stems from the logic in {{asncdc.addtable}} not aligning with the
> {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The
> original insert statement is as follows:
> – Original insert statement
> SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' ||
> 'TARGET_SERVER, ' ||
> 'TARGET_OWNER, ' ||
> 'TARGET_TABLE, ' ||
> 'SYNCHTIME, ' ||
> 'SYNCHPOINT, ' ||
> 'SOURCE_OWNER, ' ||
> 'SOURCE_TABLE, ' ||
> 'SOURCE_VIEW_QUAL, ' ||
> 'APPLY_QUAL, ' ||
> 'SET_NAME, ' ||
> 'CNTL_SERVER , ' ||
> 'TARGET_STRUCTURE , ' ||
> 'CNTL_ALIAS , ' ||
> 'PHYS_CHANGE_OWNER , ' ||
> 'PHYS_CHANGE_TABLE , ' ||
> 'MAP_ID ' ||
> ') VALUES ( ' ||
> '''KAFKA'', ' ||
> '''' || tableschema || ''', ' ||
> '''' || tablename || ''', ' ||
> 'NULL, ' ||
> 'NULL, ' ||
> '''' || tableschema || ''', ' ||
> '''' || tablename || ''', ' ||
> '0, ' ||
> '''KAFKAQUAL'', ' ||
> '''SET001'', ' ||
> ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' ||
> '8, ' ||
> ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' ||
> '''ASNCDC'', ' ||
> '''CDC_' || tableschema || '_' || tablename || ''', ' ||
> ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN
> CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10))
> END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' ||
> ' )';
> EXECUTE IMMEDIATE stmtSQL;
> The {{max(MAP_ID)}} logic is incorrect, as the correct result should be
> {{{}CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)){}}}. This issue
> prevents the addition of the eleventh table. For more details about
> {{{}asncdcaddremove.sql{}}}, please refer to:
> [asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)