This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 48cf47c42c6 branch-4.0: [fix](distribute) fix forward insert statement
cause backend core when use local shuffle union #58726 (#58758)
48cf47c42c6 is described below
commit 48cf47c42c6ed95bb1e24eae43f3bbb36cfaa574
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Dec 5 17:54:05 2025 +0800
branch-4.0: [fix](distribute) fix forward insert statement cause backend
core when use local shuffle union #58726 (#58758)
Cherry-picked from #58726
Co-authored-by: 924060929 <[email protected]>
---
.../org/apache/doris/nereids/NereidsPlanner.java | 8 ++++----
.../glue/translator/PhysicalPlanTranslator.java | 3 ++-
.../doris/nereids/parser/LogicalPlanBuilder.java | 7 +++++--
.../mv/InitMaterializationContextHook.java | 6 ++++--
.../java/org/apache/doris/qe/ConnectContext.java | 5 ++++-
.../java/org/apache/doris/qe/SessionVariable.java | 14 ++-----------
.../java/org/apache/doris/qe/StmtExecutor.java | 3 +++
.../org/apache/doris/nereids/util/PlanChecker.java | 13 ++++++++++--
.../org/apache/doris/qe/OlapQueryCacheTest.java | 23 ++++++++++++++++++++++
9 files changed, 58 insertions(+), 24 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index c54f2c13a6f..6038c672790 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -702,10 +702,10 @@ public class NereidsPlanner extends Planner {
}
boolean notNeedBackend = false;
- // if the query can compute without backend, we can skip check cluster
privileges
- if (Config.isCloudMode()
- && cascadesContext.getConnectContext().supportHandleByFe()
- && physicalPlan instanceof ComputeResultSet) {
+ // the internal query not support process Resultset, so must process
by backend
+ if (cascadesContext.getConnectContext().supportHandleByFe()
+ && physicalPlan instanceof ComputeResultSet
+ &&
!cascadesContext.getConnectContext().getState().isInternal()) {
Optional<ResultSet> resultSet = ((ComputeResultSet)
physicalPlan).computeResultInFe(
cascadesContext, Optional.empty(),
physicalPlan.getOutput());
if (resultSet.isPresent()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 716e0a947c2..5bf003bf8ce 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -2308,7 +2308,8 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
// and LocalShuffleUnion need `local` channels to do random local
shuffle, so we need check
// `enable_local_exchange`
if (setOperation instanceof PhysicalUnion
- &&
context.getConnectContext().getSessionVariable().getEnableLocalExchange()) {
+ &&
context.getConnectContext().getSessionVariable().getEnableLocalExchange()
+ && SessionVariable.canUseNereidsDistributePlanner()) {
boolean isLocalShuffleUnion = false;
if (setOperation.getPhysicalProperties().getDistributionSpec()
instanceof DistributionSpecExecutionAny) {
Map<Integer, ExchangeNode> exchangeIdToExchangeNode = new
IdentityHashMap<>();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 2f9f2328b66..94de2b2eb73 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -500,6 +500,7 @@ import
org.apache.doris.nereids.analyzer.UnboundVariable.VariableType;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.nereids.exceptions.ParseException;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.hint.DistributeHint;
import org.apache.doris.nereids.hint.JoinSkewInfo;
import org.apache.doris.nereids.load.NereidsDataDescription;
@@ -2023,8 +2024,10 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
connectContext.setStatementContext(statementContext);
statementContext.setConnectContext(connectContext);
}
- logicalPlans.add(Pair.of(
- ParserUtils.withOrigin(ctx, () -> (LogicalPlan)
visit(statement)), statementContext));
+ Pair<LogicalPlan, StatementContext> planAndContext = Pair.of(
+ ParserUtils.withOrigin(ctx, () -> (LogicalPlan)
visit(statement)), statementContext);
+ statementContext.setParsedStatement(new
LogicalPlanAdapter(planAndContext.first, statementContext));
+ logicalPlans.add(planAndContext);
List<Placeholder> params = new
ArrayList<>(tokenPosToParameters.values());
statementContext.setPlaceholders(params);
tokenPosToParameters.clear();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
index f53c971bc71..f1cdd4d4473 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
@@ -198,8 +198,10 @@ public class InitMaterializationContextHook implements
PlannerHook {
return ImmutableList.of();
}
if (CollectionUtils.isEmpty(availableMTMVs)) {
- LOG.info("Enable materialized view rewrite but availableMTMVs is
empty, query id "
- + "is {}",
cascadesContext.getConnectContext().getQueryIdentifier());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Enable materialized view rewrite but availableMTMVs
is empty, query id "
+ + "is {}",
cascadesContext.getConnectContext().getQueryIdentifier());
+ }
return ImmutableList.of();
}
List<MaterializationContext> asyncMaterializationContext = new
ArrayList<>();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 8df0a2fef0b..03b94143521 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -984,8 +984,11 @@ public class ConnectContext {
public TUniqueId nextInstanceId() {
if (loadId != null) {
return new TUniqueId(loadId.hi, loadId.lo +
instanceIdGenerator.incrementAndGet());
- } else {
+ } else if (queryId != null) {
return new TUniqueId(queryId.hi, queryId.lo +
instanceIdGenerator.incrementAndGet());
+ } else {
+ // for test
+ return new TUniqueId(0, instanceIdGenerator.incrementAndGet());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index d36199dcc49..ab221d693af 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -18,7 +18,6 @@
package org.apache.doris.qe;
import org.apache.doris.analysis.SetVar;
-import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
@@ -29,7 +28,6 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.nereids.StatementContext;
-import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.metrics.Event;
import org.apache.doris.nereids.metrics.EventSwitchParser;
import org.apache.doris.nereids.parser.Dialect;
@@ -1900,7 +1898,7 @@ public class SessionVariable implements Serializable,
Writable {
public boolean enableCommonExprPushdown = true;
@VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = false, flag =
VariableMgr.INVISIBLE,
- varType = VariableAnnotation.DEPRECATED)
+ varType = VariableAnnotation.DEPRECATED, needForward = true)
public boolean enableLocalExchange = true;
/**
@@ -4354,15 +4352,7 @@ public class SessionVariable implements Serializable,
Writable {
if (connectContext == null) {
return true;
}
- SessionVariable sessionVariable = connectContext.getSessionVariable();
- StatementContext statementContext =
connectContext.getStatementContext();
- if (statementContext != null) {
- StatementBase parsedStatement =
statementContext.getParsedStatement();
- if (!(parsedStatement instanceof LogicalPlanAdapter)) {
- return false;
- }
- }
- return sessionVariable.enableNereidsDistributePlanner;
+ return
connectContext.getSessionVariable().enableNereidsDistributePlanner;
}
public boolean isEnableNereidsDistributePlanner() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 0c2478a5ae0..61ccbc7b07b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -862,6 +862,9 @@ public class StmtExecutor {
}
parsedStmt = statements.get(originStmt.idx);
}
+ if (parsedStmt != null && statementContext.getParsedStatement() ==
null) {
+ statementContext.setParsedStatement(parsedStmt);
+ }
}
public void finalizeQuery() {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
index c23ff24c868..8a1e3da7018 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
@@ -128,8 +128,7 @@ public class PlanChecker {
public AbstractInsertExecutor getInsertExecutor(String sql) throws
Exception {
StatementContext statementContext =
MemoTestUtils.createStatementContext(connectContext, sql);
LogicalPlan parsedPlan = new NereidsParser().parseSingle(sql);
- UUID uuid = UUID.randomUUID();
- connectContext.setQueryId(new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits()));
+ setQueryId();
InsertIntoTableCommand insertIntoTableCommand =
(InsertIntoTableCommand) parsedPlan;
LogicalPlanAdapter logicalPlanAdapter = new
LogicalPlanAdapter(parsedPlan, statementContext);
return insertIntoTableCommand.initPlan(connectContext,
@@ -211,6 +210,7 @@ public class PlanChecker {
public List<PlanProcess> explainPlanProcess(String sql) {
NereidsParser parser = new NereidsParser();
LogicalPlan command = parser.parseSingle(sql);
+ setQueryId();
NereidsPlanner planner = new NereidsPlanner(
new StatementContext(connectContext, new OriginStatement(sql,
0)));
planner.planWithLock(command, PhysicalProperties.ANY,
ExplainLevel.ALL_PLAN, true);
@@ -395,6 +395,7 @@ public class PlanChecker {
connectContext.setStatementContext(statementContext);
NereidsPlanner planner = new NereidsPlanner(statementContext);
LogicalPlan parsedPlan = new NereidsParser().parseSingle(sql);
+ setQueryId();
LogicalPlanAdapter parsedPlanAdaptor = new
LogicalPlanAdapter(parsedPlan, statementContext);
statementContext.setParsedStatement(parsedPlanAdaptor);
@@ -722,6 +723,7 @@ public class PlanChecker {
connectContext.setStatementContext(statementContext);
LogicalPlan parsed = new NereidsParser().parseSingle(sql);
+ setQueryId();
NereidsPlanner nereidsPlanner = new NereidsPlanner(statementContext);
LogicalPlanAdapter adapter = LogicalPlanAdapter.of(parsed);
adapter.setIsExplain(new ExplainOptions(ExplainLevel.ALL_PLAN, false));
@@ -749,6 +751,7 @@ public class PlanChecker {
connectContext.setStatementContext(statementContext);
LogicalPlan parsed = new NereidsParser().parseSingle(sql);
+ setQueryId();
NereidsPlanner nereidsPlanner = new NereidsPlanner(statementContext);
SessionVariable sessionVariable = connectContext.getSessionVariable();
try {
@@ -878,6 +881,12 @@ public class PlanChecker {
return this;
}
+ private void setQueryId() {
+ UUID uuid = UUID.randomUUID();
+ TUniqueId id = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
+ connectContext.setQueryId(id);
+ }
+
public static boolean isPlanEqualWithoutID(Plan plan1, Plan plan2) {
if (plan1.arity() != plan2.arity()
|| !plan1.getOutput().equals(plan2.getOutput()) ||
plan1.getClass() != plan2.getClass()) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
index 4291d753918..98fd388fea3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
@@ -68,9 +68,11 @@ import org.apache.doris.qe.cache.RowBatchBuilder;
import org.apache.doris.qe.cache.SqlCache;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TUniqueId;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import mockit.Expectations;
@@ -89,6 +91,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.UUID;
public class OlapQueryCacheTest {
private static final Logger LOG =
LogManager.getLogger(OlapQueryCacheTest.class);
@@ -271,6 +274,24 @@ public class OlapQueryCacheTest {
db.registerTable(view3);
View view4 = createEventNestedView();
db.registerTable(view4);
+
+ SystemInfoService clusterInfo = Env.getCurrentEnv().getClusterInfo();
+ Backend be = new Backend(0, "127.0.0.1", 0);
+ be.setAlive(true);
+ ImmutableMap<Long, Backend> backends = ImmutableMap.of(0L, be);
+ new Expectations(clusterInfo) {
+ {
+ clusterInfo.getBackendsByCurrentCluster();
+ minTimes = 0;
+ result = backends;
+ }
+
+ {
+ clusterInfo.getAllBackendsByAllCluster();
+ minTimes = 0;
+ result = backends;
+ }
+ };
}
private OlapTable createOrderTable() {
@@ -501,6 +522,8 @@ public class OlapQueryCacheTest {
LogicalPlan plan = new NereidsParser().parseSingle(sql);
OriginStatement originStatement = new OriginStatement(sql, 0);
StatementContext statementContext = new StatementContext(ctx,
originStatement);
+ UUID uuid = UUID.randomUUID();
+ ctx.setQueryId(new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits()));
ctx.setStatementContext(statementContext);
NereidsPlanner nereidsPlanner = new
NereidsPlanner(statementContext);
LogicalPlanAdapter adapter = new LogicalPlanAdapter(plan,
statementContext);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]