[
https://issues.apache.org/jira/browse/FLINK-35689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17859821#comment-17859821
]
dalongliu edited comment on FLINK-35689 at 6/25/24 6:23 AM:
------------------------------------------------------------
h2. Verification Conclusion
h2.
Under Continuous Mode & Full Mode, the whole process of Materialized Table can
work, and the whole function is Ready, but the following problems are found
during the testing:
1. test-filesystem connector code needs to be optimized to support both stream
read and batch read.
2. when creating Materialized Table, if the option
`partition.fields.pt.date-formatter` is specified, it is necessary to verify
whether the partition field pt referenced by it exists, and if it does not
exist, it is necessary to report an error.
3. Continuous Mode: repeat Suspend need to check whether the RefreshStatus has
been in SUSPENDED state, if it has been suspended, should give a clear error
message.
4. Continuous Mode: Repeatedly perform Resume operation need to check whether
the job is in the running state, otherwise can not Resume successfully, to
avoid repeated submission of multiple streams of jobs.
5. Manual Refresh Materialized Table: There is a problem with the validation
logic for non-String type partitioned fields, resulting in inaccurate exception
information. For example, user_id is not a partitioned field, so the user_id
field should be excluded before validating the partitioned field.
6. Manually Refresh Materialized Table: the cluster info returned shows the map
object, not the json string corresponding to the map.
7. Full Mode: Repeat suspend operation, if it is already suspended, it should
give a clear error message.
8. Full Mode: Repeat the Resume operation with a clear error message.
9. DROP TABLE syntax can be used to delete Materialized Table metadata, need to
check whether Materialized Table supports DROP TABLE syntax.
The above issues are tracked and fixed in
https://issues.apache.org/jira/browse/FLINK-35691. cc [~hackergin].
was (Author: lsy):
h2. Verification Conclusion
h2.
Under Continuous Mode & Full Mode, the whole process of Materialized Table can
work, and the whole function is Ready, but the following problems are found
during the testing:
1. test-filesystem connector code needs to be optimized to support both stream
read and batch read.
2. when creating Materialized Table, if the option
`partition.fields.pt.date-formatter` is specified, it is necessary to verify
whether the partition field pt referenced by it exists, and if it does not
exist, it is necessary to report an error.
3. Continuous Mode: repeat Suspend need to check whether the RefreshStatus has
been in SUSPENDED state, if it has been suspended, should give a clear error
message.
4. Continuous Mode: Repeatedly perform Resume operation need to check whether
the job is in the running state, otherwise can not Resume successfully, to
avoid repeated submission of multiple streams of jobs.
5. Manual Refresh Materialized Table: There is a problem with the validation
logic for non-String type partitioned fields, resulting in inaccurate exception
information. For example, user_id is not a partitioned field, so the user_id
field should be excluded before validating the partitioned field.
6. Manually Refresh Materialized Table: the cluster info returned shows the map
object, not the json string corresponding to the map.
7. Full Mode: Repeat suspend operation, if it is already suspended, it should
give a clear error message.
8. Full Mode: Repeat the Resume operation with a clear error message.
9. DROP TABLE syntax can be used to delete Materialized Table metadata, need to
check whether Materialized Table supports DROP TABLE syntax.
The above issues are tracked and fixed in
https://issues.apache.org/jira/browse/FLINK-35691. cc [~hackergin].
> Release Testing: Verify FLIP-435 & FLIP-448: Introduce a New Materialized
> Table for Simplifying Data Pipelines
> --------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-35689
> URL: https://issues.apache.org/jira/browse/FLINK-35689
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / API
> Reporter: dalongliu
> Assignee: dalongliu
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.20.0
>
> Attachments: image-2024-06-25-12-00-07-616.png,
> image-2024-06-25-12-01-09-648.png, image-2024-06-25-12-02-08-558.png,
> image-2024-06-25-12-02-51-615.png, image-2024-06-25-12-03-20-930.png,
> image-2024-06-25-12-04-24-948.png, image-2024-06-25-12-05-39-089.png,
> image-2024-06-25-12-05-54-104.png, image-2024-06-25-12-07-52-182.png,
> image-2024-06-25-12-09-11-207.png, image-2024-06-25-12-09-22-879.png,
> image-2024-06-25-12-11-08-720.png, image-2024-06-25-12-13-47-363.png,
> image-2024-06-25-12-14-13-107.png, image-2024-06-25-12-15-03-493.png,
> image-2024-06-25-12-16-47-160.png, image-2024-06-25-12-16-57-076.png,
> image-2024-06-25-12-18-12-506.png, image-2024-06-25-13-38-47-663.png,
> image-2024-06-25-13-39-44-790.png, image-2024-06-25-13-39-56-133.png,
> image-2024-06-25-13-43-10-439.png, image-2024-06-25-13-43-22-548.png,
> image-2024-06-25-13-44-07-669.png, screenshot-1.png, screenshot-10.png,
> screenshot-11.png, screenshot-12.png, screenshot-13.png, screenshot-14.png,
> screenshot-15.png, screenshot-16.png, screenshot-2.png, screenshot-3.png,
> screenshot-4.png, screenshot-5.png, screenshot-6.png, screenshot-7.png,
> screenshot-8.png, screenshot-9.png
>
>
> Follow up the test for https://issues.apache.org/jira/browse/FLINK-35187,
> https://issues.apache.org/jira/browse/FLINK-35345
> Materialized Table depends on FLIP-435 & FLIP-448 to complete the end-to-end
> process, so the Release testing is an overall test of FLIP-435 & FLIP-448
> feature at the same time.
> Since Materialized Table depends on CatalogStore, Catalog, Workflow
> Scheduler, SQL Client, SQL Gateway, and Standalone cluster to go through the
> whole process, the validation process consists of two parts: Environment
> Setup and Feature Verification.
> h1. Environment Setup:
> h1.
> 1. create the File CatalogStore directory
> 2. Create the test-filesystem Catalog and put
> flink-table-filesystem-test-utils-1.20-SNAPSHOT.jar into the lib directory.
> 3. Create the Savepoint directory.
> 4. Configure the Flink config.yaml file.
> {code:yaml}
> #==============================================================================
> # Common
> #==============================================================================
> jobmanager:
> bind-host: localhost
> rpc:
> address: localhost
> # The RPC port where the JobManager is reachable.
> port: 6123
> memory:
> process:
> size: 1600m
> execution:
> failover-strategy: region
> taskmanager:
> bind-host: localhost
> host: localhost
> # The number of task slots that each TaskManager offers. Each slot runs one
> parallel pipeline.
> numberOfTaskSlots: 3
> memory:
> process:
> size: 1728m
> parallelism:
> # The parallelism used for programs that did not specify and other
> parallelism.
> default: 1
> #==============================================================================
> # Rest & web frontend
> #==============================================================================
> rest:
> # The address to which the REST client will connect to
> address: localhost
> bind-address: localhost
> # Catalog Store
> table:
> catalog-store:
> kind: file
> file:
> path: xxx
> # Embedded Scheduler config
> workflow-scheduler:
> type: embedded
> # SQL Gateway address
> sql-gateway:
> endpoint:
> rest:
> address: 127.0.0.1
> {code}
> 5. Start the Standalone cluster: . /bin/start-cluster.sh
> 6. Start the SQL Gateway: . /bin/sql-gateway.sh
> 7. Start SQL Client: /bin/sql-client.sh gateway --endpoint
> http://127.0.0.1:8083
> 8. Register the test-filesystem Catalog
> {code:sql}
> CREATE CATALOG mt_cat
> WITH (
> 'type' = 'test-filesystem',
> 'path' = '...',
> 'default-database' = 'mydb'
> );
> USE CATALOG mt_cat;
> {code}
> 9. Create the test-filesystem source table and insert the data
> {code:sql}
> -- 1. create json format table
> CREATE TABLE json_source (
> order_id BIGINT,
> user_id BIGINT,
> user_name STRING,
> order_created_at STRING,
> payment_amount_cents BIGINT
> ) WITH (
> 'format' = 'json',
> 'source.monitor-interval' = '5S'
> );
> -- 2. insert data
> INSERT INTO mt_cat.mydb.json_source VALUES
> (1001, 1, 'user1', '2024-06-24 10:00:00', 10),
> (1002, 1, 'user2', '2024-06-24 10:01:00', 20),
> (1003, 2, 'user3', '2024-06-24 10:02:00', 30),
> (1004, 2, 'user4', '2024-06-24 10:03:00', 40),
> (1005, 1, 'user1', '2024-06-25 10:00:00', 10),
> (1006, 1, 'user2', '2024-06-25 10:01:00', 20),
> (1007, 2, 'user3', '2024-06-25 10:02:00', 30),
> (1008, 2, 'user4', '2024-06-25 10:03:00', 40);
> INSERT INTO mt_cat.mydb.json_source VALUES
> (1001, 1, 'user1', '2024-06-26 10:00:00', 10),
> (1002, 1, 'user2', '2024-06-26 10:01:00', 20),
> (1003, 2, 'user3', '2024-06-26 10:02:00', 30),
> (1004, 2, 'user4', '2024-06-26 10:03:00', 40),
> (1005, 1, 'user1', '2024-06-27 10:00:00', 10),
> (1006, 1, 'user2', '2024-06-27 10:01:00', 20),
> (1007, 2, 'user3', '2024-06-27 10:02:00', 30),
> (1008, 2, 'user4', '2024-06-27 10:03:00', 40);
> {code}
> h1. Feature verification
> h1.
> h2. Continuous Mode
> h2.
> In Continuous Mode, Materialized Table runs a Flink streaming job to update
> the data in real-time. Feature verify includes various scenarios such as
> Create & Suspend & Resume & Drop.
> 1. Create Materialized Table, including various bad cases and good cases, and
> execute the following statement in the SQL Client
> {code:sql}
> CREATE MATERIALIZED TABLE continuous_users_shops
> (
> PRIMARY KEY(id) NOT ENFORCED
> )
> WITH(
> 'format' = 'debezium-json'
> )
> FRESHNESS = INTERVAL '30' SECOND
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS pv
> FROM (
> SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds,
> payment_amount_cents FROM json_source ) AS tmp
> GROUP BY (user_id, ds);
> {code}
> 2. Suspend Materialized Table and execute the following statement in the SQL
> Client
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops SUSPEND;
> {code}
> 3. Resume Materialized Table
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops RESUME;
> {code}
> 4. Manual Refresh Materialized Table
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops REFRESH
> PARTITION(ds = '2024-06-25');
> {code}
> 5. Drop Materialized Table
> {code:sql}
> DROP MATERIALIZED TABLE mt_cat.mydb.continuous_users_shops;
> {code}
> h2. Full Mode
> h2.
> In Full Mode, Materialized Table needs to rely on Workflow Scheduler to
> complete the periodic full refresh operation, so the main purpose is to
> verify the FLIP-448 function.
> 1. Create Materialized Table, verify various good and bad cases, and execute
> the following statement
> {code:sql}
> CREATE MATERIALIZED TABLE mt_cat.mydb.full_users_shops
> PARTITIONED BY (ds)
> WITH(
> 'format' = 'json'
> )
> FRESHNESS = INTERVAL '1' MINUTE
> REFRESH_MODE = FULL
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS pv
> FROM (
> SELECT user_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds,
> payment_amount_cents FROM mt_cat.mydb.json_source ) AS tmp
> GROUP BY (user_id, ds);
> {code}
> 2. Suspend Materialized Table by executing the following statement
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops SUSPEND;
> {code}
> 3. Resume Materialized Table and execute the following statement
> {code:sql}
> ALTER MATERIALIZED TABLE mt_cat.mydb.full_users_shops RESUME;
> {code}
> 4. Drop Materialized Table and execute the following statement
> {code:sql}
> DROP MATERIALIZED TABLE mt_cat.mydb.full_users_shops;
> DROP MATERIALIZED TABLE IF EXISTS mt_cat.mydb.full_users_shops;
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)