This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 36b6cea462 [feature-wip](nereids) Support Q-Error to measure the 
accuracy of derived statistics (#17185)
36b6cea462 is described below

commit 36b6cea462cffdb139c9062ec9babfe1c0b45821
Author: AKIRA <[email protected]>
AuthorDate: Wed Mar 8 17:26:24 2023 +0900

    [feature-wip](nereids) Support Q-Error to measure the accuracy of derived 
statistics (#17185)
    
    Collect each estimated output rows and exact output rows for each plan 
node, and use this to measure the accuracy of derived statistics. The estimated 
result is managed by ProfileManager. We would get this estimated result in the 
http request by query id later.
---
 .../apache/doris/common/util/ProfileManager.java   |  22 ++++
 .../apache/doris/common/util/RuntimeProfile.java   |   1 -
 .../org/apache/doris/nereids/NereidsPlanner.java   |   5 +-
 .../glue/translator/PhysicalPlanTranslator.java    |  81 ++++++++++-----
 .../doris/nereids/stats/StatsErrorEstimator.java   | 114 +++++++++++++++++++++
 .../java/org/apache/doris/qe/ConnectContext.java   |  10 ++
 .../java/org/apache/doris/qe/QeProcessorImpl.java  |   4 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |   6 +-
 .../translator/PhysicalPlanTranslatorTest.java     |   2 +-
 9 files changed, 216 insertions(+), 29 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
index 0d4c6d49b8..7734e76db8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
@@ -110,6 +110,8 @@ public class ProfileManager {
         public MultiProfileTreeBuilder builder = null;
         public String errMsg = "";
 
+        public double qError;
+
         // lazy load profileContent because sometimes profileContent is very 
large
         public String getProfileContent() {
             if (profileContent != null) {
@@ -119,6 +121,15 @@ public class ProfileManager {
             profileContent = profile.toString();
             return profileContent;
         }
+
+        public double getqError() {
+            return qError;
+        }
+
+        public void setqError(double qError) {
+            this.qError = qError;
+        }
+
     }
 
     // only protect queryIdDeque; queryIdToProfileMap is concurrent, no need 
to protect
@@ -252,6 +263,10 @@ public class ProfileManager {
         }
     }
 
+    public ProfileElement findProfileElementObject(String queryId) {
+        return queryIdToProfileMap.get(queryId);
+    }
+
     /**
      * Check if the query with specific query id is queried by specific user.
      *
@@ -371,4 +386,11 @@ public class ProfileManager {
     public boolean isQueryProfile(RuntimeProfile profile) {
         return "Query".equals(profile.getName());
     }
+
+    public void setQErrorToProfileElementObject(String queryId, double qError) 
{
+        ProfileElement profileElement = findProfileElementObject(queryId);
+        if (profileElement != null) {
+            profileElement.setqError(qError);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
index aceb7dc1e5..981f7be77d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
@@ -137,7 +137,6 @@ public class RuntimeProfile {
     // preorder traversal, idx should be modified in the traversal process
     private void update(List<TRuntimeProfileNode> nodes, Reference<Integer> 
idx) {
         TRuntimeProfileNode node = nodes.get(idx.getRef());
-
         // update this level's counters
         if (node.counters != null) {
             for (TCounter tcounter : node.counters) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index 8456f6c2db..33025b2559 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -101,12 +101,13 @@ public class NereidsPlanner extends Planner {
             return;
         }
         PhysicalPlan physicalPlan = (PhysicalPlan) resultPlan;
-        PhysicalPlanTranslator physicalPlanTranslator = new 
PhysicalPlanTranslator();
         PlanTranslatorContext planTranslatorContext = new 
PlanTranslatorContext(cascadesContext);
+        PhysicalPlanTranslator physicalPlanTranslator = new 
PhysicalPlanTranslator(planTranslatorContext,
+                ConnectContext.get().getStatsErrorEstimator());
         if (ConnectContext.get().getSessionVariable().isEnableNereidsTrace()) {
             CounterEvent.clearCounter();
         }
-        PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan, 
planTranslatorContext);
+        PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan);
 
         scanNodeList = planTranslatorContext.getScanNodes();
         descTable = planTranslatorContext.getDescTable();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 74c9517aff..5db3b71fd1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -54,6 +54,7 @@ import 
org.apache.doris.nereids.properties.DistributionSpecReplicated;
 import org.apache.doris.nereids.properties.OrderKey;
 import org.apache.doris.nereids.properties.PhysicalProperties;
 import 
org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup;
+import org.apache.doris.nereids.stats.StatsErrorEstimator;
 import org.apache.doris.nereids.trees.expressions.AggregateExpression;
 import org.apache.doris.nereids.trees.expressions.Alias;
 import org.apache.doris.nereids.trees.expressions.Cast;
@@ -70,6 +71,7 @@ import org.apache.doris.nereids.trees.expressions.WindowFrame;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
 import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
 import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
+import org.apache.doris.nereids.trees.plans.AbstractPlan;
 import org.apache.doris.nereids.trees.plans.AggMode;
 import org.apache.doris.nereids.trees.plans.AggPhase;
 import org.apache.doris.nereids.trees.plans.JoinType;
@@ -173,15 +175,30 @@ import java.util.stream.Stream;
  */
 public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, 
PlanTranslatorContext> {
     private static final Logger LOG = 
LogManager.getLogger(PhysicalPlanTranslator.class);
+    PlanTranslatorContext context;
+
+    StatsErrorEstimator statsErrorEstimator;
+
+    public PhysicalPlanTranslator() {
+    }
+
+    public PhysicalPlanTranslator(PlanTranslatorContext context, 
StatsErrorEstimator statsErrorEstimator) {
+        this.context = context;
+        this.statsErrorEstimator = statsErrorEstimator;
+    }
+
+    public PlanFragment translatePlan(PhysicalPlan physicalPlan, 
PlanTranslatorContext context) {
+        this.context = context;
+        return translatePlan(physicalPlan);
+    }
 
     /**
      * Translate Nereids Physical Plan tree to Stale Planner PlanFragment tree.
      *
      * @param physicalPlan Nereids Physical Plan tree
-     * @param context context to help translate
      * @return Stale Planner PlanFragment tree
      */
-    public PlanFragment translatePlan(PhysicalPlan physicalPlan, 
PlanTranslatorContext context) {
+    public PlanFragment translatePlan(PhysicalPlan physicalPlan) {
         PlanFragment rootFragment = physicalPlan.accept(this, context);
         if (physicalPlan instanceof PhysicalDistribute) {
             PhysicalDistribute distribute = (PhysicalDistribute) physicalPlan;
@@ -369,7 +386,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 inputPlanFragment.getPlanRoot(), groupingInfo, 
repeatSlotIdList,
                 allSlotId, 
repeat.computeVirtualSlotValues(sortedVirtualSlots));
         
repeatNode.setNumInstances(inputPlanFragment.getPlanRoot().getNumInstances());
-        inputPlanFragment.addPlanRoot(repeatNode);
+        addPlanRoot(inputPlanFragment, repeatNode, repeat);
         inputPlanFragment.updateDataPartition(DataPartition.RANDOM);
         return inputPlanFragment;
     }
@@ -388,8 +405,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         tupleIds.add(tupleDescriptor.getId());
         EmptySetNode emptySetNode = new EmptySetNode(context.nextPlanNodeId(), 
tupleIds);
 
-        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
emptySetNode,
-                DataPartition.UNPARTITIONED);
+        PlanFragment planFragment = createPlanFragment(emptySetNode,
+                DataPartition.UNPARTITIONED, emptyRelation);
         context.addPlanFragment(planFragment);
         return planFragment;
     }
@@ -421,7 +438,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         unionNode.addConstExprList(legacyExprs);
         unionNode.finalizeForNereids(oneRowTuple.getSlots(), new 
ArrayList<>());
 
-        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
unionNode, DataPartition.UNPARTITIONED);
+        PlanFragment planFragment = createPlanFragment(unionNode, 
DataPartition.UNPARTITIONED, oneRowRelation);
         context.addPlanFragment(planFragment);
         return planFragment;
     }
@@ -521,7 +538,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                     .map(context::findSlotRef).collect(Collectors.toList());
             dataPartition = new DataPartition(TPartitionType.HASH_PARTITIONED, 
partitionExprs);
         }
-        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
olapScanNode, dataPartition);
+        PlanFragment planFragment = createPlanFragment(olapScanNode, 
dataPartition, olapScan);
         context.addPlanFragment(planFragment);
         return planFragment;
     }
@@ -544,8 +561,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         );
         scanNode.finalizeForNereids();
         context.getScanNodes().add(scanNode);
-        PlanFragment planFragment =
-                new PlanFragment(context.nextFragmentId(), scanNode, 
DataPartition.RANDOM);
+        PlanFragment planFragment = createPlanFragment(scanNode, 
DataPartition.RANDOM, schemaScan);
         context.addPlanFragment(planFragment);
         return planFragment;
     }
@@ -573,7 +589,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         Utils.execWithUncheckedException(fileScanNode::finalizeForNereids);
         // Create PlanFragment
         DataPartition dataPartition = DataPartition.RANDOM;
-        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
fileScanNode, dataPartition);
+        PlanFragment planFragment = createPlanFragment(fileScanNode, 
dataPartition, fileScan);
         context.addPlanFragment(planFragment);
         return planFragment;
     }
@@ -600,7 +616,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             context.findSlotRef(slot.getExprId()).setLabel(tableColumnName);
         }
 
-        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
scanNode, DataPartition.RANDOM);
+        PlanFragment planFragment = createPlanFragment(scanNode, 
DataPartition.RANDOM, tvfRelation);
         context.addPlanFragment(planFragment);
         return planFragment;
     }
@@ -695,7 +711,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         if (!sort.getSortPhase().isMerge()) {
             // For localSort or Gather->Sort, we just need to add sortNode
             SortNode sortNode = translateSortNode(sort, 
inputFragment.getPlanRoot(), context);
-            currentFragment.addPlanRoot(sortNode);
+            addPlanRoot(currentFragment, sortNode, sort);
         } else {
             // For mergeSort, we need to push sortInfo to exchangeNode
             if (!(currentFragment.getPlanRoot() instanceof ExchangeNode)) {
@@ -800,7 +816,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         } else {
             
analyticEvalNode.setNumInstances(inputPlanFragment.getPlanRoot().getNumInstances());
         }
-        inputPlanFragment.addPlanRoot(analyticEvalNode);
+        addPlanRoot(inputPlanFragment, analyticEvalNode, physicalWindow);
         return inputPlanFragment;
     }
 
@@ -831,7 +847,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             SortNode sortNode = translateSortNode(topN, 
inputFragment.getPlanRoot(), context);
             sortNode.setOffset(topN.getOffset());
             sortNode.setLimit(topN.getLimit());
-            currentFragment.addPlanRoot(sortNode);
+            addPlanRoot(currentFragment, sortNode, topN);
         } else {
             // For mergeSort, we need to push sortInfo to exchangeNode
             if (!(currentFragment.getPlanRoot() instanceof ExchangeNode)) {
@@ -1163,8 +1179,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             boolean needNewRootFragment = nestedLoopJoin.child(0) instanceof 
PhysicalDistribute;
             PlanFragment joinFragment;
             if (needNewRootFragment) {
-                joinFragment = new PlanFragment(context.nextFragmentId(), 
nestedLoopJoinNode,
-                        DataPartition.UNPARTITIONED);
+                joinFragment = createPlanFragment(nestedLoopJoinNode,
+                        DataPartition.UNPARTITIONED, nestedLoopJoin);
                 context.addPlanFragment(joinFragment);
             } else {
                 joinFragment = leftFragment;
@@ -1408,7 +1424,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             // the three nodes don't support conjuncts, need create a 
SelectNode to filter data
             SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), 
planNode);
             addConjunctsToPlanNode(filter, selectNode, context);
-            inputFragment.addPlanRoot(selectNode);
+            addPlanRoot(inputFragment, selectNode, filter);
         } else {
             if (!(filter.child(0) instanceof AbstractPhysicalJoin)) {
                 addConjunctsToPlanNode(filter, planNode, context);
@@ -1423,6 +1439,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         filter.getConjuncts().stream()
                 .map(e -> ExpressionTranslator.translate(e, context))
                 .forEach(planNode::addConjunct);
+        updateLegacyPlanIdToPhysicalPlan(planNode, filter);
     }
 
     @Override
@@ -1475,7 +1492,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             
currentFragment.setPlanRoot(currentFragment.getPlanRoot().getChild(0));
             currentFragment = createParentFragment(currentFragment, 
DataPartition.UNPARTITIONED, context);
         }
-        currentFragment.addPlanRoot(assertNumRowsNode);
+        addPlanRoot(currentFragment, assertNumRowsNode, assertNumRows);
         return currentFragment;
     }
 
@@ -1555,8 +1572,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         // If all child fragments are unpartitioned, return a single 
unpartitioned fragment
         // with a UnionNode that merges all child fragments.
         if (allChildFragmentsUnPartitioned(childrenFragments)) {
-            setOperationFragment = new PlanFragment(
-                    context.nextFragmentId(), setOperationNode, 
DataPartition.UNPARTITIONED);
+            setOperationFragment = createPlanFragment(setOperationNode, 
DataPartition.UNPARTITIONED, setOperation);
             // Absorb the plan trees of all childFragments into unionNode
             // and fix up the fragment tree in the process.
             for (int i = 0; i < childrenFragments.size(); ++i) {
@@ -1565,9 +1581,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         context);
             }
         } else {
-            setOperationFragment = new PlanFragment(context.nextFragmentId(), 
setOperationNode,
+            setOperationFragment = createPlanFragment(setOperationNode,
                     new DataPartition(TPartitionType.HASH_PARTITIONED,
-                            
setOperationNode.getMaterializedResultExprLists().get(0)));
+                            
setOperationNode.getMaterializedResultExprLists().get(0)), setOperation);
             for (int i = 0; i < childrenFragments.size(); ++i) {
                 PlanFragment childFragment = childrenFragments.get(i);
                 // Connect the unpartitioned child fragments to 
SetOperationNode via a random exchange.
@@ -1597,7 +1613,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 .collect(Collectors.toList());
         TableFunctionNode tableFunctionNode = new 
TableFunctionNode(context.nextPlanNodeId(),
                 currentFragment.getPlanRoot(), tupleDescriptor.getId(), 
functionCalls, outputSlotIds);
-        currentFragment.addPlanRoot(tableFunctionNode);
+        addPlanRoot(currentFragment, tableFunctionNode, generate);
         return currentFragment;
     }
 
@@ -1856,7 +1872,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 Expr.cloneList(lhsJoinExprs, null));
         DataPartition rhsJoinPartition =
                 new DataPartition(TPartitionType.HASH_PARTITIONED, 
rhsJoinExprs);
-        PlanFragment joinFragment = new PlanFragment(context.nextFragmentId(), 
hashJoinNode, lhsJoinPartition);
+        PlanFragment joinFragment = createPlanFragment(hashJoinNode, 
lhsJoinPartition, physicalHashJoin);
         context.addPlanFragment(joinFragment);
 
         connectChildFragment(hashJoinNode, 0, joinFragment, leftFragment, 
context);
@@ -2058,4 +2074,21 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         && selectIndexColumns.contains(((SlotReference) 
slot).getColumn().get()))
                 .collect(ImmutableList.toImmutableList());
     }
+
+    private PlanFragment createPlanFragment(PlanNode planNode, DataPartition 
dataPartition, AbstractPlan physicalPlan) {
+        PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
planNode, dataPartition);
+        updateLegacyPlanIdToPhysicalPlan(planNode, physicalPlan);
+        return planFragment;
+    }
+
+    private void addPlanRoot(PlanFragment fragment, PlanNode planNode, 
AbstractPlan physicalPlan) {
+        fragment.addPlanRoot(planNode);
+        updateLegacyPlanIdToPhysicalPlan(planNode, physicalPlan);
+    }
+
+    private void updateLegacyPlanIdToPhysicalPlan(PlanNode planNode, 
AbstractPlan physicalPlan) {
+        if (statsErrorEstimator != null) {
+            statsErrorEstimator.updateLegacyPlanIdToPhysicalPlan(planNode, 
physicalPlan);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsErrorEstimator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsErrorEstimator.java
new file mode 100644
index 0000000000..ad61375654
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsErrorEstimator.java
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.stats;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.ProfileManager;
+import org.apache.doris.nereids.trees.plans.AbstractPlan;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.thrift.TReportExecStatusParams;
+import org.apache.doris.thrift.TRuntimeProfileNode;
+import org.apache.doris.thrift.TUniqueId;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Used to estimate the bias of stats estimation.
+ */
+public class StatsErrorEstimator {
+    private Map<Integer, Pair<Double, Double>> legacyPlanIdToPhysicalPlan;
+
+    public StatsErrorEstimator() {
+        legacyPlanIdToPhysicalPlan = new HashMap<>();
+    }
+
+    public void updateLegacyPlanIdToPhysicalPlan(PlanNode planNode, 
AbstractPlan physicalPlan) {
+        legacyPlanIdToPhysicalPlan.put(planNode.getId().asInt(), 
Pair.of(physicalPlan.getStats().getRowCount(),
+                (double) 0));
+    }
+
+    /**
+     *  Q-error:
+     *      q = max_{i=1}^{n}(max(\frac{b^\prime}{b}, \frac{b}{b^\prime})
+     */
+    public double calculateQError() {
+        double qError = Double.NEGATIVE_INFINITY;
+        for (Entry<Integer, Pair<Double, Double>> entry : 
legacyPlanIdToPhysicalPlan.entrySet()) {
+            double exactReturnedRows = entry.getValue().second;
+            double estimateReturnedRows = entry.getValue().first;
+            qError = Math.max(qError,
+                    Math.max(exactReturnedRows / 
oneIfZero(estimateReturnedRows),
+                            estimateReturnedRows / 
oneIfZero(exactReturnedRows)));
+        }
+        return qError;
+    }
+
+    /**
+     * Update extract returned rows incrementally, since there may be many 
execution instances of plan fragment.
+     */
+    public void updateExactReturnedRows(TReportExecStatusParams 
tReportExecStatusParams) {
+        TUniqueId tUniqueId = tReportExecStatusParams.query_id;
+        for (TRuntimeProfileNode runtimeProfileNode : 
tReportExecStatusParams.profile.nodes) {
+            String name = runtimeProfileNode.name;
+            int planId = extractPlanNodeIdFromName(name);
+            if (planId == -1) {
+                continue;
+            }
+            double rowsReturned = runtimeProfileNode.counters.stream()
+                    .filter(p -> p.name.equals("RowsReturned")).mapToDouble(p 
-> (double) p.getValue()).sum();
+            Pair<Double, Double> pair = legacyPlanIdToPhysicalPlan.get(planId);
+            pair.second = pair.second + rowsReturned;
+        }
+        double qError = calculateQError();
+        ProfileManager.getInstance()
+                .setQErrorToProfileElementObject(DebugUtil.printId(tUniqueId), 
qError);
+    }
+
+    /**
+     * TODO: The execution report from BE doesn't have any schema, so we have 
to use regex to extract the plan node id.
+     */
+    private int extractPlanNodeIdFromName(String name) {
+        Pattern p = Pattern.compile("\\b(?!dst_id=)id=(\\d+)\\b");
+        Matcher m = p.matcher(name);
+        if (!m.find()) {
+            return -1;
+        }
+        return Integer.parseInt(m.group(1));
+    }
+
+    private Double extractRowsReturned(String rowsReturnedStr) {
+        if (rowsReturnedStr == null) {
+            return 0.0;
+        }
+        Pattern p = Pattern.compile("\\((\\d+)\\)");
+        Matcher m = p.matcher(rowsReturnedStr);
+        if (!m.find()) {
+            return 0.0;
+        }
+        return Double.parseDouble(m.group(1));
+    }
+
+    private double oneIfZero(double d) {
+        return d == 0.0 ? 1.0 : d;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 1598ecbdbc..42cdb8402e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -34,6 +34,7 @@ import org.apache.doris.mysql.MysqlChannel;
 import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.mysql.MysqlSslContext;
 import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.stats.StatsErrorEstimator;
 import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.thrift.TResourceInfo;
@@ -166,6 +167,8 @@ public class ConnectContext {
      */
     private int executionTimeoutS;
 
+    private StatsErrorEstimator statsErrorEstimator;
+
     public void setUserQueryTimeout(long queryTimeout) {
         this.userQueryTimeout = queryTimeout;
     }
@@ -710,5 +713,12 @@ public class ConnectContext {
         return "stmt[" + stmtId + ", " + DebugUtil.printId(queryId) + "]";
     }
 
+    public StatsErrorEstimator getStatsErrorEstimator() {
+        return statsErrorEstimator;
+    }
+
+    public void setStatsErrorEstimator(StatsErrorEstimator 
statsErrorEstimator) {
+        this.statsErrorEstimator = statsErrorEstimator;
+    }
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 1bf11ece97..81a6bdf4d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -187,6 +187,10 @@ public final class QeProcessorImpl implements QeProcessor {
         }
         final TReportExecStatusResult result = new TReportExecStatusResult();
         final QueryInfo info = coordinatorMap.get(params.query_id);
+        if (info != null && info.connectContext != null && 
info.connectContext.getStatsErrorEstimator() != null) {
+            
info.connectContext.getStatsErrorEstimator().updateExactReturnedRows(params);
+        }
+
         if (info == null) {
             result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
             LOG.info("ReportExecStatus() runtime error, query {} does not 
exist", DebugUtil.printId(params.query_id));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 39a7d05b32..d0c865c2bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -101,6 +101,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.stats.StatsErrorEstimator;
 import org.apache.doris.nereids.trees.plans.commands.Command;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.OriginalPlanner;
@@ -441,6 +442,10 @@ public class StmtExecutor implements ProfileWriter {
     // Exception:
     // IOException: talk with client failed.
     public void execute(TUniqueId queryId) throws Exception {
+        SessionVariable sessionVariable = context.getSessionVariable();
+        if (sessionVariable.enableProfile && 
sessionVariable.isEnableNereidsPlanner()) {
+            ConnectContext.get().setStatsErrorEstimator(new 
StatsErrorEstimator());
+        }
         context.setStartTime();
 
         plannerProfile.setQueryBeginTime();
@@ -638,7 +643,6 @@ public class StmtExecutor implements ProfileWriter {
         } finally {
             // revert Session Value
             try {
-                SessionVariable sessionVariable = context.getSessionVariable();
                 VariableMgr.revertSessionValue(sessionVariable);
                 // origin value init
                 sessionVariable.setIsSingleSetVar(false);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
index 2da342ff26..eb8f95a47f 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
@@ -75,7 +75,7 @@ public class PhysicalPlanTranslatorTest {
         PhysicalProject<PhysicalFilter<PhysicalOlapScan>> project = new 
PhysicalProject<>(projList,
                 placeHolder, filter);
         PlanTranslatorContext planTranslatorContext = new 
PlanTranslatorContext();
-        PhysicalPlanTranslator translator = new PhysicalPlanTranslator();
+        PhysicalPlanTranslator translator = new 
PhysicalPlanTranslator(planTranslatorContext, null);
         PlanFragment fragment = translator.visitPhysicalProject(project, 
planTranslatorContext);
         PlanNode planNode = fragment.getPlanRoot();
         List<OlapScanNode> scanNodeList = new ArrayList<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to