Hey folks, I've been digging into a couple of TPCDS queries that are unexpectedly slow and uncovered what seems to be some surprising behavior in the planner concerning runtime filter assignment. Consider the following query in the TPCDS schema:
select straight_join * from promotion a join item b on a.p_item_sk = b.i_item_sk join [shuffle] store_sales c on b.i_item_sk = c.ss_item_sk where b.i_color = 'xxx' This generates a plan that looks like this: http://people.apache.org/~todd/plan.png Or in text form: +-----------------------------------------------------------------------------------------------------+ | Explain String | +-----------------------------------------------------------------------------------------------------+ | Max Per-Host Resource Reservation: Memory=85.88MB | | Per-Host Resource Estimates: Memory=298.14GB | | | | F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 | | | Per-Host Resources: mem-estimate=0B mem-reservation=0B | | PLAN-ROOT SINK | | | mem-estimate=0B mem-reservation=0B | | | | | 08:EXCHANGE [UNPARTITIONED] | | | mem-estimate=0B mem-reservation=0B | | | tuple-ids=0,1,2 row-size=911B cardinality=14201171 | | | | | F03:PLAN FRAGMENT [HASH(b.i_item_sk)] hosts=1 instances=1 | | Per-Host Resources: mem-estimate=295.06GB mem-reservation=50.00MB runtime-filters-memory=16.00MB | | 04:HASH JOIN [INNER JOIN, PARTITIONED] | | | hash predicates: b.i_item_sk = c.ss_item_sk | | | fk/pk conjuncts: none | | | runtime filters: RF000[bloom] <- c.ss_item_sk | | | mem-estimate=295.04GB mem-reservation=34.00MB spill-buffer=2.00MB | | | tuple-ids=0,1,2 row-size=911B cardinality=14201171 | | | | | |--07:EXCHANGE [HASH(c.ss_item_sk)] | | | | mem-estimate=0B mem-reservation=0B | | | | tuple-ids=2 row-size=100B cardinality=2879987999 | | | | | | | F02:PLAN FRAGMENT [RANDOM] hosts=9 instances=9 | | | Per-Host Resources: mem-estimate=1.89GB mem-reservation=0B | | | 02:SCAN HDFS [tpcds_1000_parquet.store_sales c, RANDOM] | | | partitions=1824/1824 files=1824 size=189.24GB | | | stored statistics: | | | table: rows=2879987999 size=unavailable | | | partitions: 1824/1824 rows=2879987999 | | | columns: all | | | extrapolated-rows=disabled | | | mem-estimate=1.89GB mem-reservation=0B | | | tuple-ids=2 row-size=100B cardinality=2879987999 | | | | | 06:EXCHANGE [HASH(b.i_item_sk)] | | | mem-estimate=0B mem-reservation=0B | | | tuple-ids=0,1 row-size=811B cardinality=1500 | | | | | F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | | Per-Host Resources: mem-estimate=323.88MB mem-reservation=19.88MB runtime-filters-memory=17.00MB | | 03:HASH JOIN [INNER JOIN, BROADCAST] | | | hash predicates: a.p_item_sk = b.i_item_sk | | | fk/pk conjuncts: a.p_item_sk = b.i_item_sk | | | runtime filters: RF002[bloom] <- b.i_item_sk | | | mem-estimate=2.88MB mem-reservation=2.88MB spill-buffer=128.00KB | | | tuple-ids=0,1 row-size=811B cardinality=1500 | | | | | |--05:EXCHANGE [BROADCAST] | | | | mem-estimate=0B mem-reservation=0B | | | | tuple-ids=1 row-size=496B cardinality=3226 | | | | | | | F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | | | Per-Host Resources: mem-estimate=896.00MB mem-reservation=16.00MB runtime-filters-memory=16.00MB | | | 01:SCAN HDFS [tpcds_1000_parquet.item b, RANDOM] | | | partitions=1/1 files=1 size=28.28MB | | | predicates: b.i_color = 'xxx' | | | runtime filters: RF000[bloom] -> b.i_item_sk | | | stored statistics: | | | table: rows=300000 size=28.28MB | | | columns: all | | | extrapolated-rows=disabled | | | parquet statistics predicates: b.i_color = 'xxx' | | | parquet dictionary predicates: b.i_color = 'xxx' | | | mem-estimate=880.00MB mem-reservation=0B | | | tuple-ids=1 row-size=496B cardinality=3226 | | | | | 00:SCAN HDFS [tpcds_1000_parquet.promotion a, RANDOM] | | partitions=1/1 files=1 size=85.50KB | | runtime filters: RF000[bloom] -> a.p_item_sk, RF002[bloom] -> a.p_item_sk | | stored statistics: | | table: rows=1500 size=85.50KB | | columns: all | | extrapolated-rows=disabled | | mem-estimate=304.00MB mem-reservation=0B | | tuple-ids=0 row-size=315B cardinality=1500 | +-----------------------------------------------------------------------------------------------------+ Here because of the equi-joins, we have a slot equivalence for all of the 'item_id' columns. So, it seems it would be valid to take the runtime filter RF002 (generated from b.sk_item_id at HASH JOIN node 03) and apply it at the scan node 02 for store_sales. Notably, doing so is extremely beneficial in this manufactured query, since the scan of 'item' takes only a second or so to determine that in fact there are no items with color 'xxx'. This could completely short circuit the scan 02 of the very large store_sales table. Obviously this query is canned, but I'm seeing some cases in TPCDS (eg Q64) where a much more complex variant of this plan ends up being generated and failing to utilize an available runtime filter. I spent some time digging in the code, and I think the faulty logic might be here in RuntimeFilterGenerator.java: generateFilters(ctx, root.getChild(0)); // Finalize every runtime filter of that join. This is to ensure that we don't // assign a filter to a scan node from the right subtree of joinNode or ancestor // join nodes in case we don't find a destination node in the left subtree. for (RuntimeFilter runtimeFilter: filters) finalizeRuntimeFilter(runtimeFilter); I'm not quite sure how to understand the comment, but it seems this is preventing the runtime filter at the hash join from being sent to any node which is either on its right descendent or higher up the tree. In other words, it's limiting the runtime filter to _only_ be applied at nodes in its left subtree. Per my above example query, I think this is unnecessarily restrictive. Based on my understanding of Impala scheduling/parallelism, all of the build sides of joins start running in parallel from bottom up, and it's always possible for a build to complete somewhere deep in the tree before a build completes higher up the tree, in which case sending the RF to the hash join's "cousin" is beneficial. I think the only necessary restriction is that a RF should not be sent from a hash join node to any descendent of its right child. Keep in mind I'm very new to the Impala planner code and particularly to the runtime filter portion thereof, so I may have missed something. But does the above sound like a plausible bug/missed optimization? -Todd -- Todd Lipcon Software Engineer, Cloudera
