stdpain opened a new issue #6075: URL: https://github.com/apache/incubator-doris/issues/6075
# What is Runtime Filter? Suppose you have the following SQL to execute: ``` select * from table1 join table2 on table1.id = table2.id; ``` The execution plan usually determines the size of the two tables, and then lets the big table join the small table, if nothing is optimized then the big table on the left needs to sweep all If nothing is optimized, the big table on the left needs to sweep all the data, then send it to joinnode, and then go to join one by one, which wastes a lot of overhead in the middle. For example, network IO overhead.  If we scan the small table first, and then get some information of the small table, such as the upper and lower bounds, or build a bloomfilter, this can make The big table on the left can filter some data in advance, or even scan a lot less data (by the upper and lower bounds we can know which partitions don't need to be swept, and if some columns are ordered, you can also use the index to filter.  ## The current runtime filter of Doris Currently Doris has runtimefilter-like logic, where the hash_join inside tries to build the In expressions and then push down to scan_node. Also, shuffle join pushdown is not supported. ## How Doris implements RuntimeFilter Step 1: Push down a RuntimeFilter that supports broadcast join. It is better to push it down as an expression, and later we can take advantage of our storage primer to do some optimization. The first step is to implement a RuntimeFilter that supports broadcast join. There is no adjuster involved in broadcast join. We don't need to design a lot of classes, so we can reuse the existing logic and finish the development quickly (currently bloomfilter and minmaxFilter are finished and being tested) Step 2: Support shuffle join Doris BE doesn't have a Coordinator, so we need to push all the filters built by JoinNode to scan_node in this part. and do the aggregation of filters when scan_node. Filter must be aggregated, not aggregated will cause scan_node to lose data when scanning. ## Some components RuntimeFilterMgr A component that manages the RuntimeFilter (all filters are managed by this one), with a lifecycle strongly bound to The lifecycle is strongly bound to RuntimeState. All filters in a fragment_instance need to be registered to this class, and a Filter is also retrieved from this class. Why this is strongly bound to the RuntimeState lifecycle 。。。。 Because when we update Filter via rpc, we need to We need to locate this class by a fragment_instance_id and then assign the value to it. RuntimeFilterMergeControllerEntity The naming of this class is not very friendly, it can be understood as the context for merging nodes. The life cycle can be temporarily understood as the entire phase is valid (you can think of this query is not stopped, this will always be in) FE must select the last destroyed sink node as the merge node. Each time It will store query-id -> weak_this in controller, and then store shared_ptr in each fragment_state in each fragment_state. When all the strong reference pointers are destroyed. This will be automatically destroyed from the map (override close) RuntimeFilterMergeController The controller used for Filter merging, all Filter production segments. The life cycle is strongly bound to FragmentMgr strong binding. Filter Construction RuntimeFilter is built in HashJoinNode. The specific type of build, and some other properties, as well as build_expr are obtained in FE. Since expression computation in doris is currently a rather unwieldy operation, FE needs to provide an expr_order attribute to represent that this is the the first join condition For example A.a = B.a and A.b + 1 = B.b and A.c = B.c*2+1 ``` Here the prob_expr is [SlotRef(A.a),BinaryPredicate(A.a,1),SlotRef(A.c)] Here the build_expr is [SlotRef(B.a),SlotRef(B.b),XXXPredicate] If we build two RuntimeFilters on the equivalence condition that A.b + 1 = B.b, we can circumvent this expr_order to circumvent one expression computation. ``` ## Filter merge The current types of joins that can generate RuntimeFilter are BoardCast Join Shuffle Join and Colocation Join | join type | whether to merge | runtime filter role point | | ---------- | ---------------- | ------------------------------------------------------------ | | BoardCast | no | may be remote or local (on top of the same fragment_instance) | | Shuffle | yes | Remote | | Colocation | no | the same fragment_instance | BoardCast Join can theoretically be sent directly to the role node when the role point is remote, but for convenience, it is sent directly to the merge node. to the merge node, and then the merge node sends it to the node that needs to receive it. ## Filter apply The current logic is that the right table scan_node will try to wait for a while in the open phase, and if it does, it will try to push the filter down to the storage primer. engine. If the configuration is to wait at most 1s per filter, and there are 3 filters on a particular scan_node, then it will wait at most 3s. But not all conditions will be pushed to the storage engine, for example the following sql: The wait here is the await nodify implementation In this case, since the equal conditions are prob_expr: CastExpr(SlotExpr()) , build_expr: SlotExpr() , this will not be pushed to the storage engine level, but can be filtered out in the scan_node expression filtering phase here the build_expr is [SlotRef(A.a),BinaryPredicate(A.a,1),SlotRef(A.c)] Here the build_expr is [SlotRef(B.a),SlotRef(B.b),XXXPredicate] If we build two RuntimeFilters on the equivalence condition that A.b + 1 = B.b, we can circumvent this expr_order to circumvent one expression calculation. ``` select count(*) from line_order join date where line_order.date=date.date_str and date.year=1993 -- type line_order.date -> int -- type date.date_str -> varchar If it doesn't wait in the open phase, then each will check if the filter is ready at the start of each ``` If it doesn't wait in the open phase, then each will check if the filter is ready at the start of each data scan phase, and if If it is ready, the filter will be used, but this time it will not try to push down to the storage engine, but will only perform the expression calculation filter # reference https://bbs.huaweicloud.com/blogs/174769 https://impala.apache.org/docs/build/html/topics/impala_runtime_filtering.html#runtime_filtering_file_formats -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
