alamb opened a new issue, #7955:
URL: https://github.com/apache/arrow-datafusion/issues/7955

   ### Is your feature request related to a problem or challenge?
   
   If we want to make DataFusion the engine of choice for fast OLAP processing, 
eventually we will need to make joins faster. In addition to making sure the 
join order is not disastrous (e.g. 
https://github.com/apache/arrow-datafusion/issues/7949) we can consider other 
advanced OLAP techniques improve joins (especially queries with multiple joins)
   
   ### Describe the solution you'd like
   
   I would like to propose we look into pushing "join predicate" into scans 
(which I know of as "sideways information passing")
   
   As an example, consider the joins from TPCH Q17
   
   ```sql
   select
   sum(l_extendedprice) / 7.0 as avg_yearly from
   part, lineitem
   where
     p_partkey = l_partkey
     and p_brand = 'Brand#23'
     and p_container = 'MED BOX'
     and l_quantity < ( select  0.2 * avg(l_quantity)   from    lineitem where  
l_partkey = p_partkey   );
   ```
   
   The first join (should) look like this. The observation is there are no 
predicates on the `lineitem` table (the big one), which means all the filtering 
happens in the join, which is bad because the scan can't do optimizations like 
"late materialization" and instead must decode all 60M values of selected 
columns, even though very few (2044!) are actually used
   
   
   ```
                             │                                                  
       
                             │                                                  
       
              2044 Rows      │                                                  
       
                             │                                                  
       
                             ▼                                                  
       
                    ┌────────────────┐                                          
       
                    │    HashJoin    │                                          
       
                    │   p_partkey =  │                                          
       
                    │   l_partkey    │                                          
       
                    └──┬─────────┬───┘                     This scan decodes 
60M values
      2M Rows          │         │             60M Rows         of l_quantity 
and      
              ┌────────┘         └─────────┐               l_extendedprice, 
even though
              │                            │               all but 2044 are 
filtered by
              ▼                            ▼                         the join   
       
    ┌──────────────────┐        ┌─────────────────────┐                         
       
    │Scan: part        │        │Scan: lineitem       │                  │      
       
    │projection:       │        │projection:          │                         
       
    │  p_partkey       │        │  l_quantity,        │                  │      
       
    │filters:          │        │  l_extendedprice,   │◀─ ─ ─ ─ ─ ─ ─ ─ ─       
       
    │  p_brand = ..    │        │  l_partkey          │                         
       
    │  p_container = ..│        │filters:             │                         
       
    │                  │        │  NONE               │                         
       
    └──────────────────┘        └─────────────────────┘                         
       
   ```
   
   The idea is to push the predicate into the join, by making something that 
acts like `l_partkey IN (...)` that can be applied during the scan
   
   
   ```
   
                                  1. The HashJoin completely reads the build    
                    
                                  side before starting the probe side.          
                    
                                                                                
                    
                                  Thus, all 2M known matching values of         
                    
                            │     l_partkey are in a hash table prior to        
                    
                            │     scanning lineitem                             
                    
             2044 Rows      │                                                   
                    
                            │                           │                       
                    
                            ▼                                                   
                    
                   ┌────────────────┐                   │                       
                    
                   │    HashJoin    │                                           
                    
                   │   p_partkey =  │◀─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                       
                    
                   │   l_partkey    │                                           
                    
                   └──┬─────────┬───┘                                           
                    
                      │         │             60M Rows                          
                    
             ┌────────┘         └────────────┐                  The idea is to 
introduce a filter   
             │                               │                  that is 
effectively "l_partkey IN   
             ▼                               ▼                  (HASH TABLE)" 
or something similar  
   ┌──────────────────┐        ┌──────────────────────────┐     that is applied 
during the scan     
   │Scan: part        │        │Scan: lineitem            │┌ ─ ─                
                    
   │projection:       │        │projection:               │     If the scan can 
avoid decoding      
   │  p_partkey       │        │  l_quantity,             ││    l_quantity and 
l_extended that do   
   │filters:          │        │  l_extendedprice,        │     not match, 
there is significant     
   │  p_brand = ..    │        │  l_partkey               ││    savings         
                    
   │  p_container = ..│        │filters:                  │                     
                    
   │                  │        │  l_partkey IN (....)   ◀─│┘                    
                    
   └──────────────────┘        └──────────────────────────┘                     
                    
   ```
   
   In a query with a single selective join (that filters many values) the 
savings is likely minimal as it depends on how much work can be saved in 
materialization (decoding). The only scan that does late materialization in 
DataFusion at the time of writing is the `ParquetExec`
   
   However, in a query with multiple selective joins the savings becomes much 
more pronounced, because we can save the effort of creating intermediate join 
outputs which are filtered out by joins later in the plan
   
   For example:
   
   ```
       Pass down in multiple joins                                              
                   
                                                                                
                   
    While this doesn't happen in TPCH                                           
                   
   Q17 (the subquery has no predicates)                                         
                   
    the SIPS approach can be even more                                          
                   
    effective with multiple selective                                           
                   
                  joins                  │                                      
                   
                                         │                                      
                   
                                         │             Filters on both join 
keys can be applied    
                                         │             at this level, which can 
be even more       
                                         ▼             effective as it avoids 
the work to create   
                                ┌────────────────┐     the intermediate output 
of HashJoin(2)   ─ ┐
                                │  HashJoin (1)  │     which is then filtered 
by HashJoin(1)       
                                │     d1.key =   │                              
                  │
                                │    f.d1_key    │                              
                   
                                └──┬─────────┬───┘                              
                  │
                                   │         │                                  
                   
                        ┌──────────┘         └────────────┐                     
                  │
                        │                                 │                     
                   
                        ▼                                 ▼                     
                  │
              ┌──────────────────┐               ┌────────────────┐             
                   
              │Scan: D1          │               │  HashJoin (2)  │             
                  │
              │filters:          │               │     d2.key =   │             
                   
              │  ...             │               │    f.d2_key    │             
                  │
              └──────────────────┘               └───┬─────────┬──┘             
                   
                                                     │         │                
                  │
                                         ┌───────────┘         └─────────────┐  
                   
                                         │                                   │  
                  │
                                         ▼                                   ▼  
                   
                                ┌────────────────┐                
┌─────────────────────┐         │
                                │Scan: D2        │                │Scan: F      
        │          
                                │filters:        │                │filters:     
        │         │
                                │  ...           │                │  f.d1_key 
IN (...)  │◀ ─ ─ ─ ─ 
                                └────────────────┘                │  f.d2_key 
IN (...)  │          
                                                                  │             
        │          
                                                                  
└─────────────────────┘          
   
   ```
   
   
   ### Describe alternatives you've considered
   
   Some version of this technique is described in "Bloom Filter Joins" in 
Spark:  https://issues.apache.org/jira/browse/SPARK-32268
   
   Building a seprate Bloom Filter has the nice property that you can 
distribute them in a networked cluster, however, the overhead of creating the 
bloom filter would likely be non trivial 
   
   ### Additional context
   
   See a description of how DataFusion HashJoins work here: 
https://github.com/apache/arrow-datafusion/pull/7953
   
   Here  is an industrial paper that describes industrial experience with using 
SIPS  techniques here: 
https://15721.courses.cs.cmu.edu/spring2020/papers/13-execution/shrinivas-icde2013.pdf
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to