[ 
https://issues.apache.org/jira/browse/FLINK-39832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhouqing updated FLINK-39832:
-----------------------------
    Description: 
  When an Oracle source table has a column defined as NUMBER(p, 0) with p >= 19 
(e.g. NUMBER(19,0) as a primary key), the flink-cdc pipeline connector 
incorrectly maps it to BIGINT. However, Debezium internally encodes values with 
precision >= 19 as DECIMAL (variable-length BYTES). This mismatch causes the 
sink to read a BinaryRecordData pointer as a long value, producing a constant 
garbage PK (e.g. 171798691841 = 0x28_00000001) for every row. With upsert 
sinks, all rows collapse into one.

  Affected Version: 3.6.0 (both -1.20 and -2.2 variants)

  Root Cause

  Two code paths disagree on the type for NUMBER(p, 0) when p >= 19:

  - OracleTypeUtils.fromDbzColumn (builds CDC schema): maps NUMBER(p, 0) for 
any p to BIGINT
  - DebeziumSchemaDataTypeInference.inferBytes (runtime): maps NUMBER(p, 0) 
with p >= 19 to DECIMAL(p, 0)

  The CreateTableEvent declares the column as BIGINT, but BinaryRecordData 
stores the value in DECIMAL non-compact layout (8-byte pointer: length << 32 | 
offset). When the sink calls getLong(idx), it reads the pointer — same value 
for every row — so PK-based upsert folds all rows into one.

  Reproduction

  1. Create an Oracle table:

  CREATE TABLE FLINKUSER.FLINK_USER (
    ID     NUMBER(19,0) PRIMARY KEY,
    NAME   VARCHAR2(64),
    AGE    NUMBER(10,0),
    GENDER VARCHAR2(2)
  );
  INSERT INTO FLINKUSER.FLINK_USER VALUES (1, 'zhangsan', 18, 'M');
  INSERT INTO FLINKUSER.FLINK_USER VALUES (2, 'lisi',     19, 'F');
  INSERT INTO FLINKUSER.FLINK_USER VALUES (3, 'wangwu',   20, 'M');
  COMMIT;

  2. Run a flink-cdc pipeline with Oracle source and any upsert-capable sink 
(e.g. MaxCompute, Paimon).

  3. Observe the sink table: only 1 row exists. All IDs became 171798691841.

  NUMBER(10,0) and NUMBER(18,0) work correctly. The bug is specific to p >= 19.

  Proposed Fix

  In OracleTypeUtils.fromDbzColumn, align with Debezium's boundary: return 
BIGINT only when precision is between 1 and 18; for p >= 19 (or unknown 
precision), return DECIMAL(p, 0).

  Test Results (with patch applied)

  - NUMBER(19,0) snapshot 3 rows (ID 1/2/3): Before = all IDs 171798691841, 1 
row; After = IDs correct, 3 rows
  - INSERT ID = 9000000000000000001: Before = also becomes 171798691841; After 
= preserved correctly
  - UPDATE / DELETE: Before = rows already collapsed, meaningless; After = 
works correctly

  was:
  When an Oracle source table has a column defined as NUMBER(p, 0) with p >= 19 
(e.g. NUMBER(19,0) as a primary key), the flink-cdc pipeline connector
   incorrectly maps it to BIGINT. However, Debezium internally encodes values 
with precision >= 19 as DECIMAL (variable-length BYTES). This mismatch
  causes the sink to read a BinaryRecordData pointer as a long value, producing 
a constant garbage PK (e.g. 171798691841 = 0x28_00000001) for every
  row. With upsert sinks, all rows collapse into one.

  Affected Version: 3.6.0 (both -1.20 and -2.2 variants)

  Root Cause

  Two code paths disagree on the type for NUMBER(p, 0) when p >= 19:

  - OracleTypeUtils.fromDbzColumn (builds CDC schema): maps NUMBER(p, 0) for 
any p to BIGINT
  - DebeziumSchemaDataTypeInference.inferBytes (runtime): maps NUMBER(p, 0) 
with p >= 19 to DECIMAL(p, 0)

  The CreateTableEvent declares the column as BIGINT, but BinaryRecordData 
stores the value in DECIMAL non-compact layout (8-byte pointer: length <<
  32 | offset). When the sink calls getLong(idx), it reads the pointer — same 
value for every row — so PK-based upsert folds all rows into one.

  Reproduction

  1. Create an Oracle table:

  CREATE TABLE FLINKUSER.FLINK_USER (
    ID     NUMBER(19,0) PRIMARY KEY,
    NAME   VARCHAR2(64),
    AGE    NUMBER(10,0),
    GENDER VARCHAR2(2)
  );
  INSERT INTO FLINKUSER.FLINK_USER VALUES (1, 'zhangsan', 18, 'M');
  INSERT INTO FLINKUSER.FLINK_USER VALUES (2, 'lisi',     19, 'F');
  INSERT INTO FLINKUSER.FLINK_USER VALUES (3, 'wangwu',   20, 'M');
  COMMIT;

  2. Run a flink-cdc pipeline with Oracle source and any upsert-capable sink 
(e.g. MaxCompute, Paimon).

  3. Observe the sink table: only 1 row exists. All IDs became 171798691841.

  NUMBER(10,0) and NUMBER(18,0) work correctly. The bug is specific to p >= 19.

  Proposed Fix

  In OracleTypeUtils.fromDbzColumn, align with Debezium's boundary: return 
BIGINT only when precision is between 1 and 18; for p >= 19 (or unknown
  precision), return DECIMAL(p, 0).

  Test Results (with patch applied)

  - NUMBER(19,0) snapshot 3 rows (ID 1/2/3): Before = all IDs 171798691841, 1 
row; After = IDs correct, 3 rows
  - INSERT ID = 9000000000000000001: Before = also becomes 171798691841; After 
= preserved correctly
  - UPDATE / DELETE: Before = rows already collapsed, meaningless; After = 
works correctly


>   [Bug] Oracle pipeline connector maps NUMBER(p,0) with p>=19 to BIGINT, 
> causing primary key collision and row collapse in downstream sinks
> -------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39832
>                 URL: https://issues.apache.org/jira/browse/FLINK-39832
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: 1.20.4
>            Reporter: Zhouqing
>            Priority: Major
>
>   When an Oracle source table has a column defined as NUMBER(p, 0) with p >= 
> 19 (e.g. NUMBER(19,0) as a primary key), the flink-cdc pipeline connector 
> incorrectly maps it to BIGINT. However, Debezium internally encodes values 
> with precision >= 19 as DECIMAL (variable-length BYTES). This mismatch causes 
> the sink to read a BinaryRecordData pointer as a long value, producing a 
> constant garbage PK (e.g. 171798691841 = 0x28_00000001) for every row. With 
> upsert sinks, all rows collapse into one.
>   Affected Version: 3.6.0 (both -1.20 and -2.2 variants)
>   Root Cause
>   Two code paths disagree on the type for NUMBER(p, 0) when p >= 19:
>   - OracleTypeUtils.fromDbzColumn (builds CDC schema): maps NUMBER(p, 0) for 
> any p to BIGINT
>   - DebeziumSchemaDataTypeInference.inferBytes (runtime): maps NUMBER(p, 0) 
> with p >= 19 to DECIMAL(p, 0)
>   The CreateTableEvent declares the column as BIGINT, but BinaryRecordData 
> stores the value in DECIMAL non-compact layout (8-byte pointer: length << 32 
> | offset). When the sink calls getLong(idx), it reads the pointer — same 
> value for every row — so PK-based upsert folds all rows into one.
>   Reproduction
>   1. Create an Oracle table:
>   CREATE TABLE FLINKUSER.FLINK_USER (
>     ID     NUMBER(19,0) PRIMARY KEY,
>     NAME   VARCHAR2(64),
>     AGE    NUMBER(10,0),
>     GENDER VARCHAR2(2)
>   );
>   INSERT INTO FLINKUSER.FLINK_USER VALUES (1, 'zhangsan', 18, 'M');
>   INSERT INTO FLINKUSER.FLINK_USER VALUES (2, 'lisi',     19, 'F');
>   INSERT INTO FLINKUSER.FLINK_USER VALUES (3, 'wangwu',   20, 'M');
>   COMMIT;
>   2. Run a flink-cdc pipeline with Oracle source and any upsert-capable sink 
> (e.g. MaxCompute, Paimon).
>   3. Observe the sink table: only 1 row exists. All IDs became 171798691841.
>   NUMBER(10,0) and NUMBER(18,0) work correctly. The bug is specific to p >= 
> 19.
>   Proposed Fix
>   In OracleTypeUtils.fromDbzColumn, align with Debezium's boundary: return 
> BIGINT only when precision is between 1 and 18; for p >= 19 (or unknown 
> precision), return DECIMAL(p, 0).
>   Test Results (with patch applied)
>   - NUMBER(19,0) snapshot 3 rows (ID 1/2/3): Before = all IDs 171798691841, 1 
> row; After = IDs correct, 3 rows
>   - INSERT ID = 9000000000000000001: Before = also becomes 171798691841; 
> After = preserved correctly
>   - UPDATE / DELETE: Before = rows already collapsed, meaningless; After = 
> works correctly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to