[
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Asif updated SPARK-44662:
-------------------------
Affects Version/s: 4.0.0
> SPIP: Improving performance of BroadcastHashJoin queries with stream side
> join key on non partition columns
> -----------------------------------------------------------------------------------------------------------
>
> Key: SPARK-44662
> URL: https://issues.apache.org/jira/browse/SPARK-44662
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.5.0, 4.0.0
> Reporter: Asif
> Priority: Major
> Labels: pull-request-available
> Attachments: perf results broadcast var pushdown - Partitioned
> TPCDS.pdf
>
>
> h2. *Q1. What are you trying to do? Articulate your objectives using
> absolutely no jargon.*
> On the lines of DPP which helps DataSourceV2 relations when the joining key
> is a partition column, the same concept can be extended over to the case
> where joining key is not a partition column.
> The Keys of BroadcastHashJoin are already available before actual evaluation
> of the stream iterator. These keys can be pushed down to the DataSource as a
> SortedSet.
> For non partition columns, the DataSources like iceberg have max/min stats on
> column available at manifest level, and for formats like parquet , they have
> max/min stats at various storage level. The passed SortedSet can be used to
> prune using ranges at both driver level ( manifests files) as well as
> executor level ( while actually going through chunks , row groups etc at
> parquet level)
> If the data is stored as Columnar Batch format , then it would not be
> possible to filter out individual row at DataSource level, even though we
> have keys.
> But at the scan level, ( ColumnToRowExec) it is still possible to filter out
> as many rows as possible , if the query involves nested joins. Thus reducing
> the number of rows to join at the higher join levels.
> Will be adding more details..
> h2. *Q2. What problem is this proposal NOT designed to solve?*
> This can only help in BroadcastHashJoin's performance if the join is Inner or
> Left Semi.
> This will also not work if there are nodes like Expand, Generator , Aggregate
> (without group by on keys not part of joining column etc) below the
> BroadcastHashJoin node being targeted.
> h2. *Q3. How is it done today, and what are the limits of current practice?*
> Currently this sort of pruning at DataSource level is being done using DPP
> (Dynamic Partition Pruning ) and IFF one of the join key column is a
> Partitioning column ( so that cost of DPP query is justified and way less
> than amount of data it will be filtering by skipping partitions).
> The limitation is that DPP type approach is not implemented ( intentionally I
> believe), if the join column is a non partition column ( because of cost of
> "DPP type" query would most likely be way high as compared to any possible
> pruning ( especially if the column is not stored in a sorted manner).
> h2. *Q4. What is new in your approach and why do you think it will be
> successful?*
> 1) This allows pruning on non partition column based joins.
> 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP
> type" query.
> 3) The Data can be used by DataSource to prune at driver (possibly) and also
> at executor level ( as in case of parquet which has max/min at various
> structure levels)
> 4) The big benefit should be seen in multilevel nested join queries. In the
> current code base, if I am correct, only one join's pruning filter would get
> pushed at scan level. Since it is on partition key may be that is sufficient.
> But if it is a nested Join query , and may be involving different columns on
> streaming side for join, each such filter push could do significant pruning.
> This requires some handling in case of AQE, as the stream side iterator ( &
> hence stage evaluation needs to be delayed, till all the available join
> filters in the nested tree are pushed at their respective target
> BatchScanExec).
> h4. *Single Row Filteration*
> 5) In case of nested broadcasted joins, if the datasource is column vector
> oriented , then what spark would get is a ColumnarBatch. But because scans
> have Filters from multiple joins, they can be retrieved and can be applied in
> code generated at ColumnToRowExec level, using a new "containsKey" method on
> HashedRelation. Thus only those rows which satisfy all the
> BroadcastedHashJoins ( whose keys have been pushed) , will be used for join
> evaluation.
> The code is already there , the PR on spark side is
> [spark-broadcast-var|https://github.com/apache/spark/pull/43373]. For non
> partition table TPCDS run on laptop with TPCDS data size of ( scale factor
> 4), I am seeing 15% gain.
> For partition table TPCDS, there is improvement in 4 - 5 queries to the tune
> of 10% to 37%.
> h2. *Q5. Who cares? If you are successful, what difference will it make?*
> If use cases involve multiple joins especially when the join columns are non
> partitioned, and performance is a criteria, this PR *might* help.
> h2. *Q6. What are the risks?*
> Well the changes are extensive. review will be painful . Though code is being
> tested continuously and adding more tests , with big change, some possibility
> of bugs is there. But as of now, I think the code is robust. To get the Perf
> benefit fully, the pushed filters utilization needs to be implemented on the
> DataSource side too. Have already done it for {*}iceberg{*}. But I believe
> atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach
> would still result in perf benefit, even with Default implementation which
> can be given in *SupportsRuntimeFiltering*
> h2. *Q7. How long will it take?*
> The code is already implemented. PR is already opened. whatever time needed
> is for review and discussion.
> h2. *Q8. What are the mid-term and final “exams” to check for success?*
> All tests should pass.
> The perf benefit should justify the changes.
> Attaching an excel which indicates perf results on tpcds. However as of now
> the test is done locally on laptop with scale factor of 4. I wil see if I can
> get a full fledged tpcds run.
> The perf results are obtained for partitioned and non partitioned tables. And
> the data source V2 implementation is using iceberg.
> I will be opening a separate PR on iceberg based on the new functions added
> in SupportsRuntimeV2Filtering iterface.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]