This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 932b639086b [refactor](point query) decouple PointQueryExec from the
Coordinator (#24509)
932b639086b is described below
commit 932b639086b929745871ec1016e8c9419e3c7f20
Author: lihangyu <[email protected]>
AuthorDate: Mon Sep 18 11:25:40 2023 +0800
[refactor](point query) decouple PointQueryExec from the Coordinator
(#24509)
In order to decouple PointQueryExec from the Coordinator, both
PointQueryExec and Coordinator inherit from CoordInterface, and are
collectively scheduled through StmtExecutor.
---
.../java/org/apache/doris/qe/CoordInterface.java | 31 ++++++
.../main/java/org/apache/doris/qe/Coordinator.java | 63 ++---------
.../java/org/apache/doris/qe/PointQueryExec.java | 122 ++++++++++++++++-----
.../java/org/apache/doris/qe/StmtExecutor.java | 34 +++---
4 files changed, 153 insertions(+), 97 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
new file mode 100644
index 00000000000..925cd1fd15b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.proto.Types;
+
+public interface CoordInterface {
+ public void exec() throws Exception;
+
+ public RowBatch getNext() throws Exception;
+
+ public int getInstanceTotalNum();
+
+ public void cancel(Types.PPlanFragmentCancelReason cancelReason);
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 3309c828833..f601faba683 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -19,8 +19,6 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DescriptorTable;
-import org.apache.doris.analysis.PrepareStmt;
-import org.apache.doris.analysis.PrepareStmt.PreparedType;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
@@ -150,7 +148,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
-public class Coordinator {
+public class Coordinator implements CoordInterface {
private static final Logger LOG = LogManager.getLogger(Coordinator.class);
private static final String localIP =
FrontendOptions.getLocalHostAddress();
@@ -260,7 +258,6 @@ public class Coordinator {
public Map<RuntimeFilterId, Integer> ridToBuilderNum = Maps.newHashMap();
private ConnectContext context;
- private boolean isPointQuery = false;
private PointQueryExec pointExec = null;
private StatsErrorEstimator statsErrorEstimator;
@@ -294,32 +291,7 @@ public class Coordinator {
this.queryId = context.queryId();
this.fragments = planner.getFragments();
this.scanNodes = planner.getScanNodes();
-
- if (this.scanNodes.size() == 1 && this.scanNodes.get(0) instanceof
OlapScanNode) {
- OlapScanNode olapScanNode = (OlapScanNode) (this.scanNodes.get(0));
- isPointQuery = olapScanNode.isPointQuery();
- if (isPointQuery) {
- PlanFragment fragment = fragments.get(0);
- LOG.debug("execPointGet fragment {}", fragment);
- OlapScanNode planRoot = (OlapScanNode) fragment.getPlanRoot();
- Preconditions.checkNotNull(planRoot);
- pointExec = new
PointQueryExec(planRoot.getPointQueryEqualPredicates(),
- planRoot.getDescTable(),
fragment.getOutputExprs());
- }
- }
- PrepareStmt prepareStmt = analyzer == null ? null :
analyzer.getPrepareStmt();
- if (prepareStmt != null && prepareStmt.getPreparedType() ==
PreparedType.FULL_PREPARED) {
- // Used cached or better performance
- this.descTable = prepareStmt.getDescTable();
- if (pointExec != null) {
- pointExec.setCacheID(prepareStmt.getID());
-
pointExec.setSerializedDescTable(prepareStmt.getSerializedDescTable());
-
pointExec.setSerializedOutputExpr(prepareStmt.getSerializedOutputExprs());
- pointExec.setBinaryProtocol(prepareStmt.isBinaryProtocol());
- }
- } else {
- this.descTable = planner.getDescTable().toThrift();
- }
+ this.descTable = planner.getDescTable().toThrift();
this.returnedAllResults = false;
this.enableShareHashTableForBroadcastJoin =
context.getSessionVariable().enableShareHashTableForBroadcastJoin;
@@ -506,6 +478,7 @@ public class Coordinator {
return result;
}
+ @Override
public int getInstanceTotalNum() {
return instanceTotalNum;
}
@@ -598,6 +571,7 @@ public class Coordinator {
// 'Request' must contain at least a coordinator plan fragment (ie, can't
// be for a query like 'SELECT 1').
// A call to Exec() must precede all other member function calls.
+ @Override
public void exec() throws Exception {
if (LOG.isDebugEnabled() && !scanNodes.isEmpty()) {
LOG.debug("debug: in Coordinator::exec. query id: {}, planNode:
{}",
@@ -649,17 +623,10 @@ public class Coordinator {
LOG.info("dispatch load job: {} to {}",
DebugUtil.printId(queryId), addressToBackendID.keySet());
}
executionProfile.markInstances(instanceIds);
- if (!isPointQuery) {
- if (enablePipelineEngine) {
- sendPipelineCtx();
- } else {
- sendFragment();
- }
+ if (enablePipelineEngine) {
+ sendPipelineCtx();
} else {
- OlapScanNode planRoot = (OlapScanNode)
fragments.get(0).getPlanRoot();
- Preconditions.checkState(planRoot.getScanTabletIds().size() == 1);
- pointExec.setCandidateBackends(planRoot.getScanBackendIds());
- pointExec.setTabletId(planRoot.getScanTabletIds().get(0));
+ sendFragment();
}
}
@@ -1187,6 +1154,7 @@ public class Coordinator {
}
}
+ @Override
public RowBatch getNext() throws Exception {
if (receiver == null) {
throw new UserException("There is no receiver.");
@@ -1194,12 +1162,7 @@ public class Coordinator {
RowBatch resultBatch;
Status status = new Status();
-
- if (!isPointQuery) {
- resultBatch = receiver.getNext(status);
- } else {
- resultBatch = pointExec.getNext(status);
- }
+ resultBatch = receiver.getNext(status);
if (!status.ok()) {
LOG.warn("get next fail, need cancel. query id: {}",
DebugUtil.printId(queryId));
}
@@ -1325,6 +1288,7 @@ public class Coordinator {
cancel(Types.PPlanFragmentCancelReason.USER_CANCEL);
}
+ @Override
public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
lock();
try {
@@ -2095,13 +2059,6 @@ public class Coordinator {
// Populates scan_range_assignment_.
// <fragment, <server, nodeId>>
private void computeScanRangeAssignment() throws Exception {
- if (isPointQuery) {
- // Fast path for evaluate Backend for point query
- List<TScanRangeLocations> locations = ((OlapScanNode)
scanNodes.get(0)).lazyEvaluateRangeLocations();
- Preconditions.checkNotNull(locations);
- return;
- }
-
Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
Map<TNetworkAddress, Long> replicaNumPerHost =
getReplicaNumPerHostForOlapTable();
Collections.shuffle(scanNodes);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index bda52b94ad2..0ffb5b989d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -17,23 +17,33 @@
package org.apache.doris.qe;
+import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.PrepareStmt;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.Planner;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.KeyTuple;
+import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TExprList;
import org.apache.doris.thrift.TResultBatch;
+import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TStatusCode;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -43,7 +53,6 @@ import org.apache.thrift.TSerializer;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -53,7 +62,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-public class PointQueryExec {
+public class PointQueryExec implements CoordInterface {
private static final Logger LOG =
LogManager.getLogger(PointQueryExec.class);
// SlotRef sorted by column id
private Map<SlotRef, Expr> equalPredicats;
@@ -69,22 +78,55 @@ public class PointQueryExec {
private boolean isBinaryProtocol = false;
private List<Backend> candidateBackends;
+ Planner planner;
// For parepared statement cached structure,
// there are some pre caculated structure in Backend TabletFetch service
// using this ID to find for this prepared statement
private UUID cacheID;
- public PointQueryExec(Map<SlotRef, Expr> equalPredicats, DescriptorTable
descTable,
- ArrayList<Expr> outputExprs) {
- this.equalPredicats = equalPredicats;
- this.descriptorTable = descTable;
- this.outputExprs = outputExprs;
+ private OlapScanNode getPlanRoot() {
+ List<PlanFragment> fragments = planner.getFragments();
+ PlanFragment fragment = fragments.get(0);
+ LOG.debug("execPointGet fragment {}", fragment);
+ OlapScanNode planRoot = (OlapScanNode) fragment.getPlanRoot();
+ Preconditions.checkNotNull(planRoot);
+ return planRoot;
}
- void setCandidateBackends(HashSet<Long> backendsIds) {
+ public PointQueryExec(Planner planner, Analyzer analyzer) {
+ // init from planner
+ this.planner = planner;
+ List<PlanFragment> fragments = planner.getFragments();
+ PlanFragment fragment = fragments.get(0);
+ OlapScanNode planRoot = getPlanRoot();
+ this.equalPredicats = planRoot.getPointQueryEqualPredicates();
+ this.descriptorTable = planRoot.getDescTable();
+ this.outputExprs = fragment.getOutputExprs();
+
+ PrepareStmt prepareStmt = analyzer == null ? null :
analyzer.getPrepareStmt();
+ if (prepareStmt != null && prepareStmt.getPreparedType() ==
PrepareStmt.PreparedType.FULL_PREPARED) {
+ // Used cached or better performance
+ this.cacheID = prepareStmt.getID();
+ this.serializedDescTable = prepareStmt.getSerializedDescTable();
+ this.serializedOutputExpr = prepareStmt.getSerializedOutputExprs();
+ this.isBinaryProtocol = prepareStmt.isBinaryProtocol();
+ } else {
+ // TODO
+ // planner.getDescTable().toThrift();
+ }
+ }
+
+ void setScanRangeLocations() throws Exception {
+ OlapScanNode planRoot = getPlanRoot();
+ // compute scan range
+ List<TScanRangeLocations> locations =
planRoot.lazyEvaluateRangeLocations();
+ Preconditions.checkState(planRoot.getScanTabletIds().size() == 1);
+ this.tabletID = planRoot.getScanTabletIds().get(0);
+
+ Preconditions.checkNotNull(locations);
candidateBackends = new ArrayList<>();
- for (Long backendID : backendsIds) {
+ for (Long backendID : planRoot.getScanBackendIds()) {
Backend backend = Env.getCurrentSystemInfo().getBackend(backendID);
if (SimpleScheduler.isAvailable(backend)) {
candidateBackends.add(backend);
@@ -92,32 +134,13 @@ public class PointQueryExec {
}
// Random read replicas
Collections.shuffle(this.candidateBackends);
- }
-
- public void setSerializedDescTable(ByteString serializedDescTable) {
- this.serializedDescTable = serializedDescTable;
- }
-
- public void setSerializedOutputExpr(ByteString serializedOutputExpr) {
- this.serializedOutputExpr = serializedOutputExpr;
- }
-
- public void setCacheID(UUID cacheID) {
- this.cacheID = cacheID;
- }
-
- public void setTabletId(long tabletID) {
- this.tabletID = tabletID;
+ LOG.debug("set scan locations, backend ids {}, tablet id {}",
candidateBackends, tabletID);
}
public void setTimeout(long timeoutMs) {
this.timeoutMs = timeoutMs;
}
- public void setBinaryProtocol(boolean isBinaryProtocol) {
- this.isBinaryProtocol = isBinaryProtocol;
- }
-
void addKeyTuples(
InternalService.PTabletKeyLookupRequest.Builder requestBuilder) {
// TODO handle IN predicates
@@ -129,11 +152,26 @@ public class PointQueryExec {
requestBuilder.addKeyTuples(kBuilder);
}
- public RowBatch getNext(Status status) throws TException {
+ @Override
+ public int getInstanceTotalNum() {
+ // TODO
+ return 1;
+ }
+
+ @Override
+ public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
+ // Do nothing
+ }
+
+
+ @Override
+ public RowBatch getNext() throws Exception {
+ setScanRangeLocations();
Iterator<Backend> backendIter = candidateBackends.iterator();
RowBatch rowBatch = null;
int tryCount = 0;
int maxTry = Math.min(Config.max_point_query_retry_time,
candidateBackends.size());
+ Status status = new Status();
do {
Backend backend = backendIter.next();
rowBatch = getNextInternal(status, backend);
@@ -146,9 +184,33 @@ public class PointQueryExec {
}
status.setStatus(Status.OK);
} while (true);
+ // handle status code
+ if (!status.ok()) {
+ if (Strings.isNullOrEmpty(status.getErrorMsg())) {
+ status.rewriteErrorMsg();
+ }
+ if (status.isRpcError()) {
+ throw new RpcException(null, status.getErrorMsg());
+ } else {
+ String errMsg = status.getErrorMsg();
+ LOG.warn("query failed: {}", errMsg);
+
+ // hide host info
+ int hostIndex = errMsg.indexOf("host");
+ if (hostIndex != -1) {
+ errMsg = errMsg.substring(0, hostIndex);
+ }
+ throw new UserException(errMsg);
+ }
+ }
return rowBatch;
}
+ @Override
+ public void exec() throws Exception {
+ // Do nothing
+ }
+
private RowBatch getNextInternal(Status status, Backend backend) throws
TException {
long timeoutTs = System.currentTimeMillis() + timeoutMs;
RowBatch rowBatch = new RowBatch();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c4ef5ad1604..b7b633a4b3d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1389,19 +1389,25 @@ public class StmtExecutor {
//
// 2. If this is a query, send the result expr fields first, and send
result data back to client.
RowBatch batch;
- coord = new Coordinator(context, analyzer, planner,
context.getStatsErrorEstimator());
- if (Config.enable_workload_group &&
context.sessionVariable.getEnablePipelineEngine()) {
-
coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
+ CoordInterface coordBase = null;
+ if (queryStmt instanceof SelectStmt && ((SelectStmt)
parsedStmt).isPointQueryShortCircuit()) {
+ coordBase = new PointQueryExec(planner, analyzer);
} else {
- context.setWorkloadGroupName("");
+ coord = new Coordinator(context, analyzer, planner,
context.getStatsErrorEstimator());
+ if (Config.enable_workload_group &&
context.sessionVariable.getEnablePipelineEngine()) {
+
coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
+ } else {
+ context.setWorkloadGroupName("");
+ }
+ QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
+ new QeProcessorImpl.QueryInfo(context,
originStmt.originStmt, coord));
+ profile.addExecutionProfile(coord.getExecutionProfile());
+ coordBase = coord;
}
- QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
- new QeProcessorImpl.QueryInfo(context, originStmt.originStmt,
coord));
- profile.addExecutionProfile(coord.getExecutionProfile());
Span queryScheduleSpan =
context.getTracer().spanBuilder("query
schedule").setParent(Context.current()).startSpan();
try (Scope scope = queryScheduleSpan.makeCurrent()) {
- coord.exec();
+ coordBase.exec();
} catch (Exception e) {
queryScheduleSpan.recordException(e);
throw e;
@@ -1410,12 +1416,12 @@ public class StmtExecutor {
}
profile.getSummaryProfile().setQueryScheduleFinishTime();
updateProfile(false);
- if (coord.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
+ if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
try {
LOG.debug("Start to execute fragment. user: {}, db: {}, sql:
{}, fragment instance num: {}",
context.getQualifiedUser(), context.getDatabase(),
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
- coord.getInstanceTotalNum());
+ coordBase.getInstanceTotalNum());
} catch (Exception e) {
LOG.warn("Fail to print fragment concurrency for Query.", e);
}
@@ -1426,7 +1432,7 @@ public class StmtExecutor {
while (true) {
// register the fetch result time.
profile.getSummaryProfile().setTempStartTime();
- batch = coord.getNext();
+ batch = coordBase.getNext();
profile.getSummaryProfile().freshFetchResultConsumeTime();
// for outfile query, there will be only one empty batch send
back with eos flag
@@ -1494,17 +1500,17 @@ public class StmtExecutor {
// in some case may block all fragment handle threads
// details see issue https://github.com/apache/doris/issues/16203
LOG.warn("cancel fragment query_id:{} cause {}",
DebugUtil.printId(context.queryId()), e.getMessage());
- coord.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+ coordBase.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
fetchResultSpan.recordException(e);
throw e;
} finally {
fetchResultSpan.end();
- if (coord.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
+ if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
try {
LOG.debug("Finish to execute fragment. user: {}, db: {},
sql: {}, fragment instance num: {}",
context.getQualifiedUser(), context.getDatabase(),
parsedStmt.getOrigStmt().originStmt.replace("\n",
" "),
- coord.getInstanceTotalNum());
+ coordBase.getInstanceTotalNum());
} catch (Exception e) {
LOG.warn("Fail to print fragment concurrency for Query.",
e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]