[
https://issues.apache.org/jira/browse/SPARK-33302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227274#comment-17227274
]
Apache Spark commented on SPARK-33302:
--------------------------------------
User 'AngersZhuuuu' has created a pull request for this issue:
https://github.com/apache/spark/pull/30278
> Failed to push down filters through Expand
> ------------------------------------------
>
> Key: SPARK-33302
> URL: https://issues.apache.org/jira/browse/SPARK-33302
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.3.4, 3.0.1, 3.1.0
> Reporter: Yuming Wang
> Priority: Major
>
> How to reproduce this issue:
> {code:sql}
> create table SPARK_33302_1(pid int, uid int, sid int, dt date, suid int)
> using parquet;
> create table SPARK_33302_2(pid int, vs int, uid int, csid int) using parquet;
> SELECT
> years,
> appversion,
> SUM(uusers) AS users
> FROM (SELECT
> Date_trunc('year', dt) AS years,
> CASE
> WHEN h.pid = 3 THEN 'iOS'
> WHEN h.pid = 4 THEN 'Android'
> ELSE 'Other'
> END AS viewport,
> h.vs AS appversion,
> Count(DISTINCT u.uid) AS uusers
> ,Count(DISTINCT u.suid) AS srcusers
> FROM SPARK_33302_1 u
> join SPARK_33302_2 h
> ON h.uid = u.uid
> GROUP BY 1,
> 2,
> 3) AS a
> WHERE viewport = 'iOS'
> GROUP BY 1,
> 2
> {code}
> {noformat}
> == Physical Plan ==
> *(5) HashAggregate(keys=[years#30, appversion#32],
> functions=[sum(uusers#33L)])
> +- Exchange hashpartitioning(years#30, appversion#32, 200), true, [id=#251]
> +- *(4) HashAggregate(keys=[years#30, appversion#32],
> functions=[partial_sum(uusers#33L)])
> +- *(4) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS
> TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN
> 'Android' ELSE 'Other' END#46, vs#12], functions=[count(if ((gid#44 = 1))
> u.`uid`#47 else null)])
> +- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt` AS
> TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN
> 'Android' ELSE 'Other' END#46, vs#12, 200), true, [id=#246]
> +- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS
> TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN
> 'Android' ELSE 'Other' END#46, vs#12], functions=[partial_count(if ((gid#44 =
> 1)) u.`uid`#47 else null)])
> +- *(3) HashAggregate(keys=[date_trunc('year', CAST(u.`dt` AS
> TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN
> 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44],
> functions=[])
> +- Exchange hashpartitioning(date_trunc('year', CAST(u.`dt`
> AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN
> 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44, 200),
> true, [id=#241]
> +- *(2) HashAggregate(keys=[date_trunc('year',
> CAST(u.`dt` AS TIMESTAMP))#45, CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN
> (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46, vs#12, u.`uid`#47,
> u.`suid`#48, gid#44], functions=[])
> +- *(2) Filter (CASE WHEN (h.`pid` = 3) THEN 'iOS'
> WHEN (h.`pid` = 4) THEN 'Android' ELSE 'Other' END#46 = iOS)
> +- *(2) Expand [ArrayBuffer(date_trunc(year,
> cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE WHEN (pid#11 = 3) THEN iOS
> WHEN (pid#11 = 4) THEN Android ELSE Other END, vs#12, uid#7, null, 1),
> ArrayBuffer(date_trunc(year, cast(dt#9 as timestamp), Some(Etc/GMT+7)), CASE
> WHEN (pid#11 = 3) THEN iOS WHEN (pid#11 = 4) THEN Android ELSE Other END,
> vs#12, null, suid#10, 2)], [date_trunc('year', CAST(u.`dt` AS TIMESTAMP))#45,
> CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` = 4) THEN 'Android' ELSE
> 'Other' END#46, vs#12, u.`uid`#47, u.`suid`#48, gid#44]
> +- *(2) Project [uid#7, dt#9, suid#10, pid#11,
> vs#12]
> +- *(2) BroadcastHashJoin [uid#7], [uid#13],
> Inner, BuildRight
> :- *(2) Project [uid#7, dt#9, suid#10]
> : +- *(2) Filter isnotnull(uid#7)
> : +- *(2) ColumnarToRow
> : +- FileScan parquet
> default.spark_33301_1[uid#7,dt#9,suid#10] Batched: true, DataFilters:
> [isnotnull(uid#7)], Format: Parquet, Location:
> InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/spark_33301_1],
> PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema:
> struct<uid:int,dt:date,suid:int>
> +- BroadcastExchange
> HashedRelationBroadcastMode(List(cast(input[2, int, true] as bigint))),
> [id=#233]
> +- *(1) Project [pid#11, vs#12, uid#13]
> +- *(1) Filter isnotnull(uid#13)
> +- *(1) ColumnarToRow
> +- FileScan parquet
> default.spark_33301_2[pid#11,vs#12,uid#13] Batched: true, DataFilters:
> [isnotnull(uid#13)], Format: Parquet, Location:
> InMemoryFileIndex[file:/root/spark-3.0.0-bin-hadoop3.2/spark-warehouse/spark_33301_2],
> PartitionFilters: [], PushedFilters: [IsNotNull(uid)], ReadSchema:
> struct<pid:int,vs:int,uid:int>
> {noformat}
> We can push down {{Filter (CASE WHEN (h.`pid` = 3) THEN 'iOS' WHEN (h.`pid` =
> 4) THEN 'Android' ELSE 'Other' END#46 = iOS)}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]