On Thu, Jan 31, 2019 at 8:16 AM Todd Lipcon <[email protected]> wrote:
> On Wed, Jan 30, 2019 at 10:36 PM Tim Armstrong <[email protected]> > wrote: > > > I suspect the motivation might be something related to the order in which > > the join builds execute. I think the mental model is that they happen > "top > > down", but in most cases in Impala they actually happen concurrently > > because either they're in different fragments or we start up an async > build > > thread. The exception is if there are multiple builds in the same > fragment, > > and either mt_dop > 0, in which case they execute one-by-one top down. > > > > Is there a strong reason to go top-down instead of "likely to run fastest > first"? > Not that I know of - you need to set up all of the join builds to run the pipeline anyway (assuming no spilling), so I think it's mainly a matter of what the most efficient way to set up the builds. Bottom-up is actually (I think) better for memory - I think if a join is going to spill, you'd prefer it to be one of the upper ones so that the input row count to the spilling join is minimised. Maybe there's some insight I'm missing here though. > > > > > > But anyway, the common case is that the builds happen concurrently, so > it's > > racy whether a filter arrives before the scan feeding into a join build > > starts. The filter wait time maybe could be thought of as helping the > > producer of the filter win the race. > > > > I can see two problems here: > > > > - Cycles in the filter graph could get weird - with the current wait > > mechanism both scans would just wait, then start up at the same time > and > > race to send filters to each other. So we'd probably need to make sure > > that > > the filter graph didn't have those cycles. > > > > Ah, that makes sense about the waiting on each other unnecessarily. > However, it isn't unreasonable for the filter graph to have a cycle if we > think it's plausible that one side will execute much faster? > Ah yeah, I guess that makes sense - if the filter arrives after the large scan has started, it will still be able to filter out a lot of rows. That is going to add some unpredictability though, since the exact timing would affect the number of rows returned from the scan. Maybe that unpredictability doesn't matter too much given the gains are so big, but it seems like maybe it would be best in some of these cases if the planner picked a winner rather than letting them race. > > For example, in my toy query, after fixing the behavior to what I think > makes more sense, I get this plan: > > +----------------------------------------------------------------+ > | Explain String | > +----------------------------------------------------------------+ > | Max Per-Host Resource Reservation: Memory=86.88MB | > | Per-Host Resource Estimates: Memory=298.14GB | > | | > | PLAN-ROOT SINK | > | | | > | 08:EXCHANGE [UNPARTITIONED] | > | | | > | 04:HASH JOIN [INNER JOIN, PARTITIONED] | > | | hash predicates: b.i_item_sk = c.ss_item_sk | > | | runtime filters: RF000 <- c.ss_item_sk | > | | | > | |--07:EXCHANGE [HASH(c.ss_item_sk)] | > | | | | > | | 02:SCAN HDFS [tpcds_1000_parquet.store_sales c] | > | | partitions=1824/1824 files=1824 size=189.24GB | > | | runtime filters: RF002 -> c.ss_item_sk | > | | | > | 06:EXCHANGE [HASH(b.i_item_sk)] | > | | | > | 03:HASH JOIN [INNER JOIN, BROADCAST] | > | | hash predicates: a.p_item_sk = b.i_item_sk | > | | runtime filters: RF002 <- b.i_item_sk | > | | | > | |--05:EXCHANGE [BROADCAST] | > | | | | > | | 01:SCAN HDFS [tpcds_1000_parquet.item b] | > | | partitions=1/1 files=1 size=28.28MB | > | | predicates: b.i_color = 'xxx' | > | | runtime filters: RF000 -> b.i_item_sk | > | | | > | 00:SCAN HDFS [tpcds_1000_parquet.promotion a] | > | partitions=1/1 files=1 size=85.50KB | > | runtime filters: RF000 -> a.p_item_sk, RF002 -> a.p_item_sk | > +----------------------------------------------------------------+ > > > In this plan, I think we have a sort of cycle: > > 00 SCAN HDFS: waits on RF000, RF002 > 01 SCAN HDFS: waits on RF000 > 03 HASH JOIN: produces RF002, blocked on build side 01 SCAN > 02 SCAN HDFS: waits on RF002 > 04 HASH JOIN: produces RF000, blocked on build side 02 SCAN > > So, JOIN 04 blocks on SCAN 02, which waits on RF002, produced by JOIN 03, > blocked on SCAN 01, which waits on RF000, produced by JOIN 04 > > Sure enough, if I execute this query with RUNTIME_FILTER_WAIT_TIME_MS=30000 > it takes 30sec, whereas if I execute it with no wait time, it completes in > <1sec. > > That said, rather than breaking cycles (which would force us to know which > side is likely to win the race) maybe we should just disable waiting and > break the time-wise dependency? > I think the challenge we have with disabling waiting in this case is . It seems ok to me to skip waiting for filters in the leftmost scan, since the unfiltered rows are just going to be filtered out further up in the pipeline. It's problematic to skip waiting if the scan feeds into a join build, because the rows that weren't filtered end up bloating the join's hash tables and polluting any runtime filters sent from that join node. That's the case where you'd really benefit from forcing the "right" behaviour rather than letting everything race. Just brainstorming, one way to solve this at runtime is, if a scan feeding into a join build started before the runtime filter arrived, then when the runtime filter arrives, it could be forwarded to the join build, which could then apply it to its hash tables to prune out any unnecessary rows (I guess this assumes you don't project out the columns the filter applied to). Then, if rows were pruned out, that join could rebuild and resend its filters to its destinations. It gets interesting with cascading filters, since you could potentially have multiple rounds of the filters being sent. > - The logic around when join build happens might need reworking so that > > builds that send filters "up" the tree always start before or > > concurrently > > with the join build with the subtree containing the destination scan > > node. > > > > > yea, definitely merits some further thought so we don't end up with such a > wait-cycle either due to explicit data dependencies or due to our > scheduling policies. > > BTW, on the plus side, my example query went from taking 387sec without my > optimization to <1sec with the optimization. TPCDS Q64 SF1000 on a 9-node > cluster goes from 44sec to 8sec. A good number of the other plans seem to > have gotten new RFs as well, so re-running them to see if there are any > other big speedups: > > Files query11.sql.explain-old and query11.sql.explain-new differ > Files query17.sql.explain-old and query17.sql.explain-new differ > Files query1.sql.explain-old and query1.sql.explain-new differ > Files query25.sql.explain-old and query25.sql.explain-new differ > Files query29.sql.explain-old and query29.sql.explain-new differ > Files query32.sql.explain-old and query32.sql.explain-new differ > Files query39.sql.explain-old and query39.sql.explain-new differ > Files query58.sql.explain-old and query58.sql.explain-new differ > Files query64.sql.explain-old and query64.sql.explain-new differ > Files query65.sql.explain-old and query65.sql.explain-new differ > Files query72.sql.explain-old and query72.sql.explain-new differ > Files query74.sql.explain-old and query74.sql.explain-new differ > Files query83.sql.explain-old and query83.sql.explain-new differ > Files query84.sql.explain-old and query84.sql.explain-new differ > Files query92.sql.explain-old and query92.sql.explain-new differ > Files query95.sql.explain-old and query95.sql.explain-new differ > > -Todd > > > > > > On Thu, Jan 31, 2019 at 7:24 AM Philip Zeyliger <[email protected]> > > wrote: > > > > > (If folks are interested in a refresher, > > > > > > > > > https://impala.apache.org/docs/build/html/topics/impala_runtime_filtering.html > > > is useful.) > > > > > > I stared at that code for a bit, and I agree with you that it's > > plausible. > > > I'm also confused by the "bottom-up" comment of generateFilters(): it > > seems > > > like we walk the plan depth first, and the assignment happens in that > > > depth-first walk, before all the runtime filters are generated. > > > > > > A concern is making sure that there are no outer joins impacting > > > correctness. > > > > > > -- Philip > > > > > > On Wed, Jan 30, 2019 at 9:36 PM Todd Lipcon <[email protected]> wrote: > > > > > > > Hey folks, > > > > > > > > I've been digging into a couple of TPCDS queries that are > unexpectedly > > > slow > > > > and uncovered what seems to be some surprising behavior in the > planner > > > > concerning runtime filter assignment. Consider the following query in > > the > > > > TPCDS schema: > > > > > > > > select straight_join * > > > > from promotion a > > > > join item b on a.p_item_sk = b.i_item_sk > > > > join [shuffle] store_sales c on b.i_item_sk = c.ss_item_sk > > > > where b.i_color = 'xxx' > > > > > > > > This generates a plan that looks like this: > > > > http://people.apache.org/~todd/plan.png > > > > > > > > Or in text form: > > > > > > > > > > > > > > +-----------------------------------------------------------------------------------------------------+ > > > > | Explain String > > > > | > > > > > > > > > > > > > > +-----------------------------------------------------------------------------------------------------+ > > > > | Max Per-Host Resource Reservation: Memory=85.88MB > > > > | > > > > | Per-Host Resource Estimates: Memory=298.14GB > > > > | > > > > | > > > > | > > > > | F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 > > > > | > > > > | | Per-Host Resources: mem-estimate=0B mem-reservation=0B > > > > | > > > > | PLAN-ROOT SINK > > > > | > > > > | | mem-estimate=0B mem-reservation=0B > > > > | > > > > | | > > > > | > > > > | 08:EXCHANGE [UNPARTITIONED] > > > > | > > > > | | mem-estimate=0B mem-reservation=0B > > > > | > > > > | | tuple-ids=0,1,2 row-size=911B cardinality=14201171 > > > > | > > > > | | > > > > | > > > > | F03:PLAN FRAGMENT [HASH(b.i_item_sk)] hosts=1 instances=1 > > > > | > > > > | Per-Host Resources: mem-estimate=295.06GB mem-reservation=50.00MB > > > > runtime-filters-memory=16.00MB | > > > > | 04:HASH JOIN [INNER JOIN, PARTITIONED] > > > > | > > > > | | hash predicates: b.i_item_sk = c.ss_item_sk > > > > | > > > > | | fk/pk conjuncts: none > > > > | > > > > | | runtime filters: RF000[bloom] <- c.ss_item_sk > > > > | > > > > | | mem-estimate=295.04GB mem-reservation=34.00MB > spill-buffer=2.00MB > > > > | > > > > | | tuple-ids=0,1,2 row-size=911B cardinality=14201171 > > > > | > > > > | | > > > > | > > > > | |--07:EXCHANGE [HASH(c.ss_item_sk)] > > > > | > > > > | | | mem-estimate=0B mem-reservation=0B > > > > | > > > > | | | tuple-ids=2 row-size=100B cardinality=2879987999 > > > > | > > > > | | | > > > > | > > > > | | F02:PLAN FRAGMENT [RANDOM] hosts=9 instances=9 > > > > | > > > > | | Per-Host Resources: mem-estimate=1.89GB mem-reservation=0B > > > > | > > > > | | 02:SCAN HDFS [tpcds_1000_parquet.store_sales c, RANDOM] > > > > | > > > > | | partitions=1824/1824 files=1824 size=189.24GB > > > > | > > > > | | stored statistics: > > > > | > > > > | | table: rows=2879987999 size=unavailable > > > > | > > > > | | partitions: 1824/1824 rows=2879987999 > > > > | > > > > | | columns: all > > > > | > > > > | | extrapolated-rows=disabled > > > > | > > > > | | mem-estimate=1.89GB mem-reservation=0B > > > > | > > > > | | tuple-ids=2 row-size=100B cardinality=2879987999 > > > > | > > > > | | > > > > | > > > > | 06:EXCHANGE [HASH(b.i_item_sk)] > > > > | > > > > | | mem-estimate=0B mem-reservation=0B > > > > | > > > > | | tuple-ids=0,1 row-size=811B cardinality=1500 > > > > | > > > > | | > > > > | > > > > | F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 > > > > | > > > > | Per-Host Resources: mem-estimate=323.88MB mem-reservation=19.88MB > > > > runtime-filters-memory=17.00MB | > > > > | 03:HASH JOIN [INNER JOIN, BROADCAST] > > > > | > > > > | | hash predicates: a.p_item_sk = b.i_item_sk > > > > | > > > > | | fk/pk conjuncts: a.p_item_sk = b.i_item_sk > > > > | > > > > | | runtime filters: RF002[bloom] <- b.i_item_sk > > > > | > > > > | | mem-estimate=2.88MB mem-reservation=2.88MB spill-buffer=128.00KB > > > > | > > > > | | tuple-ids=0,1 row-size=811B cardinality=1500 > > > > | > > > > | | > > > > | > > > > | |--05:EXCHANGE [BROADCAST] > > > > | > > > > | | | mem-estimate=0B mem-reservation=0B > > > > | > > > > | | | tuple-ids=1 row-size=496B cardinality=3226 > > > > | > > > > | | | > > > > | > > > > | | F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 > > > > | > > > > | | Per-Host Resources: mem-estimate=896.00MB > mem-reservation=16.00MB > > > > runtime-filters-memory=16.00MB | > > > > | | 01:SCAN HDFS [tpcds_1000_parquet.item b, RANDOM] > > > > | > > > > | | partitions=1/1 files=1 size=28.28MB > > > > | > > > > | | predicates: b.i_color = 'xxx' > > > > | > > > > | | runtime filters: RF000[bloom] -> b.i_item_sk > > > > | > > > > | | stored statistics: > > > > | > > > > | | table: rows=300000 size=28.28MB > > > > | > > > > | | columns: all > > > > | > > > > | | extrapolated-rows=disabled > > > > | > > > > | | parquet statistics predicates: b.i_color = 'xxx' > > > > | > > > > | | parquet dictionary predicates: b.i_color = 'xxx' > > > > | > > > > | | mem-estimate=880.00MB mem-reservation=0B > > > > | > > > > | | tuple-ids=1 row-size=496B cardinality=3226 > > > > | > > > > | | > > > > | > > > > | 00:SCAN HDFS [tpcds_1000_parquet.promotion a, RANDOM] > > > > | > > > > | partitions=1/1 files=1 size=85.50KB > > > > | > > > > | runtime filters: RF000[bloom] -> a.p_item_sk, RF002[bloom] -> > > > > a.p_item_sk | > > > > | stored statistics: > > > > | > > > > | table: rows=1500 size=85.50KB > > > > | > > > > | columns: all > > > > | > > > > | extrapolated-rows=disabled > > > > | > > > > | mem-estimate=304.00MB mem-reservation=0B > > > > | > > > > | tuple-ids=0 row-size=315B cardinality=1500 > > > > | > > > > > > > > > > > > > > +-----------------------------------------------------------------------------------------------------+ > > > > > > > > > > > > Here because of the equi-joins, we have a slot equivalence for all of > > the > > > > 'item_id' columns. So, it seems it would be valid to take the runtime > > > > filter RF002 (generated from b.sk_item_id at HASH JOIN node 03) and > > apply > > > > it at the scan node 02 for store_sales. Notably, doing so is > extremely > > > > beneficial in this manufactured query, since the scan of 'item' takes > > > only > > > > a second or so to determine that in fact there are no items with > color > > > > 'xxx'. This could completely short circuit the scan 02 of the very > > large > > > > store_sales table. > > > > > > > > Obviously this query is canned, but I'm seeing some cases in TPCDS > (eg > > > Q64) > > > > where a much more complex variant of this plan ends up being > generated > > > and > > > > failing to utilize an available runtime filter. > > > > > > > > I spent some time digging in the code, and I think the faulty logic > > might > > > > be here in RuntimeFilterGenerator.java: > > > > > > > > generateFilters(ctx, root.getChild(0)); > > > > // Finalize every runtime filter of that join. This is to > ensure > > > that > > > > we don't > > > > // assign a filter to a scan node from the right subtree of > > > joinNode > > > > or ancestor > > > > // join nodes in case we don't find a destination node in the > > left > > > > subtree. > > > > for (RuntimeFilter runtimeFilter: filters) > > > > finalizeRuntimeFilter(runtimeFilter); > > > > > > > > I'm not quite sure how to understand the comment, but it seems this > is > > > > preventing the runtime filter at the hash join from being sent to any > > > node > > > > which is either on its right descendent or higher up the tree. In > other > > > > words, it's limiting the runtime filter to _only_ be applied at nodes > > in > > > > its left subtree. > > > > > > > > Per my above example query, I think this is unnecessarily > restrictive. > > > > Based on my understanding of Impala scheduling/parallelism, all of > the > > > > build sides of joins start running in parallel from bottom up, and > it's > > > > always possible for a build to complete somewhere deep in the tree > > > before a > > > > build completes higher up the tree, in which case sending the RF to > the > > > > hash join's "cousin" is beneficial. I think the only necessary > > > restriction > > > > is that a RF should not be sent from a hash join node to any > descendent > > > of > > > > its right child. > > > > > > > > Keep in mind I'm very new to the Impala planner code and particularly > > to > > > > the runtime filter portion thereof, so I may have missed something. > But > > > > does the above sound like a plausible bug/missed optimization? > > > > > > > > -Todd > > > > -- > > > > Todd Lipcon > > > > Software Engineer, Cloudera > > > > > > > > > > > > -- > Todd Lipcon > Software Engineer, Cloudera >
