Forus0322 opened a new issue, #6972:
URL: https://github.com/apache/paimon/issues/6972

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Paimon version
   
   Paimon version: master
   Head commit id: c455db2bc0a2ed60d8e4e75cc778210e3820ffa9
   
   ### Compute Engine
   
   Flink version:1.20.1
   
   ### Minimal reproduce step
   
   set 'execution.checkpointing.interval' = '10000';
   set 'table.exec.sink.upsert-materialize' = 'NONE';
   
   CREATE CATALOG paimon_catalog WITH (
        'type'='paimon',
        'warehouse'='file:/Users/xumingjun/Desktop/paimon'
   );
   
   use catalog paimon_catalog;
   
   -- ====================================
   -- 1. 创建主表的数据源(订单流)
   -- ====================================
   CREATE TEMPORARY TABLE orders_source (
       order_id BIGINT,
       user_id BIGINT,
       order_amount DECIMAL(10, 2),
       order_status STRING,
       order_time TIMESTAMP(3),
       dt STRING
   ) WITH (
       'connector' = 'datagen',
       'rows-per-second' = '10',  -- 较快的速度
       'fields.order_id.kind' = 'sequence',
       'fields.order_id.start' = '1',
       'fields.order_id.end' = '1000000',
       'fields.user_id.kind' = 'random',
       'fields.user_id.min' = '1',
       'fields.user_id.max' = '10000',
       'fields.order_amount.kind' = 'random',
       'fields.order_amount.min' = '10.00',
       'fields.order_amount.max' = '9999.99',
       'fields.order_status.length' = '1'
   );
   
   -- ====================================
   -- 2. 创建从表的数据源(订单明细流)
   -- ====================================
   CREATE TEMPORARY TABLE order_details_source (
       detail_id BIGINT,
       order_id BIGINT,  -- 外键,关联orders表
       product_id BIGINT,
       quantity INT,
       detail_time TIMESTAMP(3),
       dt STRING
   ) WITH (
       'connector' = 'datagen',
       'rows-per-second' = '5',  -- 较慢的速度,模拟数据到达差异
       'fields.detail_id.kind' = 'sequence',
       'fields.detail_id.start' = '1',
       'fields.detail_id.end' = '10000000',
       'fields.order_id.kind' = 'sequence',  -- 与主表order_id对应
       'fields.order_id.start' = '1',
       'fields.order_id.end' = '1000000',
       'fields.product_id.kind' = 'random',
       'fields.product_id.min' = '1',
       'fields.product_id.max' = '5000',
       'fields.quantity.kind' = 'random',
       'fields.quantity.min' = '1',
       'fields.quantity.max' = '100'
   );
   
   -- ====================================
   -- 3. 创建Paimon主表(订单表)
   -- ====================================
   CREATE TABLE IF NOT EXISTS paimon_orders (
       order_id BIGINT,
       user_id BIGINT,
       order_amount DECIMAL(10, 2),
       order_status STRING,
       order_time TIMESTAMP(3),
       dt STRING,
       PRIMARY KEY (dt, order_id) NOT ENFORCED
   ) PARTITIONED BY (dt) WITH (
       'bucket' = '2',
       'compaction.metrics.enabled' = 'true'
   );
   
   -- ====================================
   -- 4. 创建Paimon从表(订单明细表 - Partial Update)
   -- ====================================
   CREATE TABLE IF NOT EXISTS paimon_order_details (
       detail_id BIGINT,
       order_id BIGINT,  -- 外键字段
       product_id BIGINT,
       quantity INT,
       detail_time TIMESTAMP(3),  -- 当前流的时间
       order_amount DECIMAL(10, 2),
       order_status STRING,
       order_time TIMESTAMP(3),  -- 关联数据的时间
       dt STRING,
       PRIMARY KEY (dt, detail_id) NOT ENFORCED
   ) PARTITIONED BY (dt) WITH (
       'merge-engine' = 'partial-update',  -- 使用partial update
       'partial-update.ignore-delete' = 'true',
       'ffields.detail_time.sequence-group' = 'order_id,product_id,quantity,dt',
       'ffields.order_time.sequence-group' = 'order_amount,order_status',
       'bucket' = '2',
       'compaction.metrics.enabled' = 'true'
   );
   
   -- ====================================
   -- 5. 写入主表数据
   -- ====================================
   INSERT INTO paimon_orders
   SELECT 
       order_id,
       user_id,
       order_amount,
       order_status,
       order_time,
       DATE_FORMAT(order_time, 'yyyy-MM-dd') as dt
   FROM default_catalog.default_database.orders_source;
   
   -- ====================================
   -- 6. 关联查询并写入从表(包含部分主表字段)
   -- ====================================
   INSERT INTO paimon_order_details
   SELECT 
       d.detail_id,
       d.order_id,
       d.product_id,
       d.quantity,
       d.detail_time,
                o.order_amount,
       o.order_status,
       o.order_time,
       d.dt
   FROM (
       SELECT 
           detail_id,
           order_id,
           product_id,
           quantity,
           now() as detail_time,
                          DATE_FORMAT(detail_time, 'yyyy-MM-dd') as dt,
           PROCTIME() as proc_time
       FROM default_catalog.default_database.order_details_source
   ) d
   LEFT JOIN paimon_orders FOR SYSTEM_TIME AS OF d.proc_time AS o
   ON d.order_id = o.order_id AND d.dt = o.dt;
   
   ### What doesn't meet your expectations?
   
   exception using lookup.
   
   ### Anything else?
   
   <img width="1920" height="1080" alt="Image" 
src="https://github.com/user-attachments/assets/0d394443-354b-4065-87d6-40708d76b6be";
 />
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to