This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 226775f059f [Feature](Point Query) fully support in nereids #35823
(#36205)
226775f059f is described below
commit 226775f059f027402f8a687d08e93b2434cf482b
Author: lihangyu <[email protected]>
AuthorDate: Thu Jun 13 08:37:31 2024 +0800
[Feature](Point Query) fully support in nereids #35823 (#36205)
---
.../org/apache/doris/nereids/NereidsPlanner.java | 1 +
.../org/apache/doris/nereids/StatementContext.java | 27 +-
.../doris/nereids/jobs/executor/Rewriter.java | 3 +
.../org/apache/doris/nereids/rules/RuleType.java | 2 +
.../LogicalResultSinkToShortCircuitPointQuery.java | 108 +++++++
.../nereids/trees/expressions/Expression.java | 5 +
.../nereids/trees/expressions/literal/Literal.java | 2 +-
.../doris/nereids/trees/plans/PlaceholderId.java | 7 +-
.../trees/plans/commands/ExecuteCommand.java | 26 +-
.../org/apache/doris/planner/OlapScanNode.java | 6 +-
.../org/apache/doris/qe/PointQueryExecutor.java | 319 +++++++++++++++++++++
.../apache/doris/qe/PreparedStatementContext.java | 3 +
.../java/org/apache/doris/qe/SessionVariable.java | 4 +
.../apache/doris/qe/ShortCircuitQueryContext.java | 88 ++++++
.../java/org/apache/doris/qe/StmtExecutor.java | 17 +-
.../point_query_p0/test_point_query_partition.out | 15 +
.../test_compaction_uniq_keys_row_store.groovy | 7 +-
.../suites/point_query_p0/test_point_query.groovy | 33 ++-
.../test_point_query_partition.groovy | 41 ++-
19 files changed, 684 insertions(+), 30 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index 21c950c40c6..1ae1864ad3b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -107,6 +107,7 @@ public class NereidsPlanner extends Planner {
@Override
public void plan(StatementBase queryStmt,
org.apache.doris.thrift.TQueryOptions queryOptions) {
+ this.queryOptions = queryOptions;
if
(statementContext.getConnectContext().getSessionVariable().isEnableNereidsTrace())
{
NereidsTracer.init();
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
index deb7f5f76cc..4d026cf9ce3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
@@ -43,6 +43,7 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.qe.ShortCircuitQueryContext;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.statistics.Statistics;
@@ -122,8 +123,8 @@ public class StatementContext implements Closeable {
// generate for next id for prepared statement's placeholders, which is
connection level
private final IdGenerator<PlaceholderId> placeHolderIdGenerator =
PlaceholderId.createGenerator();
- // relation id to placeholders for prepared statement
- private final Map<PlaceholderId, Expression> idToPlaceholderRealExpr = new
HashMap<>();
+ // relation id to placeholders for prepared statement, ordered by
placeholder id
+ private final Map<PlaceholderId, Expression> idToPlaceholderRealExpr = new
TreeMap<>();
// collect all hash join conditions to compute node connectivity in join
graph
private final List<Expression> joinFilters = new ArrayList<>();
@@ -164,6 +165,12 @@ public class StatementContext implements Closeable {
// form this map
private final Map<RelationId, Statistics> relationIdToStatisticsMap = new
LinkedHashMap<>();
+ // Indicates the query is short-circuited in both plan and execution
phase, typically
+ // for high speed/concurrency point queries
+ private boolean isShortCircuitQuery;
+
+ private ShortCircuitQueryContext shortCircuitQueryContext;
+
public StatementContext() {
this(ConnectContext.get(), null, 0);
}
@@ -235,6 +242,22 @@ public class StatementContext implements Closeable {
}
}
+ public boolean isShortCircuitQuery() {
+ return isShortCircuitQuery;
+ }
+
+ public void setShortCircuitQuery(boolean shortCircuitQuery) {
+ isShortCircuitQuery = shortCircuitQuery;
+ }
+
+ public ShortCircuitQueryContext getShortCircuitQueryContext() {
+ return shortCircuitQueryContext;
+ }
+
+ public void setShortCircuitQueryContext(ShortCircuitQueryContext
shortCircuitQueryContext) {
+ this.shortCircuitQueryContext = shortCircuitQueryContext;
+ }
+
public Optional<SqlCacheContext> getSqlCacheContext() {
return Optional.ofNullable(sqlCacheContext);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index 28307ec7b3a..a240bebd905 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -82,6 +82,7 @@ import org.apache.doris.nereids.rules.rewrite.InferPredicates;
import org.apache.doris.nereids.rules.rewrite.InferSetOperatorDistinct;
import org.apache.doris.nereids.rules.rewrite.InlineLogicalView;
import org.apache.doris.nereids.rules.rewrite.LimitSortToTopN;
+import
org.apache.doris.nereids.rules.rewrite.LogicalResultSinkToShortCircuitPointQuery;
import org.apache.doris.nereids.rules.rewrite.MergeAggregate;
import org.apache.doris.nereids.rules.rewrite.MergeFilters;
import org.apache.doris.nereids.rules.rewrite.MergeOneRowRelationIntoUnion;
@@ -398,6 +399,8 @@ public class Rewriter extends AbstractBatchJobExecutor {
topic("topn optimize",
topDown(new DeferMaterializeTopNResult())
),
+ topic("Point query short circuit",
+ topDown(new LogicalResultSinkToShortCircuitPointQuery())),
topic("eliminate",
// SORT_PRUNING should be applied after mergeLimit
custom(RuleType.ELIMINATE_SORT, EliminateSort::new),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 864d8fd6bd3..721f9f8dff6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -340,6 +340,8 @@ public enum RuleType {
BUILD_AGG_FOR_RANDOM_DISTRIBUTED_TABLE_PROJECT_SCAN(RuleTypeClass.REWRITE),
BUILD_AGG_FOR_RANDOM_DISTRIBUTED_TABLE_FILTER_SCAN(RuleTypeClass.REWRITE),
BUILD_AGG_FOR_RANDOM_DISTRIBUTED_TABLE_AGG_SCAN(RuleTypeClass.REWRITE),
+ // short circuit rule
+ SHOR_CIRCUIT_POINT_QUERY(RuleTypeClass.REWRITE),
// exploration rules
REORDER_INTERSECT(RuleTypeClass.EXPLORATION),
TEST_EXPLORATION(RuleTypeClass.EXPLORATION),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/LogicalResultSinkToShortCircuitPointQuery.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/LogicalResultSinkToShortCircuitPointQuery.java
new file mode 100644
index 00000000000..1438edb9bdd
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/LogicalResultSinkToShortCircuitPointQuery.java
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * short circuit query optimization
+ * pattern : select xxx from tbl where key = ?
+ */
+public class LogicalResultSinkToShortCircuitPointQuery implements
RewriteRuleFactory {
+
+ private Expression removeCast(Expression expression) {
+ if (expression instanceof Cast) {
+ return expression.child(0);
+ }
+ return expression;
+ }
+
+ private boolean
filterMatchShortCircuitCondition(LogicalFilter<LogicalOlapScan> filter) {
+ return filter.getConjuncts().stream().allMatch(
+ // all conjuncts match with pattern `key = ?`
+ expression -> (expression instanceof EqualTo)
+ &&
(removeCast(expression.child(0)).isKeyColumnFromTable()
+ || ((SlotReference)
expression.child(0)).getName().equals(Column.DELETE_SIGN))
+ && expression.child(1).isLiteral());
+ }
+
+ private boolean scanMatchShortCircuitCondition(LogicalOlapScan olapScan) {
+ if
(!ConnectContext.get().getSessionVariable().enableShortCircuitQuery) {
+ return false;
+ }
+ OlapTable olapTable = olapScan.getTable();
+ return olapTable.getEnableLightSchemaChange() &&
olapTable.getEnableUniqueKeyMergeOnWrite()
+ && olapTable.storeRowColumn();
+ }
+
+ // set short circuit flag and return the original plan
+ private Plan shortCircuit(Plan root, OlapTable olapTable,
+ Set<Expression> conjuncts, StatementContext statementContext) {
+ // All key columns in conjuncts
+ Set<String> colNames = Sets.newHashSet();
+ for (Expression expr : conjuncts) {
+ colNames.add(((SlotReference)
removeCast((expr.child(0)))).getName());
+ }
+ // set short circuit flag and modify nothing to the plan
+ if (olapTable.getBaseSchemaKeyColumns().size() <= colNames.size()) {
+ statementContext.setShortCircuitQuery(true);
+ }
+ return root;
+ }
+
+ @Override
+ public List<Rule> buildRules() {
+ return ImmutableList.of(
+ RuleType.SHOR_CIRCUIT_POINT_QUERY.build(
+
logicalResultSink(logicalProject(logicalFilter(logicalOlapScan()
+ .when(this::scanMatchShortCircuitCondition)
+ ).when(this::filterMatchShortCircuitCondition)))
+ .thenApply(ctx -> {
+ return shortCircuit(ctx.root,
ctx.root.child().child().child().getTable(),
+
+
ctx.root.child().child().getConjuncts(), ctx.statementContext);
+ })),
+ RuleType.SHOR_CIRCUIT_POINT_QUERY.build(
+ logicalResultSink(logicalFilter(logicalOlapScan()
+ .when(this::scanMatchShortCircuitCondition)
+ ).when(this::filterMatchShortCircuitCondition))
+ .thenApply(ctx -> {
+ return shortCircuit(ctx.root,
ctx.root.child().child().getTable(),
+ ctx.root.child().getConjuncts(),
ctx.statementContext);
+ }))
+ );
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
index 75cef0fc946..f6c7cbdb66a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java
@@ -390,6 +390,11 @@ public abstract class Expression extends
AbstractTreeNode<Expression> implements
return (this instanceof SlotReference) && ((SlotReference)
this).getColumn().isPresent();
}
+ public boolean isKeyColumnFromTable() {
+ return (this instanceof SlotReference) && ((SlotReference)
this).getColumn().isPresent()
+ && ((SlotReference) this).getColumn().get().isKey();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/literal/Literal.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/literal/Literal.java
index 9c1096799ba..d81206f7100 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/literal/Literal.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/literal/Literal.java
@@ -539,7 +539,7 @@ public abstract class Literal extends Expression implements
LeafExpression, Comp
microsecond = data.getInt();
}
if (Config.enable_date_conversion) {
- return new DateTimeV2Literal(year, month, day, hour, minute,
second, microsecond);
+ return new DateTimeV2Literal(DateTimeV2Type.MAX, year, month,
day, hour, minute, second, microsecond);
}
return new DateTimeLiteral(DateTimeType.INSTANCE, year, month,
day, hour, minute, second, microsecond);
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlaceholderId.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlaceholderId.java
index f1d410100e1..be3cb645fe6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlaceholderId.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlaceholderId.java
@@ -23,7 +23,7 @@ import org.apache.doris.common.IdGenerator;
/**
* placeholder id for prepared statement parameters
*/
-public class PlaceholderId extends Id<PlaceholderId> {
+public class PlaceholderId extends Id<PlaceholderId> implements
Comparable<PlaceholderId> {
public PlaceholderId(int id) {
super(id);
@@ -55,4 +55,9 @@ public class PlaceholderId extends Id<PlaceholderId> {
public int hashCode() {
return super.hashCode();
}
+
+ @Override
+ public int compareTo(PlaceholderId o) {
+ return this.id - o.id;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java
index b098f883647..d5260a72cde 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExecuteCommand.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.commands;
+import org.apache.doris.analysis.Queriable;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
@@ -24,10 +25,13 @@ import
org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.PointQueryExecutor;
import org.apache.doris.qe.PreparedStatementContext;
+import org.apache.doris.qe.ShortCircuitQueryContext;
import org.apache.doris.qe.StmtExecutor;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
/**
@@ -65,8 +69,26 @@ public class ExecuteCommand extends Command {
LogicalPlanAdapter planAdapter = new
LogicalPlanAdapter(prepareCommand.getLogicalPlan(), executor.getContext()
.getStatementContext());
executor.setParsedStmt(planAdapter);
- // execute real statement
- executor.execute();
+ // If it's not a short circuit query or schema version is
different(indicates schema changed),
+ // need to do reanalyze and plan
+ boolean needAnalyze =
!executor.getContext().getStatementContext().isShortCircuitQuery()
+ || (preparedStmtCtx.shortCircuitQueryContext.isPresent()
+ &&
preparedStmtCtx.shortCircuitQueryContext.get().tbl.getBaseSchemaVersion()
+ !=
preparedStmtCtx.shortCircuitQueryContext.get().schemaVersion);
+ if (needAnalyze) {
+ // execute real statement
+ preparedStmtCtx.shortCircuitQueryContext = Optional.empty();
+ statementContext.setShortCircuitQueryContext(null);
+ executor.execute();
+ if
(executor.getContext().getStatementContext().isShortCircuitQuery()) {
+ // cache short-circuit plan
+ preparedStmtCtx.shortCircuitQueryContext = Optional.of(
+ new ShortCircuitQueryContext(executor.planner(),
(Queriable) executor.getParsedStmt()));
+
statementContext.setShortCircuitQueryContext(preparedStmtCtx.shortCircuitQueryContext.get());
+ }
+ return;
+ }
+ PointQueryExecutor.directExecuteShortCircuitQuery(executor,
preparedStmtCtx, statementContext);
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 4d86a89f27c..d312992bc80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -1122,7 +1122,8 @@ public class OlapScanNode extends ScanNode {
public boolean isPointQuery() {
return this.pointQueryEqualPredicats != null
- || (preparedStatment != null &&
preparedStatment.isPointQueryShortCircuit());
+ || (preparedStatment != null &&
preparedStatment.isPointQueryShortCircuit())
+ ||
ConnectContext.get().getStatementContext().isShortCircuitQuery();
}
private void computeTabletInfo() throws UserException {
@@ -1250,6 +1251,7 @@ public class OlapScanNode extends ScanNode {
scanTabletIds.clear();
bucketSeq2locations.clear();
scanReplicaIds.clear();
+ sampleTabletIds.clear();
try {
createScanRangeLocations();
} catch (AnalysisException e) {
@@ -1350,7 +1352,7 @@ public class OlapScanNode extends ScanNode {
output.append(prefix).append("pushAggOp=").append(pushDownAggNoGroupingOp).append("\n");
}
if (isPointQuery()) {
- output.append(prefix).append("SHORT-CIRCUIT");
+ output.append(prefix).append("SHORT-CIRCUIT\n");
}
if (!CollectionUtils.isEmpty(rewrittenProjectList)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
new file mode 100644
index 00000000000..3f5c24f5632
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
@@ -0,0 +1,319 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.MysqlCommand;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.KeyTuple;
+import org.apache.doris.proto.Types;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.rpc.TCustomProtocolFactory;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TResultBatch;
+import org.apache.doris.thrift.TScanRangeLocations;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+public class PointQueryExecutor implements CoordInterface {
+ private static final Logger LOG =
LogManager.getLogger(PointQueryExecutor.class);
+ private long tabletID = 0;
+ private long timeoutMs = Config.point_query_timeout_ms; // default 10s
+
+ private boolean isCancel = false;
+ private List<Backend> candidateBackends;
+ private final int maxMsgSizeOfResultReceiver;
+
+ // used for snapshot read in cloud mode
+ private List<Long> snapshotVisibleVersions;
+
+ private final ShortCircuitQueryContext shortCircuitQueryContext;
+
+ public PointQueryExecutor(ShortCircuitQueryContext ctx, int
maxMessageSize) {
+ ctx.sanitize();
+ this.shortCircuitQueryContext = ctx;
+ this.maxMsgSizeOfResultReceiver = maxMessageSize;
+ }
+
+ void setScanRangeLocations() throws Exception {
+ OlapScanNode scanNode = shortCircuitQueryContext.scanNode;
+ // compute scan range
+ List<TScanRangeLocations> locations =
scanNode.lazyEvaluateRangeLocations();
+ Preconditions.checkNotNull(locations);
+ if (scanNode.getScanTabletIds().isEmpty()) {
+ return;
+ }
+ Preconditions.checkState(scanNode.getScanTabletIds().size() == 1);
+ this.tabletID = scanNode.getScanTabletIds().get(0);
+
+ candidateBackends = new ArrayList<>();
+ for (Long backendID : scanNode.getScanBackendIds()) {
+ Backend backend = Env.getCurrentSystemInfo().getBackend(backendID);
+ if (SimpleScheduler.isAvailable(backend)) {
+ candidateBackends.add(backend);
+ }
+ }
+ // Random read replicas
+ Collections.shuffle(this.candidateBackends);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("set scan locations, backend ids {}, tablet id {}",
candidateBackends, tabletID);
+ }
+ }
+
+ // execute query without analyze & plan
+ public static void directExecuteShortCircuitQuery(StmtExecutor executor,
+ PreparedStatementContext preparedStmtCtx,
+ StatementContext statementContext) throws Exception {
+ Preconditions.checkNotNull(preparedStmtCtx.shortCircuitQueryContext);
+ ShortCircuitQueryContext shortCircuitQueryContext =
preparedStmtCtx.shortCircuitQueryContext.get();
+ // update conjuncts
+ List<Expr> conjunctVals =
statementContext.getIdToPlaceholderRealExpr().values().stream().map(
+ expression -> (
+ (Literal) expression).toLegacyLiteral())
+ .collect(Collectors.toList());
+ if (conjunctVals.size() != preparedStmtCtx.command.placeholderCount())
{
+ throw new AnalysisException("Mismatched conjuncts values size with
prepared"
+ + "statement parameters size, expected "
+ + preparedStmtCtx.command.placeholderCount()
+ + ", but meet " + conjunctVals.size());
+ }
+ updateScanNodeConjuncts(shortCircuitQueryContext.scanNode,
conjunctVals);
+ // short circuit plan and execution
+ executor.executeAndSendResult(false, false,
+ shortCircuitQueryContext.analzyedQuery, executor.getContext()
+ .getMysqlChannel(), null, null);
+ }
+
+ private static void updateScanNodeConjuncts(OlapScanNode scanNode,
List<Expr> conjunctVals) {
+ for (int i = 0; i < conjunctVals.size(); ++i) {
+ BinaryPredicate binaryPredicate = (BinaryPredicate)
scanNode.getConjuncts().get(i);
+ if (binaryPredicate.getChild(0) instanceof LiteralExpr) {
+ binaryPredicate.setChild(0, conjunctVals.get(i));
+ } else if (binaryPredicate.getChild(1) instanceof LiteralExpr) {
+ binaryPredicate.setChild(1, conjunctVals.get(i));
+ } else {
+ Preconditions.checkState(false, "Should conatains literal in "
+ binaryPredicate.toSqlImpl());
+ }
+ }
+ }
+
+ public void setTimeout(long timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ }
+
+ void addKeyTuples(
+ InternalService.PTabletKeyLookupRequest.Builder requestBuilder) {
+ // TODO handle IN predicates
+ KeyTuple.Builder kBuilder = KeyTuple.newBuilder();
+ for (Expr expr : shortCircuitQueryContext.scanNode.getConjuncts()) {
+ BinaryPredicate predicate = (BinaryPredicate) expr;
+ kBuilder.addKeyColumnRep(predicate.getChild(1).getStringValue());
+ }
+ requestBuilder.addKeyTuples(kBuilder);
+ }
+
+ @Override
+ public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
+ // Do nothing
+ }
+
+
+ @Override
+ public RowBatch getNext() throws Exception {
+ setScanRangeLocations();
+ // No partition/tablet found return emtpy row batch
+ if (candidateBackends == null || candidateBackends.isEmpty()) {
+ return new RowBatch();
+ }
+ Iterator<Backend> backendIter = candidateBackends.iterator();
+ RowBatch rowBatch = null;
+ int tryCount = 0;
+ int maxTry = Math.min(Config.max_point_query_retry_time,
candidateBackends.size());
+ Status status = new Status();
+ do {
+ Backend backend = backendIter.next();
+ rowBatch = getNextInternal(status, backend);
+ if (rowBatch != null) {
+ break;
+ }
+ if (++tryCount >= maxTry) {
+ break;
+ }
+ } while (true);
+ // handle status code
+ if (!status.ok()) {
+ if (Strings.isNullOrEmpty(status.getErrorMsg())) {
+ status.rewriteErrorMsg();
+ }
+ String errMsg = status.getErrorMsg();
+ LOG.warn("query failed: {}", errMsg);
+ if (status.isRpcError()) {
+ throw new RpcException(null, errMsg);
+ } else {
+ // hide host info
+ int hostIndex = errMsg.indexOf("host");
+ if (hostIndex != -1) {
+ errMsg = errMsg.substring(0, hostIndex);
+ }
+ throw new UserException(errMsg);
+ }
+ }
+ return rowBatch;
+ }
+
+ @Override
+ public void exec() throws Exception {
+ // Point queries don't need to do anthing in execution phase.
+ // only handles in getNext()
+ }
+
+ private RowBatch getNextInternal(Status status, Backend backend) throws
TException {
+ long timeoutTs = System.currentTimeMillis() + timeoutMs;
+ RowBatch rowBatch = new RowBatch();
+ InternalService.PTabletKeyLookupResponse pResult = null;
+ try {
+
Preconditions.checkNotNull(shortCircuitQueryContext.serializedDescTable);
+
+ InternalService.PTabletKeyLookupRequest.Builder requestBuilder
+ = InternalService.PTabletKeyLookupRequest.newBuilder()
+ .setTabletId(tabletID)
+ .setDescTbl(shortCircuitQueryContext.serializedDescTable)
+
.setOutputExpr(shortCircuitQueryContext.serializedOutputExpr)
+
.setQueryOptions(shortCircuitQueryContext.serializedQueryOptions)
+ .setIsBinaryRow(ConnectContext.get().command ==
MysqlCommand.COM_STMT_EXECUTE);
+ if (snapshotVisibleVersions != null &&
!snapshotVisibleVersions.isEmpty()) {
+ requestBuilder.setVersion(snapshotVisibleVersions.get(0));
+ }
+ if (shortCircuitQueryContext.cacheID != null) {
+ InternalService.UUID.Builder uuidBuilder =
InternalService.UUID.newBuilder();
+
uuidBuilder.setUuidHigh(shortCircuitQueryContext.cacheID.getMostSignificantBits());
+
uuidBuilder.setUuidLow(shortCircuitQueryContext.cacheID.getLeastSignificantBits());
+ requestBuilder.setUuid(uuidBuilder);
+ }
+ addKeyTuples(requestBuilder);
+
+ InternalService.PTabletKeyLookupRequest request =
requestBuilder.build();
+ Future<InternalService.PTabletKeyLookupResponse> futureResponse =
+
BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAddress(),
request);
+ long currentTs = System.currentTimeMillis();
+ if (currentTs >= timeoutTs) {
+ LOG.warn("fetch result timeout {}", backend.getBrpcAddress());
+ status.updateStatus(TStatusCode.INTERNAL_ERROR, "query request
timeout");
+ return null;
+ }
+ try {
+ pResult = futureResponse.get(timeoutTs - currentTs,
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // continue to get result
+ LOG.warn("future get interrupted Exception");
+ if (isCancel) {
+ status.updateStatus(TStatusCode.CANCELLED, "cancelled");
+ return null;
+ }
+ } catch (TimeoutException e) {
+ futureResponse.cancel(true);
+ LOG.warn("fetch result timeout {}, addr {}", timeoutTs -
currentTs, backend.getBrpcAddress());
+ status.updateStatus(TStatusCode.INTERNAL_ERROR, "query fetch
result timeout");
+ return null;
+ }
+ } catch (RpcException e) {
+ LOG.warn("query fetch rpc exception {}, e {}",
backend.getBrpcAddress(), e);
+ status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
+ SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
+ return null;
+ } catch (ExecutionException e) {
+ LOG.warn("query fetch execution exception {}, addr {}", e,
backend.getBrpcAddress());
+ if (e.getMessage().contains("time out")) {
+ // if timeout, we set error code to TIMEOUT, and it will not
retry querying.
+ status.updateStatus(TStatusCode.TIMEOUT, e.getMessage());
+ } else {
+ status.updateStatus(TStatusCode.THRIFT_RPC_ERROR,
e.getMessage());
+ SimpleScheduler.addToBlacklist(backend.getId(),
e.getMessage());
+ }
+ return null;
+ }
+ Status resultStatus = new Status(pResult.getStatus());
+ if (resultStatus.getErrorCode() != TStatusCode.OK) {
+ status.updateStatus(resultStatus.getErrorCode(),
resultStatus.getErrorMsg());
+ return null;
+ }
+
+ if (pResult.hasEmptyBatch() && pResult.getEmptyBatch()) {
+ LOG.debug("get empty rowbatch");
+ rowBatch.setEos(true);
+ status.updateStatus(TStatusCode.OK, "");
+ return rowBatch;
+ } else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) {
+ byte[] serialResult = pResult.getRowBatch().toByteArray();
+ TResultBatch resultBatch = new TResultBatch();
+ TDeserializer deserializer = new TDeserializer(
+ new
TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
+ try {
+ deserializer.deserialize(resultBatch, serialResult);
+ } catch (TException e) {
+ if (e.getMessage().contains("MaxMessageSize reached")) {
+ throw new TException("MaxMessageSize reached, try increase
max_msg_size_of_result_receiver");
+ } else {
+ throw e;
+ }
+ }
+ rowBatch.setBatch(resultBatch);
+ rowBatch.setEos(true);
+ status.updateStatus(TStatusCode.OK, "");
+ return rowBatch;
+ } else {
+ Preconditions.checkState(false, "No row batch or empty batch
found");
+ }
+
+ if (isCancel) {
+ status.updateStatus(TStatusCode.CANCELLED, "cancelled");
+ }
+ return rowBatch;
+ }
+
+ public void cancel() {
+ isCancel = true;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java
index d54b2e5291c..8decad79917 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PreparedStatementContext.java
@@ -20,11 +20,14 @@ package org.apache.doris.qe;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
+import java.util.Optional;
+
public class PreparedStatementContext {
public PrepareCommand command;
public ConnectContext ctx;
StatementContext statementContext;
public String stmtString;
+ public Optional<ShortCircuitQueryContext> shortCircuitQueryContext =
Optional.empty();
// Timestamp in millisecond last command starts at
protected volatile long startTime;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 471d7cd086d..88a3a5c6019 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -129,6 +129,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final String MAX_INSTANCE_NUM = "max_instance_num";
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
public static final String ENABLE_SPILLING = "enable_spilling";
+ public static final String ENABLE_SHORT_CIRCUIT_QUERY =
"enable_short_circuit_point_query";
public static final String ENABLE_EXCHANGE_NODE_PARALLEL_MERGE =
"enable_exchange_node_parallel_merge";
public static final String ENABLE_SERVER_SIDE_PREPARED_STATEMENT =
"enable_server_side_prepared_statement";
@@ -631,6 +632,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_SPILLING)
public boolean enableSpilling = false;
+ @VariableMgr.VarAttr(name = ENABLE_SHORT_CIRCUIT_QUERY)
+ public boolean enableShortCircuitQuery = true;
+
@VariableMgr.VarAttr(name = ENABLE_EXCHANGE_NODE_PARALLEL_MERGE)
public boolean enableExchangeNodeParallelMerge = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/ShortCircuitQueryContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShortCircuitQueryContext.java
new file mode 100644
index 00000000000..727eee11752
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShortCircuitQueryContext.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.Queriable;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.Planner;
+import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TExprList;
+import org.apache.doris.thrift.TQueryOptions;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class ShortCircuitQueryContext {
+ // Cached for better CPU performance, since serialize DescriptorTable and
+ // outputExprs are heavy work
+ public final ByteString serializedDescTable;
+ public final ByteString serializedOutputExpr;
+ public final ByteString serializedQueryOptions;
+
+ // For prepared statement cached structure,
+ // there are some pre-calculated structure in Backend TabletFetch service
+ // using this ID to find for this prepared statement
+ public final UUID cacheID;
+
+ public final int schemaVersion;
+ public final OlapTable tbl;
+
+ public final OlapScanNode scanNode;
+ public final Queriable analzyedQuery;
+ // Serialized mysql Field, this could avoid serialize mysql field each
time sendFields.
+ // Since, serialize fields is too heavy when table is wide
+ public Map<String, byte[]> serializedFields = Maps.newHashMap();
+
+
+ public ShortCircuitQueryContext(Planner planner, Queriable analzyedQuery)
throws TException {
+ this.serializedDescTable = ByteString.copyFrom(
+ new
TSerializer().serialize(planner.getDescTable().toThrift()));
+ TQueryOptions options = planner.getQueryOptions() != null ?
planner.getQueryOptions() : new TQueryOptions();
+ this.serializedQueryOptions = ByteString.copyFrom(
+ new TSerializer().serialize(options));
+ List<TExpr> exprs = new ArrayList<>();
+ for (Expr expr :
planner.getFragments().get(1).getPlanRoot().getProjectList()) {
+ exprs.add(expr.treeToThrift());
+ }
+ TExprList exprList = new TExprList(exprs);
+ serializedOutputExpr = ByteString.copyFrom(
+ new TSerializer().serialize(exprList));
+ this.cacheID = UUID.randomUUID();
+ this.scanNode = ((OlapScanNode) planner.getScanNodes().get(0));
+ this.tbl = this.scanNode.getOlapTable();
+ this.schemaVersion = this.tbl.getBaseSchemaVersion();
+ this.analzyedQuery = analzyedQuery;
+ }
+
+ public void sanitize() {
+ Preconditions.checkNotNull(serializedDescTable);
+ Preconditions.checkNotNull(serializedOutputExpr);
+ Preconditions.checkNotNull(cacheID);
+ Preconditions.checkNotNull(tbl);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index a0ba53f1fee..5bf75b99bfa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1622,7 +1622,7 @@ public class StmtExecutor {
planner.plan(newSelectStmt,
context.getSessionVariable().toThrift());
}
}
- sendResult(false, isSendFields, queryStmt, channel, cacheAnalyzer,
cacheResult);
+ executeAndSendResult(false, isSendFields, queryStmt, channel,
cacheAnalyzer, cacheResult);
}
// Process a select statement.
@@ -1704,11 +1704,12 @@ public class StmtExecutor {
}
}
- sendResult(isOutfileQuery, false, queryStmt, channel, null, null);
+ executeAndSendResult(isOutfileQuery, false, queryStmt, channel, null,
null);
LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
}
- private void sendResult(boolean isOutfileQuery, boolean isSendFields,
Queriable queryStmt, MysqlChannel channel,
+ public void executeAndSendResult(boolean isOutfileQuery, boolean
isSendFields,
+ Queriable queryStmt, MysqlChannel channel,
CacheAnalyzer cacheAnalyzer, InternalService.PFetchCacheResult
cacheResult) throws Exception {
// 1. If this is a query with OUTFILE clause, eg: select * from tbl1
into outfile xxx,
// We will not send real query result to client. Instead, we only
send OK to client with
@@ -1719,7 +1720,15 @@ public class StmtExecutor {
// 2. If this is a query, send the result expr fields first, and send
result data back to client.
RowBatch batch;
CoordInterface coordBase = null;
- if (queryStmt instanceof SelectStmt && ((SelectStmt)
parsedStmt).isPointQueryShortCircuit()) {
+ if (statementContext.isShortCircuitQuery()) {
+ ShortCircuitQueryContext shortCircuitQueryContext =
+ statementContext.getShortCircuitQueryContext() != null
+ ?
statementContext.getShortCircuitQueryContext()
+ : new ShortCircuitQueryContext(planner,
(Queriable) parsedStmt);
+ coordBase = new PointQueryExecutor(shortCircuitQueryContext,
+
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
+ } else if (queryStmt instanceof SelectStmt && ((SelectStmt)
parsedStmt).isPointQueryShortCircuit()) {
+ // this branch is for legacy planner, to be removed
coordBase = new PointQueryExec(planner, analyzer,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
} else {
diff --git a/regression-test/data/point_query_p0/test_point_query_partition.out
b/regression-test/data/point_query_p0/test_point_query_partition.out
index cd22e6c93ec..bef064984c8 100644
--- a/regression-test/data/point_query_p0/test_point_query_partition.out
+++ b/regression-test/data/point_query_p0/test_point_query_partition.out
@@ -31,3 +31,18 @@
-- !point_select --
+-- !point_selectxxx --
+686612 686612 686612 \N \N \N \N \N \N \N
\N
+
+-- !point_selectyyy --
+686612 686612 686612 \N \N \N \N \N \N \N
\N
+
+-- !point_selectzzz --
+686612 686612 686612 \N \N \N \N \N \N \N
\N
+
+-- !point_selectmmm --
+686613 686613 686613 \N \N \N \N \N \N \N
\N
+
+-- !point_selecteee --
+686613 686613 686613 \N \N \N \N \N \N \N
\N
+
diff --git
a/regression-test/suites/compaction/test_compaction_uniq_keys_row_store.groovy
b/regression-test/suites/compaction/test_compaction_uniq_keys_row_store.groovy
index e493d6b0368..5c3011f3882 100644
---
a/regression-test/suites/compaction/test_compaction_uniq_keys_row_store.groovy
+++
b/regression-test/suites/compaction/test_compaction_uniq_keys_row_store.groovy
@@ -18,7 +18,7 @@
import org.codehaus.groovy.runtime.IOGroovyMethods
-suite("test_compaction_uniq_keys_row_store") {
+suite("test_compaction_uniq_keys_row_store", "nonConcurrent") {
def realDb = "regression_test_serving_p0"
def tableName = realDb + ".compaction_uniq_keys_row_store_regression_test"
sql "CREATE DATABASE IF NOT EXISTS ${realDb}"
@@ -36,6 +36,8 @@ suite("test_compaction_uniq_keys_row_store") {
stmt.setInt(8, sex)
}
+ sql "set global enable_server_side_prepared_statement = true"
+
try {
String backend_id;
def backendId_to_backendIP = [:]
@@ -76,7 +78,7 @@ suite("test_compaction_uniq_keys_row_store") {
// set server side prepared statment url
def url="jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + realDb +
"?&useServerPrepStmts=true"
def result1 = connect(user=user, password=password, url=url) {
- def stmt = prepareStatement """ SELECT * FROM ${tableName} t
where user_id = ? and date = ? and datev2 = ? and datetimev2_1 = ? and
datetimev2_2 = ? and city = ? and age = ? and sex = ?; """
+ def stmt = prepareStatement """ SELECT /*+
SET_VAR(enable_nereids_planner=true,enable_fallback_to_original_planner=false)
*/ * FROM ${tableName} t where user_id = ? and date = ? and datev2 = ? and
datetimev2_1 = ? and datetimev2_2 = ? and city = ? and age = ? and sex = ?; """
setPrepareStmtArgs stmt, 1, '2017-10-01', '2017-10-01',
'2017-10-01 11:11:11.21', '2017-10-01 11:11:11.11', 'Beijing', 10, 1
qe_point_select stmt
setPrepareStmtArgs stmt, 1, '2017-10-01', '2017-10-01',
'2017-10-01 11:11:11.22', '2017-10-01 11:11:11.12', 'Beijing', 10, 1
@@ -211,4 +213,5 @@ suite("test_compaction_uniq_keys_row_store") {
} finally {
// try_sql("DROP TABLE IF EXISTS ${tableName}")
}
+ sql "set global enable_server_side_prepared_statement = false"
}
diff --git a/regression-test/suites/point_query_p0/test_point_query.groovy
b/regression-test/suites/point_query_p0/test_point_query.groovy
index c1a785612fe..68df19e6620 100644
--- a/regression-test/suites/point_query_p0/test_point_query.groovy
+++ b/regression-test/suites/point_query_p0/test_point_query.groovy
@@ -30,8 +30,9 @@ suite("test_point_query", "nonConcurrent") {
try {
set_be_config.call("disable_storage_row_cache", "false")
// nereids do not support point query now
- sql """set global enable_nereids_planner=false"""
-
+ sql "set global enable_fallback_to_original_planner = false"
+ sql """set global enable_nereids_planner=true"""
+ sql "set global enable_server_side_prepared_statement = true"
def user = context.config.jdbcUser
def password = context.config.jdbcPassword
def realDb = "regression_test_serving_p0"
@@ -138,7 +139,7 @@ suite("test_point_query", "nonConcurrent") {
sql """ INSERT INTO ${tableName} VALUES(298, 120939.11130,
"${generateString(298)}", "laooq", "2030-01-02", "2020-01-01 12:36:38", 298,
"7022-01-01 11:30:38", 1, 90696620686827832.374, [], []) """
def result1 = connect(user=user, password=password,
url=prepare_url) {
- def stmt = prepareStatement "select /*+
SET_VAR(enable_nereids_planner=false) */ * from ${tableName} where k1 = ? and
k2 = ? and k3 = ?"
+ def stmt = prepareStatement "select /*+
SET_VAR(enable_nereids_planner=true) */ * from ${tableName} where k1 = ? and k2
= ? and k3 = ?"
assertEquals(stmt.class,
com.mysql.cj.jdbc.ServerPreparedStatement);
stmt.setInt(1, 1231)
stmt.setBigDecimal(2, new BigDecimal("119291.11"))
@@ -174,13 +175,14 @@ suite("test_point_query", "nonConcurrent") {
qe_point_select stmt
stmt.close()
- stmt = prepareStatement "select /*+
SET_VAR(enable_nereids_planner=false) */ * from ${tableName} where k1 = 1235
and k2 = ? and k3 = ?"
+ stmt = prepareStatement "select /*+
SET_VAR(enable_nereids_planner=true) */ * from ${tableName} where k1 = ? and k2
= ? and k3 = ?"
assertEquals(stmt.class,
com.mysql.cj.jdbc.ServerPreparedStatement);
- stmt.setBigDecimal(1, new BigDecimal("991129292901.11138"))
- stmt.setString(2, "dd")
+ stmt.setInt(1, 1235)
+ stmt.setBigDecimal(2, new BigDecimal("991129292901.11138"))
+ stmt.setString(3, "dd")
qe_point_select stmt
- def stmt_fn = prepareStatement "select /*+
SET_VAR(enable_nereids_planner=false) */ hex(k3), hex(k4) from ${tableName}
where k1 = ? and k2 =? and k3 = ?"
+ def stmt_fn = prepareStatement "select /*+
SET_VAR(enable_nereids_planner=true) */ hex(k3), hex(k4) from ${tableName}
where k1 = ? and k2 =? and k3 = ?"
assertEquals(stmt_fn.class,
com.mysql.cj.jdbc.ServerPreparedStatement);
stmt_fn.setInt(1, 1231)
stmt_fn.setBigDecimal(2, new BigDecimal("119291.11"))
@@ -194,8 +196,9 @@ suite("test_point_query", "nonConcurrent") {
"""
sleep(1);
nprep_sql """ INSERT INTO ${tableName} VALUES(1235,
120939.11130, "a ddd", "laooq", "2030-01-02", "2020-01-01 12:36:38", 22.822,
"7022-01-01 11:30:38", 1, 1.1111299, [119291.19291], ["111", "222", "333"], 1)
"""
- stmt.setBigDecimal(1, new BigDecimal("120939.11130"))
- stmt.setString(2, "a ddd")
+ stmt.setBigDecimal(1, 1235)
+ stmt.setBigDecimal(2, new BigDecimal("120939.11130"))
+ stmt.setString(3, "a ddd")
qe_point_select stmt
qe_point_select stmt
// invalidate cache
@@ -222,9 +225,9 @@ suite("test_point_query", "nonConcurrent") {
}
// disable useServerPrepStmts
def result2 = connect(user=user, password=password,
url=context.config.jdbcUrl) {
- qt_sql """select /*+ SET_VAR(enable_nereids_planner=false) */
* from ${tableName} where k1 = 1231 and k2 = 119291.11 and k3 = 'ddd'"""
- qt_sql """select /*+ SET_VAR(enable_nereids_planner=false) */
* from ${tableName} where k1 = 1237 and k2 = 120939.11130 and k3 = 'a ddd'"""
- qt_sql """select /*+ SET_VAR(enable_nereids_planner=false) */
hex(k3), hex(k4), k7 + 10.1 from ${tableName} where k1 = 1237 and k2 =
120939.11130 and k3 = 'a ddd'"""
+ qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ *
from ${tableName} where k1 = 1231 and k2 = 119291.11 and k3 = 'ddd'"""
+ qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ *
from ${tableName} where k1 = 1237 and k2 = 120939.11130 and k3 = 'a ddd'"""
+ qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */
hex(k3), hex(k4), k7 + 10.1 from ${tableName} where k1 = 1237 and k2 =
120939.11130 and k3 = 'a ddd'"""
// prepared text
// sql """ prepare stmt1 from select * from ${tableName}
where k1 = % and k2 = % and k3 = % """
// qt_sql """execute stmt1 using (1231, 119291.11, 'ddd')"""
@@ -253,7 +256,7 @@ suite("test_point_query", "nonConcurrent") {
"disable_auto_compaction" = "false"
);"""
sql """insert into ${tableName} values (0, "1", "2", "3")"""
- qt_sql """select /*+ SET_VAR(enable_nereids_planner=false) */
* from ${tableName} where customer_key = 0"""
+ qt_sql """select /*+ SET_VAR(enable_nereids_planner=true) */ *
from ${tableName} where customer_key = 0"""
}
}
sql "DROP TABLE IF EXISTS test_ODS_EBA_LLREPORT";
@@ -271,9 +274,11 @@ suite("test_point_query", "nonConcurrent") {
);
"""
sql "insert into test_ODS_EBA_LLREPORT(RPTNO) values('567890')"
- sql "select /*+ SET_VAR(enable_nereids_planner=false) */
substr(RPTNO,2,5) from test_ODS_EBA_LLREPORT where RPTNO = '567890'"
+ sql "select /*+ SET_VAR(enable_nereids_planner=true) */
substr(RPTNO,2,5) from test_ODS_EBA_LLREPORT where RPTNO = '567890'"
} finally {
set_be_config.call("disable_storage_row_cache", "true")
sql """set global enable_nereids_planner=true"""
+ sql "set global enable_fallback_to_original_planner = true"
+ sql "set global enable_server_side_prepared_statement = false"
}
}
\ No newline at end of file
diff --git
a/regression-test/suites/point_query_p0/test_point_query_partition.groovy
b/regression-test/suites/point_query_p0/test_point_query_partition.groovy
index 7b5966db0c1..459911f25ce 100644
--- a/regression-test/suites/point_query_p0/test_point_query_partition.groovy
+++ b/regression-test/suites/point_query_p0/test_point_query_partition.groovy
@@ -17,13 +17,13 @@
import java.math.BigDecimal;
-suite("test_point_query_partition") {
+suite("test_point_query_partition", "nonConcurrent") {
def user = context.config.jdbcUser
def password = context.config.jdbcPassword
def realDb = "regression_test_serving_p0"
def tableName = realDb + ".tbl_point_query_partition"
sql "CREATE DATABASE IF NOT EXISTS ${realDb}"
-
+ sql "set global enable_server_side_prepared_statement = true"
// Parse url
String jdbcUrl = context.config.jdbcUrl
String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
@@ -114,4 +114,41 @@ suite("test_point_query_partition") {
stmt.setInt(1, 1000)
qe_point_select stmt
}
+
+ sql "DROP TABLE IF EXISTS regression_test_serving_p0.customer";
+ sql """
+ CREATE TABLE regression_test_serving_p0.customer (
+ `customer_key` BIGINT NULL,
+ `customer_value_0` TEXT NULL,
+ `customer_value_1` TEXT NULL,
+ `customer_value_2` TEXT NULL,
+ `customer_value_3` TEXT NULL,
+ `customer_value_4` TEXT NULL,
+ `customer_value_5` TEXT NULL,
+ `customer_value_6` TEXT NULL,
+ `customer_value_7` TEXT NULL,
+ `customer_value_8` TEXT NULL,
+ `customer_value_10` TEXT NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`customer_key`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`customer_key`) BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "store_row_column" = "true"
+ );
+ """
+ sql """insert into regression_test_serving_p0.customer(customer_key,
customer_value_0, customer_value_1) values(686612, "686612", "686612")"""
+ sql """insert into regression_test_serving_p0.customer(customer_key,
customer_value_0, customer_value_1) values(686613, "686613", "686613")"""
+ def result3 = connect(user=user, password=password, url=prepare_url) {
+ def stmt = prepareStatement "select /*+
SET_VAR(enable_nereids_planner=true) */ * from
regression_test_serving_p0.customer where customer_key = ?"
+ stmt.setInt(1, 686612)
+ qe_point_selectxxx stmt
+ qe_point_selectyyy stmt
+ qe_point_selectzzz stmt
+ stmt.setInt(1, 686613)
+ qe_point_selectmmm stmt
+ qe_point_selecteee stmt
+ }
+ sql "set global enable_server_side_prepared_statement = false"
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]