[
https://issues.apache.org/jira/browse/SPARK-19730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15884770#comment-15884770
]
Herman van Hovell commented on SPARK-19730:
-------------------------------------------
On a high level I agree that this is useful.
However the size of the output of the subquery can be a problem. For this to
practically work you need to have the output available in memory when
filtering; the output needs to be broadcasted. This puts a limitation on the
cases where this applies (your example for instance uses a sort merge semi join
meaning that the relation is probably too big), and it also requires some
clever planning.
> Predicate Subqueries do not push results of subqueries to data source
> ---------------------------------------------------------------------
>
> Key: SPARK-19730
> URL: https://issues.apache.org/jira/browse/SPARK-19730
> Project: Spark
> Issue Type: Bug
> Components: Optimizer, SQL
> Affects Versions: 2.1.0
> Reporter: Shawn Lavelle
>
> When a SparkSQL query contains a subquery in the where clause, such as a
> predicate query using the IN operator, the results of that subquery are not
> pushed down as a filter to the DataSourceAPI for the outer query.
> Example:
> Select point, time, value from data where time between now()-86400 and now()
> and point in (select point from groups where group_id=5);
> Two queries will be sent to the data Source. One for the subquery, and
> another for the outer query. The subquery works correctly returning the
> points in the group, however, outer query does not push a filter for point
> column.
> Affect:
> The "group" table has a few hundred rows to group a few hundred thousand
> points. The data table has several billion rows keyed by point and time.
> Without the ability to push down the filters for the columns of outer the
> query, the data source cannot properly conduct its pruned scan.
> The subquery results should be pushed down to the outer query as an IN Filter
> with the results of the subquery.
> {panel:title=Physical Plan}
> *Project [point#263, value#270]
> +- SortMergeJoin [point#263], [col#284], LeftSemi
> :- *Sort [point#263 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(point#263, 20)
> : +- *Filter ((time#264L >= 1487964691000) && (time#264L <=
> 1487964696000))
> : +- \*Scan @4b455128 DBNAME.data[ point#263, time#264Lvalue#270]
> *PushedFilters: [GreaterThanOrEqual(time,1487964691000),
> LessThanOrEqual(time,1487964691000)*, ReadSchema:
> struct<point:int,time:bigint,value:double>...
> +- *Sort [col#284 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(col#284, 20)
> +- Generate explode(points#273), false, false, [col#284]
> +- *Project [points#273]
> +- *Filter (group_id#272 = 1)
> +- *Scan @12fb3c1a <db>.groups[points#273,group_id#272]
> PushedFilters: [EqualTo(group_id,1)], ReadSchema: struct<points:array<int>> |
> {panel}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]