[
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17766622#comment-17766622
]
Neha Aggarwal commented on FLINK-32718:
---------------------------------------
Hello Jane Chan,
I am attaching the Physical Plan. Let me know if you want the execution plan as
well.
== Optimized Physical Plan ==
Union(all=[true], union=[rill_server_timestamp, key, value])
:- Calc(select=[CAST(PROCTIME_MATERIALIZE(PROCTIME()) AS TIMESTAMP(3) NOT NULL)
AS rill_server_timestamp, ||(_UTF-16LE'lmSpeedKmph_avg_storeId_30m#',
CAST(store_id0 AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE")) AS key,
ROUND($f1, 4) AS value])
: +- GroupWindowAggregate(groupBy=[store_id0], window=[SlidingGroupWindow('w$,
proctime0, 1800000, 300000)], select=[store_id0, AVG($f2) AS $f1])
: +- Exchange(distribution=[hash[store_id0]])
: +- Calc(select=[proctime0, store_id AS store_id0,
/(/(CAST(metadata_json.lastMileDistance AS DOUBLE), 1000:DOUBLE),
/(CAST(-(updated_at0, updated_at) AS DOUBLE), 3.6E6:DOUBLE)) AS $f2])
: +- IntervalJoin(joinType=[InnerJoin],
windowBounds=[isRowTime=false, leftLowerBound=-5700000, leftUpperBound=300000,
leftTimeIndex=2, rightTimeIndex=4], where=[AND(=(job_id0, job_id),>=(proctime,
-(proctime0, 5700000:INTERVAL MINUTE)), <=(proctime, +(proctime0,
300000:INTERVAL MINUTE)))], select=[job_id, updated_at, proctime, job_id0,
updated_at0, store_id, metadata_json, proctime0])
: :- Exchange(distribution=[hash[job_id]])
: : +- Calc(select=[job_id, updated_at, PROCTIME() AS proctime],
where=[AND(=(status, _UTF-16LE'DELIVERY_PICKEDUP':VARCHAR(2147483647) CHARACTER
SET "UTF-16LE"), =(type, _UTF-16LE'INSTA':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"))])
: : +-
TableSourceScan(table=[[*anonymous_datastream_source$10*]], fields=[event_id,
uuid, time_stamp, schema_version, _server_time_stamp, id, job_id, event_type,
update_time, update_json, status, updated_at, type, structured, store_id,
deTriggered, metadata_json, proctime])
: +- Exchange(distribution=[hash[job_id]])
: +- Calc(select=[job_id, updated_at, store_id, metadata_json,
PROCTIME() AS proctime], where=[AND(=(status,
_UTF-16LE'DELIVERY_DELIVERED':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
=(type, _UTF-16LE'INSTA':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
: +-
TableSourceScan(table=[[*anonymous_datastream_source$10*]], fields=[event_id,
uuid, time_stamp, schema_version, _server_time_stamp, id, job_id, event_type,
update_time, update_json, status, updated_at, type, structured, store_id,
deTriggered, metadata_json, proctime])
+- Calc(select=[CAST(PROCTIME_MATERIALIZE(PROCTIME()) AS TIMESTAMP(3) NOT NULL)
AS rill_server_timestamp, ||(_UTF-16LE'lmSpeedKmph_avg_cxGh6_30m#', $f1) AS
key, ROUND($f1_0, 4) AS value])
+- GroupWindowAggregate(groupBy=[$f1], window=[SlidingGroupWindow('w$,
proctime0, 1800000, 300000)], select=[$f1, AVG($f2) AS $f1_0])
+- Exchange(distribution=[hash[$f1]])
+- Calc(select=[proctime0,
SUBSTRING(GetGeoHash(metadata_json.address.lat, metadata_json.address.lng), 1,
6) AS $f1, /(/(CAST(metadata_json.lastMileDistance AS DOUBLE), 1000:DOUBLE),
/(CAST(-(updated_at0, updated_at) AS DOUBLE), 3.6E6:DOUBLE)) AS $f2])
+- IntervalJoin(joinType=[InnerJoin],
windowBounds=[isRowTime=false, leftLowerBound=-5700000, leftUpperBound=300000,
leftTimeIndex=2, rightTimeIndex=3], where=[AND(=(job_id0, job_id),>=(proctime,
-(proctime0, 5700000:INTERVAL MINUTE)), <=(proctime, +(proctime0,
300000:INTERVAL MINUTE)))], select=[job_id, updated_at, proctime, job_id0,
updated_at0, metadata_json, proctime0])
:- Exchange(distribution=[hash[job_id]])
: +- Calc(select=[job_id, updated_at, PROCTIME() AS proctime],
where=[AND(=(status, _UTF-16LE'DELIVERY_PICKEDUP':VARCHAR(2147483647) CHARACTER
SET "UTF-16LE"), =(type, _UTF-16LE'INSTA':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"))])
: +-
TableSourceScan(table=[[*anonymous_datastream_source$10*]], fields=[event_id,
uuid, time_stamp, schema_version, _server_time_stamp, id, job_id, event_type,
update_time, update_json, status, updated_at, type, structured, store_id,
deTriggered, metadata_json, proctime])
+- Exchange(distribution=[hash[job_id]])
+- Calc(select=[job_id, updated_at, metadata_json, PROCTIME()
AS proctime], where=[AND(=(status,
_UTF-16LE'DELIVERY_DELIVERED':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
=(type, _UTF-16LE'INSTA':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
+-
TableSourceScan(table=[[*anonymous_datastream_source$10*]], fields=[event_id,
uuid, time_stamp, schema_version, _server_time_stamp, id, job_id, event_type,
update_time, update_json, status, updated_at, type, structured, store_id,
deTriggered, metadata_json, proctime])
without union all query:
== Optimized Physical Plan ==
Calc(select=[CAST(PROCTIME_MATERIALIZE(PROCTIME()) AS TIMESTAMP(3) NOT NULL) AS
rill_server_timestamp, ||(_UTF-16LE'lmSpeedKmph_avg_cxGh6_30m#', $f1) AS key,
ROUND($f1_0, 4) AS value])
+- GroupWindowAggregate(groupBy=[$f1], window=[SlidingGroupWindow('w$,
proctime0, 1800000, 300000)], select=[$f1, AVG($f2) AS $f1_0])
+- Exchange(distribution=[hash[$f1]])
+- Calc(select=[proctime0,
SUBSTRING(GetGeoHash(metadata_json.address.lat, metadata_json.address.lng), 1,
6) AS $f1, /(/(CAST(metadata_json.lastMileDistance AS DOUBLE), 1000:DOUBLE),
/(CAST(-(updated_at0, updated_at) AS DOUBLE), 3.6E6:DOUBLE)) AS $f2])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false,
leftLowerBound=-5700000, leftUpperBound=300000, leftTimeIndex=2,
rightTimeIndex=3], where=[AND(=(job_id0, job_id), >=(proctime, -(proctime0,
5700000:INTERVAL MINUTE)), <=(proctime, +(proctime0, 300000:INTERVAL
MINUTE)))], select=[job_id, updated_at, proctime, job_id0, updated_at0,
metadata_json, proctime0])
:- Exchange(distribution=[hash[job_id]])
: +- Calc(select=[job_id, updated_at, PROCTIME() AS proctime],
where=[AND(=(status, _UTF-16LE'DELIVERY_PICKEDUP':VARCHAR(2147483647) CHARACTER
SET "UTF-16LE"), =(type, _UTF-16LE'INSTA':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"))])
: +-
TableSourceScan(table=[[*anonymous_datastream_source$11*]], fields=[event_id,
uuid, time_stamp, schema_version, _server_time_stamp, id, job_id, event_type,
update_time, update_json, status, updated_at, type, structured, store_id,
deTriggered, metadata_json, proctime])
+- Exchange(distribution=[hash[job_id]])
+- Calc(select=[job_id, updated_at, metadata_json, PROCTIME() AS
proctime], where=[AND(=(status,
_UTF-16LE'DELIVERY_DELIVERED':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"),
=(type, _UTF-16LE'INSTA':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
+-
TableSourceScan(table=[[*anonymous_datastream_source$11*]], fields=[event_id,
uuid, time_stamp, schema_version, _server_time_stamp, id, job_id, event_type,
update_time, update_json, status, updated_at, type, structured, store_id,
deTriggered, metadata_json, proctime])
> 'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint
> to become unbounded
> -----------------------------------------------------------------------------------------------
>
> Key: FLINK-32718
> URL: https://issues.apache.org/jira/browse/FLINK-32718
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API, Table SQL / Planner
> Affects Versions: 1.16.1
> Environment: Apache Flink 1.16.1
> Reporter: Neha Aggarwal
> Assignee: Jayme Howard
> Priority: Major
> Attachments: Screenshot 2023-07-31 at 10.37.11 AM.png, Screenshot
> 2023-07-31 at 10.40.30 AM.png, Screenshot 2023-07-31 at 10.40.38 AM.png,
> Screenshot 2023-07-31 at 10.41.07 AM.png, Screenshot 2023-07-31 at 10.41.26
> AM.png, Screenshot 2023-07-31 at 10.41.35 AM.png, Screenshot 2023-07-31 at
> 10.41.48 AM.png
>
>
> I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY'
> condition in Flink is causing the checkpoint to become unbounded. I've
> attached two Flink SQL queries where the only difference is the addition of
> one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to
> the query, the checkpoint size keeps increasing, which is not expected. We
> anticipated it would follow the same pattern of increasing and decreasing
> based on the throughput, as it does without the 'UNION ALL'.
> When I analyze the breakdown of the checkpoint size, I find that the
> 'Interval Join' operator has the largest size. I suspect there might be a bug
> causing this difference in checkpoint behavior due to the addition of the
> 'GROUP BY' in the query.
> We are using Flink version 1.16.1 with RocksDB as the backend, and
> incremental checkpointing is enabled.
> PS: Interestingly, when I take periodic savepoints for both jobs, their
> patterns are similar, and the savepoint size is smaller than the checkpoint
> in the job with the 'UNION ALL'. Attaching the queries in the comment below.
> With Union ALL:
> select
> *
> from
> (
> with p2d as (
> select
> delivered.job_id,
> delivered.store_id,
> substring(
> GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
> )
> from
> 1 for 6
> ) AS cx_gh6,
> (
> (
> cast(
> delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
> ) /(
> cast(
> delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
> )
> ) as lm_speed_kmph,
> delivered.proctime as proctime
> from
> awz_s3_OrderLogsEvent pickedup
> inner join awz_s3_OrderLogsEvent delivered on delivered.job_id =
> pickedup.job_id
> and delivered.status = 'DELIVERY_DELIVERED'
> and delivered.type = 'INSTA'
> and pickedup.proctime between delivered.proctime - interval '95'
> minute
> and delivered.proctime + interval '5' minute
> where
> pickedup.status = 'DELIVERY_PICKEDUP'
> and pickedup.type = 'INSTA'
> )
> select
> 'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
> round(Avg(lm_speed_kmph), 4) AS `value`
> from
> p2d
> group by
> HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
> ),
> store_id
> union all
> select
> 'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
> round(Avg(lm_speed_kmph), 4) AS `value`
> from
> p2d
> group by
> HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
> ),
> cx_gh6
> )
>
> Without Union all which is showing expected behaviour:
> select
> *
> from
> (
> with p2d as (
> select
> delivered.job_id,
> delivered.store_id,
> substring(
> GetGeoHash(
> CAST(delivered.metadata_json.address.lat as double),
> CAST(delivered.metadata_json.address.lng as double)
> )
> from
> 1 for 6
> ) AS cx_gh6,
> (
> (
> cast(
> delivered.metadata_json.lastMileDistance as double
> ) / cast(1000 as double)
> ) /(
> cast(
> delivered.updated_at - pickedup.updated_at as double
> ) / cast(60 * 60 * 1000 as double)
> )
> ) as lm_speed_kmph,
> delivered.proctime as proctime
> from
> awz_s3_OrderLogsEvent pickedup
> inner join awz_s3_OrderLogsEvent delivered on delivered.job_id =
> pickedup.job_id
> and delivered.status = 'DELIVERY_DELIVERED'
> and delivered.type = 'INSTA'
> and pickedup.proctime between delivered.proctime - interval '95'
> minute
> and delivered.proctime + interval '5' minute
> where
> pickedup.status = 'DELIVERY_PICKEDUP'
> and pickedup.type = 'INSTA'
> )
> select
> 'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
> round(Avg(lm_speed_kmph), 4) AS `value`
> from
> p2d
> group by
> HOP(
> proctime,
> interval '5' minute,
> interval '30' minute
> ),
> cx_gh6
> )
>
>
> !Screenshot 2023-07-31 at 10.37.11 AM.png|width=1113,height=597!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)