zhangbin created FLINK-27922:
--------------------------------
Summary: Flink SQL unique key lost when set
table.exec.mini-batch.enabled=true
Key: FLINK-27922
URL: https://issues.apache.org/jira/browse/FLINK-27922
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.12.2
Environment: Flink1.12.2
Reporter: zhangbin
Flink SQL table has primary keys, but when set table.exec.mini-batch.enabled
=true, the unique key is lost.
{code:java}
@Test
public void testJoinUniqueKey() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
StatementSet statementSet = tableEnv.createStatementSet();
tableEnv.executeSql("CREATE TABLE `t_apply_sku_test`(`dt`
BIGINT,`refund_apply_id` BIGINT,`base_sku_id` BIGINT,`order_id`
BIGINT,`user_id` BIGINT,`poi_id` BIGINT,`refund_type`
BIGINT,`apply_refund_reason_code` BIGINT,`apply_refund_reason_desc`
VARCHAR,`apply_refund_review_status` BIGINT,`apply_refund_review_status_desc`
VARCHAR,`apply_refund_reject_reason` VARCHAR,`apply_is_refunded`
INTEGER,`apply_pic_url` VARCHAR,`remark` VARCHAR,`refund_apply_originator`
BIGINT,`second_reason_code` BIGINT,`second_reason`
VARCHAR,`refund_target_account` BIGINT,`after_service_id`
BIGINT,`receipt_status` BIGINT,`group_header_goods_status`
INTEGER,`apply_operator_mis_name` VARCHAR,`refund_apply_time`
BIGINT,`update_time` BIGINT,`base_sku_name` VARCHAR,`apply_refund_num`
BIGINT,`view_qty` DECIMAL(38,18),`refund_scale_type`
BIGINT,`refund_scale_type_desc` VARCHAR,`refund_scale`
DECIMAL(38,18),`apply_refund_amt` DECIMAL(38,18),`refund_scale_user_real_pay`
DECIMAL(38,18),`refund_red_packet_price` DECIMAL(38,18),`load_time`
VARCHAR,`take_rate_type` BIGINT,`platform_rate` DECIMAL(38,18),`order_sku_type`
INTEGER,`second_reason_aggregated_code` INTEGER,`second_reason_aggregated`
VARCHAR,`compensation_amount` DECIMAL(38,18),`aftersale_type`
INTEGER,`group_header_parallel_status` INTEGER,`grid_parallel_status` INTEGER)
WITH ('connector'='blackhole')");
tableEnv.executeSql("CREATE TABLE `t_name`(`id` BIGINT,`after_service_id`
BIGINT PRIMARY KEY NOT ENFORCED,`order_id` BIGINT,`user_id` BIGINT,`poi_id`
BIGINT,`city_id` BIGINT,`refund_type` INTEGER,`first_reason_code`
INTEGER,`first_reason` VARCHAR,`second_reason_code` INTEGER,`second_reason`
VARCHAR,`pic_url` VARCHAR,`remark` VARCHAR,`refund_price`
INTEGER,`refund_red_packet_price` INTEGER,`refund_total_price`
INTEGER,`refund_promotion_price` INTEGER,`refund_coupon_price`
INTEGER,`refund_other_price` INTEGER,`user_receipt_status`
INTEGER,`collect_status` INTEGER,`refund_target_account` INTEGER,`status`
INTEGER,`flow_instance_id` BIGINT,`create_time` BIGINT,`modify_time` BIGINT)
WITH ('connector'='datagen')");
tableEnv.executeSql("CREATE TABLE `t_item_name`(`id`
BIGINT,`refund_fwd_item_id` BIGINT,`after_service_id` BIGINT,`order_id`
BIGINT,`order_item_id` BIGINT,`stack_id` BIGINT,`sku_id` BIGINT,`sku_name`
VARCHAR,`supplier_id` BIGINT,`refund_quantity` INTEGER,`item_sku_type`
INTEGER,`refund_scale_type` INTEGER,`refund_scale` INTEGER,`accurate_refund`
INTEGER,`refund_price` INTEGER,`refund_red_packet_price`
INTEGER,`refund_price_info` VARCHAR,`refund_total_price`
INTEGER,`refund_promotion_price` INTEGER,`refund_coupon_price`
INTEGER,`refund_other_price` INTEGER,`extend_info` VARCHAR,`create_time`
BIGINT,`modify_time` BIGINT) WITH ('connector'='datagen')");
tableEnv.executeSql("CREATE TABLE `t_progress_name`(`id`
BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id`
BIGINT,`progress_node` VARCHAR,`progress_node_status` INTEGER,`operator`
VARCHAR,`parallel` INTEGER,`flow_element_id` VARCHAR,`extend_info`
VARCHAR,`create_time` BIGINT,`modify_time` BIGINT) WITH
('connector'='datagen')");
tableEnv.executeSql("CREATE TABLE `t_attr_name`(`id`
BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id`
BIGINT,`name` VARCHAR,`value` VARCHAR,`create_time` BIGINT,`modify_time`
BIGINT) WITH ('connector'='datagen')");
tableEnv.executeSql("CREATE TABLE `t_apply_status_name`(`id`
BIGINT,`after_service_id` BIGINT PRIMARY KEY NOT ENFORCED,`order_id`
BIGINT,`apply_status` INTEGER,`ai_audit_status`
INTEGER,`group_header_confirm_status` INTEGER,`group_header_retrieve_status`
INTEGER,`driver_retrieve_status` INTEGER,`group_header_parallel_status`
INTEGER,`grid_parallel_status` INTEGER,`create_time` BIGINT,`modify_time`
BIGINT) WITH ('connector'='datagen')");
tableEnv.executeSql("CREATE VIEW `v_refund_fwd_item_sku` AS SELECT
`after_service_id` AS `refund_apply_id`, `order_id`, `sku_id` AS `base_sku_id`,
SUM(`if`(`refund_quantity` IS NULL, 0, `refund_quantity`)) AS
`apply_refund_num`, SUM(`if`(`refund_quantity` IS NULL, 0,
CAST(`refund_quantity` AS DOUBLE))) AS `view_qty`, SUM(`if`(`refund_price` IS
NULL, 0, `refund_price`)) AS `apply_refund_amt`, SUM(`if`(`refund_price` IS
NULL, 0, `refund_price`)) AS `refund_scale_user_real_pay`,
SUM(`if`(`refund_red_packet_price` IS NULL, 0, `refund_red_packet_price`)) AS
`refund_red_packet_price`\n"
+ "FROM `t_item_name`\n"
+ "GROUP BY `after_service_id`, `order_id`, `sku_id`");
statementSet.addInsertSql("INSERT INTO `t_apply_sku_test` SELECT
CAST(`FROM_UNIXTIME`(`ord`.`modify_time` / 1000, 'yyyyMMdd') AS BIGINT) AS
`dt`, `sku`.`refund_apply_id`,`sku`.`base_sku_id`, `sku`.`order_id`,
`ord`.`user_id`, `ord`.`poi_id`, CAST(`ord`.`refund_type` AS BIGINT) AS
`refund_type`, CAST(`ord`.`first_reason_code` AS BIGINT) AS
`apply_refund_reason_code`, `ord`.`first_reason` AS `apply_refund_reason_desc`,
CAST(`stat`.`apply_status` AS BIGINT) AS `apply_refund_review_status`, CASE
WHEN `stat`.`apply_status` = 0 OR `stat`.`apply_status` = 13 THEN 'a' WHEN
`stat`.`apply_status` = 1 THEN 'b' WHEN `stat`.`apply_status` = 2 THEN 'c' WHEN
`stat`.`apply_status` = 3 THEN 'd' WHEN `stat`.`apply_status` = 4 THEN 'e' WHEN
`stat`.`apply_status` = 5 THEN 'f' WHEN `stat`.`apply_status` = 6 THEN 'g' ELSE
'x' END AS `apply_refund_review_status_desc`,
`if`(`progress`.`progress_node_status` = 30, `extend_info`, '') AS
`apply_refund_reject_reason`, CASE WHEN `progress`.`progress_node` =
'refund.node' THEN 1 ELSE 0 END AS `apply_is_refunded`, `ord`.`pic_url` AS
`apply_pic_url`, `ord`.`remark`, CAST(NULL AS BIGINT) AS
`refund_apply_originator`, CAST(`ord`.`second_reason_code` AS BIGINT) AS
`second_reason_code`, `ord`.`second_reason`, CAST(`ord`.`refund_target_account`
AS BIGINT) AS `refund_target_account`, `ord`.`after_service_id`,
CAST(`ord`.`user_receipt_status` AS BIGINT) AS `receipt_status`, CASE WHEN
`attr`.`name` = 'fwd_ext' AND `attr`.`value` = '1' THEN 1 WHEN `attr`.`name` =
'fwd_ext' AND `attr`.`value` = '2' THEN 2 ELSE 0 END AS
`group_header_goods_status`, `progress`.`operator` AS
`apply_operator_mis_name`, `ord`.`create_time` AS `refund_apply_time`,
`ord`.`modify_time` AS `update_time`, CAST(NULL AS VARCHAR) AS `base_sku_name`,
CAST(`sku`.`apply_refund_num` AS BIGINT) AS `apply_refund_num`,
`sku`.`view_qty` AS `view_qty`, CAST(NULL AS BIGINT) AS `refund_scale_type`,
CAST(NULL AS VARCHAR) AS `refund_scale_type_desc`, CAST(NULL AS DECIMAL) AS
`refund_scale`, `sku`.`apply_refund_amt` AS `apply_refund_amt`,
`sku`.`refund_scale_user_real_pay` AS `refund_scale_user_real_pay`,
`sku`.`refund_red_packet_price` AS `refund_red_packet_price`,
CAST(LOCALTIMESTAMP AS VARCHAR) AS `load_time`, CAST(NULL AS BIGINT) AS
`take_rate_type`, CAST(NULL AS DECIMAL) AS `platform_rate`, 1 AS
`order_sku_type`, CAST(NULL AS INTEGER) AS `second_reason_aggregated_code`,
CAST(NULL AS VARCHAR) AS `second_reason_aggregated`, CAST(NULL AS DECIMAL) AS
`compensation_amount`, 1 AS `aftersale_type`, CASE WHEN
`progress`.`progress_node` = 'group.header.audit.node' AND `parallel` = 1 AND
`progress_node_status` = 10 THEN 1 WHEN `progress`.`progress_node` =
'group.header.audit.node' AND `parallel` = 1 AND `progress_node_status` = 20
THEN 2 WHEN `progress`.`progress_node` = 'group.header.audit.node' AND
`parallel` = 1 AND `progress_node_status` = 30 THEN 3 WHEN
`progress`.`progress_node` = 'group.header.audit.node' AND `parallel` = 1 AND
`progress_node_status` = 40 THEN 4 WHEN `progress`.`progress_node` =
'group.header.audit.node' AND `parallel` = 1 AND `progress_node_status` = 50
THEN 5 ELSE 0 END AS `group_header_parallel_status`, CASE WHEN
`progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND
`progress_node_status` = 10 THEN 1 WHEN `progress`.`progress_node` =
'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 20 THEN 2
WHEN `progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND
`progress_node_status` = 30 THEN 3 WHEN `progress`.`progress_node` =
'grid.audit.node' AND `parallel` = 1 AND `progress_node_status` = 40 THEN 4
WHEN `progress`.`progress_node` = 'grid.audit.node' AND `parallel` = 1 AND
`progress_node_status` = 50 THEN 5 ELSE 0 END AS `grid_parallel_status`\n"
+ "FROM `t_name` AS `ord`\n"
+ "LEFT JOIN `t_progress_name` AS `progress` ON
`ord`.`after_service_id` = `progress`.`after_service_id`\n"
+ "LEFT JOIN `t_attr_name` AS `attr` ON `ord`.`after_service_id` =
`attr`.`after_service_id`\n"
+ "LEFT JOIN `t_apply_status_name` AS `stat` ON
`ord`.`after_service_id` = `stat`.`after_service_id`\n"
+ "LEFT JOIN `v_refund_fwd_item_sku` AS `sku` ON
`stat`.`after_service_id` = `sku`.`refund_apply_id`");
System.out.println(statementSet.explain());
}{code}
== Optimized Logical Plan ==
{code:java}
Sink(table=[default_catalog.default_database.t_apply_sku_test], fields=[dt,
refund_apply_id, base_sku_id, order_id, user_id, poi_id, refund_type,
apply_refund_reason_code, apply_refund_reason_desc, apply_refund_review_status,
apply_refund_review_status_desc, apply_refund_reject_reason, apply_is_refunded,
apply_pic_url, remark, refund_apply_originator, second_reason_code,
second_reason, refund_target_account, after_service_id, receipt_status,
group_header_goods_status, apply_operator_mis_name, refund_apply_time,
update_time, base_sku_name, apply_refund_num, view_qty, refund_scale_type,
refund_scale_type_desc, refund_scale, apply_refund_amt,
refund_scale_user_real_pay, refund_red_packet_price, load_time, take_rate_type,
platform_rate, order_sku_type, second_reason_aggregated_code,
second_reason_aggregated, compensation_amount, aftersale_type,
group_header_parallel_status, grid_parallel_status])
+- Calc(select=[CAST(((modify_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd'))
AS dt, refund_apply_id, base_sku_id, order_id, user_id, poi_id,
CAST(refund_type) AS refund_type, CAST(first_reason_code) AS
apply_refund_reason_code, first_reason AS apply_refund_reason_desc,
CAST(apply_status) AS apply_refund_review_status, CAST(((apply_status SEARCH
Sarg[0, 13]) CASE _UTF-16LE'a' CASE (apply_status = 1) CASE _UTF-16LE'b' CASE
(apply_status = 2) CASE _UTF-16LE'c' CASE (apply_status = 3) CASE _UTF-16LE'd'
CASE (apply_status = 4) CASE _UTF-16LE'e' CASE (apply_status = 5) CASE
_UTF-16LE'f' CASE (apply_status = 6) CASE _UTF-16LE'g' CASE _UTF-16LE'x')) AS
apply_refund_review_status_desc, ((progress_node_status = 30) IF extend_info IF
_UTF-16LE'') AS apply_refund_reject_reason, CAST(((progress_node =
_UTF-16LE'refund.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE 1
CASE 0)) AS apply_is_refunded, pic_url AS apply_pic_url, remark, null:BIGINT AS
refund_apply_originator, CAST(second_reason_code) AS second_reason_code,
second_reason, CAST(refund_target_account) AS refund_target_account,
CAST(after_service_id) AS after_service_id, CAST(user_receipt_status) AS
receipt_status, CAST((((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) CHARACTER
SET "UTF-16LE") AND (value = _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE")) CASE 1 CASE ((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE") AND (value = _UTF-16LE'2':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE")) CASE 2 CASE 0)) AS group_header_goods_status,
operator AS apply_operator_mis_name, create_time AS refund_apply_time,
modify_time AS update_time, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
AS base_sku_name, CAST(apply_refund_num) AS apply_refund_num, CAST(view_qty) AS
view_qty, null:BIGINT AS refund_scale_type, null:VARCHAR(2147483647) CHARACTER
SET "UTF-16LE" AS refund_scale_type_desc, null:DECIMAL(38, 18) AS refund_scale,
CAST(apply_refund_amt) AS apply_refund_amt, CAST(refund_scale_user_real_pay) AS
refund_scale_user_real_pay, CAST(refund_red_packet_price) AS
refund_red_packet_price, CAST(CAST(())) AS load_time, null:BIGINT AS
take_rate_type, null:DECIMAL(38, 18) AS platform_rate, 1 AS order_sku_type,
null:INTEGER AS second_reason_aggregated_code, null:VARCHAR(2147483647)
CHARACTER SET "UTF-16LE" AS second_reason_aggregated, null:DECIMAL(38, 18) AS
compensation_amount, 1 AS aftersale_type, CAST((((progress_node =
_UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE") AND (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE
((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 20))
CASE 2 CASE ((progress_node =
_UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE") AND (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE
((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 40))
CASE 4 CASE ((progress_node =
_UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE") AND (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0))
AS group_header_parallel_status, CAST((((progress_node =
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE ((progress_node =
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(parallel = 1) AND (progress_node_status = 20)) CASE 2 CASE ((progress_node =
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE ((progress_node =
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(parallel = 1) AND (progress_node_status = 40)) CASE 4 CASE ((progress_node =
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0)) AS
grid_parallel_status])
+- Join(joinType=[LeftOuterJoin], where=[(after_service_id0 =
refund_apply_id)], select=[after_service_id, user_id, poi_id, refund_type,
first_reason_code, first_reason, second_reason_code, second_reason, pic_url,
remark, user_receipt_status, refund_target_account, create_time, modify_time,
progress_node, progress_node_status, operator, parallel, extend_info, name,
value, after_service_id0, apply_status, refund_apply_id, order_id, base_sku_id,
apply_refund_num, view_qty, apply_refund_amt, refund_scale_user_real_pay,
refund_red_packet_price], leftInputSpec=[NoUniqueKey],
rightInputSpec=[HasUniqueKey])
:- Exchange(distribution=[hash[after_service_id0]])
: +- Join(joinType=[LeftOuterJoin], where=[(after_service_id =
after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type,
first_reason_code, first_reason, second_reason_code, second_reason, pic_url,
remark, user_receipt_status, refund_target_account, create_time, modify_time,
progress_node, progress_node_status, operator, parallel, extend_info, name,
value, after_service_id0, apply_status], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
: :- Exchange(distribution=[hash[after_service_id]])
: : +- Calc(select=[after_service_id, user_id, poi_id, refund_type,
first_reason_code, first_reason, second_reason_code, second_reason, pic_url,
remark, user_receipt_status, refund_target_account, create_time, modify_time,
progress_node, progress_node_status, operator, parallel, extend_info, name,
value])
: : +- Join(joinType=[LeftOuterJoin], where=[(after_service_id =
after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type,
first_reason_code, first_reason, second_reason_code, second_reason, pic_url,
remark, user_receipt_status, refund_target_account, create_time, modify_time,
progress_node, progress_node_status, operator, parallel, extend_info,
after_service_id0, name, value], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
: : :- Exchange(distribution=[hash[after_service_id]])
: : : +- Calc(select=[after_service_id, user_id, poi_id,
refund_type, first_reason_code, first_reason, second_reason_code,
second_reason, pic_url, remark, user_receipt_status, refund_target_account,
create_time, modify_time, progress_node, progress_node_status, operator,
parallel, extend_info])
: : : +- Join(joinType=[LeftOuterJoin],
where=[(after_service_id = after_service_id0)], select=[after_service_id,
user_id, poi_id, refund_type, first_reason_code, first_reason,
second_reason_code, second_reason, pic_url, remark, user_receipt_status,
refund_target_account, create_time, modify_time, after_service_id0,
progress_node, progress_node_status, operator, parallel, extend_info],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
: : : :- Exchange(distribution=[hash[after_service_id]])
: : : : +- Calc(select=[after_service_id, user_id,
poi_id, refund_type, first_reason_code, first_reason, second_reason_code,
second_reason, pic_url, remark, user_receipt_status, refund_target_account,
create_time, modify_time])
: : : : +- DropUpdateBefore
: : : : +- MiniBatchAssigner(interval=[5000ms],
mode=[ProcTime])
: : : : +-
TableSourceScan(table=[[default_catalog, default_database, t_name]],
fields=[id, after_service_id, order_id, user_id, poi_id, city_id, refund_type,
first_reason_code, first_reason, second_reason_code, second_reason, pic_url,
remark, refund_price, refund_red_packet_price, refund_total_price,
refund_promotion_price, refund_coupon_price, refund_other_price,
user_receipt_status, collect_status, refund_target_account, status,
flow_instance_id, create_time, modify_time])
: : : +- Exchange(distribution=[hash[after_service_id]])
: : : +- Calc(select=[after_service_id,
progress_node, progress_node_status, operator, parallel, extend_info])
: : : +- DropUpdateBefore
: : : +- MiniBatchAssigner(interval=[5000ms],
mode=[ProcTime])
: : : +-
TableSourceScan(table=[[default_catalog, default_database, t_progress_name]],
fields=[id, after_service_id, order_id, progress_node, progress_node_status,
operator, parallel, flow_element_id, extend_info, create_time, modify_time])
: : +- Exchange(distribution=[hash[after_service_id]])
: : +- Calc(select=[after_service_id, name, value])
: : +- DropUpdateBefore
: : +- MiniBatchAssigner(interval=[5000ms],
mode=[ProcTime])
: : +- TableSourceScan(table=[[default_catalog,
default_database, t_attr_name]], fields=[id, after_service_id, order_id, name,
value, create_time, modify_time])
: +- Exchange(distribution=[hash[after_service_id]])
: +- Calc(select=[after_service_id, apply_status])
: +- DropUpdateBefore
: +- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
: +- TableSourceScan(table=[[default_catalog,
default_database, t_apply_status_name]], fields=[id, after_service_id,
order_id, apply_status, ai_audit_status, group_header_confirm_status,
group_header_retrieve_status, driver_retrieve_status,
group_header_parallel_status, grid_parallel_status, create_time, modify_time])
+- Exchange(distribution=[hash[refund_apply_id]])
+- Calc(select=[refund_apply_id, order_id, base_sku_id,
apply_refund_num, view_qty, refund_scale_user_real_pay AS apply_refund_amt,
refund_scale_user_real_pay, refund_red_packet_price])
+- GlobalGroupAggregate(groupBy=[refund_apply_id, order_id,
base_sku_id], select=[refund_apply_id, order_id, base_sku_id,
SUM_RETRACT((sum$0, count$1)) AS apply_refund_num, SUM_RETRACT((sum$2,
count$3)) AS view_qty, SUM_RETRACT((sum$4, count$5)) AS
refund_scale_user_real_pay, SUM_RETRACT((sum$6, count$7)) AS
refund_red_packet_price])
+- Exchange(distribution=[hash[refund_apply_id, order_id,
base_sku_id]])
+- LocalGroupAggregate(groupBy=[refund_apply_id, order_id,
base_sku_id], select=[refund_apply_id, order_id, base_sku_id, SUM_RETRACT($f3)
AS (sum$0, count$1), SUM_RETRACT($f4) AS (sum$2, count$3), SUM_RETRACT($f5) AS
(sum$4, count$5), SUM_RETRACT($f6) AS (sum$6, count$7), COUNT_RETRACT(*) AS
count1$8])
+- Calc(select=[after_service_id AS refund_apply_id,
order_id, sku_id AS base_sku_id, (refund_quantity IS NULL IF 0 IF
refund_quantity) AS $f3, (refund_quantity IS NULL IF 0 IF
CAST(refund_quantity)) AS $f4, (refund_price IS NULL IF 0 IF refund_price) AS
$f5, (refund_red_packet_price IS NULL IF 0 IF refund_red_packet_price) AS $f6])
+- MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])
+- TableSourceScan(table=[[default_catalog,
default_database, t_item_name]], fields=[id, refund_fwd_item_id,
after_service_id, order_id, order_item_id, stack_id, sku_id, sku_name,
supplier_id, refund_quantity, item_sku_type, refund_scale_type, refund_scale,
accurate_refund, refund_price, refund_red_packet_price, refund_price_info,
refund_total_price, refund_promotion_price, refund_coupon_price,
refund_other_price, extend_info, create_time, modify_time]) {code}
when set table.exec.mini-batch.enabled=false
== Optimized Logical Plan ==
{code:java}
Sink(table=[default_catalog.default_database.t_apply_sku_test], fields=[dt,
refund_apply_id, base_sku_id, order_id, user_id, poi_id, refund_type,
apply_refund_reason_code, apply_refund_reason_desc, apply_refund_review_status,
apply_refund_review_status_desc, apply_refund_reject_reason, apply_is_refunded,
apply_pic_url, remark, refund_apply_originator, second_reason_code,
second_reason, refund_target_account, after_service_id, receipt_status,
group_header_goods_status, apply_operator_mis_name, refund_apply_time,
update_time, base_sku_name, apply_refund_num, view_qty, refund_scale_type,
refund_scale_type_desc, refund_scale, apply_refund_amt,
refund_scale_user_real_pay, refund_red_packet_price, load_time, take_rate_type,
platform_rate, order_sku_type, second_reason_aggregated_code,
second_reason_aggregated, compensation_amount, aftersale_type,
group_header_parallel_status, grid_parallel_status])
+- Calc(select=[CAST(((modify_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd'))
AS dt, refund_apply_id, base_sku_id, order_id, user_id, poi_id,
CAST(refund_type) AS refund_type, CAST(first_reason_code) AS
apply_refund_reason_code, first_reason AS apply_refund_reason_desc,
CAST(apply_status) AS apply_refund_review_status, CAST(((apply_status SEARCH
Sarg[0, 13]) CASE _UTF-16LE'a' CASE (apply_status = 1) CASE _UTF-16LE'b' CASE
(apply_status = 2) CASE _UTF-16LE'c' CASE (apply_status = 3) CASE _UTF-16LE'd'
CASE (apply_status = 4) CASE _UTF-16LE'e' CASE (apply_status = 5) CASE
_UTF-16LE'f' CASE (apply_status = 6) CASE _UTF-16LE'g' CASE _UTF-16LE'x')) AS
apply_refund_review_status_desc, ((progress_node_status = 30) IF extend_info IF
_UTF-16LE'') AS apply_refund_reject_reason, CAST(((progress_node =
_UTF-16LE'refund.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") CASE 1
CASE 0)) AS apply_is_refunded, pic_url AS apply_pic_url, remark, null:BIGINT AS
refund_apply_originator, CAST(second_reason_code) AS second_reason_code,
second_reason, CAST(refund_target_account) AS refund_target_account,
CAST(after_service_id) AS after_service_id, CAST(user_receipt_status) AS
receipt_status, CAST((((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647) CHARACTER
SET "UTF-16LE") AND (value = _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE")) CASE 1 CASE ((name = _UTF-16LE'fwd_ext':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE") AND (value = _UTF-16LE'2':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE")) CASE 2 CASE 0)) AS group_header_goods_status,
operator AS apply_operator_mis_name, create_time AS refund_apply_time,
modify_time AS update_time, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
AS base_sku_name, CAST(apply_refund_num) AS apply_refund_num, CAST(view_qty) AS
view_qty, null:BIGINT AS refund_scale_type, null:VARCHAR(2147483647) CHARACTER
SET "UTF-16LE" AS refund_scale_type_desc, null:DECIMAL(38, 18) AS refund_scale,
CAST(apply_refund_amt) AS apply_refund_amt, CAST(refund_scale_user_real_pay) AS
refund_scale_user_real_pay, CAST(refund_red_packet_price) AS
refund_red_packet_price, CAST(CAST(())) AS load_time, null:BIGINT AS
take_rate_type, null:DECIMAL(38, 18) AS platform_rate, 1 AS order_sku_type,
null:INTEGER AS second_reason_aggregated_code, null:VARCHAR(2147483647)
CHARACTER SET "UTF-16LE" AS second_reason_aggregated, null:DECIMAL(38, 18) AS
compensation_amount, 1 AS aftersale_type, CAST((((progress_node =
_UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE") AND (parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE
((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 20))
CASE 2 CASE ((progress_node =
_UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE") AND (parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE
((progress_node = _UTF-16LE'group.header.audit.node':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE") AND (parallel = 1) AND (progress_node_status = 40))
CASE 4 CASE ((progress_node =
_UTF-16LE'group.header.audit.node':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE") AND (parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0))
AS group_header_parallel_status, CAST((((progress_node =
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(parallel = 1) AND (progress_node_status = 10)) CASE 1 CASE ((progress_node =
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(parallel = 1) AND (progress_node_status = 20)) CASE 2 CASE ((progress_node =
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(parallel = 1) AND (progress_node_status = 30)) CASE 3 CASE ((progress_node =
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(parallel = 1) AND (progress_node_status = 40)) CASE 4 CASE ((progress_node =
_UTF-16LE'grid.audit.node':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(parallel = 1) AND (progress_node_status = 50)) CASE 5 CASE 0)) AS
grid_parallel_status])
+- Join(joinType=[LeftOuterJoin], where=[(after_service_id0 =
refund_apply_id)], select=[after_service_id, user_id, poi_id, refund_type,
first_reason_code, first_reason, second_reason_code, second_reason, pic_url,
remark, user_receipt_status, refund_target_account, create_time, modify_time,
progress_node, progress_node_status, operator, parallel, extend_info, name,
value, after_service_id0, apply_status, refund_apply_id, order_id, base_sku_id,
apply_refund_num, view_qty, apply_refund_amt, refund_scale_user_real_pay,
refund_red_packet_price], leftInputSpec=[HasUniqueKey],
rightInputSpec=[HasUniqueKey])
:- Exchange(distribution=[hash[after_service_id0]])
: +- Join(joinType=[LeftOuterJoin], where=[(after_service_id =
after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type,
first_reason_code, first_reason, second_reason_code, second_reason, pic_url,
remark, user_receipt_status, refund_target_account, create_time, modify_time,
progress_node, progress_node_status, operator, parallel, extend_info, name,
value, after_service_id0, apply_status],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])
: :- Exchange(distribution=[hash[after_service_id]])
: : +- Calc(select=[after_service_id, user_id, poi_id, refund_type,
first_reason_code, first_reason, second_reason_code, second_reason, pic_url,
remark, user_receipt_status, refund_target_account, create_time, modify_time,
progress_node, progress_node_status, operator, parallel, extend_info, name,
value])
: : +- Join(joinType=[LeftOuterJoin], where=[(after_service_id =
after_service_id0)], select=[after_service_id, user_id, poi_id, refund_type,
first_reason_code, first_reason, second_reason_code, second_reason, pic_url,
remark, user_receipt_status, refund_target_account, create_time, modify_time,
progress_node, progress_node_status, operator, parallel, extend_info,
after_service_id0, name, value], leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])
: : :- Exchange(distribution=[hash[after_service_id]])
: : : +- Calc(select=[after_service_id, user_id, poi_id,
refund_type, first_reason_code, first_reason, second_reason_code,
second_reason, pic_url, remark, user_receipt_status, refund_target_account,
create_time, modify_time, progress_node, progress_node_status, operator,
parallel, extend_info])
: : : +- Join(joinType=[LeftOuterJoin],
where=[(after_service_id = after_service_id0)], select=[after_service_id,
user_id, poi_id, refund_type, first_reason_code, first_reason,
second_reason_code, second_reason, pic_url, remark, user_receipt_status,
refund_target_account, create_time, modify_time, after_service_id0,
progress_node, progress_node_status, operator, parallel, extend_info],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])
: : : :- Exchange(distribution=[hash[after_service_id]])
: : : : +- Calc(select=[after_service_id, user_id,
poi_id, refund_type, first_reason_code, first_reason, second_reason_code,
second_reason, pic_url, remark, user_receipt_status, refund_target_account,
create_time, modify_time])
: : : : +- DropUpdateBefore
: : : : +-
TableSourceScan(table=[[default_catalog, default_database, t_name]],
fields=[id, after_service_id, order_id, user_id, poi_id, city_id, refund_type,
first_reason_code, first_reason, second_reason_code, second_reason, pic_url,
remark, refund_price, refund_red_packet_price, refund_total_price,
refund_promotion_price, refund_coupon_price, refund_other_price,
user_receipt_status, collect_status, refund_target_account, status,
flow_instance_id, create_time, modify_time])
: : : +- Exchange(distribution=[hash[after_service_id]])
: : : +- Calc(select=[after_service_id,
progress_node, progress_node_status, operator, parallel, extend_info])
: : : +- DropUpdateBefore
: : : +-
TableSourceScan(table=[[default_catalog, default_database, t_progress_name]],
fields=[id, after_service_id, order_id, progress_node, progress_node_status,
operator, parallel, flow_element_id, extend_info, create_time, modify_time])
: : +- Exchange(distribution=[hash[after_service_id]])
: : +- Calc(select=[after_service_id, name, value])
: : +- DropUpdateBefore
: : +- TableSourceScan(table=[[default_catalog,
default_database, t_attr_name]], fields=[id, after_service_id, order_id, name,
value, create_time, modify_time])
: +- Exchange(distribution=[hash[after_service_id]])
: +- Calc(select=[after_service_id, apply_status])
: +- DropUpdateBefore
: +- TableSourceScan(table=[[default_catalog,
default_database, t_apply_status_name]], fields=[id, after_service_id,
order_id, apply_status, ai_audit_status, group_header_confirm_status,
group_header_retrieve_status, driver_retrieve_status,
group_header_parallel_status, grid_parallel_status, create_time, modify_time])
+- Exchange(distribution=[hash[refund_apply_id]])
+- Calc(select=[refund_apply_id, order_id, base_sku_id,
apply_refund_num, view_qty, refund_scale_user_real_pay AS apply_refund_amt,
refund_scale_user_real_pay, refund_red_packet_price])
+- GroupAggregate(groupBy=[refund_apply_id, order_id, base_sku_id],
select=[refund_apply_id, order_id, base_sku_id, SUM_RETRACT($f3) AS
apply_refund_num, SUM_RETRACT($f4) AS view_qty, SUM_RETRACT($f5) AS
refund_scale_user_real_pay, SUM_RETRACT($f6) AS refund_red_packet_price])
+- Exchange(distribution=[hash[refund_apply_id, order_id,
base_sku_id]])
+- Calc(select=[after_service_id AS refund_apply_id,
order_id, sku_id AS base_sku_id, (refund_quantity IS NULL IF 0 IF
refund_quantity) AS $f3, (refund_quantity IS NULL IF 0 IF
CAST(refund_quantity)) AS $f4, (refund_price IS NULL IF 0 IF refund_price) AS
$f5, (refund_red_packet_price IS NULL IF 0 IF refund_red_packet_price) AS $f6])
+- TableSourceScan(table=[[default_catalog,
default_database, t_item_name]], fields=[id, refund_fwd_item_id,
after_service_id, order_id, order_item_id, stack_id, sku_id, sku_name,
supplier_id, refund_quantity, item_sku_type, refund_scale_type, refund_scale,
accurate_refund, refund_price, refund_red_packet_price, refund_price_info,
refund_total_price, refund_promotion_price, refund_coupon_price,
refund_other_price, extend_info, create_time, modify_time]) {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)