[ 
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Aggarwal updated FLINK-32718:
----------------------------------
    Description: 
I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
condition in Flink is causing the checkpoint to become unbounded. I've attached 
two Flink SQL queries where the only difference is the addition of one more 
'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to the query, 
the checkpoint size keeps increasing, which is not expected. We anticipated it 
would follow the same pattern of increasing and decreasing based on the 
throughput, as it does without the 'UNION ALL'.
When I analyze the breakdown of the checkpoint size, I find that the 'Interval 
Join' operator has the largest size. I suspect there might be a bug causing 
this difference in checkpoint behavior due to the addition of the 'GROUP BY' in 
the query.
We are using Flink version 1.16.1 with RocksDB as the backend, and incremental 
checkpointing is enabled.
PS: Interestingly, when I take periodic savepoints for both jobs, their 
patterns are similar, and the savepoint size is smaller than the checkpoint in 
the job with the 'UNION ALL'. Attaching the queries in the comment below.

With Union ALL:

select
  *
from
  (
    with p2d as (
      select
        delivered.job_id,
        delivered.store_id,
        substring(
          GetGeoHash(
            CAST(delivered.metadata_json.address.lat as double),
            CAST(delivered.metadata_json.address.lng as double)
          )
          from
            1 for 6
        ) AS cx_gh6,
        (
          (
            cast(
              delivered.metadata_json.lastMileDistance as double
            ) / cast(1000 as double)
          ) /(
            cast(
              delivered.updated_at - pickedup.updated_at as double
            ) / cast(60 * 60 * 1000 as double)
          )
        ) as lm_speed_kmph,
        delivered.proctime as proctime
      from
        awz_s3_OrderLogsEvent pickedup
        inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
pickedup.job_id
        and delivered.status = 'DELIVERY_DELIVERED'
        and delivered.type = 'INSTA'
        and pickedup.proctime between delivered.proctime - interval '95' minute
        and delivered.proctime + interval '5' minute
      where
        pickedup.status = 'DELIVERY_PICKEDUP'
        and pickedup.type = 'INSTA'
    )
    select
      'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
      round(Avg(lm_speed_kmph), 4) AS `value`
    from
      p2d
    group by
      HOP(
        proctime,
        interval '5' minute,
        interval '30' minute
      ),
      store_id
    union all
    select
      'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
      round(Avg(lm_speed_kmph), 4) AS `value`
    from
      p2d
    group by
      HOP(
        proctime,
        interval '5' minute,
        interval '30' minute
      ),
      cx_gh6
  )

 

Without Union all which is showing expected behaviour:

select
  *
from
  (
    with p2d as (
      select
        delivered.job_id,
        delivered.store_id,
        substring(
          GetGeoHash(
            CAST(delivered.metadata_json.address.lat as double),
            CAST(delivered.metadata_json.address.lng as double)
          )
          from
            1 for 6
        ) AS cx_gh6,
        (
          (
            cast(
              delivered.metadata_json.lastMileDistance as double
            ) / cast(1000 as double)
          ) /(
            cast(
              delivered.updated_at - pickedup.updated_at as double
            ) / cast(60 * 60 * 1000 as double)
          )
        ) as lm_speed_kmph,
        delivered.proctime as proctime
      from
        awz_s3_OrderLogsEvent pickedup
        inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
pickedup.job_id
        and delivered.status = 'DELIVERY_DELIVERED'
        and delivered.type = 'INSTA'
        and pickedup.proctime between delivered.proctime - interval '95' minute
        and delivered.proctime + interval '5' minute
      where
        pickedup.status = 'DELIVERY_PICKEDUP'
        and pickedup.type = 'INSTA'
    )
    select
      'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
      round(Avg(lm_speed_kmph), 4) AS `value`
    from
      p2d
    group by
      HOP(
        proctime,
        interval '5' minute,
        interval '30' minute
      ),
      cx_gh6
  )

 

 

!Screenshot 2023-07-31 at 10.37.11 AM.png!

  was:
I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
condition in Flink is causing the checkpoint to become unbounded. I've attached 
two Flink SQL queries where the only difference is the addition of one more 
'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to the query, 
the checkpoint size keeps increasing, which is not expected. We anticipated it 
would follow the same pattern of increasing and decreasing based on the 
throughput, as it does without the 'UNION ALL'.
When I analyze the breakdown of the checkpoint size, I find that the 'Interval 
Join' operator has the largest size. I suspect there might be a bug causing 
this difference in checkpoint behavior due to the addition of the 'GROUP BY' in 
the query.
We are using Flink version 1.16.1 with RocksDB as the backend, and incremental 
checkpointing is enabled.
PS: Interestingly, when I take periodic savepoints for both jobs, their 
patterns are similar, and the savepoint size is smaller than the checkpoint in 
the job with the 'UNION ALL'. Attaching the queries in the comment below.

With Union ALL:

select
  *
from
  (
    with p2d as (
      select
        delivered.job_id,
        delivered.store_id,
        substring(
          GetGeoHash(
            CAST(delivered.metadata_json.address.lat as double),
            CAST(delivered.metadata_json.address.lng as double)
          )
          from
            1 for 6
        ) AS cx_gh6,
        (
          (
            cast(
              delivered.metadata_json.lastMileDistance as double
            ) / cast(1000 as double)
          ) /(
            cast(
              delivered.updated_at - pickedup.updated_at as double
            ) / cast(60 * 60 * 1000 as double)
          )
        ) as lm_speed_kmph,
        delivered.proctime as proctime
      from
        awz_s3_OrderLogsEvent pickedup
        inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
pickedup.job_id
        and delivered.status = 'DELIVERY_DELIVERED'
        and delivered.type = 'INSTA'
        and pickedup.proctime between delivered.proctime - interval '95' minute
        and delivered.proctime + interval '5' minute
      where
        pickedup.status = 'DELIVERY_PICKEDUP'
        and pickedup.type = 'INSTA'
    )
    select
      'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
      round(Avg(lm_speed_kmph), 4) AS `value`
    from
      p2d
    group by
      HOP(
        proctime,
        interval '5' minute,
        interval '30' minute
      ),
      store_id
    union all
    select
      'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
      round(Avg(lm_speed_kmph), 4) AS `value`
    from
      p2d
    group by
      HOP(
        proctime,
        interval '5' minute,
        interval '30' minute
      ),
      cx_gh6
  )

 

Without Union all which is showing expected behaviour:


select
  *
from
  (
    with p2d as (
      select
        delivered.job_id,
        delivered.store_id,
        substring(
          GetGeoHash(
            CAST(delivered.metadata_json.address.lat as double),
            CAST(delivered.metadata_json.address.lng as double)
          )
          from
            1 for 6
        ) AS cx_gh6,
        (
          (
            cast(
              delivered.metadata_json.lastMileDistance as double
            ) / cast(1000 as double)
          ) /(
            cast(
              delivered.updated_at - pickedup.updated_at as double
            ) / cast(60 * 60 * 1000 as double)
          )
        ) as lm_speed_kmph,
        delivered.proctime as proctime
      from
        awz_s3_OrderLogsEvent pickedup
        inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
pickedup.job_id
        and delivered.status = 'DELIVERY_DELIVERED'
        and delivered.type = 'INSTA'
        and pickedup.proctime between delivered.proctime - interval '95' minute
        and delivered.proctime + interval '5' minute
      where
        pickedup.status = 'DELIVERY_PICKEDUP'
        and pickedup.type = 'INSTA'
    )
    select
      'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
      round(Avg(lm_speed_kmph), 4) AS `value`
    from
      p2d
    group by
      HOP(
        proctime,
        interval '5' minute,
        interval '30' minute
      ),
      cx_gh6
  )

 

 

!Screenshot 2023-07-31 at 10.37.11 AM.png|width=1113,height=597!


>  'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint 
> to become unbounded
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32718
>                 URL: https://issues.apache.org/jira/browse/FLINK-32718
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Planner
>    Affects Versions: 1.16.1
>         Environment: Apache Flink 1.16.1
>            Reporter: Neha Aggarwal
>            Assignee: Jayme Howard
>            Priority: Major
>         Attachments: Screenshot 2023-07-31 at 10.37.11 AM-1.png, Screenshot 
> 2023-07-31 at 10.37.11 AM.png, Screenshot 2023-07-31 at 10.40.30 AM.png, 
> Screenshot 2023-07-31 at 10.40.38 AM.png, Screenshot 2023-07-31 at 10.41.07 
> AM.png, Screenshot 2023-07-31 at 10.41.26 AM.png, Screenshot 2023-07-31 at 
> 10.41.35 AM.png, Screenshot 2023-07-31 at 10.41.48 AM.png
>
>
> I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
> condition in Flink is causing the checkpoint to become unbounded. I've 
> attached two Flink SQL queries where the only difference is the addition of 
> one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to 
> the query, the checkpoint size keeps increasing, which is not expected. We 
> anticipated it would follow the same pattern of increasing and decreasing 
> based on the throughput, as it does without the 'UNION ALL'.
> When I analyze the breakdown of the checkpoint size, I find that the 
> 'Interval Join' operator has the largest size. I suspect there might be a bug 
> causing this difference in checkpoint behavior due to the addition of the 
> 'GROUP BY' in the query.
> We are using Flink version 1.16.1 with RocksDB as the backend, and 
> incremental checkpointing is enabled.
> PS: Interestingly, when I take periodic savepoints for both jobs, their 
> patterns are similar, and the savepoint size is smaller than the checkpoint 
> in the job with the 'UNION ALL'. Attaching the queries in the comment below.
> With Union ALL:
> select
>   *
> from
>   (
>     with p2d as (
>       select
>         delivered.job_id,
>         delivered.store_id,
>         substring(
>           GetGeoHash(
>             CAST(delivered.metadata_json.address.lat as double),
>             CAST(delivered.metadata_json.address.lng as double)
>           )
>           from
>             1 for 6
>         ) AS cx_gh6,
>         (
>           (
>             cast(
>               delivered.metadata_json.lastMileDistance as double
>             ) / cast(1000 as double)
>           ) /(
>             cast(
>               delivered.updated_at - pickedup.updated_at as double
>             ) / cast(60 * 60 * 1000 as double)
>           )
>         ) as lm_speed_kmph,
>         delivered.proctime as proctime
>       from
>         awz_s3_OrderLogsEvent pickedup
>         inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
>         and delivered.status = 'DELIVERY_DELIVERED'
>         and delivered.type = 'INSTA'
>         and pickedup.proctime between delivered.proctime - interval '95' 
> minute
>         and delivered.proctime + interval '5' minute
>       where
>         pickedup.status = 'DELIVERY_PICKEDUP'
>         and pickedup.type = 'INSTA'
>     )
>     select
>       'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
>       round(Avg(lm_speed_kmph), 4) AS `value`
>     from
>       p2d
>     group by
>       HOP(
>         proctime,
>         interval '5' minute,
>         interval '30' minute
>       ),
>       store_id
>     union all
>     select
>       'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>       round(Avg(lm_speed_kmph), 4) AS `value`
>     from
>       p2d
>     group by
>       HOP(
>         proctime,
>         interval '5' minute,
>         interval '30' minute
>       ),
>       cx_gh6
>   )
>  
> Without Union all which is showing expected behaviour:
> select
>   *
> from
>   (
>     with p2d as (
>       select
>         delivered.job_id,
>         delivered.store_id,
>         substring(
>           GetGeoHash(
>             CAST(delivered.metadata_json.address.lat as double),
>             CAST(delivered.metadata_json.address.lng as double)
>           )
>           from
>             1 for 6
>         ) AS cx_gh6,
>         (
>           (
>             cast(
>               delivered.metadata_json.lastMileDistance as double
>             ) / cast(1000 as double)
>           ) /(
>             cast(
>               delivered.updated_at - pickedup.updated_at as double
>             ) / cast(60 * 60 * 1000 as double)
>           )
>         ) as lm_speed_kmph,
>         delivered.proctime as proctime
>       from
>         awz_s3_OrderLogsEvent pickedup
>         inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
>         and delivered.status = 'DELIVERY_DELIVERED'
>         and delivered.type = 'INSTA'
>         and pickedup.proctime between delivered.proctime - interval '95' 
> minute
>         and delivered.proctime + interval '5' minute
>       where
>         pickedup.status = 'DELIVERY_PICKEDUP'
>         and pickedup.type = 'INSTA'
>     )
>     select
>       'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>       round(Avg(lm_speed_kmph), 4) AS `value`
>     from
>       p2d
>     group by
>       HOP(
>         proctime,
>         interval '5' minute,
>         interval '30' minute
>       ),
>       cx_gh6
>   )
>  
>  
> !Screenshot 2023-07-31 at 10.37.11 AM.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to