[
https://issues.apache.org/jira/browse/FLINK-20747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17254421#comment-17254421
]
zengjinbo edited comment on FLINK-20747 at 12/24/20, 6:08 AM:
--------------------------------------------------------------
hi jark:
this is Exception code ,but I Can't provide data.
{code:java}
create table dws_fact_day_org_pro_size_ord( order_no string,
order_dtl_id string, brd_no string,
brd_dtl_no string, org_lno string, store_lno
string, store_brd string, org_new_no
string, period_sdate string, create_time
string, pro_no string, size_code string,
size_type_code string, cust_no string,
sal_biz_type string, sal_mode_cate string, sal_mode
string, sys_source int, logistics_mode
int, sal_qty int, sal_amt decimal(38,
18), sal_nos_prm_amt decimal(38, 18), sal_prm_amt
decimal(38, 18), table_source int, status
int, order_type int, tag_price decimal(38,
18), pro_prm decimal(38, 18), mem_no
string, mem_name string, mem_tel string,
discount_rate_type int, prom_no string,
discount_rate_source_id string, discount_rate decimal(38, 18),
is_online int, virtual_flag int, proctime as
proctime(), primary key(order_no,order_dtl_id)not enforced)WITH (
'connector' = 'upsert-kafka', 'topic' = 'TOPIC_DWS_FACT_DAY_ORG_PRO_SIZE_ORD',
'properties.group.id' = 'DWS_FACT_DAY_ORDER_KAFKA',
'properties.bootstrap.servers' = '127.0.0.1:9092', 'key.format' = 'json',
'value.format' = 'json' );
create table DWS_DIM_ORG_ATTR( org_lno string , multi_type string ,
store_brd string) with ( 'connector' = 'dmp-elasticsearch','hosts' =
'http://127.0.0.1:9200','index' = 'dws_dim_org_attr');
--dim_产品维表create table DWS_DIM_PRO_ALLINFO_CATE_FLAG(pro_cate_flag int,pro_no
string) with ( 'connector' = 'dmp-elasticsearch','hosts' =
'http://127.0.0.1:9200','index' = 'dws_dim_pro_allinfo_cate_flag');
--销售订单明细-营业员 create table T04_POS_ORDER_ASSISTANT ( id
string , order_no string , order_dtl_id
string , assistant_id string , assistant_no
string , assistant_name string , settle_share_amount
decimal(38,18) , share_amount decimal(38,18) , share_qty
decimal(38,18) , counts int , order_type int
, zone_yyyymm string , create_user string ,
create_time string , update_user string ,
update_time string , sharding_flag string ,
assistant_shop_no string , assistant_shop_name string ,
etl_create_time string , etl_update_time string ,
proctime as proctime(), primary key(id)not enforced )WITH ( 'connector'
= 'upsert-kafka', 'topic' = 'TOPIC_POS_ORDER_ASSISTANT',
'properties.group.id' = 'DWS_FACT_DAY_ORDER_KAFKA',
'properties.bootstrap.servers' = '127.0.0.1:9092', 'key.format' = 'json',
'value.format' = 'json'
);
create table topic_dws_fact_day_order ( org_lno string
, -- 机构原编码 store_lno string
, -- 店仓原编码 period_sdate string
, -- 字符串日期(YYYYMMDD) order_no string ,
-- 订单编号 upt_flag int , --
UPT标识(1:是,0:否) cr_flag int , --
CR标识(1:是,0:否) nos_num_flag int , --
小票数标识(1:是,0:否) store_brd string , -- 店铺品牌
org_new_no string , --
机构最新编码(新店店铺编码/货管单位编码) cust_sal_nos_qty decimal(38,18) , -- 客单销售量
cust_sal_nos_amt decimal(38,18) , -- 客单销售额
cust_sal_nos_prm_amt decimal(38,18) , -- 客单销售单据牌价额
sal_nos_num int , -- 客单数
create_time string , -- 制单日期 primary key
(org_lno,store_lno,period_sdate,order_no) not enforced
) with ( 'connector' = 'upsert-kafka',
'topic' = 'TOPIC_DWS_FACT_DAY_ORDER', 'properties.bootstrap.servers' =
'127.0.0.1:9092', 'key.format' = 'json', 'value.format' = 'json' );
insert into topic_dws_fact_day_orderselect
t2.org_lno,t1.store_lno,t1.period_sdate,t1.order_no, case when
max(t2.multi_type) in ('单品','NBA') and t2.store_brd in
('AD','AS','AO','AK','RB') and (sum(t1.sal_qty) <= 5 or sum(t1.sal_amt) <=
3000) then 1 when max(t2.multi_type) in ('单品','NBA') and
t2.store_brd in ('AD','AS','AO','AK','RB') and (sum(t1.sal_qty) > 5 and
sum(t1.sal_amt) > 3000) then 0 when max(t2.multi_type) in
('单品','NBA') and t2.store_brd in ('NK') then 1 when
max(t2.multi_type) in ('单品','NBA') and t2.store_brd not in
('NK','AD','AS','AO','AK','RB') and sum(t1.sal_qty) < 10 then 1 when
max(t2.multi_type) in ('单品','NBA') and t2.store_brd not in
('NK','AD','AS','AO','AK','RB') and sum(t1.sal_qty) >= 10 then 0
when max(t2.multi_type)='多品' and sum(t1.sal_qty) < 15 then 1 else 0
end upt_flag, max(case t1.is_online when 1 then 0 else case
t1.sys_source when 6 then 0 else case t1.virtual_flag when 0 then 0 else 1 end
end end ) cr_flag, 1 as nos_num_flag, t2.store_brd,
t1.org_new_no, cast(sum(t1.sal_qty) as decimal(38,18)) as
cust_sal_nos_qty, cast(sum(case when pro_cate_flag = 1 then t1.sal_amt else 0
end) as decimal(38,18)) as cust_sal_nos_amt, cast(sum(case when pro_cate_flag
=1 then t1.sal_nos_prm_amt else 0 end) as decimal(38,18)) as
cust_sal_nos_prm_amt, 1 as sal_nos_num, t1.create_time from
dws_fact_day_org_pro_size_ord t1, DWS_DIM_ORG_ATTR for system_time as of
t1.proctime as t2, DWS_DIM_PRO_ALLINFO_CATE_FLAG for system_time as of
t1.proctime as t3, (select order_no,order_dtl_id from
T04_POS_ORDER_ASSISTANT group by order_no,order_dtl_id) t4 where t1.org_lno
= t2.org_lno and t1.pro_no = t3.pro_no and t1.order_no = t4.order_no
and t1.order_dtl_id = t4.order_dtl_id and t1.table_source=1 and
t1.order_type = 0 group by
t2.org_lno,t1.store_lno,t1.period_sdate,t1.order_no,t2.store_brd,t1.org_new_no,t1.create_time
having sum(sal_qty)<>0 or sum(case pro_cate_flag when 1 then
t1.sal_amt else 0 end)<>0;
{code}
was (Author: zengjinbo):
hi jark:
this is Exception code ,but I Can't provide data.
{code:java}
// create table dws_fact_day_org_pro_size_ord( order_no
string, order_dtl_id string, brd_no string,
brd_dtl_no string, org_lno string,
store_lno string, store_brd string,
org_new_no string, period_sdate string,
create_time string, pro_no string, size_code
string, size_type_code string, cust_no
string, sal_biz_type string, sal_mode_cate
string, sal_mode string, sys_source int,
logistics_mode int, sal_qty int, sal_amt
decimal(38, 18), sal_nos_prm_amt decimal(38, 18),
sal_prm_amt decimal(38, 18), table_source int,
status int, order_type int, tag_price
decimal(38, 18), pro_prm decimal(38, 18), mem_no
string, mem_name string, mem_tel
string, discount_rate_type int, prom_no
string, discount_rate_source_id string, discount_rate
decimal(38, 18), is_online int, virtual_flag
int, proctime as proctime(), primary key(order_no,order_dtl_id)not
enforced)WITH ( 'connector' = 'upsert-kafka', 'topic' =
'TOPIC_DWS_FACT_DAY_ORG_PRO_SIZE_ORD', 'properties.group.id' =
'DWS_FACT_DAY_ORDER_KAFKA', 'properties.bootstrap.servers' = '127.0.0.1:9092',
'key.format' = 'json', 'value.format' = 'json' );
create table DWS_DIM_ORG_ATTR( org_lno string , multi_type string ,
store_brd string) with ( 'connector' = 'dmp-elasticsearch','hosts' =
'http://127.0.0.1:9200','index' = 'dws_dim_org_attr');
--dim_产品维表create table DWS_DIM_PRO_ALLINFO_CATE_FLAG(pro_cate_flag int,pro_no
string) with ( 'connector' = 'dmp-elasticsearch','hosts' =
'http://127.0.0.1:9200','index' = 'dws_dim_pro_allinfo_cate_flag');
--销售订单明细-营业员 create table T04_POS_ORDER_ASSISTANT ( id
string , order_no string , order_dtl_id
string , assistant_id string , assistant_no
string , assistant_name string , settle_share_amount
decimal(38,18) , share_amount decimal(38,18) , share_qty
decimal(38,18) , counts int , order_type int
, zone_yyyymm string , create_user string ,
create_time string , update_user string ,
update_time string , sharding_flag string ,
assistant_shop_no string , assistant_shop_name string ,
etl_create_time string , etl_update_time string ,
proctime as proctime(), primary key(id)not enforced )WITH ( 'connector'
= 'upsert-kafka', 'topic' = 'TOPIC_POS_ORDER_ASSISTANT',
'properties.group.id' = 'DWS_FACT_DAY_ORDER_KAFKA',
'properties.bootstrap.servers' = '127.0.0.1:9092', 'key.format' = 'json',
'value.format' = 'json'
);
create table topic_dws_fact_day_order ( org_lno string
, -- 机构原编码 store_lno string
, -- 店仓原编码 period_sdate string
, -- 字符串日期(YYYYMMDD) order_no string ,
-- 订单编号 upt_flag int , --
UPT标识(1:是,0:否) cr_flag int , --
CR标识(1:是,0:否) nos_num_flag int , --
小票数标识(1:是,0:否) store_brd string , -- 店铺品牌
org_new_no string , --
机构最新编码(新店店铺编码/货管单位编码) cust_sal_nos_qty decimal(38,18) , -- 客单销售量
cust_sal_nos_amt decimal(38,18) , -- 客单销售额
cust_sal_nos_prm_amt decimal(38,18) , -- 客单销售单据牌价额
sal_nos_num int , -- 客单数
create_time string , -- 制单日期 primary key
(org_lno,store_lno,period_sdate,order_no) not enforced
) with ( 'connector' = 'upsert-kafka',
'topic' = 'TOPIC_DWS_FACT_DAY_ORDER', 'properties.bootstrap.servers' =
'127.0.0.1:9092', 'key.format' = 'json', 'value.format' = 'json' );
insert into topic_dws_fact_day_orderselect
t2.org_lno,t1.store_lno,t1.period_sdate,t1.order_no, case when
max(t2.multi_type) in ('单品','NBA') and t2.store_brd in
('AD','AS','AO','AK','RB') and (sum(t1.sal_qty) <= 5 or sum(t1.sal_amt) <=
3000) then 1 when max(t2.multi_type) in ('单品','NBA') and
t2.store_brd in ('AD','AS','AO','AK','RB') and (sum(t1.sal_qty) > 5 and
sum(t1.sal_amt) > 3000) then 0 when max(t2.multi_type) in
('单品','NBA') and t2.store_brd in ('NK') then 1 when
max(t2.multi_type) in ('单品','NBA') and t2.store_brd not in
('NK','AD','AS','AO','AK','RB') and sum(t1.sal_qty) < 10 then 1 when
max(t2.multi_type) in ('单品','NBA') and t2.store_brd not in
('NK','AD','AS','AO','AK','RB') and sum(t1.sal_qty) >= 10 then 0
when max(t2.multi_type)='多品' and sum(t1.sal_qty) < 15 then 1 else 0
end upt_flag, max(case t1.is_online when 1 then 0 else case
t1.sys_source when 6 then 0 else case t1.virtual_flag when 0 then 0 else 1 end
end end ) cr_flag, 1 as nos_num_flag, t2.store_brd,
t1.org_new_no, cast(sum(t1.sal_qty) as decimal(38,18)) as
cust_sal_nos_qty, cast(sum(case when pro_cate_flag = 1 then t1.sal_amt else 0
end) as decimal(38,18)) as cust_sal_nos_amt, cast(sum(case when pro_cate_flag
=1 then t1.sal_nos_prm_amt else 0 end) as decimal(38,18)) as
cust_sal_nos_prm_amt, 1 as sal_nos_num, t1.create_time from
dws_fact_day_org_pro_size_ord t1, DWS_DIM_ORG_ATTR for system_time as of
t1.proctime as t2, DWS_DIM_PRO_ALLINFO_CATE_FLAG for system_time as of
t1.proctime as t3, (select order_no,order_dtl_id from
T04_POS_ORDER_ASSISTANT group by order_no,order_dtl_id) t4 where t1.org_lno
= t2.org_lno and t1.pro_no = t3.pro_no and t1.order_no = t4.order_no
and t1.order_dtl_id = t4.order_dtl_id and t1.table_source=1 and
t1.order_type = 0 group by
t2.org_lno,t1.store_lno,t1.period_sdate,t1.order_no,t2.store_brd,t1.org_new_no,t1.create_time
having sum(sal_qty)<>0 or sum(case pro_cate_flag when 1 then
t1.sal_amt else 0 end)<>0;
{code}
> ClassCastException when using MAX aggregate function
> ----------------------------------------------------
>
> Key: FLINK-20747
> URL: https://issues.apache.org/jira/browse/FLINK-20747
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.12.0
> Reporter: zengjinbo
> Priority: Critical
> Fix For: 1.12.0, 1.13.0
>
> Attachments: image-2020-12-23-18-04-21-079.png
>
>
> During the process of upgrading 1.12.0, I found that Flink SQL is not
> compatible with 1.11.1
> java.lang.ClassCastException: java.lang.Integer cannot be cast to
> org.apache.flink.table.data.StringDatajava.lang.ClassCastException:
> java.lang.Integer cannot be cast to org.apache.flink.table.data.StringData at
> org$apache$flink$table$planner$functions$aggfunctions$MaxWithRetractAggFunction$MaxWithRetractAccumulator$Converter.toInternal(Unknown
> Source) at
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92)
> at
> org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47)
> at
> org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59)
> at GroupAggsHandler$875.getAccumulators(Unknown Source) at
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:175)
> at
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:45)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>
>
> !image-2020-12-23-18-04-21-079.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)