This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 932b639086b [refactor](point query) decouple PointQueryExec from the 
Coordinator (#24509)
932b639086b is described below

commit 932b639086b929745871ec1016e8c9419e3c7f20
Author: lihangyu <[email protected]>
AuthorDate: Mon Sep 18 11:25:40 2023 +0800

    [refactor](point query) decouple PointQueryExec from the Coordinator 
(#24509)
    
    In order to decouple PointQueryExec from the Coordinator, both 
PointQueryExec and Coordinator inherit from CoordInterface, and are 
collectively scheduled through StmtExecutor.
---
 .../java/org/apache/doris/qe/CoordInterface.java   |  31 ++++++
 .../main/java/org/apache/doris/qe/Coordinator.java |  63 ++---------
 .../java/org/apache/doris/qe/PointQueryExec.java   | 122 ++++++++++++++++-----
 .../java/org/apache/doris/qe/StmtExecutor.java     |  34 +++---
 4 files changed, 153 insertions(+), 97 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
new file mode 100644
index 00000000000..925cd1fd15b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.proto.Types;
+
+public interface CoordInterface {
+    public void exec() throws Exception;
+
+    public RowBatch getNext() throws Exception;
+
+    public int getInstanceTotalNum();
+
+    public void cancel(Types.PPlanFragmentCancelReason cancelReason);
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 3309c828833..f601faba683 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -19,8 +19,6 @@ package org.apache.doris.qe;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.DescriptorTable;
-import org.apache.doris.analysis.PrepareStmt;
-import org.apache.doris.analysis.PrepareStmt.PreparedType;
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.FsBroker;
@@ -150,7 +148,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
-public class Coordinator {
+public class Coordinator implements CoordInterface {
     private static final Logger LOG = LogManager.getLogger(Coordinator.class);
 
     private static final String localIP = 
FrontendOptions.getLocalHostAddress();
@@ -260,7 +258,6 @@ public class Coordinator {
     public Map<RuntimeFilterId, Integer> ridToBuilderNum = Maps.newHashMap();
     private ConnectContext context;
 
-    private boolean isPointQuery = false;
     private PointQueryExec pointExec = null;
 
     private StatsErrorEstimator statsErrorEstimator;
@@ -294,32 +291,7 @@ public class Coordinator {
         this.queryId = context.queryId();
         this.fragments = planner.getFragments();
         this.scanNodes = planner.getScanNodes();
-
-        if (this.scanNodes.size() == 1 && this.scanNodes.get(0) instanceof 
OlapScanNode) {
-            OlapScanNode olapScanNode = (OlapScanNode) (this.scanNodes.get(0));
-            isPointQuery = olapScanNode.isPointQuery();
-            if (isPointQuery) {
-                PlanFragment fragment = fragments.get(0);
-                LOG.debug("execPointGet fragment {}", fragment);
-                OlapScanNode planRoot = (OlapScanNode) fragment.getPlanRoot();
-                Preconditions.checkNotNull(planRoot);
-                pointExec = new 
PointQueryExec(planRoot.getPointQueryEqualPredicates(),
-                                                planRoot.getDescTable(), 
fragment.getOutputExprs());
-            }
-        }
-        PrepareStmt prepareStmt = analyzer == null ? null : 
analyzer.getPrepareStmt();
-        if (prepareStmt != null && prepareStmt.getPreparedType() == 
PreparedType.FULL_PREPARED) {
-            // Used cached or better performance
-            this.descTable = prepareStmt.getDescTable();
-            if (pointExec != null) {
-                pointExec.setCacheID(prepareStmt.getID());
-                
pointExec.setSerializedDescTable(prepareStmt.getSerializedDescTable());
-                
pointExec.setSerializedOutputExpr(prepareStmt.getSerializedOutputExprs());
-                pointExec.setBinaryProtocol(prepareStmt.isBinaryProtocol());
-            }
-        } else {
-            this.descTable = planner.getDescTable().toThrift();
-        }
+        this.descTable = planner.getDescTable().toThrift();
 
         this.returnedAllResults = false;
         this.enableShareHashTableForBroadcastJoin = 
context.getSessionVariable().enableShareHashTableForBroadcastJoin;
@@ -506,6 +478,7 @@ public class Coordinator {
         return result;
     }
 
+    @Override
     public int getInstanceTotalNum() {
         return instanceTotalNum;
     }
@@ -598,6 +571,7 @@ public class Coordinator {
     // 'Request' must contain at least a coordinator plan fragment (ie, can't
     // be for a query like 'SELECT 1').
     // A call to Exec() must precede all other member function calls.
+    @Override
     public void exec() throws Exception {
         if (LOG.isDebugEnabled() && !scanNodes.isEmpty()) {
             LOG.debug("debug: in Coordinator::exec. query id: {}, planNode: 
{}",
@@ -649,17 +623,10 @@ public class Coordinator {
             LOG.info("dispatch load job: {} to {}", 
DebugUtil.printId(queryId), addressToBackendID.keySet());
         }
         executionProfile.markInstances(instanceIds);
-        if (!isPointQuery) {
-            if (enablePipelineEngine) {
-                sendPipelineCtx();
-            } else {
-                sendFragment();
-            }
+        if (enablePipelineEngine) {
+            sendPipelineCtx();
         } else {
-            OlapScanNode planRoot = (OlapScanNode) 
fragments.get(0).getPlanRoot();
-            Preconditions.checkState(planRoot.getScanTabletIds().size() == 1);
-            pointExec.setCandidateBackends(planRoot.getScanBackendIds());
-            pointExec.setTabletId(planRoot.getScanTabletIds().get(0));
+            sendFragment();
         }
     }
 
@@ -1187,6 +1154,7 @@ public class Coordinator {
         }
     }
 
+    @Override
     public RowBatch getNext() throws Exception {
         if (receiver == null) {
             throw new UserException("There is no receiver.");
@@ -1194,12 +1162,7 @@ public class Coordinator {
 
         RowBatch resultBatch;
         Status status = new Status();
-
-        if (!isPointQuery) {
-            resultBatch = receiver.getNext(status);
-        } else {
-            resultBatch = pointExec.getNext(status);
-        }
+        resultBatch = receiver.getNext(status);
         if (!status.ok()) {
             LOG.warn("get next fail, need cancel. query id: {}", 
DebugUtil.printId(queryId));
         }
@@ -1325,6 +1288,7 @@ public class Coordinator {
         cancel(Types.PPlanFragmentCancelReason.USER_CANCEL);
     }
 
+    @Override
     public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
         lock();
         try {
@@ -2095,13 +2059,6 @@ public class Coordinator {
     // Populates scan_range_assignment_.
     // <fragment, <server, nodeId>>
     private void computeScanRangeAssignment() throws Exception {
-        if (isPointQuery) {
-            // Fast path for evaluate Backend for point query
-            List<TScanRangeLocations> locations = ((OlapScanNode) 
scanNodes.get(0)).lazyEvaluateRangeLocations();
-            Preconditions.checkNotNull(locations);
-            return;
-        }
-
         Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
         Map<TNetworkAddress, Long> replicaNumPerHost = 
getReplicaNumPerHostForOlapTable();
         Collections.shuffle(scanNodes);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index bda52b94ad2..0ffb5b989d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -17,23 +17,33 @@
 
 package org.apache.doris.qe;
 
+import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.PrepareStmt;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Status;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.Planner;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.KeyTuple;
+import org.apache.doris.proto.Types;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TExpr;
 import org.apache.doris.thrift.TExprList;
 import org.apache.doris.thrift.TResultBatch;
+import org.apache.doris.thrift.TScanRangeLocations;
 import org.apache.doris.thrift.TStatusCode;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.protobuf.ByteString;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -43,7 +53,6 @@ import org.apache.thrift.TSerializer;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -53,7 +62,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-public class PointQueryExec {
+public class PointQueryExec implements CoordInterface {
     private static final Logger LOG = 
LogManager.getLogger(PointQueryExec.class);
     // SlotRef sorted by column id
     private Map<SlotRef, Expr> equalPredicats;
@@ -69,22 +78,55 @@ public class PointQueryExec {
     private boolean isBinaryProtocol = false;
 
     private List<Backend> candidateBackends;
+    Planner planner;
 
     // For parepared statement cached structure,
     // there are some pre caculated structure in Backend TabletFetch service
     // using this ID to find for this prepared statement
     private UUID cacheID;
 
-    public PointQueryExec(Map<SlotRef, Expr> equalPredicats, DescriptorTable 
descTable,
-            ArrayList<Expr> outputExprs) {
-        this.equalPredicats = equalPredicats;
-        this.descriptorTable = descTable;
-        this.outputExprs = outputExprs;
+    private OlapScanNode getPlanRoot() {
+        List<PlanFragment> fragments = planner.getFragments();
+        PlanFragment fragment = fragments.get(0);
+        LOG.debug("execPointGet fragment {}", fragment);
+        OlapScanNode planRoot = (OlapScanNode) fragment.getPlanRoot();
+        Preconditions.checkNotNull(planRoot);
+        return planRoot;
     }
 
-    void setCandidateBackends(HashSet<Long> backendsIds) {
+    public PointQueryExec(Planner planner, Analyzer analyzer) {
+        // init from planner
+        this.planner = planner;
+        List<PlanFragment> fragments = planner.getFragments();
+        PlanFragment fragment = fragments.get(0);
+        OlapScanNode planRoot = getPlanRoot();
+        this.equalPredicats = planRoot.getPointQueryEqualPredicates();
+        this.descriptorTable = planRoot.getDescTable();
+        this.outputExprs = fragment.getOutputExprs();
+
+        PrepareStmt prepareStmt = analyzer == null ? null : 
analyzer.getPrepareStmt();
+        if (prepareStmt != null && prepareStmt.getPreparedType() == 
PrepareStmt.PreparedType.FULL_PREPARED) {
+            // Used cached or better performance
+            this.cacheID = prepareStmt.getID();
+            this.serializedDescTable = prepareStmt.getSerializedDescTable();
+            this.serializedOutputExpr = prepareStmt.getSerializedOutputExprs();
+            this.isBinaryProtocol = prepareStmt.isBinaryProtocol();
+        } else {
+            // TODO
+            // planner.getDescTable().toThrift();
+        }
+    }
+
+    void setScanRangeLocations() throws Exception {
+        OlapScanNode planRoot = getPlanRoot();
+        // compute scan range
+        List<TScanRangeLocations> locations = 
planRoot.lazyEvaluateRangeLocations();
+        Preconditions.checkState(planRoot.getScanTabletIds().size() == 1);
+        this.tabletID = planRoot.getScanTabletIds().get(0);
+
+        Preconditions.checkNotNull(locations);
         candidateBackends = new ArrayList<>();
-        for (Long backendID : backendsIds) {
+        for (Long backendID : planRoot.getScanBackendIds()) {
             Backend backend = Env.getCurrentSystemInfo().getBackend(backendID);
             if (SimpleScheduler.isAvailable(backend)) {
                 candidateBackends.add(backend);
@@ -92,32 +134,13 @@ public class PointQueryExec {
         }
         // Random read replicas
         Collections.shuffle(this.candidateBackends);
-    }
-
-    public void setSerializedDescTable(ByteString serializedDescTable) {
-        this.serializedDescTable = serializedDescTable;
-    }
-
-    public void setSerializedOutputExpr(ByteString serializedOutputExpr) {
-        this.serializedOutputExpr = serializedOutputExpr;
-    }
-
-    public void setCacheID(UUID cacheID) {
-        this.cacheID = cacheID;
-    }
-
-    public void setTabletId(long tabletID) {
-        this.tabletID = tabletID;
+        LOG.debug("set scan locations, backend ids {}, tablet id {}", 
candidateBackends, tabletID);
     }
 
     public void setTimeout(long timeoutMs) {
         this.timeoutMs = timeoutMs;
     }
 
-    public void setBinaryProtocol(boolean isBinaryProtocol) {
-        this.isBinaryProtocol = isBinaryProtocol;
-    }
-
     void addKeyTuples(
             InternalService.PTabletKeyLookupRequest.Builder requestBuilder) {
         // TODO handle IN predicates
@@ -129,11 +152,26 @@ public class PointQueryExec {
         requestBuilder.addKeyTuples(kBuilder);
     }
 
-    public RowBatch getNext(Status status) throws TException {
+    @Override
+    public int getInstanceTotalNum() {
+        // TODO
+        return 1;
+    }
+
+    @Override
+    public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
+        // Do nothing
+    }
+
+
+    @Override
+    public RowBatch getNext() throws Exception {
+        setScanRangeLocations();
         Iterator<Backend> backendIter = candidateBackends.iterator();
         RowBatch rowBatch = null;
         int tryCount = 0;
         int maxTry = Math.min(Config.max_point_query_retry_time, 
candidateBackends.size());
+        Status status = new Status();
         do {
             Backend backend = backendIter.next();
             rowBatch = getNextInternal(status, backend);
@@ -146,9 +184,33 @@ public class PointQueryExec {
             }
             status.setStatus(Status.OK);
         } while (true);
+        // handle status code
+        if (!status.ok()) {
+            if (Strings.isNullOrEmpty(status.getErrorMsg())) {
+                status.rewriteErrorMsg();
+            }
+            if (status.isRpcError()) {
+                throw new RpcException(null, status.getErrorMsg());
+            } else {
+                String errMsg = status.getErrorMsg();
+                LOG.warn("query failed: {}", errMsg);
+
+                // hide host info
+                int hostIndex = errMsg.indexOf("host");
+                if (hostIndex != -1) {
+                    errMsg = errMsg.substring(0, hostIndex);
+                }
+                throw new UserException(errMsg);
+            }
+        }
         return rowBatch;
     }
 
+    @Override
+    public void exec() throws Exception {
+        // Do nothing
+    }
+
     private RowBatch getNextInternal(Status status, Backend backend) throws 
TException {
         long timeoutTs = System.currentTimeMillis() + timeoutMs;
         RowBatch rowBatch = new RowBatch();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c4ef5ad1604..b7b633a4b3d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1389,19 +1389,25 @@ public class StmtExecutor {
         //
         // 2. If this is a query, send the result expr fields first, and send 
result data back to client.
         RowBatch batch;
-        coord = new Coordinator(context, analyzer, planner, 
context.getStatsErrorEstimator());
-        if (Config.enable_workload_group && 
context.sessionVariable.getEnablePipelineEngine()) {
-            
coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
+        CoordInterface coordBase = null;
+        if (queryStmt instanceof SelectStmt && ((SelectStmt) 
parsedStmt).isPointQueryShortCircuit()) {
+            coordBase = new PointQueryExec(planner, analyzer);
         } else {
-            context.setWorkloadGroupName("");
+            coord = new Coordinator(context, analyzer, planner, 
context.getStatsErrorEstimator());
+            if (Config.enable_workload_group && 
context.sessionVariable.getEnablePipelineEngine()) {
+                
coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
+            } else {
+                context.setWorkloadGroupName("");
+            }
+            QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
+                    new QeProcessorImpl.QueryInfo(context, 
originStmt.originStmt, coord));
+            profile.addExecutionProfile(coord.getExecutionProfile());
+            coordBase = coord;
         }
-        QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
-                new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, 
coord));
-        profile.addExecutionProfile(coord.getExecutionProfile());
         Span queryScheduleSpan =
                 context.getTracer().spanBuilder("query 
schedule").setParent(Context.current()).startSpan();
         try (Scope scope = queryScheduleSpan.makeCurrent()) {
-            coord.exec();
+            coordBase.exec();
         } catch (Exception e) {
             queryScheduleSpan.recordException(e);
             throw e;
@@ -1410,12 +1416,12 @@ public class StmtExecutor {
         }
         profile.getSummaryProfile().setQueryScheduleFinishTime();
         updateProfile(false);
-        if (coord.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
+        if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
             try {
                 LOG.debug("Start to execute fragment. user: {}, db: {}, sql: 
{}, fragment instance num: {}",
                         context.getQualifiedUser(), context.getDatabase(),
                         parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
-                        coord.getInstanceTotalNum());
+                        coordBase.getInstanceTotalNum());
             } catch (Exception e) {
                 LOG.warn("Fail to print fragment concurrency for Query.", e);
             }
@@ -1426,7 +1432,7 @@ public class StmtExecutor {
             while (true) {
                 // register the fetch result time.
                 profile.getSummaryProfile().setTempStartTime();
-                batch = coord.getNext();
+                batch = coordBase.getNext();
                 profile.getSummaryProfile().freshFetchResultConsumeTime();
 
                 // for outfile query, there will be only one empty batch send 
back with eos flag
@@ -1494,17 +1500,17 @@ public class StmtExecutor {
             // in some case may block all fragment handle threads
             // details see issue https://github.com/apache/doris/issues/16203
             LOG.warn("cancel fragment query_id:{} cause {}", 
DebugUtil.printId(context.queryId()), e.getMessage());
-            coord.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+            coordBase.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
             fetchResultSpan.recordException(e);
             throw e;
         } finally {
             fetchResultSpan.end();
-            if (coord.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
+            if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
                 try {
                     LOG.debug("Finish to execute fragment. user: {}, db: {}, 
sql: {}, fragment instance num: {}",
                             context.getQualifiedUser(), context.getDatabase(),
                             parsedStmt.getOrigStmt().originStmt.replace("\n", 
" "),
-                            coord.getInstanceTotalNum());
+                            coordBase.getInstanceTotalNum());
                 } catch (Exception e) {
                     LOG.warn("Fail to print fragment concurrency for Query.", 
e);
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to