This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d9924c9b8e [Improvement](topn) add limit threashold session variable
and fuzzy for topn optimizations (#16514)
d9924c9b8e is described below
commit d9924c9b8ec04c5c44c302f3a7638d4cc47148fc
Author: Kang <[email protected]>
AuthorDate: Fri Feb 10 12:56:33 2023 +0800
[Improvement](topn) add limit threashold session variable and fuzzy for
topn optimizations (#16514)
1. add limit threshold for topn runtime pushdown and key topn optimization
2. use unified session variable topn_opt_limit_threshold for all topn
optimizations
3. add fuzzy support for topn_opt_limit_threshold
---
docs/en/docs/advanced/variables.md | 4 ++
docs/zh-CN/docs/advanced/variables.md | 3 +
.../main/java/org/apache/doris/common/Config.java | 3 -
.../java/org/apache/doris/analysis/SelectStmt.java | 4 +-
.../org/apache/doris/planner/OlapScanNode.java | 6 ++
.../org/apache/doris/planner/OriginalPlanner.java | 2 +
.../java/org/apache/doris/qe/SessionVariable.java | 10 ++-
.../java/org/apache/doris/planner/PlannerTest.java | 80 +++++++++++++++++-----
8 files changed, 88 insertions(+), 24 deletions(-)
diff --git a/docs/en/docs/advanced/variables.md
b/docs/en/docs/advanced/variables.md
index 9725672cc3..b995bc3d44 100644
--- a/docs/en/docs/advanced/variables.md
+++ b/docs/en/docs/advanced/variables.md
@@ -571,3 +571,7 @@ Translated with www.DeepL.com/Translator (free version)
* `enable_file_cache`
Set wether to use block file cache. This variable takes effect only if the
BE config enable_file_cache=true. The cache is not used when BE config
enable_file_cache=false.
+
+* `topn_opt_limit_threshold`
+
+ Set threshold for limit of topn query (eg. SELECT * FROM t ORDER BY k
LIMIT n). If n <= threshold, topn optimizations(runtime predicate pushdown, two
phase result fetch and read order by key) will enable automatically, otherwise
disable. Default value is 1024.
diff --git a/docs/zh-CN/docs/advanced/variables.md
b/docs/zh-CN/docs/advanced/variables.md
index 4c8e8ee06e..dcbdb501e3 100644
--- a/docs/zh-CN/docs/advanced/variables.md
+++ b/docs/zh-CN/docs/advanced/variables.md
@@ -560,3 +560,6 @@ SELECT /*+ SET_VAR(query_timeout = 1,
enable_partition_cache=true) */ sleep(3);
控制是否启用block file
cache。该变量只有在be.conf中enable_file_cache=true时才有效,如果be.conf中enable_file_cache=false,则block
file cache一直处于禁用状态。
+* `topn_opt_limit_threshold`
+
+ 设置topn优化的limit阈值 (例如:SELECT * FROM t ORDER BY k LIMIT n).
如果limit的n小于等于阈值,topn相关优化(动态过滤下推、两阶段获取结果、按key的顺序读数据)会自动启用,否则会禁用。默认值是1024。
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index ef325d357c..6ef6184e81 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1969,9 +1969,6 @@ public class Config extends ConfigBase {
@ConfField(masterOnly = true, mutable = true)
public static int max_error_tablet_of_broker_load = 3;
- @ConfField(mutable = false)
- public static int topn_two_phase_limit_threshold = 512;
-
/**
* Used to set session variables randomly to check more issues in github
workflow
*/
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index a8b572d040..d0819885db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -708,8 +708,8 @@ public class SelectStmt extends QueryStmt {
// Only TOPN query at present
if (getOrderByElements() == null
|| !hasLimit()
- || getLimit() == 0
- || getLimit() >
ConnectContext.get().getSessionVariable().twoPhaseReadLimitThreshold) {
+ || getLimit() <= 0
+ || getLimit() >
ConnectContext.get().getSessionVariable().topnOptLimitThreshold) {
return false;
}
// Check order by exprs are all slot refs
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index ccc1d14b0e..2d6bd2d12e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -940,6 +940,12 @@ public class OlapScanNode extends ScanNode {
* Check Parent sort node can push down to child olap scan.
*/
public boolean checkPushSort(SortNode sortNode) {
+ // Ensure limit is less then threshold
+ if (sortNode.getLimit() <= 0
+ || sortNode.getLimit() >
ConnectContext.get().getSessionVariable().topnOptLimitThreshold) {
+ return false;
+ }
+
// Ensure all isAscOrder is same, ande length != 0.
// Can't be zorder.
if (sortNode.getSortInfo().getIsAscOrder().stream().distinct().count()
!= 1
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index 5f0abd125d..00328d1ef9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -448,6 +448,8 @@ public class OriginalPlanner extends Planner {
SortNode sortNode = (SortNode) node;
PlanNode child = sortNode.getChild(0);
if (child instanceof OlapScanNode && sortNode.getLimit() > 0
+ && ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable() != null
+ && sortNode.getLimit() <=
ConnectContext.get().getSessionVariable().topnOptLimitThreshold
&&
sortNode.getSortInfo().getMaterializedOrderingExprs().size() > 0) {
Expr firstSortExpr =
sortNode.getSortInfo().getMaterializedOrderingExprs().get(0);
if (firstSortExpr instanceof SlotRef &&
!firstSortExpr.getType().isStringType()
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index e85a4dcb33..28c2028226 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -259,7 +259,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String EXTERNAL_SORT_BYTES_THRESHOLD =
"external_sort_bytes_threshold";
public static final String ENABLE_TWO_PHASE_READ_OPT =
"enable_two_phase_read_opt";
- public static final String TWO_PHASE_READ_OPT_LIMIT_THRESHOLD =
"two_phase_read_opt_limit_threshold";
+ public static final String TOPN_OPT_LIMIT_THRESHOLD =
"topn_opt_limit_threshold";
+
public static final String ENABLE_FILE_CACHE = "enable_file_cache";
public static final String GROUP_BY_AND_HAVING_USE_ALIAS_FIRST =
"group_by_and_having_use_alias_first";
@@ -681,8 +682,8 @@ public class SessionVariable implements Serializable,
Writable {
// 2. spawn fetch RPC to other nodes to get related data by sorted rowids
@VariableMgr.VarAttr(name = ENABLE_TWO_PHASE_READ_OPT, fuzzy = true)
public boolean enableTwoPhaseReadOpt = true;
- @VariableMgr.VarAttr(name = TWO_PHASE_READ_OPT_LIMIT_THRESHOLD)
- public long twoPhaseReadLimitThreshold = 512;
+ @VariableMgr.VarAttr(name = TOPN_OPT_LIMIT_THRESHOLD)
+ public long topnOptLimitThreshold = 1024;
// Default value is false, which means the group by and having clause
// should first use column name not alias. According to mysql.
@@ -735,6 +736,9 @@ public class SessionVariable implements Serializable,
Writable {
// this.enableFoldConstantByBe = false;
// this.enableTwoPhaseReadOpt = true;
}
+
+ // set random 1, 10, 100, 1000, 10000
+ this.topnOptLimitThreshold = 10 ^ (random.nextInt(5));
}
public String printFuzzyVariables() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
index 2b5e0c3266..a23aab5d6c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
@@ -508,17 +508,53 @@ public class PlannerTest extends TestWithFeService {
@Test
public void testPushSortToOlapScan() throws Exception {
- // Push sort successfully
- String sql1 = "explain select k1 from db1.tbl1 order by k1, k2";
+ // Push sort fail without limit
+ String sql1 = "explain select k1 from db1.tbl3 order by k1, k2";
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
String plan1 = planner1.getExplainString(new ExplainOptions(false,
false));
+ Assertions.assertFalse(plan1.contains("SORT INFO:\n `k1`\n
`k2`"));
+ Assertions.assertFalse(plan1.contains("SORT LIMIT:"));
+ Assertions.assertFalse(plan1.contains("TOPN OPT"));
+
+ // Push sort fail limit > topnOptLimitThreshold
+ sql1 = "explain select k1 from db1.tbl3 order by k1, k2 limit "
+ + (connectContext.getSessionVariable().topnOptLimitThreshold + 1);
+ stmtExecutor1 = new StmtExecutor(connectContext, sql1);
+ stmtExecutor1.execute();
+ planner1 = stmtExecutor1.planner();
+ plan1 = planner1.getExplainString(new ExplainOptions(false, false));
+ Assertions.assertFalse(plan1.contains("SORT INFO:\n `k1`\n
`k2`"));
+ Assertions.assertFalse(plan1.contains("SORT LIMIT:"));
+ Assertions.assertFalse(plan1.contains("TOPN OPT"));
+
+ // Push sort success limit = topnOptLimitThreshold
+ sql1 = "explain select k1 from db1.tbl3 order by k1, k2 limit "
+ + (connectContext.getSessionVariable().topnOptLimitThreshold);
+ stmtExecutor1 = new StmtExecutor(connectContext, sql1);
+ stmtExecutor1.execute();
+ planner1 = stmtExecutor1.planner();
+ plan1 = planner1.getExplainString(new ExplainOptions(false, false));
Assertions.assertTrue(plan1.contains("SORT INFO:\n `k1`\n
`k2`"));
Assertions.assertTrue(plan1.contains("SORT LIMIT:"));
+ Assertions.assertTrue(plan1.contains("TOPN OPT"));
+
+ // Push sort success limit < topnOptLimitThreshold
+ if (connectContext.getSessionVariable().topnOptLimitThreshold > 1) {
+ sql1 = "explain select k1 from db1.tbl3 order by k1, k2 limit "
+ + (connectContext.getSessionVariable().topnOptLimitThreshold -
1);
+ stmtExecutor1 = new StmtExecutor(connectContext, sql1);
+ stmtExecutor1.execute();
+ planner1 = stmtExecutor1.planner();
+ plan1 = planner1.getExplainString(new ExplainOptions(false,
false));
+ Assertions.assertTrue(plan1.contains("SORT INFO:\n `k1`\n
`k2`"));
+ Assertions.assertTrue(plan1.contains("SORT LIMIT:"));
+ Assertions.assertTrue(plan1.contains("TOPN OPT"));
+ }
// Push sort failed
- String sql2 = "explain select k1, k2, k3 from db1.tbl1 order by k1,
k3, k2";
+ String sql2 = "explain select k1, k2, k3 from db1.tbl3 order by k1,
k3, k2";
StmtExecutor stmtExecutor2 = new StmtExecutor(connectContext, sql2);
stmtExecutor2.execute();
Planner planner2 = stmtExecutor2.planner();
@@ -529,9 +565,10 @@ public class PlannerTest extends TestWithFeService {
@Test
public void testEliminatingSortNode() throws Exception {
- // fail case 1
+ // success case 1
{
- String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 order
by k1, k2";
+ String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 order
by k1, k2 limit "
+ +
connectContext.getSessionVariable().topnOptLimitThreshold;
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext,
sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
@@ -542,7 +579,8 @@ public class PlannerTest extends TestWithFeService {
// fail case 2
{
- String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k3
= 2 order by k1, k2";
+ String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k3
= 2 order by k1, k2 limit "
+ +
connectContext.getSessionVariable().topnOptLimitThreshold;
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext,
sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
@@ -553,7 +591,8 @@ public class PlannerTest extends TestWithFeService {
// fail case 3
{
- String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2
!= 2 order by k1, k2";
+ String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2
!= 2 order by k1, k2 limit "
+ +
connectContext.getSessionVariable().topnOptLimitThreshold;
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext,
sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
@@ -564,7 +603,8 @@ public class PlannerTest extends TestWithFeService {
// fail case 4
{
- String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 or k2
= 2 order by k1, k2";
+ String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 or k2
= 2 order by k1, k2 limit "
+ +
connectContext.getSessionVariable().topnOptLimitThreshold;
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext,
sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
@@ -575,7 +615,8 @@ public class PlannerTest extends TestWithFeService {
// fail case 5
{
- String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2
= 2 or k3 = 3 order by k1, k2";
+ String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2
= 2 or k3 = 3 order by k1, k2 limit "
+ +
connectContext.getSessionVariable().topnOptLimitThreshold;
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext,
sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
@@ -587,7 +628,8 @@ public class PlannerTest extends TestWithFeService {
// fail case 6
// TODO, support: in (select 1)
{
- String sql1 = "explain select k1 from db1.tbl1 where k1 in (select
1) and k2 = 2 order by k1, k2";
+ String sql1 = "explain select k1 from db1.tbl1 where k1 in (select
1) and k2 = 2 order by k1, k2 limit "
+ +
connectContext.getSessionVariable().topnOptLimitThreshold;
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext,
sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
@@ -597,7 +639,8 @@ public class PlannerTest extends TestWithFeService {
// fail case 7
{
- String sql1 = "explain select k1 from db1.tbl1 where k1 not in (1)
and k2 = 2 order by k1, k2";
+ String sql1 = "explain select k1 from db1.tbl1 where k1 not in (1)
and k2 = 2 order by k1, k2 limit "
+ +
connectContext.getSessionVariable().topnOptLimitThreshold;
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext,
sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
@@ -607,7 +650,8 @@ public class PlannerTest extends TestWithFeService {
// success case 1
{
- String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2
= 2 order by k1, k2";
+ String sql1 = "explain select k1 from db1.tbl1 where k1 = 1 and k2
= 2 order by k1, k2 limit "
+ +
connectContext.getSessionVariable().topnOptLimitThreshold;
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext,
sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
@@ -618,7 +662,8 @@ public class PlannerTest extends TestWithFeService {
// success case 2
{
- String sql1 = "explain select k1 from db1.tbl1 where k3 = 3 and k2
= 2 and k1 = 1 order by k1, k2";
+ String sql1 = "explain select k1 from db1.tbl1 where k3 = 3 and k2
= 2 and k1 = 1 order by k1, k2 limit "
+ +
connectContext.getSessionVariable().topnOptLimitThreshold;
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext,
sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
@@ -629,7 +674,8 @@ public class PlannerTest extends TestWithFeService {
// success case 3
{
- String sql1 = "explain select k1 from db1.tbl1 where k1 in (1) and
k2 in (2) and k2 !=2 order by k1, k2";
+ String sql1 = "explain select k1 from db1.tbl1 where k1 in (1) and
k2 in (2) and k2 !=2 order by k1, k2 limit "
+ +
connectContext.getSessionVariable().topnOptLimitThreshold;
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext,
sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
@@ -640,7 +686,8 @@ public class PlannerTest extends TestWithFeService {
// success case 4
{
- String sql1 = "explain select k1 from db1.tbl1 where k1 in
(concat('1','2')) and k2 = 2 order by k1, k2";
+ String sql1 = "explain select k1 from db1.tbl1 where k1 in
(concat('1','2')) and k2 = 2 order by k1, k2 limit "
+ +
connectContext.getSessionVariable().topnOptLimitThreshold;
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext,
sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
@@ -652,7 +699,8 @@ public class PlannerTest extends TestWithFeService {
// success case 5
{
String sql1 = "explain select tbl1.k1 from db1.tbl1 join db1.tbl2
on tbl1.k1 = tbl2.k1"
- + " where tbl1.k1 = 1 and tbl2.k1 = 2 and tbl1.k2 = 3
order by tbl1.k1, tbl2.k1";
+ + " where tbl1.k1 = 1 and tbl2.k1 = 2 and tbl1.k2 = 3
order by tbl1.k1, tbl2.k1 limit "
+ +
connectContext.getSessionVariable().topnOptLimitThreshold;
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext,
sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]