[ https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Neha Aggarwal updated FLINK-32718: ---------------------------------- Attachment: Screenshot 2023-07-31 at 10.37.11 AM-2.png > 'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint > to become unbounded > ----------------------------------------------------------------------------------------------- > > Key: FLINK-32718 > URL: https://issues.apache.org/jira/browse/FLINK-32718 > Project: Flink > Issue Type: Bug > Components: Table SQL / API, Table SQL / Planner > Affects Versions: 1.16.1 > Environment: Apache Flink 1.16.1 > Reporter: Neha Aggarwal > Assignee: Jayme Howard > Priority: Major > Attachments: Screenshot 2023-07-31 at 10.37.11 AM-1.png, Screenshot > 2023-07-31 at 10.37.11 AM-2.png, Screenshot 2023-07-31 at 10.37.11 AM.png, > Screenshot 2023-07-31 at 10.40.30 AM.png, Screenshot 2023-07-31 at 10.40.38 > AM.png, Screenshot 2023-07-31 at 10.41.07 AM.png, Screenshot 2023-07-31 at > 10.41.26 AM.png, Screenshot 2023-07-31 at 10.41.35 AM.png, Screenshot > 2023-07-31 at 10.41.48 AM.png > > > I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' > condition in Flink is causing the checkpoint to become unbounded. I've > attached two Flink SQL queries where the only difference is the addition of > one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to > the query, the checkpoint size keeps increasing, which is not expected. We > anticipated it would follow the same pattern of increasing and decreasing > based on the throughput, as it does without the 'UNION ALL'. > When I analyze the breakdown of the checkpoint size, I find that the > 'Interval Join' operator has the largest size. I suspect there might be a bug > causing this difference in checkpoint behavior due to the addition of the > 'GROUP BY' in the query. > We are using Flink version 1.16.1 with RocksDB as the backend, and > incremental checkpointing is enabled. > PS: Interestingly, when I take periodic savepoints for both jobs, their > patterns are similar, and the savepoint size is smaller than the checkpoint > in the job with the 'UNION ALL'. Attaching the queries in the comment below. > With Union ALL: > select > * > from > ( > with p2d as ( > select > delivered.job_id, > delivered.store_id, > substring( > GetGeoHash( > CAST(delivered.metadata_json.address.lat as double), > CAST(delivered.metadata_json.address.lng as double) > ) > from > 1 for 6 > ) AS cx_gh6, > ( > ( > cast( > delivered.metadata_json.lastMileDistance as double > ) / cast(1000 as double) > ) /( > cast( > delivered.updated_at - pickedup.updated_at as double > ) / cast(60 * 60 * 1000 as double) > ) > ) as lm_speed_kmph, > delivered.proctime as proctime > from > awz_s3_OrderLogsEvent pickedup > inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = > pickedup.job_id > and delivered.status = 'DELIVERY_DELIVERED' > and delivered.type = 'INSTA' > and pickedup.proctime between delivered.proctime - interval '95' > minute > and delivered.proctime + interval '5' minute > where > pickedup.status = 'DELIVERY_PICKEDUP' > and pickedup.type = 'INSTA' > ) > select > 'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key, > round(Avg(lm_speed_kmph), 4) AS `value` > from > p2d > group by > HOP( > proctime, > interval '5' minute, > interval '30' minute > ), > store_id > union all > select > 'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key, > round(Avg(lm_speed_kmph), 4) AS `value` > from > p2d > group by > HOP( > proctime, > interval '5' minute, > interval '30' minute > ), > cx_gh6 > ) > > Without Union all which is showing expected behaviour: > select > * > from > ( > with p2d as ( > select > delivered.job_id, > delivered.store_id, > substring( > GetGeoHash( > CAST(delivered.metadata_json.address.lat as double), > CAST(delivered.metadata_json.address.lng as double) > ) > from > 1 for 6 > ) AS cx_gh6, > ( > ( > cast( > delivered.metadata_json.lastMileDistance as double > ) / cast(1000 as double) > ) /( > cast( > delivered.updated_at - pickedup.updated_at as double > ) / cast(60 * 60 * 1000 as double) > ) > ) as lm_speed_kmph, > delivered.proctime as proctime > from > awz_s3_OrderLogsEvent pickedup > inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = > pickedup.job_id > and delivered.status = 'DELIVERY_DELIVERED' > and delivered.type = 'INSTA' > and pickedup.proctime between delivered.proctime - interval '95' > minute > and delivered.proctime + interval '5' minute > where > pickedup.status = 'DELIVERY_PICKEDUP' > and pickedup.type = 'INSTA' > ) > select > 'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key, > round(Avg(lm_speed_kmph), 4) AS `value` > from > p2d > group by > HOP( > proctime, > interval '5' minute, > interval '30' minute > ), > cx_gh6 > ) > > > !Screenshot 2023-07-31 at 10.37.11 AM.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)