stdpain opened a new issue #6075:
URL: https://github.com/apache/incubator-doris/issues/6075


   # What is Runtime Filter?
   Suppose you have the following SQL to execute:
   ```
   select * from table1 join table2 on table1.id = table2.id;
   ```
   The execution plan usually determines the size of the two tables, and then 
lets the big table join the small table, if nothing is optimized then the big 
table on the left needs to sweep all
   If nothing is optimized, the big table on the left needs to sweep all the 
data, then send it to joinnode, and then go to join one by one, which wastes a 
lot of overhead in the middle. For example, network IO overhead.
   
   
![image](https://user-images.githubusercontent.com/34912776/122856207-fff0a880-d348-11eb-9b7f-d4e21fecc3d2.png)
   
   If we scan the small table first, and then get some information of the small 
table, such as the upper and lower bounds, or build a bloomfilter, this can make
   The big table on the left can filter some data in advance, or even scan a 
lot less data (by the upper and lower bounds we can know which partitions don't 
need to be swept, and if some columns are ordered, you can also use the index 
to filter.
   
   
![image](https://user-images.githubusercontent.com/34912776/122856310-29113900-d349-11eb-9d0a-24c86c76ddf6.png)
   
   ## The current runtime filter of Doris
   Currently Doris has runtimefilter-like logic, where the hash_join inside 
tries to build the In expressions and then push down to scan_node.
   Also, shuffle join pushdown is not supported.
   
   ## How Doris implements RuntimeFilter
   Step 1: Push down a RuntimeFilter that supports broadcast join. It is better 
to push it down as an expression, and later we can take advantage of our 
storage primer to do some optimization.
   The first step is to implement a RuntimeFilter that supports broadcast join.
   There is no adjuster involved in broadcast join. We don't need to design a 
lot of classes, so we can reuse the existing logic and finish the development 
quickly (currently
   bloomfilter and minmaxFilter are finished and being tested)
   Step 2: Support shuffle join
   Doris BE doesn't have a Coordinator, so we need to push all the filters 
built by JoinNode to scan_node in this part.
   and do the aggregation of filters when scan_node.
   Filter must be aggregated, not aggregated will cause scan_node to lose data 
when scanning.
   
   ## Some components
   RuntimeFilterMgr A component that manages the RuntimeFilter (all filters are 
managed by this one), with a lifecycle strongly bound to
   The lifecycle is strongly bound to RuntimeState. All filters in a 
fragment_instance need to be registered to this class, and a
   Filter is also retrieved from this class.
   Why this is strongly bound to the RuntimeState lifecycle 。。。。 Because when 
we update Filter via rpc, we need to
   We need to locate this class by a fragment_instance_id and then assign the 
value to it.
   RuntimeFilterMergeControllerEntity The naming of this class is not very 
friendly, it can be understood as the context for merging nodes.
   The life cycle can be temporarily understood as the entire phase is valid 
(you can think of this query is not stopped, this will always be in)
   FE must select the last destroyed sink node as the merge node. Each time
   It will store query-id -> weak_this in controller, and then store shared_ptr 
in each fragment_state
   in each fragment_state. When all the strong reference pointers are 
destroyed. This will be automatically destroyed from the map (override close)
   
   RuntimeFilterMergeController The controller used for Filter merging, all 
Filter production segments. The life cycle is strongly bound to
   FragmentMgr strong binding.
   
   Filter Construction
   RuntimeFilter is built in HashJoinNode.
   The specific type of build, and some other properties, as well as build_expr 
are obtained in FE.
   Since expression computation in doris is currently a rather unwieldy 
operation, FE needs to provide an expr_order attribute to represent that this 
is the
   the first join condition
   For example A.a = B.a and A.b + 1 = B.b and A.c = B.c*2+1
   ```
   Here the prob_expr is [SlotRef(A.a),BinaryPredicate(A.a,1),SlotRef(A.c)]
   Here the build_expr is [SlotRef(B.a),SlotRef(B.b),XXXPredicate]
   If we build two RuntimeFilters on the equivalence condition that A.b + 1 = 
B.b, we can circumvent this
   expr_order to circumvent one expression computation.
   ```
   ## Filter merge
   The current types of joins that can generate RuntimeFilter are BoardCast 
Join Shuffle Join and Colocation Join
   
   | join type  | whether to merge | runtime filter role point                  
                  |
   | ---------- | ---------------- | 
------------------------------------------------------------ |
   | BoardCast  | no               | may be remote or local (on top of the same 
fragment_instance) |
   | Shuffle    | yes              | Remote                                     
                  |
   | Colocation | no               | the same fragment_instance                 
                  |
   
   BoardCast Join can theoretically be sent directly to the role node when the 
role point is remote, but for convenience, it is sent directly to the merge 
node.
   to the merge node, and then the merge node sends it to the node that needs 
to receive it.
   
   ## Filter apply
   The current logic is that the right table scan_node will try to wait for a 
while in the open phase, and if it does, it will try to push the filter down to 
the storage primer.
   engine. If the configuration is to wait at most 1s per filter, and there are 
3 filters on a particular scan_node, then it will wait at most 3s.
   But not all conditions will be pushed to the storage engine, for example the 
following sql:
   The wait here is the await nodify implementation
   In this case, since the equal conditions are prob_expr: CastExpr(SlotExpr()) 
, build_expr: SlotExpr() , this will not
   be pushed to the storage engine level, but can be filtered out in the 
scan_node expression filtering phase
   here the build_expr is [SlotRef(A.a),BinaryPredicate(A.a,1),SlotRef(A.c)]
   Here the build_expr is [SlotRef(B.a),SlotRef(B.b),XXXPredicate]
   If we build two RuntimeFilters on the equivalence condition that A.b + 1 = 
B.b, we can circumvent this
   expr_order to circumvent one expression calculation.
   ```
   select count(*) from line_order join date where line_order.date=date.date_str
   and date.year=1993
   -- type line_order.date -> int
   -- type date.date_str -> varchar If it doesn't wait in the open phase, then 
each will check if the filter is ready at the start of each 
   ```
   If it doesn't wait in the open phase, then each will check if the filter is 
ready at the start of each data scan phase, and if If it is ready, the filter 
will be used, but this time it will not try to push down to the storage engine, 
but will only perform the expression calculation filter
   
   # reference
   https://bbs.huaweicloud.com/blogs/174769
   
https://impala.apache.org/docs/build/html/topics/impala_runtime_filtering.html#runtime_filtering_file_formats
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to