dalongliu created FLINK-35689:
---------------------------------
Summary: CLONE - Release Testing: Verify FLIP-435 & FLIP-448:
Introduce a New Materialized Table for Simplifying Data Pipelines
Key: FLINK-35689
URL: https://issues.apache.org/jira/browse/FLINK-35689
Project: Flink
Issue Type: Sub-task
Components: Table SQL / API
Reporter: dalongliu
Assignee: dalongliu
Fix For: 1.20.0
Follow up the test for https://issues.apache.org/jira/browse/FLINK-35187
Materialized Table depends on FLIP-435 & FLIP-448 to complete the end-to-end
process, so the Release testing is an overall test of FLIP-435 & FLIP-448
feature at the same time.
Since Materialized Table depends on CatalogStore, Catalog, Workflow Scheduler,
SQL Client, SQL Gateway, and Standalone cluster to go through the whole
process, the validation process consists of two parts: Environment Setup and
Feature Verification.
h1. Environment Setup:
h1.
1. create the File CatalogStore directory
2. Create the test-filesystem Catalog and put
flink-table-filesystem-test-utils-1.20-SNAPSHOT.jar into the lib directory.
3. Create the Savepoint directory.
4. Configure the Flink config.yaml file.
{code:yaml}
#==============================================================================
# Common
#==============================================================================
jobmanager:
bind-host: localhost
rpc:
address: localhost
# The RPC port where the JobManager is reachable.
port: 6123
memory:
process:
size: 1600m
execution:
failover-strategy: region
taskmanager:
bind-host: localhost
host: localhost
# The number of task slots that each TaskManager offers. Each slot runs one
parallel pipeline.
numberOfTaskSlots: 3
memory:
process:
size: 1728m
parallelism:
# The parallelism used for programs that did not specify and other
parallelism.
default: 1
#==============================================================================
# Rest & web frontend
#==============================================================================
rest:
# The address to which the REST client will connect to
address: localhost
bind-address: localhost
# Catalog Store
table:
catalog-store:
kind: file
file:
path: xxx
# Embedded Scheduler config
workflow-scheduler:
type: embedded
# SQL Gateway address
sql-gateway:
endpoint:
rest:
address: 127.0.0.1
{code}
5. Start the Standalone cluster: . /bin/start-cluster.sh
6. Start the SQL Gateway: . /bin/sql-gateway.sh
7. Start SQL Client: /bin/sql-client.sh gateway --endpoint http://127.0.0.1:8083
8. Register the test-filesystem Catalog
{code:sql}
CREATE CATALOG mt_cat
WITH (
'type' = 'test-filesystem',
'path' = '...',
'default-database' = 'mydb'
);
USE CATALOG mt_cat;
{code}
9. Create the test-filesystem source table and insert the data
{code:sql}
-- 1. create json format table
CREATE TABLE json_source (
order_id BIGINT,
user_id BIGINT,
user_name STRING,
order_created_at STRING,
payment_amount_cents BIGINT
) WITH (
'format' = 'json',
'source.monitor-interval' = '5S'
);
-- 2. insert data
INSERT INTO mt_cat.mydb.json_source VALUES
(1001, 1, 'user1', '2024-06-24 10:00:00', 10),
(1002, 1, 'user2', '2024-06-24 10:01:00', 20),
(1003, 2, 'user3', '2024-06-24 10:02:00', 30),
(1004, 2, 'user4', '2024-06-24 10:03:00', 40),
(1005, 1, 'user1', '2024-06-25 10:00:00', 10),
(1006, 1, 'user2', '2024-06-25 10:01:00', 20),
(1007, 2, 'user3', '2024-06-25 10:02:00', 30),
(1008, 2, 'user4', '2024-06-25 10:03:00', 40);
INSERT INTO mt_cat.mydb.json_source VALUES
(1001, 1, 'user1', '2024-06-26 10:00:00', 10),
(1002, 1, 'user2', '2024-06-26 10:01:00', 20),
(1003, 2, 'user3', '2024-06-26 10:02:00', 30),
(1004, 2, 'user4', '2024-06-26 10:03:00', 40),
(1005, 1, 'user1', '2024-06-27 10:00:00', 10),
(1006, 1, 'user2', '2024-06-27 10:01:00', 20),
(1007, 2, 'user3', '2024-06-27 10:02:00', 30),
(1008, 2, 'user4', '2024-06-27 10:03:00', 40);
{code}
h1. Feature verification
h1.
h2. Continuous Mode
h2.
In Continuous Mode, Materialized Table runs a Flink streaming job to update the
data in real-time. Feature verify includes various scenarios such as Create &
Suspend & Resume & Drop.
1. Create Materialized Table, including various bad cases and good cases, and
execute the following statement in the SQL Client
{code:sql}
CREATE MATERIALIZED TABLE continuous_users_shops
(
PRIMARY KEY(id) NOT ENFORCED
)
WITH(
'format' = 'debezium-json'
)
FRESHNESS = INTERVAL '30' SECOND
AS SELECT
user_id,
ds,
SUM (payment_amount_cents) AS payed_buy_fee_sum,
SUM (1) AS pv
FROM (
SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds,
payment_amount_cents FROM json_source ) AS tmp
GROUP BY (user_id, ds);
{code}
2. Suspend Materialized Table and execute the following statement in the SQL
Client
{code:sql}
ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops SUSPEND;
{code}
3. Resume Materialized Table
{code:sql}
ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops RESUME;
{code}
4. Manual Refresh Materialized Table
{code:sql}
ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops REFRESH
PARTITION(ds = '2024-06-25');
{code}
5. Drop Materialized Table
{code:sql}
DROP MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops;
{code}
h2. Full Mode
h2.
In Full Mode, Materialized Table needs to rely on Workflow Scheduler to
complete the periodic full refresh operation, so the main purpose is to verify
the FLIP-448 function.
1. Create Materialized Table, verify various good and bad cases, and execute
the following statement
{code:sql}
CREATE MATERIALIZED TABLE mt_cat.mydb.full_users_shops
PARTITIONED BY (ds)
WITH(
'format' = 'json'
)
FRESHNESS = INTERVAL '1' MINUTE
REFRESH_MODE = FULL
AS SELECT
user_id,
ds,
SUM (payment_amount_cents) AS payed_buy_fee_sum,
SUM (1) AS pv
FROM (
SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds,
payment_amount_cents FROM mt_cat.mydb.json_source ) AS tmp
GROUP BY (user_id, ds);
{code}
2. Suspend Materialized Table by executing the following statement
{code:sql}
ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops SUSPEND;
{code}
3. Resume Materialized Table and execute the following statement
{code:sql}
ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops RESUME;
{code}
4. Drop Materialized Table and execute the following statement
{code:sql}
DROP MATERIALIZED TABLE mt_cat.mydb.full_users_shops;
DROP MATERIALIZED TABLE IF EXISTS mt_cat.mydb.full_users_shops;
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)