[
https://issues.apache.org/jira/browse/FLINK-34351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
lincoln lee updated FLINK-34351:
--------------------------------
Description:
This describes how to verify FLINK-33397: Support Configuring Different State
TTLs using SQL Hint.
The verification steps are as follows.
1. Start the standalone session cluster and sql client.
2. Execute the following DDL statements.
{code:sql}
CREATE TABLE `default_catalog`.`default_database`.`Orders` (
`order_id` INT,
`line_order_id` INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5'
);
CREATE TABLE `default_catalog`.`default_database`.`LineOrders` (
`line_order_id` INT,
`ship_mode` STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5'
);
CREATE TABLE `default_catalog`.`default_database`.`OrdersShipInfo` (
`order_id` INT,
`line_order_id` INT,
`ship_mode` STRING ) WITH (
'connector' = 'print'
); {code}
3. Compile and verify the INSERT INTO statement with the STATE_TTL hint applied
to join
{code:sql}
-- SET the pipeline level state TTL to 24h
SET 'table.exec.state.ttl' = '24h';
-- Configure different state TTL for join operator
COMPILE PLAN '/path/to/join-plan.json' FOR
INSERT INTO OrdersShipInfo
SELECT /*+STATE_TTL('a' = '2d', 'b' = '12h')*/ a.order_id, a.line_order_id,
b.ship_mode
FROM Orders a JOIN LineOrders b ON a.line_order_id = b.line_order_id;
{code}
The generated JSON file *should* contain the following "state" JSON array for
StreamJoin ExecNode.
{code:json}
{
"id" : 5,
"type" : "stream-exec-join_1",
"joinSpec" : {
...
},
"state" : [ {
"index" : 0,
"ttl" : "2 d",
"name" : "leftState"
}, {
"index" : 1,
"ttl" : "12 h",
"name" : "rightState"
} ],
"inputProperties": [...],
"outputType": ...,
"description": ...
}
{code}
4. Compile and verify the INSERT INTO statement with the STATE_TTL hint applied
to group aggregate
{code:sql}
CREARE TABLE source_t (
a INT,
b BIGINT,
c STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5'
);
CREARE TABLE sink_t (
b BIGINT PRIMARY KEY NOT ENFORCED,
cnt BIGINT,
avg_a DOUBLE,
min_c STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5'
);
COMPILE PLAN '/path/to/agg-plan.json' FOR
INSERT INTO sink_t SELECT /*+ STATE_TTL('source_t' = '1s') */
b,
COUNT(*) AS cnt,
AVG(a) FILTER (WHERE a > 1) AS avg_a
MIN(c) AS min_c
FROM source_t GROUP BY b
{code}
The generated JSON file *should* contain the following "state" JSON array for
StreamExecGroupAggregate ExecNode.
{code:json}
"state" : [ {
"index" : 0,
"ttl" : "1 s",
"name" : "groupAggregateState"
} ]
{code}
> Release Testing: Verify FLINK-33397 Support Configuring Different State TTLs
> using SQL Hint
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-34351
> URL: https://issues.apache.org/jira/browse/FLINK-34351
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / API
> Affects Versions: 1.19.0
> Reporter: Jane Chan
> Assignee: Yubin Li
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.19.0
>
> Attachments: image-2024-02-21-15-24-22-289.png,
> image-2024-02-21-15-24-43-212.png
>
>
> This describes how to verify FLINK-33397: Support Configuring Different State
> TTLs using SQL Hint.
>
> The verification steps are as follows.
> 1. Start the standalone session cluster and sql client.
> 2. Execute the following DDL statements.
> {code:sql}
> CREATE TABLE `default_catalog`.`default_database`.`Orders` (
> `order_id` INT,
> `line_order_id` INT
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '5'
> );
> CREATE TABLE `default_catalog`.`default_database`.`LineOrders` (
> `line_order_id` INT,
> `ship_mode` STRING
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '5'
> );
> CREATE TABLE `default_catalog`.`default_database`.`OrdersShipInfo` (
> `order_id` INT,
> `line_order_id` INT,
> `ship_mode` STRING ) WITH (
> 'connector' = 'print'
> ); {code}
> 3. Compile and verify the INSERT INTO statement with the STATE_TTL hint
> applied to join
> {code:sql}
> -- SET the pipeline level state TTL to 24h
> SET 'table.exec.state.ttl' = '24h';
> -- Configure different state TTL for join operator
> COMPILE PLAN '/path/to/join-plan.json' FOR
> INSERT INTO OrdersShipInfo
> SELECT /*+STATE_TTL('a' = '2d', 'b' = '12h')*/ a.order_id, a.line_order_id,
> b.ship_mode
> FROM Orders a JOIN LineOrders b ON a.line_order_id = b.line_order_id;
> {code}
> The generated JSON file *should* contain the following "state" JSON array for
> StreamJoin ExecNode.
> {code:json}
> {
> "id" : 5,
> "type" : "stream-exec-join_1",
> "joinSpec" : {
> ...
> },
> "state" : [ {
> "index" : 0,
> "ttl" : "2 d",
> "name" : "leftState"
> }, {
> "index" : 1,
> "ttl" : "12 h",
> "name" : "rightState"
> } ],
> "inputProperties": [...],
> "outputType": ...,
> "description": ...
> }
> {code}
> 4. Compile and verify the INSERT INTO statement with the STATE_TTL hint
> applied to group aggregate
> {code:sql}
> CREARE TABLE source_t (
> a INT,
> b BIGINT,
> c STRING
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '5'
> );
> CREARE TABLE sink_t (
> b BIGINT PRIMARY KEY NOT ENFORCED,
> cnt BIGINT,
> avg_a DOUBLE,
> min_c STRING
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '5'
> );
> COMPILE PLAN '/path/to/agg-plan.json' FOR
> INSERT INTO sink_t SELECT /*+ STATE_TTL('source_t' = '1s') */
> b,
> COUNT(*) AS cnt,
> AVG(a) FILTER (WHERE a > 1) AS avg_a
> MIN(c) AS min_c
> FROM source_t GROUP BY b
> {code}
>
> The generated JSON file *should* contain the following "state" JSON array for
> StreamExecGroupAggregate ExecNode.
> {code:json}
> "state" : [ {
> "index" : 0,
> "ttl" : "1 s",
> "name" : "groupAggregateState"
> } ]
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)