[
https://issues.apache.org/jira/browse/FLINK-37134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931132#comment-17931132
]
Yang Li commented on FLINK-37134:
---------------------------------
*Cross-team verification for FLIP-492*
*Conclusion*
The overall behavior meets expectations.
*Environment Prepare*
|Java
./bin/start-cluster.sh
./bin/sql-gateway.sh start
./bin/sql-client.sh gateway --endpoint http://127.0.0.1:8083
Flink SQL> CREATE CATALOG mt_cat WITH (
> 'type' = 'test-filesystem',
> 'path' = '/home/feng/flink-liyang-test/catalog_path',
> 'default-database' = 'mydb');
[INFO] Execute statement succeeded.
Flink SQL> use catalog mt_cat;
[INFO] Execute statement succeeded|
*Prepare Source Table*
|Java
Flink SQL> CREATE TABLE json_source (
> order_id BIGINT,
> user_id BIGINT,
> user_name STRING,
> order_created_at STRING,
> payment_amount_cents BIGINT
> ) WITH (
> 'format' = 'json',
> 'source.monitor-interval' = '10s'
> );
[INFO] Execute statement succeeded.
Flink SQL> INSERT INTO json_source VALUES
> (1001, 1, 'user1', '2024-06-19', 10),
> (1002, 2, 'user2', '2024-06-19', 20),
> (1003, 3, 'user3', '2024-06-19', 30),
> (1004, 4, 'user4', '2024-06-19', 40),
> (1005, 1, 'user1', '2024-06-20', 10),
> (1006, 2, 'user2', '2024-06-20', 20),
> (1007, 3, 'user3', '2024-06-20', 30),
> (1008, 4, 'user4', '2024-06-20', 40);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: eeedf09027a4b6643b7dcbf0ed345803|
*Continuous Mode Materialized Table*
|Java
CREATE MATERIALIZED TABLE continuous_users_shops
PARTITIONED BY (ds)
WITH (
'format' = 'debezium-json',
'sink.rolling-policy.rollover-interval' = '10s',
'sink.rolling-policy.check-interval' = '10s'
)
FRESHNESS = INTERVAL '30' SECOND
AS SELECT
user_id,
ds,
SUM (payment_amount_cents) AS payed_buy_fee_sum,
SUM (1) AS PV
FROM (
SELECT user_id, order_created_at AS ds, payment_amount_cents
FROM json_source
) AS tmp
GROUP BY user_id, ds;|
Before alter
!image-2025-02-27-20-03-38-176.png|width=686,height=166!
|Java
Flink SQL> Alter MATERIALIZED TABLE continuous_users_shops
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV,
*> LAST_VALUE(user_name)*
> FROM (
> SELECT user_id, {*}user_name{*}, order_created_at AS ds,
> payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;
[INFO] Execute statement succeeded.|
!image-2025-02-27-20-03-50-430.png|width=666,height=223!
*Full Mode Materialized Table With Partition*
|Java
CREATE MATERIALIZED TABLE full_users_shops
PARTITIONED BY (ds)
WITH (
'format' = 'json',
'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '1' MINUTE
REFRESH_MODE = FULL
AS SELECT
user_id,
ds,
SUM (payment_amount_cents) AS payed_buy_fee_sum,
SUM (1) AS PV
FROM (
SELECT user_id, order_created_at AS ds, payment_amount_cents
FROM json_source
) AS tmp
GROUP BY user_id, ds;|
Insert data
|Java
Flink SQL> INSERT INTO json_source VALUES
> (1001, 1, 'user1', CAST(CURRENT_DATE AS STRING), 10),
> (1002, 2, 'user2', CAST(CURRENT_DATE AS STRING), 20),
> (1003, 3, 'user3', CAST(CURRENT_DATE AS STRING), 30),
> (1004, 4, 'user4', CAST(CURRENT_DATE AS STRING), 40);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 654cf5d89d1f01ed509ad5578ebf6a8e|
Before alter
!image-2025-02-27-20-04-02-689.png|width=707,height=100!
Alter table
|Java
Flink SQL> Alter MATERIALIZED TABLE full_users_shops
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV,
> LAST_VALUE(user_name)
> FROM (
> SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;|
The query is as expected
|Java
Flink SQL> select * from full_users_shops;
[INFO] Result retrieval cancelled.|
!image-2025-02-27-20-04-16-841.png|width=676,height=75!
Manually Refresh Historical Partition
|Java
Flink SQL> ALTER MATERIALIZED TABLE full_users_shops REFRESH
PARTITION(ds='2024-06-20');
+----------------------------------+---------------------------+
\| job id \| cluster info \|
+----------------------------------+---------------------------+
\| 6311c78b97c3e1d83c56d4a35612743b \| \{execution.target=remote} \|
+----------------------------------+---------------------------+
1 row in set|
The data in partition '2024-06-20' has benn updated in the materialized table
!image-2025-02-27-20-04-31-211.png|width=687,height=116!
*Full Mode Materialized Table Without Partition*
|Java
Flink SQL> CREATE MATERIALIZED TABLE full_users_shops3
> WITH (
> 'format' = 'json'
> )
> FRESHNESS = INTERVAL '1' MINUTE
> REFRESH_MODE = FULL
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV
> FROM (
> SELECT user_id, order_created_at AS ds, payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;|
Before alter table
!image-2025-02-27-20-05-06-327.png|width=656,height=159!
Alter table
|Java
Flink SQL> Alter MATERIALIZED TABLE full_users_shops3
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV,
> LAST_VALUE(user_name)
> FROM (
> SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;|
The query meets expectations.
|Java
Flink SQL> select * from full_users_shops3;
[INFO] Result retrieval cancelled.|
!image-2025-02-27-20-05-25-585.png|width=658,height=130!
> Cross-team verification for FLIP-492
> ------------------------------------
>
> Key: FLINK-37134
> URL: https://issues.apache.org/jira/browse/FLINK-37134
> Project: Flink
> Issue Type: Sub-task
> Reporter: Feng Jin
> Assignee: Yang Li
> Priority: Major
> Fix For: 2.0.0
>
> Attachments: image-2025-02-27-20-03-38-176.png,
> image-2025-02-27-20-03-50-430.png, image-2025-02-27-20-04-02-689.png,
> image-2025-02-27-20-04-16-841.png, image-2025-02-27-20-04-31-211.png,
> image-2025-02-27-20-05-06-327.png, image-2025-02-27-20-05-25-585.png
>
>
> This is for cross-team verification of the release 2.0 work item:
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-492%3A+Support+Query+Modifications+for+Materialized+Tables]
> As only the Alter Query capability has been achieved at present, we only need
> to verify https://issues.apache.org/jira/browse/FLINK-36994.
>
> *Operation steps:*
> 1. Refer to
> quickstart(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/quickstart/)
> to prepare the environment required for Materialized table: including
> catalog store/test-filesystem plugin/standalone cluster/sql gateway.
> 2. Create Materialized Tables in two modes (Continuous and Full).
> 3. Modify the As query of Materialized Tables in two modes.
>
> *Verify The result:*
> In Full mode, we need to verify the behavior of partitioned and
> non-partitioned tables:
> 1. For non-partitioned tablesafter waiting for the next refresh task to be
> completed, verify whether the result after modifying the query meets
> expectations.
> 2. For partitioned tables, we need to verify that only the result of the
> latest partition is consistent with the result after modifying the query, and
> the historical partitions remain unchanged. Then, by manually refreshing,
> confirm again that the partition after refreshing is generated by the
> modified query.
>
> In Continuous mode, we need to verify the execution of the background
> Continuous job after modification.
> 1. After modifying the as query, the new job is generated by the new query.
> 2. The new job does not resume from the state of historical jobs.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)