[
https://issues.apache.org/jira/browse/FLINK-37134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931128#comment-17931128
]
Yang Li edited comment on FLINK-37134 at 2/27/25 11:50 AM:
-----------------------------------------------------------
h1. Conclusion
The overall behavior meets expectations.
h1. Environment Prepare
./bin/start-cluster.sh
./bin/sql-gateway.sh start
./bin/sql-client.sh gateway --endpoint
[http://127.0.0.1:8083|http://127.0.0.1:8083/]
Flink SQL> CREATE CATALOG mt_cat WITH (
> 'type' = 'test-filesystem',
> 'path' = '/home/feng/flink-liyang-test/catalog_path',
> 'default-database' = 'mydb');
[INFO] Execute statement succeeded.
Flink SQL> use catalog mt_cat;
[INFO] Execute statement succeeded
h1. Prepare Source Table
Flink SQL> CREATE TABLE json_source (
> order_id BIGINT,
> user_id BIGINT,
> user_name STRING,
> order_created_at STRING,
> payment_amount_cents BIGINT
> ) WITH (
> 'format' = 'json',
> 'source.monitor-interval' = '10s'
> );
[INFO] Execute statement succeeded.
Flink SQL> INSERT INTO json_source VALUES
> (1001, 1, 'user1', '2024-06-19', 10),
> (1002, 2, 'user2', '2024-06-19', 20),
> (1003, 3, 'user3', '2024-06-19', 30),
> (1004, 4, 'user4', '2024-06-19', 40),
> (1005, 1, 'user1', '2024-06-20', 10),
> (1006, 2, 'user2', '2024-06-20', 20),
> (1007, 3, 'user3', '2024-06-20', 30),
> (1008, 4, 'user4', '2024-06-20', 40);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: eeedf09027a4b6643b7dcbf0ed345803
h1. Continuous Mode Materialized Table
CREATE MATERIALIZED TABLE continuous_users_shops
PARTITIONED BY (ds)
WITH (
'format' = 'debezium-json',
'sink.rolling-policy.rollover-interval' = '10s',
'sink.rolling-policy.check-interval' = '10s'
)
FRESHNESS = INTERVAL '30' SECOND
AS SELECT
user_id,
ds,
SUM (payment_amount_cents) AS payed_buy_fee_sum,
SUM (1) AS PV
FROM (
SELECT user_id, order_created_at AS ds, payment_amount_cents
FROM json_source
) AS tmp
GROUP BY user_id, ds;
Before alter
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=NmQ0NTI5NjNhYjlkMTM3NjQ3YjlkMTE0N2VmMjdlYzVfQUpmY3p5emx4NEJpRDFlOE0yWXdyVHN0V1NJM2k0cGhfVG9rZW46Ym94azRDR0UxTTZYbFpxeWhTa2VrdXJ0WFFmXzE3NDA2NTY2OTI6MTc0MDY2MDI5Ml9WNA|width=729,height=177!
Add a field
Flink SQL> Alter MATERIALIZED TABLE continuous_users_shops
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV,
> LAST_VALUE(user_name)
> FROM (
> SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;
[INFO] Execute statement succeeded.
The result is as expected
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=NDhkNWM5ZjgwZWRhYzMzYmM1MjQ4NTg0NTRkNzc0OGFfQmM0MTg1WnJKbnRFS1A2ZXdub1AxTVo0azQwUVRkZ0xfVG9rZW46Ym94azRSUUUwZ0c0R05IVU1pWFp2dnNuR0doXzE3NDA2NTY2MTY6MTc0MDY2MDIxNl9WNA|width=752,height=252!
h1. Full Mode Materialized Table With Partition
CREATE MATERIALIZED TABLE full_users_shops
PARTITIONED BY (ds)
WITH (
'format' = 'json',
'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '1' MINUTE
REFRESH_MODE = FULL
AS SELECT
user_id,
ds,
SUM (payment_amount_cents) AS payed_buy_fee_sum,
SUM (1) AS PV
FROM (
SELECT user_id, order_created_at AS ds, payment_amount_cents
FROM json_source
) AS tmp
GROUP BY user_id, ds;
Insert data
Flink SQL> INSERT INTO json_source VALUES
> (1001, 1, 'user1', CAST(CURRENT_DATE AS STRING), 10),
> (1002, 2, 'user2', CAST(CURRENT_DATE AS STRING), 20),
> (1003, 3, 'user3', CAST(CURRENT_DATE AS STRING), 30),
> (1004, 4, 'user4', CAST(CURRENT_DATE AS STRING), 40);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 654cf5d89d1f01ed509ad5578ebf6a8e
Before alter
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=YzI5Nzc5OTkyNGJjNjE2ZDcwMmFiMmIzNTIwN2E4MGNfeThSUG1vbmJoTkRDaFZwTXg3cmZaZENvNmZRcExDT0ZfVG9rZW46Ym94azR2MjdoVXhZWGtiaU1ad2tqQzBORmdjXzE3NDA2NTY3MjU6MTc0MDY2MDMyNV9WNA|width=738,height=105!
Alter table
Flink SQL> Alter MATERIALIZED TABLE full_users_shops
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV,
> LAST_VALUE(user_name)
> FROM (
> SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;
The query is as expected
Flink SQL> select * from full_users_shops;
[INFO] Result retrieval cancelled.
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=MzBlZGFhNjBlMTk4NDNiMGYzOTRhMzA4NTY4YzAwNzJfY1paSW9RVWNidlZLaHdrTlBrYnhWUkFrd2VqYlZZWDZfVG9rZW46Ym94azRyT2hnUHlpMnZZR005QUFkeGJ1ekFnXzE3NDA2NTY3NDI6MTc0MDY2MDM0Ml9WNA|width=728,height=81!
Manually Refresh Historical Partition
Flink SQL> ALTER MATERIALIZED TABLE full_users_shops REFRESH
PARTITION(ds='2024-06-20');
{+}---------------------------------{-}{-}{+}--------------------------+
|job id|cluster info|
{+}---------------------------------{-}{-}{+}--------------------------+
|6311c78b97c3e1d83c56d4a35612743b|{execution.target=remote}|
{+}---------------------------------{-}{-}{+}--------------------------+
1 row in set
The data in partition '2024-06-20' has benn updated in the materialized table
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=Yzc5NzNkMGE1NmFlMThjZjAyYjZkYmJhZDJmZWFhZDVfZDBOclZyTk5US2xtNUxNdHdDN1prTEVvdG9Pdkh6dFBfVG9rZW46Ym94azRzcmhVYnpUbWI4cHUySnl2TUFzVXdjXzE3NDA2NTY3NjA6MTc0MDY2MDM2MF9WNA|width=732,height=124!
h1. Full Mode Materialized Table Without Partition
Flink SQL> CREATE MATERIALIZED TABLE full_users_shops3
> WITH (
> 'format' = 'json'
> )
> FRESHNESS = INTERVAL '1' MINUTE
> REFRESH_MODE = FULL
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV
> FROM (
> SELECT user_id, order_created_at AS ds, payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=YTI3ZmNjYzk5YjgxOGU2MDk2MzU5NTM4NDk0OGQ3MzFfVU5nQnBVSzJUZjRwajJ0ZWYxbHU3bTlyMTd5M05IREpfVG9rZW46Ym94azRFd3M5c3Z0eWNLR3hmWWprb2dOQVp0XzE3NDA2NTY4OTA6MTc0MDY2MDQ5MF9WNA|width=750,height=182!
Alter table
Flink SQL> Alter MATERIALIZED TABLE full_users_shops3
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV,
> LAST_VALUE(user_name)
> FROM (
> SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;
The query meets expectations.
Flink SQL> select * from full_users_shops3;
[INFO] Result retrieval cancelled.
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=ODg4MTRjMWE0MGJlODUyYzBlYjMwZmFmYTcwM2FlYmJfNTVRaEdnSDJ0SGx0OFJ4cDgyRHppT01sZUgwaWhqZmZfVG9rZW46Ym94azRxTnR5WWtNZG1BT2Q1aXVKekt5RW5oXzE3NDA2NTY5MDU6MTc0MDY2MDUwNV9WNA|width=571,height=113!
was (Author: JIRAUSER307080):
h1. Conclusion
The overall behavior meets expectations.
h1. Environment Prepare
./bin/start-cluster.sh
./bin/sql-gateway.sh start
./bin/sql-client.sh gateway --endpoint http://127.0.0.1:8083
Flink SQL> CREATE CATALOG mt_cat WITH (
> 'type' = 'test-filesystem',
> 'path' = '/home/feng/flink-liyang-test/catalog_path',
> 'default-database' = 'mydb');
[INFO] Execute statement succeeded.
Flink SQL> use catalog mt_cat;
[INFO] Execute statement succeeded
h1. Prepare Source Table
Flink SQL> CREATE TABLE json_source (
> order_id BIGINT,
> user_id BIGINT,
> user_name STRING,
> order_created_at STRING,
> payment_amount_cents BIGINT
> ) WITH (
> 'format' = 'json',
> 'source.monitor-interval' = '10s'
> );
[INFO] Execute statement succeeded.
Flink SQL> INSERT INTO json_source VALUES
> (1001, 1, 'user1', '2024-06-19', 10),
> (1002, 2, 'user2', '2024-06-19', 20),
> (1003, 3, 'user3', '2024-06-19', 30),
> (1004, 4, 'user4', '2024-06-19', 40),
> (1005, 1, 'user1', '2024-06-20', 10),
> (1006, 2, 'user2', '2024-06-20', 20),
> (1007, 3, 'user3', '2024-06-20', 30),
> (1008, 4, 'user4', '2024-06-20', 40);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: eeedf09027a4b6643b7dcbf0ed345803
h1. Continuous Mode Materialized Table
CREATE MATERIALIZED TABLE continuous_users_shops
PARTITIONED BY (ds)
WITH (
'format' = 'debezium-json',
'sink.rolling-policy.rollover-interval' = '10s',
'sink.rolling-policy.check-interval' = '10s'
)
FRESHNESS = INTERVAL '30' SECOND
AS SELECT
user_id,
ds,
SUM (payment_amount_cents) AS payed_buy_fee_sum,
SUM (1) AS PV
FROM (
SELECT user_id, order_created_at AS ds, payment_amount_cents
FROM json_source
) AS tmp
GROUP BY user_id, ds;
Before alter
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=NmQ0NTI5NjNhYjlkMTM3NjQ3YjlkMTE0N2VmMjdlYzVfQUpmY3p5emx4NEJpRDFlOE0yWXdyVHN0V1NJM2k0cGhfVG9rZW46Ym94azRDR0UxTTZYbFpxeWhTa2VrdXJ0WFFmXzE3NDA2NTY2OTI6MTc0MDY2MDI5Ml9WNA|width=729,height=177!
Add a field
Flink SQL> Alter MATERIALIZED TABLE continuous_users_shops
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV,
> LAST_VALUE(user_name)
> FROM (
> SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;
[INFO] Execute statement succeeded.
The result is as expected
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=NDhkNWM5ZjgwZWRhYzMzYmM1MjQ4NTg0NTRkNzc0OGFfQmM0MTg1WnJKbnRFS1A2ZXdub1AxTVo0azQwUVRkZ0xfVG9rZW46Ym94azRSUUUwZ0c0R05IVU1pWFp2dnNuR0doXzE3NDA2NTY2MTY6MTc0MDY2MDIxNl9WNA|width=752,height=252!
h1. Full Mode Materialized Table With Partition
CREATE MATERIALIZED TABLE full_users_shops
PARTITIONED BY (ds)
WITH (
'format' = 'json',
'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '1' MINUTE
REFRESH_MODE = FULL
AS SELECT
user_id,
ds,
SUM (payment_amount_cents) AS payed_buy_fee_sum,
SUM (1) AS PV
FROM (
SELECT user_id, order_created_at AS ds, payment_amount_cents
FROM json_source
) AS tmp
GROUP BY user_id, ds;
Insert data
Flink SQL> INSERT INTO json_source VALUES
> (1001, 1, 'user1', CAST(CURRENT_DATE AS STRING), 10),
> (1002, 2, 'user2', CAST(CURRENT_DATE AS STRING), 20),
> (1003, 3, 'user3', CAST(CURRENT_DATE AS STRING), 30),
> (1004, 4, 'user4', CAST(CURRENT_DATE AS STRING), 40);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 654cf5d89d1f01ed509ad5578ebf6a8e
Before alter
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=YzI5Nzc5OTkyNGJjNjE2ZDcwMmFiMmIzNTIwN2E4MGNfeThSUG1vbmJoTkRDaFZwTXg3cmZaZENvNmZRcExDT0ZfVG9rZW46Ym94azR2MjdoVXhZWGtiaU1ad2tqQzBORmdjXzE3NDA2NTY3MjU6MTc0MDY2MDMyNV9WNA|width=738,height=105!
Alter table
Flink SQL> Alter MATERIALIZED TABLE full_users_shops
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV,
> LAST_VALUE(user_name)
> FROM (
> SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;
The query is as expected
Flink SQL> select * from full_users_shops;
[INFO] Result retrieval cancelled.
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=MzBlZGFhNjBlMTk4NDNiMGYzOTRhMzA4NTY4YzAwNzJfY1paSW9RVWNidlZLaHdrTlBrYnhWUkFrd2VqYlZZWDZfVG9rZW46Ym94azRyT2hnUHlpMnZZR005QUFkeGJ1ekFnXzE3NDA2NTY3NDI6MTc0MDY2MDM0Ml9WNA|width=728,height=81!
Manually Refresh Historical Partition
Flink SQL> ALTER MATERIALIZED TABLE full_users_shops REFRESH
PARTITION(ds='2024-06-20');
+----------------------------------+---------------------------+
| job id | cluster info |
+----------------------------------+---------------------------+
| 6311c78b97c3e1d83c56d4a35612743b | \{execution.target=remote} |
+----------------------------------+---------------------------+
1 row in set
The data in partition '2024-06-20' has benn updated in the materialized table
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=Yzc5NzNkMGE1NmFlMThjZjAyYjZkYmJhZDJmZWFhZDVfZDBOclZyTk5US2xtNUxNdHdDN1prTEVvdG9Pdkh6dFBfVG9rZW46Ym94azRzcmhVYnpUbWI4cHUySnl2TUFzVXdjXzE3NDA2NTY3NjA6MTc0MDY2MDM2MF9WNA|width=732,height=124!
h1. Full Mode Materialized Table Without Partition
Flink SQL> CREATE MATERIALIZED TABLE full_users_shops3
> WITH (
> 'format' = 'json'
> )
> FRESHNESS = INTERVAL '1' MINUTE
> REFRESH_MODE = FULL
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV
> FROM (
> SELECT user_id, order_created_at AS ds, payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=YTI3ZmNjYzk5YjgxOGU2MDk2MzU5NTM4NDk0OGQ3MzFfVU5nQnBVSzJUZjRwajJ0ZWYxbHU3bTlyMTd5M05IREpfVG9rZW46Ym94azRFd3M5c3Z0eWNLR3hmWWprb2dOQVp0XzE3NDA2NTY4OTA6MTc0MDY2MDQ5MF9WNA|width=750,height=182!
Alter table
Flink SQL> Alter MATERIALIZED TABLE full_users_shops3
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV,
> LAST_VALUE(user_name)
> FROM (
> SELECT user_id, user_name, order_created_at AS ds, payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;
The query meets expectations.
Flink SQL> select * from full_users_shops3;
[INFO] Result retrieval cancelled.
!https://xiaomi.f.mioffice.cn/space/api/box/stream/download/asynccode/?code=ODg4MTRjMWE0MGJlODUyYzBlYjMwZmFmYTcwM2FlYmJfNTVRaEdnSDJ0SGx0OFJ4cDgyRHppT01sZUgwaWhqZmZfVG9rZW46Ym94azRxTnR5WWtNZG1BT2Q1aXVKekt5RW5oXzE3NDA2NTY5MDU6MTc0MDY2MDUwNV9WNA|width=743,height=147!
> Cross-team verification for FLIP-492
> ------------------------------------
>
> Key: FLINK-37134
> URL: https://issues.apache.org/jira/browse/FLINK-37134
> Project: Flink
> Issue Type: Sub-task
> Reporter: Feng Jin
> Assignee: Yang Li
> Priority: Major
> Fix For: 2.0.0
>
>
> This is for cross-team verification of the release 2.0 work item:
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-492%3A+Support+Query+Modifications+for+Materialized+Tables]
> As only the Alter Query capability has been achieved at present, we only need
> to verify https://issues.apache.org/jira/browse/FLINK-36994.
>
> *Operation steps:*
> 1. Refer to
> quickstart(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/quickstart/)
> to prepare the environment required for Materialized table: including
> catalog store/test-filesystem plugin/standalone cluster/sql gateway.
> 2. Create Materialized Tables in two modes (Continuous and Full).
> 3. Modify the As query of Materialized Tables in two modes.
>
> *Verify The result:*
> In Full mode, we need to verify the behavior of partitioned and
> non-partitioned tables:
> 1. For non-partitioned tablesafter waiting for the next refresh task to be
> completed, verify whether the result after modifying the query meets
> expectations.
> 2. For partitioned tables, we need to verify that only the result of the
> latest partition is consistent with the result after modifying the query, and
> the historical partitions remain unchanged. Then, by manually refreshing,
> confirm again that the partition after refreshing is generated by the
> modified query.
>
> In Continuous mode, we need to verify the execution of the background
> Continuous job after modification.
> 1. After modifying the as query, the new job is generated by the new query.
> 2. The new job does not resume from the state of historical jobs.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)