[ 
https://issues.apache.org/jira/browse/FLINK-35689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17859807#comment-17859807
 ] 

dalongliu edited comment on FLINK-35689 at 6/25/24 4:18 AM:
------------------------------------------------------------

For Continuous Mode, I did the following verification.
h2. Create Materialized Table
h2. 
1. bad case1: Invalid Primary Key: column not-exists
 !screenshot-1.png! 

2. bad case2: Invalid Primary Key: column is nullable
 !image-2024-06-25-12-00-07-616.png! 

3. bad case3: Invalid partition key
 !image-2024-06-25-12-01-09-648.png! 

4. bad case4: invalid `partition.fields.pt.date-formatter`
 !image-2024-06-25-12-02-08-558.png! 

5. bad case5: invalid freshness time unit
 !image-2024-06-25-12-02-51-615.png! 

6. bad case6: negative freshness value
 !image-2024-06-25-12-03-20-930.png! 

7. bad case7: Specify Json Format, according to Definition Query can not 
generate Flink stream jobs, the framework will first create Materialized Table, 
and then deleted, weak atomicity assurance
 !image-2024-06-25-12-04-24-948.png! 

8. Good case: Materialized Table created successfully; Flink streaming job 
submitted successfully; data written in real time

{code:sql}
CREATE MATERIALIZED TABLE continuous_users_shops 
PARTITIONED BY (ds)
WITH(
  'format' = 'debezium-json',
  'sink.rolling-policy.rollover-interval' = '10s',
  'sink.rolling-policy.check-interval' = '10s'
)
FRESHNESS = INTERVAL '30' SECOND
AS SELECT 
  user_id,
  ds,
  SUM (payment_amount_cents) AS payed_buy_fee_sum,
  SUM (1) AS pv
FROM (
    SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, 
payment_amount_cents FROM json_source ) AS tmp
 GROUP BY (user_id, ds);
{code}

 !image-2024-06-25-12-05-39-089.png! 

 !image-2024-06-25-12-05-54-104.png! 

h2. Suspend Materialized Table
h2. 
1. Suspend without specifying a savepoint path
 !image-2024-06-25-12-07-52-182.png! 

2. Suspend with specifying a savepoint path

{code:sql}
SET 'execution.checkpointing.savepoint-dir' = 
'file:///Users/ron/mt_demo/savepoint';

ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops SUSPEND;
{code}

 !image-2024-06-25-12-09-11-207.png! 
 !image-2024-06-25-12-09-22-879.png! 

3. Repeat Suspend
 !image-2024-06-25-12-11-08-720.png! 

h2. Resume Materialized Table
h2. 
1. Resume without options
{code:sql}
ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops RESUME;
{code}
 !image-2024-06-25-12-13-47-363.png! 
 !image-2024-06-25-12-14-13-107.png! 

2. Resume with options
{code:sql}
ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops RESUME
WITH (
  -- 'sink.parallelism' = '3'
  'sink.shuffle-by-partition.enable' = 'true'
);
{code}

 !image-2024-06-25-12-15-03-493.png! 

3. Repeat Resume: When a background streaming job is running and a Resume 
operation is performed, an error should be reported and a streaming job cannot 
be resubmitted.

 !image-2024-06-25-12-16-47-160.png! 
 !image-2024-06-25-12-16-57-076.png! 

h2. Manual Refresh Materialized Table
h2. 
1. bad case: Partition field does not exist
 !image-2024-06-25-12-18-12-506.png! 

2. bad case: Partition fields that are not of type String





was (Author: lsy):
For Continuous Mode, I did the following verification.
h1. Create Materialized Table
h1. 
1. bad case1: Invalid Primary Key: column not-exists
 !screenshot-1.png! 

> Release Testing: Verify FLIP-435 & FLIP-448: Introduce a New Materialized 
> Table for Simplifying Data Pipelines
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-35689
>                 URL: https://issues.apache.org/jira/browse/FLINK-35689
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / API
>            Reporter: dalongliu
>            Assignee: dalongliu
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.20.0
>
>         Attachments: image-2024-06-25-12-00-07-616.png, 
> image-2024-06-25-12-01-09-648.png, image-2024-06-25-12-02-08-558.png, 
> image-2024-06-25-12-02-51-615.png, image-2024-06-25-12-03-20-930.png, 
> image-2024-06-25-12-04-24-948.png, image-2024-06-25-12-05-39-089.png, 
> image-2024-06-25-12-05-54-104.png, image-2024-06-25-12-07-52-182.png, 
> image-2024-06-25-12-09-11-207.png, image-2024-06-25-12-09-22-879.png, 
> image-2024-06-25-12-11-08-720.png, image-2024-06-25-12-13-47-363.png, 
> image-2024-06-25-12-14-13-107.png, image-2024-06-25-12-15-03-493.png, 
> image-2024-06-25-12-16-47-160.png, image-2024-06-25-12-16-57-076.png, 
> image-2024-06-25-12-18-12-506.png, screenshot-1.png
>
>
> Follow up the test for https://issues.apache.org/jira/browse/FLINK-35187, 
> https://issues.apache.org/jira/browse/FLINK-35345
> Materialized Table depends on FLIP-435 & FLIP-448 to complete the end-to-end 
> process, so the Release testing is an overall test of FLIP-435 & FLIP-448 
> feature at the same time.
> Since Materialized Table depends on CatalogStore, Catalog, Workflow 
> Scheduler, SQL Client, SQL Gateway, and Standalone cluster to go through the 
> whole process, the validation process consists of two parts: Environment 
> Setup and Feature Verification.
> h1. Environment Setup:
> h1. 
> 1. create the File CatalogStore directory
> 2. Create the test-filesystem Catalog and put 
> flink-table-filesystem-test-utils-1.20-SNAPSHOT.jar into the lib directory.
> 3. Create the Savepoint directory.
> 4. Configure the Flink config.yaml file.
> {code:yaml}
> #==============================================================================
> # Common
> #==============================================================================
> jobmanager:
>   bind-host: localhost
>   rpc:
>     address: localhost
>     # The RPC port where the JobManager is reachable.
>     port: 6123
>   memory:
>     process:
>       size: 1600m
>   execution:
>     failover-strategy: region
> taskmanager:
>   bind-host: localhost
>   host: localhost
>   # The number of task slots that each TaskManager offers. Each slot runs one 
> parallel pipeline.
>   numberOfTaskSlots: 3
>   memory:
>     process:
>       size: 1728m
> parallelism:
>   # The parallelism used for programs that did not specify and other 
> parallelism.
>   default: 1
> #==============================================================================
> # Rest & web frontend
> #==============================================================================
> rest:
>   # The address to which the REST client will connect to
>   address: localhost
>   bind-address: localhost
> # Catalog Store
> table:
>   catalog-store:
>     kind: file
>     file:
>       path: xxx
> # Embedded Scheduler config
> workflow-scheduler:
>   type: embedded
> # SQL Gateway address
> sql-gateway:
>   endpoint:
>     rest:
>       address: 127.0.0.1
> {code}
> 5. Start the Standalone cluster: . /bin/start-cluster.sh
> 6. Start the SQL Gateway: . /bin/sql-gateway.sh
> 7. Start SQL Client: /bin/sql-client.sh gateway --endpoint 
> http://127.0.0.1:8083
> 8. Register the test-filesystem Catalog
> {code:sql}
> CREATE CATALOG mt_cat
> WITH (
>   'type' = 'test-filesystem',
>   'path' = '...',
>   'default-database' = 'mydb'  
> );
> USE CATALOG mt_cat;
> {code}
> 9. Create the test-filesystem source table and insert the data
> {code:sql}
> -- 1. create json format table
> CREATE TABLE json_source (
>   order_id BIGINT,
>   user_id BIGINT,
>   user_name STRING,
>   order_created_at STRING,
>   payment_amount_cents BIGINT
> ) WITH (
>   'format' = 'json',
>   'source.monitor-interval' = '5S'
> );
> -- 2. insert data
> INSERT INTO mt_cat.mydb.json_source VALUES
> (1001, 1, 'user1', '2024-06-24 10:00:00', 10),
> (1002, 1, 'user2', '2024-06-24 10:01:00', 20),
> (1003, 2, 'user3', '2024-06-24 10:02:00', 30),
> (1004, 2, 'user4', '2024-06-24 10:03:00', 40),
> (1005, 1, 'user1', '2024-06-25 10:00:00', 10),
> (1006, 1, 'user2', '2024-06-25 10:01:00', 20),
> (1007, 2, 'user3', '2024-06-25 10:02:00', 30),
> (1008, 2, 'user4', '2024-06-25 10:03:00', 40);
> INSERT INTO mt_cat.mydb.json_source VALUES
> (1001, 1, 'user1', '2024-06-26 10:00:00', 10),
> (1002, 1, 'user2', '2024-06-26 10:01:00', 20),
> (1003, 2, 'user3', '2024-06-26 10:02:00', 30),
> (1004, 2, 'user4', '2024-06-26 10:03:00', 40),
> (1005, 1, 'user1', '2024-06-27 10:00:00', 10),
> (1006, 1, 'user2', '2024-06-27 10:01:00', 20),
> (1007, 2, 'user3', '2024-06-27 10:02:00', 30),
> (1008, 2, 'user4', '2024-06-27 10:03:00', 40);
> {code}
> h1. Feature verification
> h1. 
> h2. Continuous Mode
> h2. 
> In Continuous Mode, Materialized Table runs a Flink streaming job to update 
> the data in real-time. Feature verify includes various scenarios such as 
> Create & Suspend & Resume & Drop.
> 1. Create Materialized Table, including various bad cases and good cases, and 
> execute the following statement in the SQL Client
> {code:sql}
> CREATE MATERIALIZED TABLE continuous_users_shops 
> (
>   PRIMARY KEY(id) NOT ENFORCED
> )
> WITH(
>   'format' = 'debezium-json'
> )
> FRESHNESS = INTERVAL '30' SECOND
> AS SELECT 
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS pv
> FROM (
>     SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, 
> payment_amount_cents FROM json_source ) AS tmp
>  GROUP BY (user_id, ds);
> {code}
> 2. Suspend Materialized Table and execute the following statement in the SQL 
> Client
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops SUSPEND;
> {code}
> 3. Resume Materialized Table
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops RESUME;
> {code}
> 4. Manual Refresh Materialized Table
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops REFRESH 
> PARTITION(ds = '2024-06-25');
> {code}
> 5. Drop Materialized Table
> {code:sql}
> DROP MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops;
> {code}
> h2. Full Mode
> h2. 
> In Full Mode, Materialized Table needs to rely on Workflow Scheduler to 
> complete the periodic full refresh operation, so the main purpose is to 
> verify the FLIP-448 function.
> 1. Create Materialized Table, verify various good and bad cases, and execute 
> the following statement
> {code:sql}
> CREATE MATERIALIZED TABLE mt_cat.mydb.full_users_shops
> PARTITIONED BY (ds)
> WITH(
>   'format' = 'json'
> )
> FRESHNESS = INTERVAL '1' MINUTE
> REFRESH_MODE = FULL
> AS SELECT 
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS pv
> FROM (
>     SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, 
> payment_amount_cents FROM mt_cat.mydb.json_source ) AS tmp
> GROUP BY (user_id, ds);
> {code}
> 2. Suspend Materialized Table by executing the following statement
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops SUSPEND;
> {code}
> 3. Resume Materialized Table and execute the following statement
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops RESUME;
> {code}
> 4. Drop Materialized Table and execute the following statement
> {code:sql}
> DROP MATERIALIZED TABLE mt_cat.mydb.full_users_shops;
> DROP MATERIALIZED TABLE IF EXISTS mt_cat.mydb.full_users_shops;
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to