This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 397bf354db [WIP](optional) using hash set to distinct single value
(#11246)
397bf354db is described below
commit 397bf354db7af43885618c78b486cb1ac0588b92
Author: wangbo <[email protected]>
AuthorDate: Thu Aug 4 15:52:58 2022 +0800
[WIP](optional) using hash set to distinct single value (#11246)
* [WIP](optional) using hash set to distinct single value
Co-authored-by: [email protected] <[email protected]>
---
.../org/apache/doris/analysis/AggregateInfo.java | 58 +++++++++++++++-------
.../java/org/apache/doris/analysis/SelectStmt.java | 4 +-
.../apache/doris/planner/DistributedPlanner.java | 6 +--
.../java/org/apache/doris/qe/SessionVariable.java | 12 +++++
4 files changed, 57 insertions(+), 23 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
index 1ddab852ee..cfba7226da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
@@ -22,7 +22,9 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.planner.DataPartition;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TPartitionType;
import com.google.common.base.MoreObjects;
@@ -113,8 +115,7 @@ public final class AggregateInfo extends AggregateInfoBase {
private ArrayList<Integer> materializedAggregateSlots =
Lists.newArrayList();
// if true, this AggregateInfo is the first phase of a 2-phase DISTINCT
computation
private boolean isDistinctAgg = false;
- // If true, the sql has MultiDistinct
- private boolean isMultiDistinct;
+ private boolean isUsingSetForDistinct;
// the multi distinct's begin pos and end pos in groupby exprs
private ArrayList<Integer> firstIdx = Lists.newArrayList();
@@ -127,10 +128,10 @@ public final class AggregateInfo extends
AggregateInfoBase {
}
private AggregateInfo(ArrayList<Expr> groupingExprs,
- ArrayList<FunctionCallExpr> aggExprs, AggPhase
aggPhase, boolean isMultiDistinct) {
+ ArrayList<FunctionCallExpr> aggExprs, AggPhase
aggPhase, boolean isUsingSetForDistinct) {
super(groupingExprs, aggExprs);
this.aggPhase = aggPhase;
- this.isMultiDistinct = isMultiDistinct;
+ this.isUsingSetForDistinct = isUsingSetForDistinct;
}
/**
@@ -197,11 +198,11 @@ public final class AggregateInfo extends
AggregateInfoBase {
// 1: if aggExprs don't have distinct or have multi distinct , create
aggregate info for
// one stage aggregation.
// 2: if aggExprs have one distinct , create aggregate info for two
stage aggregation
- boolean isMultiDistinct =
estimateIfContainsMultiDistinct(distinctAggExprs);
- if (distinctAggExprs.isEmpty() || isMultiDistinct) {
+ boolean isUsingSetForDistinct =
estimateIfUsingSetForDistinct(distinctAggExprs);
+ if (distinctAggExprs.isEmpty() || isUsingSetForDistinct) {
// It is used to map new aggr expr to old expr to help create an
external
// reference to the aggregation node tuple
- result.setIsMultiDistinct(isMultiDistinct);
+ result.setIsUsingSetForDistinct(isUsingSetForDistinct);
if (tupleDesc == null) {
result.createTupleDescs(analyzer);
result.createSmaps(analyzer);
@@ -241,6 +242,26 @@ public final class AggregateInfo extends AggregateInfoBase
{
return result;
}
+
+ // note(wb): in some cases, using hashset for distinct is better
+ public static boolean isSetUsingSetForDistinct(List<FunctionCallExpr>
distinctAggExprs) {
+ boolean isSetUsingSetForDistinct = false;
+ // for vectorized execution, we force it to using hash set to execution
+ if (distinctAggExprs.size() == 1
+ && distinctAggExprs.get(0).getFnParams().isDistinct()
+ && VectorizedUtil.isVectorized()
+ &&
ConnectContext.get().getSessionVariable().enableSingleDistinctColumnOpt()) {
+ isSetUsingSetForDistinct = true;
+ }
+ return isSetUsingSetForDistinct;
+ }
+
+ public static boolean estimateIfUsingSetForDistinct(List<FunctionCallExpr>
distinctAggExprs)
+ throws AnalysisException {
+ return estimateIfContainsMultiDistinct(distinctAggExprs)
+ || isSetUsingSetForDistinct(distinctAggExprs);
+ }
+
/**
* estimate if functions contains multi distinct
* @param distinctAggExprs
@@ -283,6 +304,7 @@ public final class AggregateInfo extends AggregateInfoBase {
hasMultiDistinct = true;
}
}
+
return hasMultiDistinct;
}
@@ -340,11 +362,11 @@ public final class AggregateInfo extends
AggregateInfoBase {
}
}
- this.isMultiDistinct =
estimateIfContainsMultiDistinct(distinctAggExprs);
+ this.isUsingSetForDistinct =
estimateIfUsingSetForDistinct(distinctAggExprs);
isDistinctAgg = true;
// add DISTINCT parameters to grouping exprs
- if (!isMultiDistinct) {
+ if (!isUsingSetForDistinct) {
groupingExprs.addAll(expr0Children);
}
@@ -399,12 +421,12 @@ public final class AggregateInfo extends
AggregateInfoBase {
&& !secondPhaseDistinctAggInfo.getAggregateExprs().isEmpty());
}
- public void setIsMultiDistinct(boolean value) {
- this.isMultiDistinct = value;
+ public void setIsUsingSetForDistinct(boolean value) {
+ this.isUsingSetForDistinct = value;
}
- public boolean isMultiDistinct() {
- return isMultiDistinct;
+ public boolean isUsingSetForDistinct() {
+ return isUsingSetForDistinct;
}
public AggregateInfo getSecondPhaseDistinctAggInfo() {
@@ -547,7 +569,7 @@ public final class AggregateInfo extends AggregateInfoBase {
AggPhase aggPhase =
(this.aggPhase == AggPhase.FIRST) ? AggPhase.FIRST_MERGE :
AggPhase.SECOND_MERGE;
- mergeAggInfo = new AggregateInfo(groupingExprs, aggExprs, aggPhase,
isMultiDistinct);
+ mergeAggInfo = new AggregateInfo(groupingExprs, aggExprs, aggPhase,
isUsingSetForDistinct);
mergeAggInfo.intermediateTupleDesc = intermediateTupleDesc;
mergeAggInfo.outputTupleDesc = outputTupleDesc;
mergeAggInfo.intermediateTupleSmap = intermediateTupleSmap;
@@ -616,7 +638,7 @@ public final class AggregateInfo extends AggregateInfoBase {
for (FunctionCallExpr inputExpr : distinctAggExprs) {
Preconditions.checkState(inputExpr.isAggregateFunction());
FunctionCallExpr aggExpr = null;
- if (!isMultiDistinct) {
+ if (!isUsingSetForDistinct) {
if
(inputExpr.getFnName().getFunction().equalsIgnoreCase(FunctionSet.COUNT)) {
// COUNT(DISTINCT ...) ->
// COUNT(IF(IsNull(<agg slot 1>), NULL, IF(IsNull(<agg
slot 2>), NULL, ...)))
@@ -682,7 +704,7 @@ public final class AggregateInfo extends AggregateInfoBase {
ArrayList<Expr> substGroupingExprs =
Expr.substituteList(origGroupingExprs, intermediateTupleSmap,
analyzer, false);
secondPhaseDistinctAggInfo =
- new AggregateInfo(substGroupingExprs, secondPhaseAggExprs,
AggPhase.SECOND, isMultiDistinct);
+ new AggregateInfo(substGroupingExprs, secondPhaseAggExprs,
AggPhase.SECOND, isUsingSetForDistinct);
secondPhaseDistinctAggInfo.createTupleDescs(analyzer);
secondPhaseDistinctAggInfo.createSecondPhaseAggSMap(this,
distinctAggExprs);
secondPhaseDistinctAggInfo.createMergeAggInfo(analyzer);
@@ -699,7 +721,7 @@ public final class AggregateInfo extends AggregateInfoBase {
ArrayList<SlotDescriptor> slotDescs = outputTupleDesc.getSlots();
int numDistinctParams = 0;
- if (!isMultiDistinct) {
+ if (!isUsingSetForDistinct) {
numDistinctParams = distinctAggExprs.get(0).getChildren().size();
} else {
for (int i = 0; i < distinctAggExprs.size(); i++) {
@@ -849,7 +871,7 @@ public final class AggregateInfo extends AggregateInfoBase {
outputTupleDesc.getSlots().get(groupExprsSize + i);
SlotDescriptor intermediateSlotDesc =
intermediateTupleDesc.getSlots().get(groupExprsSize + i);
- if (isDistinctAgg || isMultiDistinct) {
+ if (isDistinctAgg || isUsingSetForDistinct) {
slotDesc.setIsMaterialized(true);
intermediateSlotDesc.setIsMaterialized(true);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index 2440dc1956..62062243db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -1173,8 +1173,8 @@ public class SelectStmt extends QueryStmt {
}
}
final ExprSubstitutionMap result = new ExprSubstitutionMap();
- final boolean hasMultiDistinct =
AggregateInfo.estimateIfContainsMultiDistinct(distinctExprs);
- if (!hasMultiDistinct) {
+ final boolean isUsingSetForDistinct =
AggregateInfo.estimateIfUsingSetForDistinct(distinctExprs);
+ if (!isUsingSetForDistinct) {
return result;
}
for (FunctionCallExpr inputExpr : distinctExprs) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 5ea7e7524c..6c64d1b600 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -1069,7 +1069,7 @@ public class DistributedPlanner {
AggregateInfo firstPhaseAggInfo = ((AggregationNode)
node.getChild(0)).getAggInfo();
List<Expr> partitionExprs = null;
- boolean isMultiDistinct = node.getAggInfo().isMultiDistinct();
+ boolean isUsingSetForDistinct =
node.getAggInfo().isUsingSetForDistinct();
if (hasGrouping) {
// We need to do
// - child fragment:
@@ -1092,7 +1092,7 @@ public class DistributedPlanner {
// * phase 2 agg
// - merge fragment 2, unpartitioned:
// * merge agg of phase 2
- if (!isMultiDistinct) {
+ if (!isUsingSetForDistinct) {
partitionExprs =
Expr.substituteList(firstPhaseAggInfo.getGroupingExprs(),
firstPhaseAggInfo.getIntermediateSmap(),
ctx.getRootAnalyzer(), false);
}
@@ -1130,7 +1130,7 @@ public class DistributedPlanner {
mergeFragment.addPlanRoot(node);
}
- if (!hasGrouping && !isMultiDistinct) {
+ if (!hasGrouping && !isUsingSetForDistinct) {
// place the merge aggregation of the 2nd phase in an
unpartitioned fragment;
// add preceding merge fragment at end
if (mergeFragment != childFragment) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index f3281f56ce..bd2ef62fda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -174,6 +174,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_VECTORIZED_ENGINE =
"enable_vectorized_engine";
+ public static final String ENABLE_SINGLE_DISTINCT_COLUMN_OPT =
"enable_single_distinct_column_opt";
+
public static final String CPU_RESOURCE_LIMIT = "cpu_resource_limit";
public static final String ENABLE_PARALLEL_OUTFILE =
"enable_parallel_outfile";
@@ -234,6 +236,12 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
public boolean enableProfile = false;
+ // using hashset intead of group by + count can improve performance
+ // but may cause rpc failed when cluster has less BE
+ // Whether this switch is turned on depends on the BE number
+ @VariableMgr.VarAttr(name = ENABLE_SINGLE_DISTINCT_COLUMN_OPT)
+ public boolean enableSingleDistinctColumnOpt = false;
+
// Set sqlMode to empty string
@VariableMgr.VarAttr(name = SQL_MODE, needForward = true)
public long sqlMode = 0L;
@@ -540,6 +548,10 @@ public class SessionVariable implements Serializable,
Writable {
return enableProfile;
}
+ public boolean enableSingleDistinctColumnOpt() {
+ return enableSingleDistinctColumnOpt;
+ }
+
public int getWaitTimeoutS() {
return waitTimeoutS;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]