[
https://issues.apache.org/jira/browse/FLINK-35689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17859807#comment-17859807
]
dalongliu edited comment on FLINK-35689 at 6/25/24 4:18 AM:
------------------------------------------------------------
For Continuous Mode, I did the following verification.
h2. Create Materialized Table
h2.
1. bad case1: Invalid Primary Key: column not-exists
!screenshot-1.png!
2. bad case2: Invalid Primary Key: column is nullable
!image-2024-06-25-12-00-07-616.png!
3. bad case3: Invalid partition key
!image-2024-06-25-12-01-09-648.png!
4. bad case4: invalid `partition.fields.pt.date-formatter`
!image-2024-06-25-12-02-08-558.png!
5. bad case5: invalid freshness time unit
!image-2024-06-25-12-02-51-615.png!
6. bad case6: negative freshness value
!image-2024-06-25-12-03-20-930.png!
7. bad case7: Specify Json Format, according to Definition Query can not
generate Flink stream jobs, the framework will first create Materialized Table,
and then deleted, weak atomicity assurance
!image-2024-06-25-12-04-24-948.png!
8. Good case: Materialized Table created successfully; Flink streaming job
submitted successfully; data written in real time
{code:sql}
CREATE MATERIALIZED TABLE continuous_users_shops
PARTITIONED BY (ds)
WITH(
'format' = 'debezium-json',
'sink.rolling-policy.rollover-interval' = '10s',
'sink.rolling-policy.check-interval' = '10s'
)
FRESHNESS = INTERVAL '30' SECOND
AS SELECT
user_id,
ds,
SUM (payment_amount_cents) AS payed_buy_fee_sum,
SUM (1) AS pv
FROM (
SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds,
payment_amount_cents FROM json_source ) AS tmp
GROUP BY (user_id, ds);
{code}
!image-2024-06-25-12-05-39-089.png!
!image-2024-06-25-12-05-54-104.png!
h2. Suspend Materialized Table
h2.
1. Suspend without specifying a savepoint path
!image-2024-06-25-12-07-52-182.png!
2. Suspend with specifying a savepoint path
{code:sql}
SET 'execution.checkpointing.savepoint-dir' =
'file:///Users/ron/mt_demo/savepoint';
ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops SUSPEND;
{code}
!image-2024-06-25-12-09-11-207.png!
!image-2024-06-25-12-09-22-879.png!
3. Repeat Suspend
!image-2024-06-25-12-11-08-720.png!
h2. Resume Materialized Table
h2.
1. Resume without options
{code:sql}
ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops RESUME;
{code}
!image-2024-06-25-12-13-47-363.png!
!image-2024-06-25-12-14-13-107.png!
2. Resume with options
{code:sql}
ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops RESUME
WITH (
-- 'sink.parallelism' = '3'
'sink.shuffle-by-partition.enable' = 'true'
);
{code}
!image-2024-06-25-12-15-03-493.png!
3. Repeat Resume: When a background streaming job is running and a Resume
operation is performed, an error should be reported and a streaming job cannot
be resubmitted.
!image-2024-06-25-12-16-47-160.png!
!image-2024-06-25-12-16-57-076.png!
h2. Manual Refresh Materialized Table
h2.
1. bad case: Partition field does not exist
!image-2024-06-25-12-18-12-506.png!
2. bad case: Partition fields that are not of type String
was (Author: lsy):
For Continuous Mode, I did the following verification.
h1. Create Materialized Table
h1.
1. bad case1: Invalid Primary Key: column not-exists
!screenshot-1.png!
> Release Testing: Verify FLIP-435 & FLIP-448: Introduce a New Materialized
> Table for Simplifying Data Pipelines
> --------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-35689
> URL: https://issues.apache.org/jira/browse/FLINK-35689
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / API
> Reporter: dalongliu
> Assignee: dalongliu
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.20.0
>
> Attachments: image-2024-06-25-12-00-07-616.png,
> image-2024-06-25-12-01-09-648.png, image-2024-06-25-12-02-08-558.png,
> image-2024-06-25-12-02-51-615.png, image-2024-06-25-12-03-20-930.png,
> image-2024-06-25-12-04-24-948.png, image-2024-06-25-12-05-39-089.png,
> image-2024-06-25-12-05-54-104.png, image-2024-06-25-12-07-52-182.png,
> image-2024-06-25-12-09-11-207.png, image-2024-06-25-12-09-22-879.png,
> image-2024-06-25-12-11-08-720.png, image-2024-06-25-12-13-47-363.png,
> image-2024-06-25-12-14-13-107.png, image-2024-06-25-12-15-03-493.png,
> image-2024-06-25-12-16-47-160.png, image-2024-06-25-12-16-57-076.png,
> image-2024-06-25-12-18-12-506.png, screenshot-1.png
>
>
> Follow up the test for https://issues.apache.org/jira/browse/FLINK-35187,
> https://issues.apache.org/jira/browse/FLINK-35345
> Materialized Table depends on FLIP-435 & FLIP-448 to complete the end-to-end
> process, so the Release testing is an overall test of FLIP-435 & FLIP-448
> feature at the same time.
> Since Materialized Table depends on CatalogStore, Catalog, Workflow
> Scheduler, SQL Client, SQL Gateway, and Standalone cluster to go through the
> whole process, the validation process consists of two parts: Environment
> Setup and Feature Verification.
> h1. Environment Setup:
> h1.
> 1. create the File CatalogStore directory
> 2. Create the test-filesystem Catalog and put
> flink-table-filesystem-test-utils-1.20-SNAPSHOT.jar into the lib directory.
> 3. Create the Savepoint directory.
> 4. Configure the Flink config.yaml file.
> {code:yaml}
> #==============================================================================
> # Common
> #==============================================================================
> jobmanager:
> bind-host: localhost
> rpc:
> address: localhost
> # The RPC port where the JobManager is reachable.
> port: 6123
> memory:
> process:
> size: 1600m
> execution:
> failover-strategy: region
> taskmanager:
> bind-host: localhost
> host: localhost
> # The number of task slots that each TaskManager offers. Each slot runs one
> parallel pipeline.
> numberOfTaskSlots: 3
> memory:
> process:
> size: 1728m
> parallelism:
> # The parallelism used for programs that did not specify and other
> parallelism.
> default: 1
> #==============================================================================
> # Rest & web frontend
> #==============================================================================
> rest:
> # The address to which the REST client will connect to
> address: localhost
> bind-address: localhost
> # Catalog Store
> table:
> catalog-store:
> kind: file
> file:
> path: xxx
> # Embedded Scheduler config
> workflow-scheduler:
> type: embedded
> # SQL Gateway address
> sql-gateway:
> endpoint:
> rest:
> address: 127.0.0.1
> {code}
> 5. Start the Standalone cluster: . /bin/start-cluster.sh
> 6. Start the SQL Gateway: . /bin/sql-gateway.sh
> 7. Start SQL Client: /bin/sql-client.sh gateway --endpoint
> http://127.0.0.1:8083
> 8. Register the test-filesystem Catalog
> {code:sql}
> CREATE CATALOG mt_cat
> WITH (
> 'type' = 'test-filesystem',
> 'path' = '...',
> 'default-database' = 'mydb'
> );
> USE CATALOG mt_cat;
> {code}
> 9. Create the test-filesystem source table and insert the data
> {code:sql}
> -- 1. create json format table
> CREATE TABLE json_source (
> order_id BIGINT,
> user_id BIGINT,
> user_name STRING,
> order_created_at STRING,
> payment_amount_cents BIGINT
> ) WITH (
> 'format' = 'json',
> 'source.monitor-interval' = '5S'
> );
> -- 2. insert data
> INSERT INTO mt_cat.mydb.json_source VALUES
> (1001, 1, 'user1', '2024-06-24 10:00:00', 10),
> (1002, 1, 'user2', '2024-06-24 10:01:00', 20),
> (1003, 2, 'user3', '2024-06-24 10:02:00', 30),
> (1004, 2, 'user4', '2024-06-24 10:03:00', 40),
> (1005, 1, 'user1', '2024-06-25 10:00:00', 10),
> (1006, 1, 'user2', '2024-06-25 10:01:00', 20),
> (1007, 2, 'user3', '2024-06-25 10:02:00', 30),
> (1008, 2, 'user4', '2024-06-25 10:03:00', 40);
> INSERT INTO mt_cat.mydb.json_source VALUES
> (1001, 1, 'user1', '2024-06-26 10:00:00', 10),
> (1002, 1, 'user2', '2024-06-26 10:01:00', 20),
> (1003, 2, 'user3', '2024-06-26 10:02:00', 30),
> (1004, 2, 'user4', '2024-06-26 10:03:00', 40),
> (1005, 1, 'user1', '2024-06-27 10:00:00', 10),
> (1006, 1, 'user2', '2024-06-27 10:01:00', 20),
> (1007, 2, 'user3', '2024-06-27 10:02:00', 30),
> (1008, 2, 'user4', '2024-06-27 10:03:00', 40);
> {code}
> h1. Feature verification
> h1.
> h2. Continuous Mode
> h2.
> In Continuous Mode, Materialized Table runs a Flink streaming job to update
> the data in real-time. Feature verify includes various scenarios such as
> Create & Suspend & Resume & Drop.
> 1. Create Materialized Table, including various bad cases and good cases, and
> execute the following statement in the SQL Client
> {code:sql}
> CREATE MATERIALIZED TABLE continuous_users_shops
> (
> PRIMARY KEY(id) NOT ENFORCED
> )
> WITH(
> 'format' = 'debezium-json'
> )
> FRESHNESS = INTERVAL '30' SECOND
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS pv
> FROM (
> SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds,
> payment_amount_cents FROM json_source ) AS tmp
> GROUP BY (user_id, ds);
> {code}
> 2. Suspend Materialized Table and execute the following statement in the SQL
> Client
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops SUSPEND;
> {code}
> 3. Resume Materialized Table
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops RESUME;
> {code}
> 4. Manual Refresh Materialized Table
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops REFRESH
> PARTITION(ds = '2024-06-25');
> {code}
> 5. Drop Materialized Table
> {code:sql}
> DROP MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops;
> {code}
> h2. Full Mode
> h2.
> In Full Mode, Materialized Table needs to rely on Workflow Scheduler to
> complete the periodic full refresh operation, so the main purpose is to
> verify the FLIP-448 function.
> 1. Create Materialized Table, verify various good and bad cases, and execute
> the following statement
> {code:sql}
> CREATE MATERIALIZED TABLE mt_cat.mydb.full_users_shops
> PARTITIONED BY (ds)
> WITH(
> 'format' = 'json'
> )
> FRESHNESS = INTERVAL '1' MINUTE
> REFRESH_MODE = FULL
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS pv
> FROM (
> SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds,
> payment_amount_cents FROM mt_cat.mydb.json_source ) AS tmp
> GROUP BY (user_id, ds);
> {code}
> 2. Suspend Materialized Table by executing the following statement
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops SUSPEND;
> {code}
> 3. Resume Materialized Table and execute the following statement
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops RESUME;
> {code}
> 4. Drop Materialized Table and execute the following statement
> {code:sql}
> DROP MATERIALIZED TABLE mt_cat.mydb.full_users_shops;
> DROP MATERIALIZED TABLE IF EXISTS mt_cat.mydb.full_users_shops;
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)