yunlou11 opened a new issue, #5724:
URL: https://github.com/apache/paimon/issues/5724

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Paimon version
   
   paimon 1.1.1
   
   ### Compute Engine
   
   Flink 1.20.1  (paimon-flink-1.20)
   
   ### Minimal reproduce step
   
   Paimon 建表:
   
   ```sql
   CREATE TABLE `paimon`.`default`.`paimon_student_info` (
     `sno` INT NOT NULL,
     `name` VARCHAR(2147483647),
     `address` VARCHAR(2147483647),
     `email` VARCHAR(2147483647),
     CONSTRAINT `PK_sno` PRIMARY KEY (`sno`) NOT ENFORCED
   ) WITH (
     'path' = 's3://qxb-lake/default.db/paimon_student_info',
     'table.exec.sink.upsert-materialize' = 'NONE',
     'changelog-producer.row-deduplicate' = 'true',
     'changelog-producer' = 'lookup',
     'snapshot.num-retained.max' = '10',
     'snapshot.num-retained.min' = '1',
     'deletion-vectors.enabled' = 'true'
   )
   ```
   
   
   
   使用 MinIO 作为存储, 执行如下 Java代码:
   
   ```java
   public static void main(String[] args) throws Exception {
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
           env.enableCheckpointing(10000);
           StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
   
           List<String> initSQLs = ImmutableList.of(
                   "CREATE CATALOG paimon_catalog WITH (\n" +
                           "    'type'='paimon',\n" +
                           "    'warehouse'='s3://qxb-lake//',\n" +
                           "    's3.endpoint' = 'http://localhost:9000/',\n" +
                           "    's3.access-key' = 'minio',\n" +
                           "    's3.secret-key' = 'minioadmin',\n" +
                           "    's3.path.style.access' = 'true'\n" +
                           ");",
                   "create table student_info (\n" +
                           "    sno int,\n" +
                           "    name string ,\n" +
                           "    address string,\n" +
                           "    email string,\n" +
                           "    primary key (sno)  not enforced\n" +
                           ") WITH (\n" +
                           "    'connector' = 'kafka',\n" +
                           "    'topic' = 'tmp_student',\n" +
                           "    'properties.bootstrap.servers' = 
'localhost:9092',\n" +
                           "    'properties.group.id' = 'kafka_student_02',\n" +
                           "    'scan.startup.mode' = 'latest-offset',\n" +
                           "    'format' = 'debezium-json'\n" +
                           ");"
           );
           String exeSQL = "insert into 
paimon_catalog.`default`.paimon_student_info\n" +
                   "select * from student_info;";
           for (String initSQL : initSQLs) {
               tEnv.executeSql(initSQL);
           }
           tEnv.executeSql(exeSQL);
       }```
   另外启动一个任务, 将 Paimon 的Changelog 日志输出到控制台或其他地方, 观察 Paimon 表的 Changelog 生成.
   然后按照如下次序输入数据:
   1. 先写入 Kafka topic: tmp_student 如下数据:
   ```json
   
   {
         "before": null,
         "after": { "sno": 8, "name": "dyl5", "address": "hefei", "email": 
"1...@qq.com" },
         "op":"c"
   }
   ```
   2. 确保 Paimon 表中能查询到数据后, 在 kafka 中写入如下数据, 删除上述一条记录:
   ```json
   {
         "before": { "sno": 8, "name": "dyl5", "address": "hefei", "email": 
"1...@qq.com" },
         "after": null,
         "op":"d"
   }
   ```
   
   3. 这一步需要把握时机, 比如 checkpoints 间隔 10s, 默数 9s左右, 在 op=d 即将输出到控制台或输出后的很短时间内, 
将相同的数据再次写入:
   ```json
   
   {
         "before": null,
         "after": { "sno": 8, "name": "dyl5", "address": "hefei", "email": 
"1...@qq.com" },
         "op":"c"
   }
   ```
   多试几次后, 能够偶然的发现, Changelog 只输出了 op=d 的Changelog, 对于最后一条 op=c 的Changelog 没有输出. 
但是此时查询 Paimon 表, 是能够查询到 sno = 8 的记录的.  这会导致 Paimon Changelog 下游的任务只接收到  op=d 
的删除变更, 导致下游数据量和 Paimon表数据不一致.
   
   
   
   ### What doesn't meet your expectations?
   
   Paimon Changelog 丢失数据变更, 导致上下游数据不一致
   
   ### Anything else?
   
   经过 DEBUG 发现, LookupChangelogMergeFunctionWrapper 的 getResult , 在某些情况下, 
在执行上述三次 (op=c,  op=d,  op=c) 操作时,  会忽略中间的一次 op=d 的操作, 将 highLevel 设置为第一次的 op=c 
的记录, 导致配置了  'changelog-producer.row-deduplicate' = 'true' 之后, 在后面 setChangelog 
时, 判断前后两条数据完全一样, 忽略了最后一次  op=c 的数据变更输出.
   
   
![Image](https://github.com/user-attachments/assets/dbc1fbcb-2b4e-4038-9650-2deed9aa8ac7)
   
   上图中, 执行第三次 op=c 的操作时, highLevel 被授予了 第一次 op=c 的记录, 忽略了 op=d 的操作
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to