zhaojunyuan opened a new issue, #5852:
URL: https://github.com/apache/paimon/issues/5852

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Paimon version
   
   paimon version is 1.2
   
   ### Compute Engine
   
   compute engine is Flink 1.20.0
   
   ### Minimal reproduce step
   
   -- This is source main table 
   DROP TABLE ods_test.main_table;
   CREATE TABLE ods_test.main_table (
       id INT,
       name STRING,
       `value` INT,
       update_time TIMESTAMP(3),
   --    WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND,
       primary key (id) not enforced
   ) WITH (
       'connector' = 'paimon'
   );
   
   -- This is source slave table
   -- secondary table
   DROP TABLE ods_test.secondary_table;
   CREATE TABLE ods_test.secondary_table (
       sid INT,
       main_id INT,
       info STRING,
       score INT,
       update_time TIMESTAMP(3),
   --    WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND,
       primary key (sid) not enforced
   ) WITH (
       'connector' = 'paimon'
   );
   
   
   -- This is target table
   DROP TABLE dwd_test.target_table;
   CREATE TABLE dwd_test.target_table (
       id INT PRIMARY KEY NOT ENFORCED,
       main_name STRING,
       main_value INT,
       sec_info STRING,
       sec_score INT,
       main_seq TIMESTAMP(3),
       sec_seq TIMESTAMP(3)
   --    update_time TIMESTAMP(3)
   ) WITH (
       'connector' = 'paimon',
       'merge-engine' = 'partial-update',
       'partial-update.remove-record-on-delete' = 'true',
       -- 'partial-update.sequence-group' = 
'main_name,main_value:main_seq;sec_info,sec_score:sec_seq',
       -- 'partial-update.sequence-group.sec_group.ignore-retract' = 'true', -- 
关键配置
       -- 关键配置:允许NULL覆盖已有值
       'partial-update.ignore-null-field' = 'false',
       'changelog-producer' = 'lookup',
       'changelog-producer.lookup.ignore-delete' = 'false'
   );
   
   -- This is the insert statement to populate the target table
   INSERT INTO dwd_test.target_table
   SELECT
     COALESCE(m.id, s.main_id) AS id,
     m.name AS main_name,
     m.`value` AS main_value,
     -- 显式处理从表字段
     s.info AS sec_info,  -- 直接使用,当s.info为NULL时表示删除
     s.score AS sec_score,
     m.update_time AS main_seq,
     -- 使用特殊序列号标记删除
     CASE
       WHEN s.sid IS NULL THEN CURRENT_TIMESTAMP  -- 删除标记
       ELSE s.update_time
     END AS sec_seq
   --  m.update_time as m_update_time,
   --  s.update_time as s_update_time
   FROM ods_test.main_table m
   LEFT JOIN ods_test.secondary_table s ON m.id = s.main_id;
   
   -- Insert initial data for main table
   INSERT INTO ods_test.main_table VALUES
   (1, 'Main1', 100, TO_TIMESTAMP('2023-01-01 10:00:00')),
   (2, 'Main2', 200, TO_TIMESTAMP('2023-01-01 10:00:00'));
   
   -- -- Insert initial data for slave table
   INSERT INTO ods_test.secondary_table VALUES
   (101, 1, 'Info1', 5, TO_TIMESTAMP('2023-01-01 10:01:00')),
   (102, 2, 'Info2', 8, TO_TIMESTAMP('2023-01-01 10:01:00'));
   
   -- Delete a record in slave table
   DELETE FROM ods_test.secondary_table WHERE sid = 101;
   
   
   
   ### What doesn't meet your expectations?
   
   I want to test that when I delete a record in slave table, I hope the target 
table set the fields from slave as null. 
   But result is not , the target table does not change anything, even though I 
have configured the 'ignore-null-field'.
   
   Below is the process I check the result:
   
   Batch mode query target table:
   select * from dwd_test.target_table;
   select * from dwd_test.target_table;
   
+----+-----------+------------+-------------+-----------+-------------+-------------------------+-------------------------+
   | id | main_name | main_value |    sec_info | sec_score | update_time |      
          main_seq |                 sec_seq |
   
+----+-----------+------------+-------------+-----------+-------------+-------------------------+-------------------------+
   |  1 |     Main1 |        100 |       Info1 |         5 |      <NULL> | 
2023-01-01 10:00:00.000 | 2023-01-01 10:01:00.000 |
   |  2 |     Main2 |        200 |       Info2 |         8 |      <NULL> | 
2023-01-01 10:00:00.000 | 2023-01-01 10:01:00.000 |
   
+----+-----------+------------+-------------+-----------+-------------+-------------------------+-------------------------+
   
   Streaming mode query target table:
   select * from dwd_test.target_table;
   
+----+-------------+--------------------------------+-------------+--------------------------------+-------------+-------------------------+-------------------------+
   | op |          id |                      main_name |  main_value |          
             sec_info |   sec_score |                main_seq |                 
sec_seq |
   
+----+-------------+--------------------------------+-------------+--------------------------------+-------------+-------------------------+-------------------------+
   | +I |           1 |                          Main1 |         100 |          
                Info1 |           5 | 2023-01-01 10:00:00.000 | 2023-01-01 
10:01:00.000 |
   | +I |           2 |                          Main2 |         200 |          
                Info2 |           8 | 2023-01-01 10:00:00.000 | 2023-01-01 
10:01:00.000 |
   | -U |           1 |                          Main1 |         100 |          
                Info1 |           5 | 2023-01-01 10:03:00.000 | 2023-01-01 
10:02:00.000 |
   | +U |           1 |                          Main1 |         100 |          
                Info1 |           5 | 2023-01-01 10:03:00.000 | 2025-07-08 
15:03:06.243 |
   
   In flink sql client , use the select logic select to check the data before 
write to paimon:
   SELECT
       COALESCE(m.id, s.main_id) AS id,
       m.name AS main_name,
       m.`value` AS main_value,
       -- 显式处理从表字段
       s.info AS sec_info,  -- 直接使用,当s.info为NULL时表示删��
       s.score AS sec_score,
       m.update_time AS main_seq,
       -- 使用特殊序列号标记删除
       CASE
         WHEN s.sid IS NULL THEN CURRENT_TIMESTAMP  -- 删除标记
         ELSE s.update_time
       END AS sec_seq
     --  m.update_time as m_update_time,
     --  s.update_time as s_update_time
     FROM ods_test.main_table m
     LEFT JOIN ods_test.secondary_table s ON m.id = s.main_id;
   
+----+-------------+--------------------------------+-------------+--------------------------------+-------------+-------------------------+-------------------------+
   | op |          id |                      main_name |  main_value |          
             sec_info |   sec_score |                main_seq |                 
sec_seq |
   
+----+-------------+--------------------------------+-------------+--------------------------------+-------------+-------------------------+-------------------------+
   | +I |           2 |                          Main2 |         200 |          
                Info2 |           8 | 2023-01-01 10:00:00.000 | 2023-01-01 
10:01:00.000 |
   | +I |           1 |                          Main1 |         100 |          
               <NULL> |      <NULL> | 2023-01-01 10:03:00.000 | 2025-07-08 
15:08:15.451 |
   
   ### Anything else?
   
   I want to implement the following functions: Support CURD of master-slave 
table:
   
   - When adding new records to the master table, new records will be added to 
the target table according to the join logic of sql
   - When updating records in the master table, the corresponding field values 
​​in the master table in the target table are updated in the target table 
according to the PK
   - When deleting records in the master table, the records with the 
corresponding PK in the target table are deleted
   - When adding new records to the slave table, first associate the complete 
record according to the sql logic. If the PK cannot be associated in the master 
table, skip it directly; if the PK can be associated in the master table, 
update the records in the target table through the PK.
   - When updating records from the slave table, the principle is the same as 
adding
   - When deleting records from the slave table, the principle is basically the 
same as update, the difference is: if the record with PK can be found, all the 
fields of the slave table in the target table are set to NULL.
   
   How I can config the target table in Paimon.
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to