[
https://issues.apache.org/jira/browse/FLINK-35689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17859806#comment-17859806
]
dalongliu commented on FLINK-35689:
-----------------------------------
FLIP-435 & FLIP-448 were designed by me, and most of the code implementation
was done by [~hackergin], so I will do the Release Testing Verify.
> Release Testing: Verify FLIP-435 & FLIP-448: Introduce a New Materialized
> Table for Simplifying Data Pipelines
> --------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-35689
> URL: https://issues.apache.org/jira/browse/FLINK-35689
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / API
> Reporter: dalongliu
> Assignee: dalongliu
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.20.0
>
>
> Follow up the test for https://issues.apache.org/jira/browse/FLINK-35187,
> https://issues.apache.org/jira/browse/FLINK-35345
> Materialized Table depends on FLIP-435 & FLIP-448 to complete the end-to-end
> process, so the Release testing is an overall test of FLIP-435 & FLIP-448
> feature at the same time.
> Since Materialized Table depends on CatalogStore, Catalog, Workflow
> Scheduler, SQL Client, SQL Gateway, and Standalone cluster to go through the
> whole process, the validation process consists of two parts: Environment
> Setup and Feature Verification.
> h1. Environment Setup:
> h1.
> 1. create the File CatalogStore directory
> 2. Create the test-filesystem Catalog and put
> flink-table-filesystem-test-utils-1.20-SNAPSHOT.jar into the lib directory.
> 3. Create the Savepoint directory.
> 4. Configure the Flink config.yaml file.
> {code:yaml}
> #==============================================================================
> # Common
> #==============================================================================
> jobmanager:
> bind-host: localhost
> rpc:
> address: localhost
> # The RPC port where the JobManager is reachable.
> port: 6123
> memory:
> process:
> size: 1600m
> execution:
> failover-strategy: region
> taskmanager:
> bind-host: localhost
> host: localhost
> # The number of task slots that each TaskManager offers. Each slot runs one
> parallel pipeline.
> numberOfTaskSlots: 3
> memory:
> process:
> size: 1728m
> parallelism:
> # The parallelism used for programs that did not specify and other
> parallelism.
> default: 1
> #==============================================================================
> # Rest & web frontend
> #==============================================================================
> rest:
> # The address to which the REST client will connect to
> address: localhost
> bind-address: localhost
> # Catalog Store
> table:
> catalog-store:
> kind: file
> file:
> path: xxx
> # Embedded Scheduler config
> workflow-scheduler:
> type: embedded
> # SQL Gateway address
> sql-gateway:
> endpoint:
> rest:
> address: 127.0.0.1
> {code}
> 5. Start the Standalone cluster: . /bin/start-cluster.sh
> 6. Start the SQL Gateway: . /bin/sql-gateway.sh
> 7. Start SQL Client: /bin/sql-client.sh gateway --endpoint
> http://127.0.0.1:8083
> 8. Register the test-filesystem Catalog
> {code:sql}
> CREATE CATALOG mt_cat
> WITH (
> 'type' = 'test-filesystem',
> 'path' = '...',
> 'default-database' = 'mydb'
> );
> USE CATALOG mt_cat;
> {code}
> 9. Create the test-filesystem source table and insert the data
> {code:sql}
> -- 1. create json format table
> CREATE TABLE json_source (
> order_id BIGINT,
> user_id BIGINT,
> user_name STRING,
> order_created_at STRING,
> payment_amount_cents BIGINT
> ) WITH (
> 'format' = 'json',
> 'source.monitor-interval' = '5S'
> );
> -- 2. insert data
> INSERT INTO mt_cat.mydb.json_source VALUES
> (1001, 1, 'user1', '2024-06-24 10:00:00', 10),
> (1002, 1, 'user2', '2024-06-24 10:01:00', 20),
> (1003, 2, 'user3', '2024-06-24 10:02:00', 30),
> (1004, 2, 'user4', '2024-06-24 10:03:00', 40),
> (1005, 1, 'user1', '2024-06-25 10:00:00', 10),
> (1006, 1, 'user2', '2024-06-25 10:01:00', 20),
> (1007, 2, 'user3', '2024-06-25 10:02:00', 30),
> (1008, 2, 'user4', '2024-06-25 10:03:00', 40);
> INSERT INTO mt_cat.mydb.json_source VALUES
> (1001, 1, 'user1', '2024-06-26 10:00:00', 10),
> (1002, 1, 'user2', '2024-06-26 10:01:00', 20),
> (1003, 2, 'user3', '2024-06-26 10:02:00', 30),
> (1004, 2, 'user4', '2024-06-26 10:03:00', 40),
> (1005, 1, 'user1', '2024-06-27 10:00:00', 10),
> (1006, 1, 'user2', '2024-06-27 10:01:00', 20),
> (1007, 2, 'user3', '2024-06-27 10:02:00', 30),
> (1008, 2, 'user4', '2024-06-27 10:03:00', 40);
> {code}
> h1. Feature verification
> h1.
> h2. Continuous Mode
> h2.
> In Continuous Mode, Materialized Table runs a Flink streaming job to update
> the data in real-time. Feature verify includes various scenarios such as
> Create & Suspend & Resume & Drop.
> 1. Create Materialized Table, including various bad cases and good cases, and
> execute the following statement in the SQL Client
> {code:sql}
> CREATE MATERIALIZED TABLE continuous_users_shops
> (
> PRIMARY KEY(id) NOT ENFORCED
> )
> WITH(
> 'format' = 'debezium-json'
> )
> FRESHNESS = INTERVAL '30' SECOND
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS pv
> FROM (
> SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds,
> payment_amount_cents FROM json_source ) AS tmp
> GROUP BY (user_id, ds);
> {code}
> 2. Suspend Materialized Table and execute the following statement in the SQL
> Client
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops SUSPEND;
> {code}
> 3. Resume Materialized Table
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops RESUME;
> {code}
> 4. Manual Refresh Materialized Table
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops REFRESH
> PARTITION(ds = '2024-06-25');
> {code}
> 5. Drop Materialized Table
> {code:sql}
> DROP MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops;
> {code}
> h2. Full Mode
> h2.
> In Full Mode, Materialized Table needs to rely on Workflow Scheduler to
> complete the periodic full refresh operation, so the main purpose is to
> verify the FLIP-448 function.
> 1. Create Materialized Table, verify various good and bad cases, and execute
> the following statement
> {code:sql}
> CREATE MATERIALIZED TABLE mt_cat.mydb.full_users_shops
> PARTITIONED BY (ds)
> WITH(
> 'format' = 'json'
> )
> FRESHNESS = INTERVAL '1' MINUTE
> REFRESH_MODE = FULL
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS pv
> FROM (
> SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds,
> payment_amount_cents FROM mt_cat.mydb.json_source ) AS tmp
> GROUP BY (user_id, ds);
> {code}
> 2. Suspend Materialized Table by executing the following statement
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops SUSPEND;
> {code}
> 3. Resume Materialized Table and execute the following statement
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops RESUME;
> {code}
> 4. Drop Materialized Table and execute the following statement
> {code:sql}
> DROP MATERIALIZED TABLE mt_cat.mydb.full_users_shops;
> DROP MATERIALIZED TABLE IF EXISTS mt_cat.mydb.full_users_shops;
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)