[ 
https://issues.apache.org/jira/browse/FLINK-37134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931132#comment-17931132
 ] 

Yang Li commented on FLINK-37134:
---------------------------------

*Cross-team verification for FLIP-492*

*Conclusion*

The overall behavior meets expectations.

*Environment Prepare*
|Java
./bin/start-cluster.sh
./bin/sql-gateway.sh start
./bin/sql-client.sh gateway --endpoint http://127.0.0.1:8083

Flink SQL> CREATE CATALOG mt_cat WITH (
>   'type' = 'test-filesystem',
>   'path' = '/home/feng/flink-liyang-test/catalog_path',
>   'default-database' = 'mydb');
[INFO] Execute statement succeeded.

Flink SQL> use catalog mt_cat;
[INFO] Execute statement succeeded|

*Prepare Source Table*
|Java
Flink SQL> CREATE TABLE json_source (
>   order_id BIGINT,
>   user_id BIGINT,
>   user_name STRING,
>   order_created_at STRING,
>   payment_amount_cents BIGINT
> ) WITH (
>   'format' = 'json',
>   'source.monitor-interval' = '10s'
> );
[INFO] Execute statement succeeded.

Flink SQL> INSERT INTO json_source VALUES 
>   (1001, 1, 'user1', '2024-06-19', 10),
>   (1002, 2, 'user2', '2024-06-19', 20),
>   (1003, 3, 'user3', '2024-06-19', 30),
>   (1004, 4, 'user4', '2024-06-19', 40),
>   (1005, 1, 'user1', '2024-06-20', 10),
>   (1006, 2, 'user2', '2024-06-20', 20),
>   (1007, 3, 'user3', '2024-06-20', 30),
>   (1008, 4, 'user4', '2024-06-20', 40);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: eeedf09027a4b6643b7dcbf0ed345803|

   *Continuous Mode Materialized Table*
|Java
CREATE MATERIALIZED TABLE continuous_users_shops
PARTITIONED BY (ds)
WITH (
  'format' = 'debezium-json',
  'sink.rolling-policy.rollover-interval' = '10s',
  'sink.rolling-policy.check-interval' = '10s'
)
FRESHNESS = INTERVAL '30' SECOND
AS SELECT
  user_id,
  ds,
  SUM (payment_amount_cents) AS payed_buy_fee_sum,
  SUM (1) AS PV
FROM (
  SELECT user_id, order_created_at AS ds, payment_amount_cents
    FROM json_source
  ) AS tmp
GROUP BY user_id, ds;|

Before alter

!image-2025-02-27-20-03-38-176.png|width=686,height=166!
|Java
Flink SQL> Alter MATERIALIZED TABLE continuous_users_shops
> AS SELECT
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS PV,
*>   LAST_VALUE(user_name)*
> FROM (
>   SELECT user_id, {*}user_name{*}, order_created_at AS ds, 
> payment_amount_cents
>     FROM json_source
>   ) AS tmp
> GROUP BY user_id, ds;
[INFO] Execute statement succeeded.|

!image-2025-02-27-20-03-50-430.png|width=666,height=223!

*Full Mode Materialized Table With Partition*
|Java
CREATE MATERIALIZED TABLE full_users_shops
PARTITIONED BY (ds)
WITH (
  'format' = 'json',
  'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '1' MINUTE
REFRESH_MODE = FULL
AS SELECT
  user_id,
  ds,
  SUM (payment_amount_cents) AS payed_buy_fee_sum,
  SUM (1) AS PV
FROM (
  SELECT user_id, order_created_at AS ds, payment_amount_cents
  FROM json_source
) AS tmp
GROUP BY user_id, ds;|

Insert data
|Java
Flink SQL> INSERT INTO json_source VALUES 
>   (1001, 1, 'user1', CAST(CURRENT_DATE AS STRING), 10),
>   (1002, 2, 'user2', CAST(CURRENT_DATE AS STRING), 20),
>   (1003, 3, 'user3', CAST(CURRENT_DATE AS STRING), 30),
>   (1004, 4, 'user4', CAST(CURRENT_DATE AS STRING), 40);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 654cf5d89d1f01ed509ad5578ebf6a8e|

Before alter

!image-2025-02-27-20-04-02-689.png|width=707,height=100!

Alter table
|Java
Flink SQL> Alter MATERIALIZED TABLE full_users_shops
> AS SELECT
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS PV,
>   LAST_VALUE(user_name)
> FROM (
>   SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents
>     FROM json_source
>   ) AS tmp
> GROUP BY user_id, ds;|

The query is as expected
|Java
Flink SQL> select * from full_users_shops;
[INFO] Result retrieval cancelled.|

!image-2025-02-27-20-04-16-841.png|width=676,height=75!

Manually Refresh Historical Partition   
|Java
Flink SQL> ALTER MATERIALIZED TABLE full_users_shops REFRESH 
PARTITION(ds='2024-06-20');
+----------------------------------+---------------------------+
\|                           job id \|              cluster info \|
+----------------------------------+---------------------------+
\| 6311c78b97c3e1d83c56d4a35612743b \| \{execution.target=remote} \|
+----------------------------------+---------------------------+
1 row in set|

The data in partition '2024-06-20' has benn updated in the materialized table

!image-2025-02-27-20-04-31-211.png|width=687,height=116!

*Full Mode Materialized Table Without Partition*
|Java
Flink SQL> CREATE MATERIALIZED TABLE full_users_shops3
> WITH (
>   'format' = 'json'
> )
> FRESHNESS = INTERVAL '1' MINUTE
> REFRESH_MODE = FULL
> AS SELECT
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS PV
> FROM (
>   SELECT user_id, order_created_at AS ds, payment_amount_cents
>   FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;|

Before alter table

!image-2025-02-27-20-05-06-327.png|width=656,height=159!

Alter table
|Java
Flink SQL> Alter MATERIALIZED TABLE full_users_shops3
> AS SELECT
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS PV,
>   LAST_VALUE(user_name)
> FROM (
>   SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents
>     FROM json_source
>   ) AS tmp
> GROUP BY user_id, ds;|

The query meets expectations.
|Java
Flink SQL> select * from full_users_shops3;
[INFO] Result retrieval cancelled.|

!image-2025-02-27-20-05-25-585.png|width=658,height=130!

 

> Cross-team verification for FLIP-492
> ------------------------------------
>
>                 Key: FLINK-37134
>                 URL: https://issues.apache.org/jira/browse/FLINK-37134
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Feng Jin
>            Assignee: Yang Li
>            Priority: Major
>             Fix For: 2.0.0
>
>         Attachments: image-2025-02-27-20-03-38-176.png, 
> image-2025-02-27-20-03-50-430.png, image-2025-02-27-20-04-02-689.png, 
> image-2025-02-27-20-04-16-841.png, image-2025-02-27-20-04-31-211.png, 
> image-2025-02-27-20-05-06-327.png, image-2025-02-27-20-05-25-585.png
>
>
> This is for cross-team verification of the release 2.0 work item:
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-492%3A+Support+Query+Modifications+for+Materialized+Tables]
> As only the Alter Query capability has been achieved at present, we only need 
> to verify https://issues.apache.org/jira/browse/FLINK-36994.
>  
> *Operation steps:*
> 1. Refer to 
> quickstart(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/quickstart/)
>  to prepare the environment required for Materialized table: including 
> catalog store/test-filesystem plugin/standalone cluster/sql gateway.
> 2. Create Materialized Tables in two modes (Continuous and Full).
> 3. Modify the As query of Materialized Tables in two modes.
>  
> *Verify The result:* 
> In Full mode, we need to verify the behavior of partitioned and 
> non-partitioned tables:
> 1. For non-partitioned tablesafter waiting for the next refresh task to be 
> completed, verify whether the result after modifying the query meets 
> expectations.
> 2. For partitioned tables, we need to verify that only the result of the 
> latest partition is consistent with the result after modifying the query, and 
> the historical partitions remain unchanged. Then, by manually refreshing, 
> confirm again that the partition after refreshing is generated by the 
> modified query.
>  
> In Continuous mode, we need to verify the execution of the background 
> Continuous job after modification.
> 1. After modifying the as query, the new job is generated by the new query.
> 2. The new job does not resume from the state of historical jobs.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to