ACking-you commented on issue #7955: URL: https://github.com/apache/arrow-datafusion/issues/7955#issuecomment-1784007434
> ### Is your feature request related to a problem or challenge?
> If we want to make DataFusion the engine of choice for fast OLAP
processing, eventually we will need to make joins faster. In addition to making
sure the join order is not disastrous (e.g. #7949) we can consider other
advanced OLAP techniques improve joins (especially queries with multiple joins)
>
> ### Describe the solution you'd like
> I would like to propose we look into pushing "join predicate" into scans
(which I know of as "sideways information passing")
>
> As an example, consider the joins from TPCH Q17
>
> ```sql
> select
> sum(l_extendedprice) / 7.0 as avg_yearly from
> part, lineitem
> where
> p_partkey = l_partkey
> and p_brand = 'Brand#23'
> and p_container = 'MED BOX'
> and l_quantity < ( select 0.2 * avg(l_quantity) from
lineitem where l_partkey = p_partkey );
> ```
>
> The first join (should) look like this. The observation is there are no
predicates on the `lineitem` table (the big one), which means all the filtering
happens in the join, which is bad because the scan can't do optimizations like
"late materialization" and instead must decode all 60M values of selected
columns, even though very few (2044!) are actually used
>
> ```
> │
> │
> 2044 Rows │
> │
> ▼
> ┌────────────────┐
> │ HashJoin │
> │ p_partkey = │
> │ l_partkey │
> └──┬─────────┬───┘ This scan decodes
60M values
> 2M Rows │ │ 60M Rows of l_quantity
and
> ┌────────┘ └─────────┐ l_extendedprice,
even though
> │ │ all but 2044 are
filtered by
> ▼ ▼ the join
> ┌──────────────────┐ ┌─────────────────────┐
> │Scan: part │ │Scan: lineitem │ │
> │projection: │ │projection: │
> │ p_partkey │ │ l_quantity, │ │
> │filters: │ │ l_extendedprice, │◀─ ─ ─ ─ ─ ─ ─ ─ ─
> │ p_brand = .. │ │ l_partkey │
> │ p_container = ..│ │filters: │
> │ │ │ NONE │
> └──────────────────┘ └─────────────────────┘
> ```
>
> The idea is to push the predicate into the join, by making something that
acts like `l_partkey IN (...)` that can be applied during the scan
>
> ```
>
> 1. The HashJoin completely reads the build
> side before starting the probe side.
>
> Thus, all 2M known matching values of
> │ l_partkey are in a hash table prior to
> │ scanning lineitem
> 2044 Rows │
> │ │
> ▼
> ┌────────────────┐ │
> │ HashJoin │
> │ p_partkey = │◀─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
> │ l_partkey │
> └──┬─────────┬───┘
> │ │ 60M Rows
> ┌────────┘ └────────────┐ The idea is
to introduce a filter
> │ │ that is
effectively "l_partkey IN
> ▼ ▼ (HASH TABLE)"
or something similar
> ┌──────────────────┐ ┌──────────────────────────┐ that is
applied during the scan
> │Scan: part │ │Scan: lineitem │┌ ─ ─
> │projection: │ │projection: │ If the scan
can avoid decoding
> │ p_partkey │ │ l_quantity, ││ l_quantity
and l_extended that do
> │filters: │ │ l_extendedprice, │ not match,
there is significant
> │ p_brand = .. │ │ l_partkey ││ savings
> │ p_container = ..│ │filters: │
> │ │ │ l_partkey IN (....) ◀─│┘
> └──────────────────┘ └──────────────────────────┘
> ```
>
> In a query with a single selective join (that filters many values) the
savings is likely minimal as it depends on how much work can be saved in
materialization (decoding). The only scan that does late materialization in
DataFusion at the time of writing is the `ParquetExec`
>
> However, in a query with multiple selective joins the savings becomes much
more pronounced, because we can save the effort of creating intermediate join
outputs which are filtered out by joins later in the plan
>
> For example:
>
> ```
> Pass down in multiple joins
>
> While this doesn't happen in TPCH
> Q17 (the subquery has no predicates)
> the SIPS approach can be even more
> effective with multiple selective
> joins │
> │
> │ Filters on both join
keys can be applied
> │ at this level, which
can be even more
> ▼ effective as it avoids
the work to create
> ┌────────────────┐ the intermediate
output of HashJoin(2) ─ ┐
> │ HashJoin (1) │ which is then filtered
by HashJoin(1)
> │ d1.key = │
│
> │ f.d1_key │
> └──┬─────────┬───┘
│
> │ │
> ┌──────────┘ └────────────┐
│
> │ │
> ▼ ▼
│
> ┌──────────────────┐ ┌────────────────┐
> │Scan: D1 │ │ HashJoin (2) │
│
> │filters: │ │ d2.key = │
> │ ... │ │ f.d2_key │
│
> └──────────────────┘ └───┬─────────┬──┘
> │ │
│
> ┌───────────┘
└─────────────┐
> │
│ │
> ▼
▼
> ┌────────────────┐
┌─────────────────────┐ │
> │Scan: D2 │ │Scan: F
│
> │filters: │ │filters:
│ │
> │ ... │ │ f.d1_key
IN (...) │◀ ─ ─ ─ ─
> └────────────────┘ │ f.d2_key
IN (...) │
> │
│
>
└─────────────────────┘
> ```
>
> ### Describe alternatives you've considered
> Some version of this technique is described in "Bloom Filter Joins" in
Spark: https://issues.apache.org/jira/browse/SPARK-32268
>
> Building a seprate Bloom Filter has the nice property that you can
distribute them in a networked cluster, however, the overhead of creating the
bloom filter would likely be non trivial
>
> ### Additional context
> See a description of how DataFusion HashJoins work here: #7953
>
> Here is an industrial paper that describes industrial experience with
using SIPS techniques here:
https://15721.courses.cs.cmu.edu/spring2020/papers/13-execution/shrinivas-icde2013.pdf
I'm very curious about how this kind of graph is drawn😯
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
