[ 
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Aggarwal updated FLINK-32718:
----------------------------------
    Attachment: Screenshot 2023-07-31 at 10.37.11 AM-2.png

>  'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint 
> to become unbounded
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32718
>                 URL: https://issues.apache.org/jira/browse/FLINK-32718
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Planner
>    Affects Versions: 1.16.1
>         Environment: Apache Flink 1.16.1
>            Reporter: Neha Aggarwal
>            Assignee: Jayme Howard
>            Priority: Major
>         Attachments: Screenshot 2023-07-31 at 10.37.11 AM-1.png, Screenshot 
> 2023-07-31 at 10.37.11 AM-2.png, Screenshot 2023-07-31 at 10.37.11 AM.png, 
> Screenshot 2023-07-31 at 10.40.30 AM.png, Screenshot 2023-07-31 at 10.40.38 
> AM.png, Screenshot 2023-07-31 at 10.41.07 AM.png, Screenshot 2023-07-31 at 
> 10.41.26 AM.png, Screenshot 2023-07-31 at 10.41.35 AM.png, Screenshot 
> 2023-07-31 at 10.41.48 AM.png
>
>
> I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
> condition in Flink is causing the checkpoint to become unbounded. I've 
> attached two Flink SQL queries where the only difference is the addition of 
> one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to 
> the query, the checkpoint size keeps increasing, which is not expected. We 
> anticipated it would follow the same pattern of increasing and decreasing 
> based on the throughput, as it does without the 'UNION ALL'.
> When I analyze the breakdown of the checkpoint size, I find that the 
> 'Interval Join' operator has the largest size. I suspect there might be a bug 
> causing this difference in checkpoint behavior due to the addition of the 
> 'GROUP BY' in the query.
> We are using Flink version 1.16.1 with RocksDB as the backend, and 
> incremental checkpointing is enabled.
> PS: Interestingly, when I take periodic savepoints for both jobs, their 
> patterns are similar, and the savepoint size is smaller than the checkpoint 
> in the job with the 'UNION ALL'. Attaching the queries in the comment below.
> With Union ALL:
> select
>   *
> from
>   (
>     with p2d as (
>       select
>         delivered.job_id,
>         delivered.store_id,
>         substring(
>           GetGeoHash(
>             CAST(delivered.metadata_json.address.lat as double),
>             CAST(delivered.metadata_json.address.lng as double)
>           )
>           from
>             1 for 6
>         ) AS cx_gh6,
>         (
>           (
>             cast(
>               delivered.metadata_json.lastMileDistance as double
>             ) / cast(1000 as double)
>           ) /(
>             cast(
>               delivered.updated_at - pickedup.updated_at as double
>             ) / cast(60 * 60 * 1000 as double)
>           )
>         ) as lm_speed_kmph,
>         delivered.proctime as proctime
>       from
>         awz_s3_OrderLogsEvent pickedup
>         inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
>         and delivered.status = 'DELIVERY_DELIVERED'
>         and delivered.type = 'INSTA'
>         and pickedup.proctime between delivered.proctime - interval '95' 
> minute
>         and delivered.proctime + interval '5' minute
>       where
>         pickedup.status = 'DELIVERY_PICKEDUP'
>         and pickedup.type = 'INSTA'
>     )
>     select
>       'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
>       round(Avg(lm_speed_kmph), 4) AS `value`
>     from
>       p2d
>     group by
>       HOP(
>         proctime,
>         interval '5' minute,
>         interval '30' minute
>       ),
>       store_id
>     union all
>     select
>       'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>       round(Avg(lm_speed_kmph), 4) AS `value`
>     from
>       p2d
>     group by
>       HOP(
>         proctime,
>         interval '5' minute,
>         interval '30' minute
>       ),
>       cx_gh6
>   )
>  
> Without Union all which is showing expected behaviour:
> select
>   *
> from
>   (
>     with p2d as (
>       select
>         delivered.job_id,
>         delivered.store_id,
>         substring(
>           GetGeoHash(
>             CAST(delivered.metadata_json.address.lat as double),
>             CAST(delivered.metadata_json.address.lng as double)
>           )
>           from
>             1 for 6
>         ) AS cx_gh6,
>         (
>           (
>             cast(
>               delivered.metadata_json.lastMileDistance as double
>             ) / cast(1000 as double)
>           ) /(
>             cast(
>               delivered.updated_at - pickedup.updated_at as double
>             ) / cast(60 * 60 * 1000 as double)
>           )
>         ) as lm_speed_kmph,
>         delivered.proctime as proctime
>       from
>         awz_s3_OrderLogsEvent pickedup
>         inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
>         and delivered.status = 'DELIVERY_DELIVERED'
>         and delivered.type = 'INSTA'
>         and pickedup.proctime between delivered.proctime - interval '95' 
> minute
>         and delivered.proctime + interval '5' minute
>       where
>         pickedup.status = 'DELIVERY_PICKEDUP'
>         and pickedup.type = 'INSTA'
>     )
>     select
>       'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>       round(Avg(lm_speed_kmph), 4) AS `value`
>     from
>       p2d
>     group by
>       HOP(
>         proctime,
>         interval '5' minute,
>         interval '30' minute
>       ),
>       cx_gh6
>   )
>  
>  
> !Screenshot 2023-07-31 at 10.37.11 AM.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to