This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0-beta in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8e2ae30b20551de2998a5e9eccd023b6c4339abf Author: Lijia Liu <[email protected]> AuthorDate: Tue Jun 6 16:44:35 2023 +0800 [Fix](WorkloadGroup)Fix query queue nereids bug #20484 --- .../src/main/java/org/apache/doris/analysis/Analyzer.java | 11 ----------- fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 5 ++++- .../src/main/java/org/apache/doris/qe/StmtExecutor.java | 10 ++++------ 3 files changed, 8 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 117124f6ac..c76cb0fa0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -69,7 +69,6 @@ import org.apache.doris.rewrite.mvrewrite.ExprToSlotRefRule; import org.apache.doris.rewrite.mvrewrite.HLLHashToSlotRefRule; import org.apache.doris.rewrite.mvrewrite.NDVToHll; import org.apache.doris.rewrite.mvrewrite.ToBitmapToSlotRefRule; -import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TQueryGlobals; import com.google.common.base.Joiner; @@ -408,8 +407,6 @@ public class Analyzer { private final Map<InlineViewRef, Set<Expr>> migrateFailedConjuncts = Maps.newHashMap(); - public List<TPipelineWorkloadGroup> tWorkloadGroups; - public GlobalState(Env env, ConnectContext context) { this.env = env; this.context = context; @@ -597,14 +594,6 @@ public class Analyzer { return explicitViewAlias; } - public void setWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) { - globalState.tWorkloadGroups = tWorkloadGroups; - } - - public List<TPipelineWorkloadGroup> getWorkloadGroups() { - return globalState.tWorkloadGroups; - } - /** * Registers a local view definition with this analyzer. Throws an exception if a view * definition with the same alias has already been registered or if the number of diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index a042c0a8ad..91e01e6399 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -260,6 +260,10 @@ public class Coordinator { private StatsErrorEstimator statsErrorEstimator; + public void setTWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) { + this.tWorkloadGroups = tWorkloadGroups; + } + private List<TPipelineWorkloadGroup> tWorkloadGroups = Lists.newArrayList(); private final ExecutionProfile executionProfile; @@ -351,7 +355,6 @@ public class Coordinator { nextInstanceId.setHi(queryId.hi); nextInstanceId.setLo(queryId.lo + 1); this.assignedRuntimeFilters = planner.getRuntimeFilters(); - this.tWorkloadGroups = analyzer == null ? null : analyzer.getWorkloadGroups(); this.executionProfile = new ExecutionProfile(queryId, fragments.size()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index a06f6ee855..4799b21448 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -561,7 +561,7 @@ public class StmtExecutor { // queue query here if (!parsedStmt.isExplain() && Config.enable_workload_group && Config.enable_query_queue && context.getSessionVariable().enablePipelineEngine()) { - this.queryQueue = analyzer.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); + this.queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); try { this.offerRet = queryQueue.offer(); } catch (InterruptedException e) { @@ -1098,11 +1098,6 @@ public class StmtExecutor { parsedStmt.setIsExplain(explainOptions); } } - if (parsedStmt instanceof QueryStmt && Config.enable_workload_group - && context.sessionVariable.enablePipelineEngine()) { - analyzer.setWorkloadGroups(analyzer.getEnv().getWorkloadGroupMgr() - .getWorkloadGroup(context)); - } } profile.getSummaryProfile().setQueryAnalysisFinishTime(); planner = new OriginalPlanner(analyzer); @@ -1361,6 +1356,9 @@ public class StmtExecutor { // 2. If this is a query, send the result expr fields first, and send result data back to client. RowBatch batch; coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); + if (Config.enable_workload_group && context.sessionVariable.enablePipelineEngine()) { + coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context)); + } QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); profile.addExecutionProfile(coord.getExecutionProfile()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
