hf200012 commented on a change in pull request #6154:
URL: https://github.com/apache/incubator-doris/pull/6154#discussion_r663733418
##########
File path: docs/en/administrator-guide/runtime-filter.md
##########
@@ -0,0 +1,223 @@
+---
+{
+ "title": "Runtime Filter",
+ "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Runtime Filter
+Runtime Filter is a new feature officially added in Doris 0.15. It is designed
to dynamically generate filter conditions for certain Join queries at runtime
to reduce the amount of scanned data, avoid unnecessary I/O and network
transmission, and speed up the query.
+
+It's design, implementation and effects, please refer to [ISSUE
6116](https://github.com/apache/incubator-doris/issues/6116).
+
+## Noun Interpretation
+* FE: Frontend, the front-end node of Doris. Responsible for metadata
management and request access.
+* BE: Backend, the back-end node of Doris. Responsible for query execution and
data storage.
+* Left table: the table on the left during Join query. Perform Probe
operation. The order can be adjusted by Join Reorder.
+* Right table: the table on the right during Join query. Perform the Build
operation. The order can be adjusted by Join Reorder.
+* Fragment: FE will convert the execution of specific SQL statements into
corresponding fragments and send them to BE for execution. The corresponding
Fragment is executed on the BE, and the results are aggregated and returned to
the FE.
+* Join on clause: `Aa=Bb` in `A join B on Aa=Bb`, based on this to generate
join conjuncts during query planning, including expr used by join Build and
Probe, where Build expr is called in Runtime Filter src expr, Probe expr are
called target expr in Runtime Filter.
+
+## Principle
+Runtime Filter is generated during query planning, constructed in
HashJoinNode, and applied in ScanNode.
+
+For example, there is currently a Join query between the T1 table and the T2
table. Its Join mode is HashJoin. T1 is a fact table with 100,000 rows of data.
T2 is a dimension table with 100 rows of data. Doris join The actual situation
is:
+```
+| > HashJoinNode <
+| | |
+| | 100000 | 2000
+| | |
+| OlapScanNode OlapScanNode
+| ^ ^
+| | 100000 | 2000
+| T1 T2
+|
+```
+Obviously, scanning data for T2 is much faster than T1. If we take the
initiative to wait for a while and then scan T1, after T2 sends the scanned
data record to HashJoinNode, HashJoinNode calculates a filter condition based
on the data of T2, such as the maximum value of T2 data And the minimum value,
or build a Bloom Filter, and then send this filter condition to ScanNode
waiting to scan T1, the latter applies this filter condition and delivers the
filtered data to HashJoinNode, thereby reducing the number of probe hash tables
and network overhead. This filter condition is Runtime Filter, and the effect
is as follows:
+```
+| > HashJoinNode <
+| | |
+| | 6000 | 2000
+| | |
+| OlapScanNode OlapScanNode
+| ^ ^
+| | 100000 | 2000
+| T1 T2
+|
+```
+If the filter condition (Runtime Filter) can be pushed down to the storage
engine, in some cases, the index can be used to directly reduce the amount of
scanned data, thereby greatly reducing the scanning time. The effect is as
follows:
+```
+| > HashJoinNode <
+| | |
+| | 6000 | 2000
+| | |
+| OlapScanNode OlapScanNode
+| ^ ^
+| | 6000 | 2000
+| T1 T2
+|
+```
+It can be seen that, unlike predicate push-down and partition cutting, Runtime
Filter is a filter condition dynamically generated at runtime, that is, when
the query is run, the join on clause is parsed to determine the filter
expression, and the expression is broadcast to ScanNode that is reading the
left table , Thereby reducing the amount of scanned data, thereby reducing the
number of probe hash table, avoiding unnecessary I/O and network transmission.
+
+Runtime Filter is mainly used to optimize joins for large tables. If the
amount of data in the left table is too small, or the amount of data in the
right table is too large, the Runtime Filter may not achieve the expected
effect.
+
+## Usage
+
+### Set session variable
+In addition to `runtime_filter_type`, other query options are used to adjust
the Runtime Filter to achieve the best performance in a specific scenario,
usually only after the performance test, to optimize the query that is
resource-intensive, takes a long enough time to run, and has a high enough
frequency .
+
+#### 1.runtime_filter_type
+The types of Runtime Filter used include Bloom Filter, MinMax Filter, and IN
predicate. When multiple types are used, they are separated by commas. Note
that you need to add quotation marks. For example:
+```
+set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";
+```
+By default, only IN predicate is used conservatively. In some cases, the
performance is higher when Bloom Filter, MinMax Filter, and IN predicate are
used at the same time.
+
+- **Bloom Filter**: There is a certain misjudgment rate, resulting in filtered
data less than expected, but it will not lead to inaccurate final results. In
most cases, Bloom Filter can improve performance or have no significant impact
on performance , But in some cases it will cause performance degradation.
+ - Bloom Filter construction and application overhead is high, so when the
filtering rate is low, or the amount of data in the left table is small, Bloom
Filter may cause performance degradation.
+ - At present, only the Key column of the left table can be pushed down to
the storage engine if the Bloom Filter is applied, and the test results show
that the performance of the Bloom Filter is not pushed down to the storage
engine.
+ - Currently Bloom Filter only has short-circuit logic when using
expression filtering on ScanNode, that is, when the false positive rate is too
high, the Bloom Filter will not continue to be used, but there is no
short-circuit logic when the Bloom Filter is pushed down to the storage engine
, So when the filtration rate is low, it may cause performance degradation.
+- **MinMax Filter**: Contains the maximum value and the minimum value, thereby
filtering data smaller than the minimum value and greater than the maximum
value. The filtering effect of the MinMax Filter is related to the type of the
Key column in the join on clause and the data distribution of the left and
right tables.
+ - When the type of the Key column in the join on clause is
int/bigint/double, etc., in extreme cases, if the maximum and minimum values
of the left and right tables are the same, there is no effect, otherwise the
maximum value of the right table is less than the minimum value of the left
table, or the minimum of the right table The value is greater than the maximum
value in the left table, the effect is best.
+ - When the type of the Key column in the join on clause is varchar, etc.,
applying the MinMax Filter will often cause performance degradation.
+- **IN predicate**: Construct the IN predicate based on all the values of
the Key listed in the join on clause on the right table, and use the
constructed IN predicate to filter on the left table. Compared with the loom
filter, the cost of construction and application is lower. The table on the
right tends to have higher performance when the amount of data is small.
+ - By default, only the number of data rows in the right table is less than
1024 will be pushed down (can be adjusted by the `runtime_filter_max_in_num` in
the session variable).
+ - Currently IN predicate does not implement a merge method, that is, it
cannot be pushed down across Fragments, so currently when it is necessary to
push down to the ScanNode of the left table of shuffle join, if Bloom Filter is
not generated, then we will convert IN predicate to Bloom Filter for Process
pushdown across Fragments, so even if the type only selects IN predicate, Bloom
Filter may actually be applied;
+
+#### 2.runtime_filter_mode
+It is used to adjust the push-down strategy of Runtime Filter, including two
strategies, LOCAL and GLOBAL, and the default setting is GLOBAL strategy.
+
+LOCAL: Relatively conservative, the constructed Runtime Filter can only be
used in the same Fragment on the same instance (the smallest unit of query
execution), that is, the Runtime Filter producer (the HashJoinNode that
constructs the Filter) and the consumer (the ScanNode that uses the
RuntimeFilter) The same Fragment, such as the general scene of broadcast join;
+
+GLOBAL: Relatively radical. In addition to satisfying the scenario of the
LOCAL strategy, the Runtime Filter can also be combined and transmitted to
different Fragments on different instances via the network. For example,
Runtime Filter producers and consumers are in different Fragments, such as
shuffle join. The query can be optimized in a wider range of scenarios.
+
+When building and applying Runtime Filters on different Fragments, the reasons
and strategies for merging Runtime Filters can be found in [ISSUE
6116](https://github.com/apache/incubator-doris/issues/6116)
+
+#### 3.runtime_filter_wait_time_ms
+After the Runtime Filter is turned on, the ScanNode that joins the left table
will wait for a period of time for each Runtime Filter assigned to itself and
then scan the data. The default wait is 1s (1000ms, in milliseconds), that is,
if the ScanNode is assigned 3 Runtime Filters , Then it will wait at most 3s.
+
+Because it takes time to build and merge the Runtime Filter, ScanNode will try
to push down the Runtime Filter that arrives within the waiting time to the
storage engine. If the waiting time is exceeded, ScanNode will directly start
scanning data using the Runtime Filter that has arrived.
+
+If the Runtime Filter arrives after ScanNode starts scanning, ScanNode will
not push the Runtime Filter down to the storage engine. Instead, it will use
expression filtering on ScanNode based on the Runtime Filter for the data that
has been scanned from the storage engine. The scanned data will not apply the
Runtime Filter, so the intermediate data size obtained will be larger than the
optimal solution, but serious cracking can be avoided.
+
+If the cluster is busy and there are many resource-intensive or
long-time-consuming queries on the cluster, consider increasing the waiting
time to avoid missing optimization opportunities for complex queries. If the
cluster load is light, and there are many small queries on the cluster that
only take a few seconds, you can consider reducing the waiting time to avoid an
increase of 1s for each query.
+
+#### 4.runtime_filters_max_num
+The maximum number of Bloom Filters in the Runtime Filter that can be applied
to each query, the default is 10. Because Bloom Filter construction and
application costs are high, so if the generated Bloom Filter exceeds the
maximum allowed number, the Bloom Filter with large selectivity is retained.
Currently, only the number of Bloom Filters is limited, because compared to
MinMax Filter and IN predicate It is more expensive to build and apply.
+```
+Selectivity = (HashJoinNode Cardinality / HashJoinNode left child Cardinality)
+```
+Because the cardinality of FE is currently inaccurate, the selectivity of
Bloom Filter calculation here is inaccurate, so in the end it may only randomly
reserve part of Bloom Filter.
+
+#### 5. Bloom Filter related parameters
+- `runtime_bloom_filter_min_size`: the minimum length of Bloom Filter in
Runtime Filter (in bytes), the default is 1048576 (1M);
+
+- `runtime_bloom_filter_max_size`: the maximum length of Bloom Filter in
Runtime Filter (in bytes), the default is 16777216 (16M);
+
+- `runtime_bloom_filter_size`: The default length of Bloom Filter in Runtime
Filter (in bytes), the default is 2097152 (2M);
+
+Because it is necessary to ensure that the length of the Bloom Filter
constructed by each HashJoinNode is the same to be merged, the length of the
Bloom Filter is currently calculated in the FE query planning.
+
+If you can get the number of data rows (Cardinality) in the statistical
information of the join table on the right, you will try to use Cardinality as
NDV, the default false detection rate fpp is 0.05, and the Bloom that contains
NDV unique elements and the false detection rate is lower than fpp is
calculated by the formula The minimum number of bytes required by the filter,
rounded to the nearest power of 2 (log value with 2 as the base), and limits
the upper and lower limits of the length of the final Bloom Filter.
+
+If the Cardinality of the join right table is not available, the default Bloom
Filter length will be used.
+
+#### 6.runtime_filter_max_in_num
+If the number of rows in the right table of the join is greater than this
value, we will not generate an IN predicate, and the default is 1024;
+
+### View Runtime Filter generated by query
+The query plan that can be displayed by the `explain` command includes the
join on clause information used by each Fragment, as well as comments on the
generation and use of the Runtime Filter by the Fragment, so as to confirm
whether the Runtime Filter is applied to the desired join on clause.
+- The comment contained in the Fragment that generates the Runtime Filter,
such as `runtime filters: filter_id[type] <- table.column`.
+- Use the comment contained in the fragment of Runtime Filter such as `runtime
filters: filter_id[type] -> table.column`.
+
+The query in the following example uses a Runtime Filter with ID RF000.
+```
+CREATE TABLE test (t1 INT) DISTRIBUTED BY HASH (t1) BUCKETS 2
PROPERTIES("replication_num" = "1");
+INSERT INTO test VALUES (1), (2), (3), (4);
+
+CREATE TABLE test2 (t2 INT) DISTRIBUTED BY HASH (t2) BUCKETS 2
PROPERTIES("replication_num" = "1");
+INSERT INTO test2 VALUES (3), (4), (5);
+
+EXPLAIN SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2;
++-------------------------------------------------------------------+
+| Explain String |
++-------------------------------------------------------------------+
+| PLAN FRAGMENT 0 |
+| OUTPUT EXPRS:`t1` |
+| |
+| 4:EXCHANGE |
+| |
+| PLAN FRAGMENT 1 |
+| OUTPUT EXPRS: |
+| PARTITION: HASH_PARTITIONED: `default_cluster:ssb`.`test`.`t1` |
+| |
+| 2:HASH JOIN |
+| | join op: INNER JOIN (BUCKET_SHUFFLE) |
+| | equal join conjunct: `test`.`t1` = `test2`.`t2` |
+| | runtime filters: RF000[in] <- `test2`.`t2` |
+| | |
+| |----3:EXCHANGE |
+| | |
+| 0:OlapScanNode |
+| TABLE: test |
+| runtime filters: RF000[in] -> `test`.`t1` |
+| |
+| PLAN FRAGMENT 2 |
+| OUTPUT EXPRS: |
+| PARTITION: HASH_PARTITIONED: `default_cluster:ssb`.`test2`.`t2` |
+| |
+| 1:OlapScanNode |
+| TABLE: test2 |
++-------------------------------------------------------------------+
+-- 上面`runtime filters`的行显示了`PLAN FRAGMENT 1`的`2:HASH JOIN`生成了ID为RF000的IN
predicate,
+-- 其中`test2`.`t2`的key values仅在运行时可知,
+-- 在`0:OlapScanNode`使用了该IN predicate用于在读取`test`.`t1`时过滤不必要的数据。
+
+SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2;
+-- 返回2行结果[3, 4];
+
+-- 通过query的profile(set is_report_success=true;)可以查看查询内部工作的详细信息,
+-- 包括每个Runtime Filter是否下推、等待耗时、以及OLAP_SCAN_NODE从prepare到接收到Runtime Filter的总时长。
+RuntimeFilter:in:
+ - HasPushDownToEngine: true
+ - AWaitTimeCost: 0ns
+ - EffectTimeCost: 2.76ms
+
+-- 此外,在profile的OLAP_SCAN_NODE中还可以查看Runtime Filter下推后的过滤效果和耗时。
+ - RowsVectorPredFiltered: 9.320008M (9320008)
+ - VectorPredEvalTime: 364.39ms
+```
+
Review comment:
It should be English here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]