[
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Neha Aggarwal updated FLINK-32718:
----------------------------------
Labels: (was: pull-request-available)
> 'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint
> to become unbounded
> -----------------------------------------------------------------------------------------------
>
> Key: FLINK-32718
> URL: https://issues.apache.org/jira/browse/FLINK-32718
> Project: Flink
> Issue Type: Bug
> Components: Kubernetes Operator
> Affects Versions: kubernetes-operator-1.3.1
> Environment: Apache Flink Kubernetes Operator 1.3.1
> Kubernetes 1.24.9
> Reporter: Neha Aggarwal
> Assignee: Jayme Howard
> Priority: Blocker
> Attachments: Screenshot 2023-07-31 at 10.37.11 AM.png, Screenshot
> 2023-07-31 at 10.40.30 AM.png, Screenshot 2023-07-31 at 10.40.38 AM.png,
> Screenshot 2023-07-31 at 10.41.07 AM.png, Screenshot 2023-07-31 at 10.41.26
> AM.png, Screenshot 2023-07-31 at 10.41.35 AM.png, Screenshot 2023-07-31 at
> 10.41.48 AM.png
>
>
> I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY'
> condition in Flink is causing the checkpoint to become unbounded. I've
> attached two Flink SQL queries where the only difference is the addition of
> one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to
> the query, the checkpoint size keeps increasing, which is not expected. We
> anticipated it would follow the same pattern of increasing and decreasing
> based on the throughput, as it does without the 'UNION ALL'.
> When I analyze the breakdown of the checkpoint size, I find that the
> 'Interval Join' operator has the largest size. I suspect there might be a bug
> causing this difference in checkpoint behavior due to the addition of the
> 'GROUP BY' in the query.
> We are using Flink version 1.16.1 with RocksDB as the backend, and
> incremental checkpointing is enabled.
> PS: Interestingly, when I take periodic savepoints for both jobs, their
> patterns are similar, and the savepoint size is smaller than the checkpoint
> in the job with the 'UNION ALL'. Attaching the queries in the comment below.
> With Union ALL:
> select
> *
> from
> (
> with p2d as (
> select
> delivered.job_id,
> delivered.store_id,
> substring(
> GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
> )
> from
> 1 for 6
> ) AS cx_gh6,
> (
> (
> cast(
> delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
> ) /(
> cast(
> delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
> )
> ) as lm_speed_kmph,
> delivered.proctime as proctime
> from
> awz_s3_OrderLogsEvent pickedup
> inner join awz_s3_OrderLogsEvent delivered on delivered.job_id =
> pickedup.job_id
> and delivered.status = 'DELIVERY_DELIVERED'
> and delivered.type = 'INSTA'
> and pickedup.proctime between delivered.proctime - interval '95'
> minute
> and delivered.proctime + interval '5' minute
> where
> pickedup.status = 'DELIVERY_PICKEDUP'
> and pickedup.type = 'INSTA'
> )
> select
> 'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
> round(Avg(lm_speed_kmph), 4) AS `value`
> from
> p2d
> group by
> HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
> ),
> store_id
> union all
> select
> 'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
> round(Avg(lm_speed_kmph), 4) AS `value`
> from
> p2d
> group by
> HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
> ),
> cx_gh6
> )
>
> Without Union all which is showing expected behaviour:
> select
> *
> from
> (
> with p2d as (
> select
> delivered.job_id,
> delivered.store_id,
> substring(
> GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
> )
> from
> 1 for 6
> ) AS cx_gh6,
> (
> (
> cast(
> delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
> ) /(
> cast(
> delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
> )
> ) as lm_speed_kmph,
> delivered.proctime as proctime
> from
> awz_s3_OrderLogsEvent pickedup
> inner join awz_s3_OrderLogsEvent delivered on delivered.job_id =
> pickedup.job_id
> and delivered.status = 'DELIVERY_DELIVERED'
> and delivered.type = 'INSTA'
> and pickedup.proctime between delivered.proctime - interval '95'
> minute
> and delivered.proctime + interval '5' minute
> where
> pickedup.status = 'DELIVERY_PICKEDUP'
> and pickedup.type = 'INSTA'
> )
> select
> 'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
> round(Avg(lm_speed_kmph), 4) AS `value`
> from
> p2d
> group by
> HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
> ),
> cx_gh6
> )
>
>
> !Screenshot 2023-07-31 at 10.37.11 AM.png|width=1113,height=597!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)