[ 
https://issues.apache.org/jira/browse/FLINK-21301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17345659#comment-17345659
 ] 

Andy edited comment on FLINK-21301 at 5/16/21, 9:51 AM:
--------------------------------------------------------

[~godfreyhe] [~jark]  I could fix the bug that only enable allow lateness only 
when emit strategy is specified. However I think it's better to decouple allow 
lateness with state retention configuration because of the following reasons:
 # it is not a intuitive to use state retention configuration to control 
windowed operator allow lateness 
 # state retention configuration is global configuration for a job which would 
effects all state operator. If a job contains multiple state operators, e.g 
(deduplicate/ unbounded aggregate/window operator), user may want to set 
different values for different operator. For the above mentioned example, 1 day 
state retention for deduplicate, 5 min allow lateness for window aggregate.
{code:sql}
SELECT
 DATE_FORMAT(tumble_end(ROWTIME ,interval '1' DAY),'yyyy-MM-dd') as stat_time,
 count(1) as cnt, sum(num) as sum_num
FROM (
 SELECT 
 ROWTIME,
 user_id,
 row_number() over(partition by user_id, pdate order by ROWTIME ) as rn
 FROM source_kafka_biz_shuidi_sdb_crm_call_record 
) cal 
where rn =1
group by tumble(ROWTIME,interval '1' DAY);{code}
 

What's your opinion about the issue?


was (Author: qingru zhang):
[~godfreyhe] [~jark]  I could fix the bug that only enable allow lateness only 
when emit strategy is specified. However I think it's better to decouple allow 
lateness with state retention configuration because of the following reasons:
 # it is not a intuitive to use state retention configuration to control 
windowed operator allow lateness 
 # state retention configuration is global configuration for a job which would 
effects all state operator. If a job contains multiple state operators, e.g 
(deduplicate/ unbounded aggregate/window operator), user may want to set 
different values for different operator. For the above mentioned example, 
deduplicate 
SELECT DATE_FORMAT(tumble_end(ROWTIME ,interval '1' DAY),'yyyy-MM-dd') as 
stat_time,
 sum(num),count(1) first_phone_numFROM ( SELECT  ROWTIME,
 user_id, row_number() over(partition by user_id, pdate order by ROWTIME ) as 
rn FROM source_kafka_biz_shuidi_sdb_crm_call_record 
) cal where rn =1group by tumble(ROWTIME,interval '1' DAY);

What's your opinion about the issue?

> Decouple window aggregate allow lateness with state ttl configuration
> ---------------------------------------------------------------------
>
>                 Key: FLINK-21301
>                 URL: https://issues.apache.org/jira/browse/FLINK-21301
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>            Reporter: Andy
>            Priority: Major
>              Labels: auto-unassigned
>             Fix For: 1.14.0
>
>
> Currently, state retention time config will also effect state clean behavior 
> of Window Aggregate, which is unexpected for most users.
> E.g for the following example,  User would set `MinIdleStateRetentionTime` to 
> 1 Day to clean state in `deduplicate` . However, it will also effects clean 
> behavior of window aggregate. For example, 2021-01-04 data would clean at 
> 2021-01-06 instead of 2021-01-05. 
> {code:sql}
> SELECT
>  DATE_FORMAT(tumble_end(ROWTIME ,interval '1' DAY),'yyyy-MM-dd') as stat_time,
>  count(1) first_phone_num
> FROM (
>  SELECT 
>  ROWTIME,
>  user_id,
>  row_number() over(partition by user_id, pdate order by ROWTIME ) as rn
>  FROM source_kafka_biz_shuidi_sdb_crm_call_record 
> ) cal 
> where rn =1
> group by tumble(ROWTIME,interval '1' DAY);{code}
> It's better to decouple window aggregate allow lateness with 
> `MinIdleStateRetentionTime` .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to