This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 07207b7b515 [feature](shuffle) enable strict consistency dml by
default (#32958) (#34641)
07207b7b515 is described below
commit 07207b7b515e81e40219d449f8a243e540dbd0c1
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri May 10 14:31:50 2024 +0800
[feature](shuffle) enable strict consistency dml by default (#32958)
(#34641)
---
.../main/java/org/apache/doris/common/Config.java | 6 ++++++
.../plans/physical/PhysicalOlapTableSink.java | 7 +++++++
.../java/org/apache/doris/qe/SessionVariable.java | 10 +++++++++-
.../java/org/apache/doris/qe/StmtExecutor.java | 2 ++
.../java/org/apache/doris/planner/PlannerTest.java | 22 +++++++++++++++++++++-
5 files changed, 45 insertions(+), 2 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 9973de61be6..00344e4934e 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -636,6 +636,12 @@ public class Config extends ConfigBase {
varType = VariableAnnotation.EXPERIMENTAL)
public static boolean enable_single_replica_load = false;
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "对于 tablet 数量小于该数目的 DUPLICATE KEY 表,将不会启用 shuffle",
+ "Shuffle won't be enabled for DUPLICATE KEY tables if its tablet
num is lower than this number"},
+ varType = VariableAnnotation.EXPERIMENTAL)
+ public static int min_tablets_for_dup_table_shuffle = 64;
+
@ConfField(mutable = true, masterOnly = true, description = {
"单个数据库最大并发运行的事务数,包括 prepare 和 commit 事务。",
"Maximum concurrent running txn num including prepare, commit txns
under a single db.",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java
index 04cdc347db7..fee098ce166 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java
@@ -21,8 +21,10 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.RandomDistributionInfo;
+import org.apache.doris.common.Config;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
@@ -199,6 +201,11 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends
Plan> extends PhysicalTabl
if (targetTable.isPartitionDistributed()) {
DistributionInfo distributionInfo =
targetTable.getDefaultDistributionInfo();
if (distributionInfo instanceof HashDistributionInfo) {
+ // Do not enable shuffle for duplicate key tables when its
tablet num is less than threshold.
+ if (targetTable.getKeysType() == KeysType.DUP_KEYS
+ && distributionInfo.getBucketNum() <
Config.min_tablets_for_dup_table_shuffle) {
+ return PhysicalProperties.ANY;
+ }
return PhysicalProperties.TABLET_ID_SHUFFLE;
} else if (distributionInfo instanceof RandomDistributionInfo) {
return PhysicalProperties.ANY;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 207da09a430..8d345d39f44 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -893,7 +893,7 @@ public class SessionVariable implements Serializable,
Writable {
public boolean enableNereidsDmlWithPipeline = true;
@VariableMgr.VarAttr(name = ENABLE_STRICT_CONSISTENCY_DML, needForward =
true)
- public boolean enableStrictConsistencyDml = false;
+ public boolean enableStrictConsistencyDml = true;
@VariableMgr.VarAttr(name = ENABLE_VECTORIZED_ENGINE, varType =
VariableAnnotation.EXPERIMENTAL_ONLINE)
public boolean enableVectorizedEngine = true;
@@ -3457,6 +3457,14 @@ public class SessionVariable implements Serializable,
Writable {
this.dumpNereidsMemo = dumpNereidsMemo;
}
+ public boolean isEnableStrictConsistencyDml() {
+ return this.enableStrictConsistencyDml;
+ }
+
+ public void setEnableStrictConsistencyDml(boolean value) {
+ this.enableStrictConsistencyDml = value;
+ }
+
public void disableStrictConsistencyDmlOnce() throws DdlException {
if (!enableStrictConsistencyDml) {
return;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 3bdfe738482..c48cb2ce2ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -3127,6 +3127,8 @@ public class StmtExecutor {
try {
if (sessionVariable.isEnableNereidsPlanner()) {
try {
+ // disable shuffle for http stream (only 1 sink)
+ sessionVariable.disableStrictConsistencyDmlOnce();
httpStreamParams = generateHttpStreamNereidsPlan(queryId);
} catch (NereidsException | ParseException e) {
if (context.getMinidump() != null &&
context.getMinidump().toString(4) != null) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
index de9e828bacb..e0bdc744a33 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PlannerTest.java
@@ -701,10 +701,12 @@ public class PlannerTest extends TestWithFeService {
connectContext.getSessionVariable().setEnableNereidsPlanner(v);
}
- // 1. should not contains exchange node in new planner
+ // 2. should not contains exchange node in new planner
v = connectContext.getSessionVariable().isEnableNereidsPlanner();
+ boolean v2 =
connectContext.getSessionVariable().isEnableStrictConsistencyDml();
try {
connectContext.getSessionVariable().setEnableNereidsPlanner(true);
+
connectContext.getSessionVariable().setEnableStrictConsistencyDml(false);
String sql1 = "explain insert into db1.tbl1 select * from
db1.tbl1";
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext,
sql1);
stmtExecutor1.execute();
@@ -713,6 +715,24 @@ public class PlannerTest extends TestWithFeService {
Assertions.assertFalse(plan1.contains("VEXCHANGE"));
} finally {
connectContext.getSessionVariable().setEnableNereidsPlanner(v);
+
connectContext.getSessionVariable().setEnableStrictConsistencyDml(v2);
+ }
+
+ // 3. should contain exchange node in new planner if enable strict
consistency dml
+ v = connectContext.getSessionVariable().isEnableNereidsPlanner();
+ v2 =
connectContext.getSessionVariable().isEnableStrictConsistencyDml();
+ try {
+ connectContext.getSessionVariable().setEnableNereidsPlanner(true);
+
connectContext.getSessionVariable().setEnableStrictConsistencyDml(true);
+ String sql1 = "explain insert into db1.tbl1 select * from
db1.tbl1";
+ StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext,
sql1);
+ stmtExecutor1.execute();
+ Planner planner1 = stmtExecutor1.planner();
+ String plan1 = planner1.getExplainString(new ExplainOptions(false,
false, false));
+ Assertions.assertTrue(plan1.contains("VEXCHANGE"));
+ } finally {
+ connectContext.getSessionVariable().setEnableNereidsPlanner(v);
+
connectContext.getSessionVariable().setEnableStrictConsistencyDml(v2);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]