[ 
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17766622#comment-17766622
 ] 

Neha Aggarwal commented on FLINK-32718:
---------------------------------------

Hello Jane Chan,

I am attaching the Physical Plan. Let me know if you want the execution plan as 
well.

== Optimized Physical Plan ==
Union(all=[true], union=[rill_server_timestamp, key, value])
:- Calc(select=[CAST(PROCTIME_MATERIALIZE(PROCTIME()) AS TIMESTAMP(3) NOT NULL) 
AS rill_server_timestamp, ||(_UTF-16LE'lmSpeedKmph_avg_storeId_30m#', 
CAST(store_id0 AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE")) AS key, 
ROUND($f1, 4) AS value])
:  +- GroupWindowAggregate(groupBy=[store_id0], window=[SlidingGroupWindow('w$, 
proctime0, 1800000, 300000)], select=[store_id0, AVG($f2) AS $f1])
:     +- Exchange(distribution=[hash[store_id0]])
:        +- Calc(select=[proctime0, store_id AS store_id0, 
/(/(CAST(metadata_json.lastMileDistance AS DOUBLE), 1000:DOUBLE), 
/(CAST(-(updated_at0, updated_at) AS DOUBLE), 3.6E6:DOUBLE)) AS $f2])
:           +- IntervalJoin(joinType=[InnerJoin], 
windowBounds=[isRowTime=false, leftLowerBound=-5700000, leftUpperBound=300000, 
leftTimeIndex=2, rightTimeIndex=4], where=[AND(=(job_id0, job_id),>=(proctime, 
-(proctime0, 5700000:INTERVAL MINUTE)), <=(proctime, +(proctime0, 
300000:INTERVAL MINUTE)))], select=[job_id, updated_at, proctime, job_id0, 
updated_at0, store_id, metadata_json, proctime0])
:              :- Exchange(distribution=[hash[job_id]])
:              :  +- Calc(select=[job_id, updated_at, PROCTIME() AS proctime], 
where=[AND(=(status, _UTF-16LE'DELIVERY_PICKEDUP':VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE"), =(type, _UTF-16LE'INSTA':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"))])
:              :     +- 
TableSourceScan(table=[[*anonymous_datastream_source$10*]], fields=[event_id, 
uuid, time_stamp, schema_version, _server_time_stamp, id, job_id, event_type, 
update_time, update_json, status, updated_at, type, structured, store_id, 
deTriggered, metadata_json, proctime])
:              +- Exchange(distribution=[hash[job_id]])
:                 +- Calc(select=[job_id, updated_at, store_id, metadata_json, 
PROCTIME() AS proctime], where=[AND(=(status, 
_UTF-16LE'DELIVERY_DELIVERED':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
=(type, _UTF-16LE'INSTA':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
:                    +- 
TableSourceScan(table=[[*anonymous_datastream_source$10*]], fields=[event_id, 
uuid, time_stamp, schema_version, _server_time_stamp, id, job_id, event_type, 
update_time, update_json, status, updated_at, type, structured, store_id, 
deTriggered, metadata_json, proctime])
+- Calc(select=[CAST(PROCTIME_MATERIALIZE(PROCTIME()) AS TIMESTAMP(3) NOT NULL) 
AS rill_server_timestamp, ||(_UTF-16LE'lmSpeedKmph_avg_cxGh6_30m#', $f1) AS 
key, ROUND($f1_0, 4) AS value])
   +- GroupWindowAggregate(groupBy=[$f1], window=[SlidingGroupWindow('w$, 
proctime0, 1800000, 300000)], select=[$f1, AVG($f2) AS $f1_0])
      +- Exchange(distribution=[hash[$f1]])
         +- Calc(select=[proctime0, 
SUBSTRING(GetGeoHash(metadata_json.address.lat, metadata_json.address.lng), 1, 
6) AS $f1, /(/(CAST(metadata_json.lastMileDistance AS DOUBLE), 1000:DOUBLE), 
/(CAST(-(updated_at0, updated_at) AS DOUBLE), 3.6E6:DOUBLE)) AS $f2])
            +- IntervalJoin(joinType=[InnerJoin], 
windowBounds=[isRowTime=false, leftLowerBound=-5700000, leftUpperBound=300000, 
leftTimeIndex=2, rightTimeIndex=3], where=[AND(=(job_id0, job_id),>=(proctime, 
-(proctime0, 5700000:INTERVAL MINUTE)), <=(proctime, +(proctime0, 
300000:INTERVAL MINUTE)))], select=[job_id, updated_at, proctime, job_id0, 
updated_at0, metadata_json, proctime0])
               :- Exchange(distribution=[hash[job_id]])
               :  +- Calc(select=[job_id, updated_at, PROCTIME() AS proctime], 
where=[AND(=(status, _UTF-16LE'DELIVERY_PICKEDUP':VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE"), =(type, _UTF-16LE'INSTA':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"))])
               :     +- 
TableSourceScan(table=[[*anonymous_datastream_source$10*]], fields=[event_id, 
uuid, time_stamp, schema_version, _server_time_stamp, id, job_id, event_type, 
update_time, update_json, status, updated_at, type, structured, store_id, 
deTriggered, metadata_json, proctime])
               +- Exchange(distribution=[hash[job_id]])
                  +- Calc(select=[job_id, updated_at, metadata_json, PROCTIME() 
AS proctime], where=[AND(=(status, 
_UTF-16LE'DELIVERY_DELIVERED':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
=(type, _UTF-16LE'INSTA':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
                     +- 
TableSourceScan(table=[[*anonymous_datastream_source$10*]], fields=[event_id, 
uuid, time_stamp, schema_version, _server_time_stamp, id, job_id, event_type, 
update_time, update_json, status, updated_at, type, structured, store_id, 
deTriggered, metadata_json, proctime])

without union all query:

== Optimized Physical Plan ==
Calc(select=[CAST(PROCTIME_MATERIALIZE(PROCTIME()) AS TIMESTAMP(3) NOT NULL) AS 
rill_server_timestamp, ||(_UTF-16LE'lmSpeedKmph_avg_cxGh6_30m#', $f1) AS key, 
ROUND($f1_0, 4) AS value])
+- GroupWindowAggregate(groupBy=[$f1], window=[SlidingGroupWindow('w$, 
proctime0, 1800000, 300000)], select=[$f1, AVG($f2) AS $f1_0])
   +- Exchange(distribution=[hash[$f1]])
      +- Calc(select=[proctime0, 
SUBSTRING(GetGeoHash(metadata_json.address.lat, metadata_json.address.lng), 1, 
6) AS $f1, /(/(CAST(metadata_json.lastMileDistance AS DOUBLE), 1000:DOUBLE), 
/(CAST(-(updated_at0, updated_at) AS DOUBLE), 3.6E6:DOUBLE)) AS $f2])
         +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false, 
leftLowerBound=-5700000, leftUpperBound=300000, leftTimeIndex=2, 
rightTimeIndex=3], where=[AND(=(job_id0, job_id), >=(proctime, -(proctime0, 
5700000:INTERVAL MINUTE)), <=(proctime, +(proctime0, 300000:INTERVAL 
MINUTE)))], select=[job_id, updated_at, proctime, job_id0, updated_at0, 
metadata_json, proctime0])
            :- Exchange(distribution=[hash[job_id]])
            :  +- Calc(select=[job_id, updated_at, PROCTIME() AS proctime], 
where=[AND(=(status, _UTF-16LE'DELIVERY_PICKEDUP':VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE"), =(type, _UTF-16LE'INSTA':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"))])
            :     +- 
TableSourceScan(table=[[*anonymous_datastream_source$11*]], fields=[event_id, 
uuid, time_stamp, schema_version, _server_time_stamp, id, job_id, event_type, 
update_time, update_json, status, updated_at, type, structured, store_id, 
deTriggered, metadata_json, proctime])
            +- Exchange(distribution=[hash[job_id]])
               +- Calc(select=[job_id, updated_at, metadata_json, PROCTIME() AS 
proctime], where=[AND(=(status, 
_UTF-16LE'DELIVERY_DELIVERED':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
=(type, _UTF-16LE'INSTA':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
                  +- 
TableSourceScan(table=[[*anonymous_datastream_source$11*]], fields=[event_id, 
uuid, time_stamp, schema_version, _server_time_stamp, id, job_id, event_type, 
update_time, update_json, status, updated_at, type, structured, store_id, 
deTriggered, metadata_json, proctime])

>  'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint 
> to become unbounded
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32718
>                 URL: https://issues.apache.org/jira/browse/FLINK-32718
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Planner
>    Affects Versions: 1.16.1
>         Environment: Apache Flink 1.16.1
>            Reporter: Neha Aggarwal
>            Assignee: Jayme Howard
>            Priority: Major
>         Attachments: Screenshot 2023-07-31 at 10.37.11 AM.png, Screenshot 
> 2023-07-31 at 10.40.30 AM.png, Screenshot 2023-07-31 at 10.40.38 AM.png, 
> Screenshot 2023-07-31 at 10.41.07 AM.png, Screenshot 2023-07-31 at 10.41.26 
> AM.png, Screenshot 2023-07-31 at 10.41.35 AM.png, Screenshot 2023-07-31 at 
> 10.41.48 AM.png
>
>
> I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
> condition in Flink is causing the checkpoint to become unbounded. I've 
> attached two Flink SQL queries where the only difference is the addition of 
> one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to 
> the query, the checkpoint size keeps increasing, which is not expected. We 
> anticipated it would follow the same pattern of increasing and decreasing 
> based on the throughput, as it does without the 'UNION ALL'.
> When I analyze the breakdown of the checkpoint size, I find that the 
> 'Interval Join' operator has the largest size. I suspect there might be a bug 
> causing this difference in checkpoint behavior due to the addition of the 
> 'GROUP BY' in the query.
> We are using Flink version 1.16.1 with RocksDB as the backend, and 
> incremental checkpointing is enabled.
> PS: Interestingly, when I take periodic savepoints for both jobs, their 
> patterns are similar, and the savepoint size is smaller than the checkpoint 
> in the job with the 'UNION ALL'. Attaching the queries in the comment below.
> With Union ALL:
> select
>   *
> from
>   (
>     with p2d as (
>       select
>         delivered.job_id,
>         delivered.store_id,
>         substring(
>           GetGeoHash(
>             CAST(delivered.metadata_json.address.lat as double),
>             CAST(delivered.metadata_json.address.lng as double)
>           )
>           from
>             1 for 6
>         ) AS cx_gh6,
>         (
>           (
>             cast(
>               delivered.metadata_json.lastMileDistance as double
>             ) / cast(1000 as double)
>           ) /(
>             cast(
>               delivered.updated_at - pickedup.updated_at as double
>             ) / cast(60 * 60 * 1000 as double)
>           )
>         ) as lm_speed_kmph,
>         delivered.proctime as proctime
>       from
>         awz_s3_OrderLogsEvent pickedup
>         inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
>         and delivered.status = 'DELIVERY_DELIVERED'
>         and delivered.type = 'INSTA'
>         and pickedup.proctime between delivered.proctime - interval '95' 
> minute
>         and delivered.proctime + interval '5' minute
>       where
>         pickedup.status = 'DELIVERY_PICKEDUP'
>         and pickedup.type = 'INSTA'
>     )
>     select
>       'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
>       round(Avg(lm_speed_kmph), 4) AS `value`
>     from
>       p2d
>     group by
>       HOP(
>         proctime,
>         interval '5' minute,
>         interval '30' minute
>       ),
>       store_id
>     union all
>     select
>       'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>       round(Avg(lm_speed_kmph), 4) AS `value`
>     from
>       p2d
>     group by
>       HOP(
>         proctime,
>         interval '5' minute,
>         interval '30' minute
>       ),
>       cx_gh6
>   )
>  
> Without Union all which is showing expected behaviour:
> select
>   *
> from
>   (
>     with p2d as (
>       select
>         delivered.job_id,
>         delivered.store_id,
>         substring(
>           GetGeoHash(
>             CAST(delivered.metadata_json.address.lat as double),
>             CAST(delivered.metadata_json.address.lng as double)
>           )
>           from
>             1 for 6
>         ) AS cx_gh6,
>         (
>           (
>             cast(
>               delivered.metadata_json.lastMileDistance as double
>             ) / cast(1000 as double)
>           ) /(
>             cast(
>               delivered.updated_at - pickedup.updated_at as double
>             ) / cast(60 * 60 * 1000 as double)
>           )
>         ) as lm_speed_kmph,
>         delivered.proctime as proctime
>       from
>         awz_s3_OrderLogsEvent pickedup
>         inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
>         and delivered.status = 'DELIVERY_DELIVERED'
>         and delivered.type = 'INSTA'
>         and pickedup.proctime between delivered.proctime - interval '95' 
> minute
>         and delivered.proctime + interval '5' minute
>       where
>         pickedup.status = 'DELIVERY_PICKEDUP'
>         and pickedup.type = 'INSTA'
>     )
>     select
>       'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>       round(Avg(lm_speed_kmph), 4) AS `value`
>     from
>       p2d
>     group by
>       HOP(
>         proctime,
>         interval '5' minute,
>         interval '30' minute
>       ),
>       cx_gh6
>   )
>  
>  
> !Screenshot 2023-07-31 at 10.37.11 AM.png|width=1113,height=597!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to