I suspect the motivation might be something related to the order in which
the join builds execute. I think the mental model is that they happen "top
down", but in most cases in Impala they actually happen concurrently
because either they're in different fragments or we start up an async build
thread. The exception is if there are multiple builds in the same fragment,
and either mt_dop > 0, in which case they execute one-by-one top down.

But anyway, the common case is that the builds happen concurrently, so it's
racy whether a filter arrives before the scan feeding into a join build
starts. The filter wait time maybe could be thought of as helping the
producer of the filter win the race.

I can see two problems here:

   - Cycles in the filter graph could get weird - with the current wait
   mechanism both scans would just wait, then start up at the same time and
   race to send filters to each other. So we'd probably need to make sure that
   the filter graph didn't have those cycles.
   - The logic around when join build happens might need reworking so that
   builds that send filters "up" the tree always start before or concurrently
   with the join build with the subtree containing the destination scan node.


On Thu, Jan 31, 2019 at 7:24 AM Philip Zeyliger <[email protected]> wrote:

> (If folks are interested in a refresher,
>
> https://impala.apache.org/docs/build/html/topics/impala_runtime_filtering.html
> is useful.)
>
> I stared at that code for a bit, and I agree with you that it's plausible.
> I'm also confused by the "bottom-up" comment of generateFilters(): it seems
> like we walk the plan depth first, and the assignment happens in that
> depth-first walk, before all the runtime filters are generated.
>
> A concern is making sure that there are no outer joins impacting
> correctness.
>
> -- Philip
>
> On Wed, Jan 30, 2019 at 9:36 PM Todd Lipcon <[email protected]> wrote:
>
> > Hey folks,
> >
> > I've been digging into a couple of TPCDS queries that are unexpectedly
> slow
> > and uncovered what seems to be some surprising behavior in the planner
> > concerning runtime filter assignment. Consider the following query in the
> > TPCDS schema:
> >
> > select straight_join *
> >   from promotion a
> >   join item b on a.p_item_sk = b.i_item_sk
> >   join [shuffle] store_sales c on b.i_item_sk = c.ss_item_sk
> >   where b.i_color = 'xxx'
> >
> > This generates a plan that looks like this:
> > http://people.apache.org/~todd/plan.png
> >
> > Or in text form:
> >
> >
> +-----------------------------------------------------------------------------------------------------+
> > | Explain String
> >                           |
> >
> >
> +-----------------------------------------------------------------------------------------------------+
> > | Max Per-Host Resource Reservation: Memory=85.88MB
> >                            |
> > | Per-Host Resource Estimates: Memory=298.14GB
> >                           |
> > |
> >                            |
> > | F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
> >                            |
> > | |  Per-Host Resources: mem-estimate=0B mem-reservation=0B
> >                            |
> > | PLAN-ROOT SINK
> >                           |
> > | |  mem-estimate=0B mem-reservation=0B
> >                            |
> > | |
> >                            |
> > | 08:EXCHANGE [UNPARTITIONED]
> >                            |
> > | |  mem-estimate=0B mem-reservation=0B
> >                            |
> > | |  tuple-ids=0,1,2 row-size=911B cardinality=14201171
> >                            |
> > | |
> >                            |
> > | F03:PLAN FRAGMENT [HASH(b.i_item_sk)] hosts=1 instances=1
> >                            |
> > | Per-Host Resources: mem-estimate=295.06GB mem-reservation=50.00MB
> > runtime-filters-memory=16.00MB    |
> > | 04:HASH JOIN [INNER JOIN, PARTITIONED]
> >                           |
> > | |  hash predicates: b.i_item_sk = c.ss_item_sk
> >                           |
> > | |  fk/pk conjuncts: none
> >                           |
> > | |  runtime filters: RF000[bloom] <- c.ss_item_sk
> >                           |
> > | |  mem-estimate=295.04GB mem-reservation=34.00MB spill-buffer=2.00MB
> >                           |
> > | |  tuple-ids=0,1,2 row-size=911B cardinality=14201171
> >                            |
> > | |
> >                            |
> > | |--07:EXCHANGE [HASH(c.ss_item_sk)]
> >                            |
> > | |  |  mem-estimate=0B mem-reservation=0B
> >                           |
> > | |  |  tuple-ids=2 row-size=100B cardinality=2879987999
> >                           |
> > | |  |
> >                           |
> > | |  F02:PLAN FRAGMENT [RANDOM] hosts=9 instances=9
> >                            |
> > | |  Per-Host Resources: mem-estimate=1.89GB mem-reservation=0B
> >                            |
> > | |  02:SCAN HDFS [tpcds_1000_parquet.store_sales c, RANDOM]
> >                           |
> > | |     partitions=1824/1824 files=1824 size=189.24GB
> >                            |
> > | |     stored statistics:
> >                           |
> > | |       table: rows=2879987999 size=unavailable
> >                            |
> > | |       partitions: 1824/1824 rows=2879987999
> >                            |
> > | |       columns: all
> >                           |
> > | |     extrapolated-rows=disabled
> >                           |
> > | |     mem-estimate=1.89GB mem-reservation=0B
> >                           |
> > | |     tuple-ids=2 row-size=100B cardinality=2879987999
> >                           |
> > | |
> >                            |
> > | 06:EXCHANGE [HASH(b.i_item_sk)]
> >                            |
> > | |  mem-estimate=0B mem-reservation=0B
> >                            |
> > | |  tuple-ids=0,1 row-size=811B cardinality=1500
> >                            |
> > | |
> >                            |
> > | F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
> >                           |
> > | Per-Host Resources: mem-estimate=323.88MB mem-reservation=19.88MB
> > runtime-filters-memory=17.00MB    |
> > | 03:HASH JOIN [INNER JOIN, BROADCAST]
> >                           |
> > | |  hash predicates: a.p_item_sk = b.i_item_sk
> >                            |
> > | |  fk/pk conjuncts: a.p_item_sk = b.i_item_sk
> >                            |
> > | |  runtime filters: RF002[bloom] <- b.i_item_sk
> >                            |
> > | |  mem-estimate=2.88MB mem-reservation=2.88MB spill-buffer=128.00KB
> >                            |
> > | |  tuple-ids=0,1 row-size=811B cardinality=1500
> >                            |
> > | |
> >                            |
> > | |--05:EXCHANGE [BROADCAST]
> >                           |
> > | |  |  mem-estimate=0B mem-reservation=0B
> >                           |
> > | |  |  tuple-ids=1 row-size=496B cardinality=3226
> >                           |
> > | |  |
> >                           |
> > | |  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
> >                            |
> > | |  Per-Host Resources: mem-estimate=896.00MB mem-reservation=16.00MB
> > runtime-filters-memory=16.00MB |
> > | |  01:SCAN HDFS [tpcds_1000_parquet.item b, RANDOM]
> >                            |
> > | |     partitions=1/1 files=1 size=28.28MB
> >                            |
> > | |     predicates: b.i_color = 'xxx'
> >                            |
> > | |     runtime filters: RF000[bloom] -> b.i_item_sk
> >                           |
> > | |     stored statistics:
> >                           |
> > | |       table: rows=300000 size=28.28MB
> >                            |
> > | |       columns: all
> >                           |
> > | |     extrapolated-rows=disabled
> >                           |
> > | |     parquet statistics predicates: b.i_color = 'xxx'
> >                           |
> > | |     parquet dictionary predicates: b.i_color = 'xxx'
> >                           |
> > | |     mem-estimate=880.00MB mem-reservation=0B
> >                           |
> > | |     tuple-ids=1 row-size=496B cardinality=3226
> >                           |
> > | |
> >                            |
> > | 00:SCAN HDFS [tpcds_1000_parquet.promotion a, RANDOM]
> >                            |
> > |    partitions=1/1 files=1 size=85.50KB
> >                           |
> > |    runtime filters: RF000[bloom] -> a.p_item_sk, RF002[bloom] ->
> > a.p_item_sk                        |
> > |    stored statistics:
> >                            |
> > |      table: rows=1500 size=85.50KB
> >                           |
> > |      columns: all
> >                            |
> > |    extrapolated-rows=disabled
> >                            |
> > |    mem-estimate=304.00MB mem-reservation=0B
> >                            |
> > |    tuple-ids=0 row-size=315B cardinality=1500
> >                            |
> >
> >
> +-----------------------------------------------------------------------------------------------------+
> >
> >
> > Here because of the equi-joins, we have a slot equivalence for all of the
> > 'item_id' columns. So, it seems it would be valid to take the runtime
> > filter RF002 (generated from b.sk_item_id at HASH JOIN node 03) and apply
> > it at the scan node 02 for store_sales. Notably, doing so is extremely
> > beneficial in this manufactured query, since the scan of 'item' takes
> only
> > a second or so to determine that in fact there are no items with color
> > 'xxx'. This could completely short circuit the scan 02 of the very large
> > store_sales table.
> >
> > Obviously this query is canned, but I'm seeing some cases in TPCDS (eg
> Q64)
> > where a much more complex variant of this plan ends up being generated
> and
> > failing to utilize an available runtime filter.
> >
> > I spent some time digging in the code, and I think the faulty logic might
> > be here in RuntimeFilterGenerator.java:
> >
> >       generateFilters(ctx, root.getChild(0));
> >       // Finalize every runtime filter of that join. This is to ensure
> that
> > we don't
> >       // assign a filter to a scan node from the right subtree of
> joinNode
> > or ancestor
> >       // join nodes in case we don't find a destination node in the left
> > subtree.
> >       for (RuntimeFilter runtimeFilter: filters)
> > finalizeRuntimeFilter(runtimeFilter);
> >
> > I'm not quite sure how to understand the comment, but it seems this is
> > preventing the runtime filter at the hash join from being sent to any
> node
> > which is either on its right descendent or higher up the tree. In other
> > words, it's limiting the runtime filter to _only_ be applied at nodes in
> > its left subtree.
> >
> > Per my above example query, I think this is unnecessarily restrictive.
> > Based on my understanding of Impala scheduling/parallelism, all of the
> > build sides of joins start running in parallel from bottom up, and it's
> > always possible for a build to complete somewhere deep in the tree
> before a
> > build completes higher up the tree, in which case sending the RF to the
> > hash join's "cousin" is beneficial. I think the only necessary
> restriction
> > is that a RF should not be sent from a hash join node to any descendent
> of
> > its right child.
> >
> > Keep in mind I'm very new to the Impala planner code and particularly to
> > the runtime filter portion thereof, so I may have missed something. But
> > does the above sound like a plausible bug/missed optimization?
> >
> > -Todd
> > --
> > Todd Lipcon
> > Software Engineer, Cloudera
> >
>

Reply via email to