This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 439c2fd1efa0054bb642c24ed6016029be625cee Author: lihangyu <[email protected]> AuthorDate: Wed Aug 21 10:43:37 2024 +0800 [Refactor](Prepared Statement) remove implementation in legacy planner (#39465) --- fe/fe-core/src/main/cup/sql_parser.cup | 22 -- .../java/org/apache/doris/analysis/Analyzer.java | 11 - .../org/apache/doris/analysis/ExecuteStmt.java | 62 ---- .../org/apache/doris/analysis/PrepareStmt.java | 292 ---------------- .../org/apache/doris/planner/OlapScanNode.java | 44 +-- .../org/apache/doris/planner/OriginalPlanner.java | 20 +- .../java/org/apache/doris/qe/ConnectContext.java | 15 +- .../java/org/apache/doris/qe/ConnectProcessor.java | 12 +- .../main/java/org/apache/doris/qe/Coordinator.java | 6 - .../org/apache/doris/qe/MysqlConnectProcessor.java | 96 +---- .../java/org/apache/doris/qe/PointQueryExec.java | 386 --------------------- .../org/apache/doris/qe/PrepareStmtContext.java | 54 --- .../java/org/apache/doris/qe/StmtExecutor.java | 128 +------ .../data/variant_p0/variant_with_rowstore.out | 2 +- .../suites/point_query_p0/test_point_query.groovy | 5 +- .../suites/variant_p0/variant_with_rowstore.groovy | 6 +- 16 files changed, 44 insertions(+), 1117 deletions(-) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 70f5f5a46b2..42577629ba5 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -766,8 +766,6 @@ nonterminal BackupStmt backup_stmt; nonterminal AbstractBackupTableRefClause opt_backup_table_ref_list; nonterminal Boolean backup_exclude_or_not; nonterminal RestoreStmt restore_stmt; -nonterminal PrepareStmt prepare_stmt; -nonterminal ExecuteStmt execute_stmt; nonterminal SelectList select_clause, select_list, select_sublist; @@ -1275,10 +1273,6 @@ stmt ::= {: RESULT = stmt; :} | show_mtmv_stmt : stmt {: RESULT = stmt; :} - | prepare_stmt:stmt - {: RESULT = stmt; :} - | execute_stmt:stmt - {: RESULT = stmt; :} | warm_up_stmt:stmt {: RESULT = stmt; :} | /* empty: query only has comments */ @@ -5869,22 +5863,6 @@ expr_or_default ::= :} ; -prepare_stmt ::= - KW_PREPARE variable_name:name KW_FROM select_stmt:s - {: - RESULT = new PrepareStmt(s, name); - s.setPlaceHolders(parser.placeholder_expr_list); - parser.placeholder_expr_list.clear(); - :} - ; - -execute_stmt ::= - KW_EXECUTE variable_name:name args_list:s - {: - RESULT = new ExecuteStmt(name, s); - :} - ; - literal_values ::= literal:value {: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 861dc517a04..a13a6731b28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -508,9 +508,6 @@ public class Analyzer { private final GlobalState globalState; - // Attached PrepareStmt - public PrepareStmt prepareStmt; - private final InferPredicateState inferPredicateState; // An analyzer stores analysis state for a single select block. A select block can be @@ -618,14 +615,6 @@ public class Analyzer { return callDepth; } - public void setPrepareStmt(PrepareStmt stmt) { - prepareStmt = stmt; - } - - public PrepareStmt getPrepareStmt() { - return prepareStmt; - } - public void setInlineView(boolean inlineView) { isInlineView = inlineView; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExecuteStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExecuteStmt.java deleted file mode 100644 index 4805f8c97e1..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExecuteStmt.java +++ /dev/null @@ -1,62 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.analysis; - -import java.util.List; - -public class ExecuteStmt extends StatementBase { - private String stmtName; - private List<LiteralExpr> args; - - public ExecuteStmt(String stmtName, List<LiteralExpr> args) { - this.stmtName = stmtName; - this.args = args; - } - - public String getName() { - return stmtName; - } - - public List<LiteralExpr> getArgs() { - return args; - } - - @Override - public RedirectStatus getRedirectStatus() { - return RedirectStatus.NO_FORWARD; - } - - @Override - public String toSql() { - String sql = "EXECUTE("; - int size = args.size(); - for (int i = 0; i < size; ++i) { - sql += args.get(i).toSql(); - if (i < size - 1) { - sql += ", "; - } - } - sql += ")"; - return sql; - } - - @Override - public StmtType stmtType() { - return StmtType.EXECUTE; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/PrepareStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/PrepareStmt.java deleted file mode 100644 index 1c7b5459979..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PrepareStmt.java +++ /dev/null @@ -1,292 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.analysis; - -// import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.common.UserException; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.thrift.TDescriptorTable; -import org.apache.doris.thrift.TExpr; -import org.apache.doris.thrift.TExprList; -import org.apache.doris.thrift.TQueryOptions; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -public class PrepareStmt extends StatementBase { - // We provide bellow types of prepared statement: - // NONE, which is not prepared - // FULL_PREPARED, which is real prepared, which will cache analyzed statement and planner - // STATEMENT, which only cache statement it self, but need to analyze each time executed. - public enum PreparedType { - NONE, FULL_PREPARED, STATEMENT - } - - private static final Logger LOG = LogManager.getLogger(PrepareStmt.class); - private StatementBase inner; - private String stmtName; - // Cached for better CPU performance, since serialize DescriptorTable and - // outputExprs are heavy work - private ByteString serializedDescTable; - private ByteString serializedOutputExpr; - private ByteString serializedQueryOptions; - - - private UUID id; - private int schemaVersion = -1; - private OlapTable tbl; - private ConnectContext context; - private PreparedType preparedType = PreparedType.STATEMENT; - boolean isPointQueryShortCircuit = false; - - private TDescriptorTable descTable; - // Serialized mysql Field, this could avoid serialize mysql field each time sendFields. - // Since, serialize fields is too heavy when table is wide - Map<String, byte[]> serializedFields = Maps.newHashMap(); - - public PrepareStmt(StatementBase stmt, String name) { - this.inner = stmt; - this.stmtName = name; - this.id = UUID.randomUUID(); - } - - public void setContext(ConnectContext ctx) { - this.context = ctx; - } - - public boolean needReAnalyze() { - if (preparedType == PreparedType.FULL_PREPARED - && schemaVersion == tbl.getBaseSchemaVersion()) { - return false; - } - reset(); - return true; - } - - public TDescriptorTable getDescTable() { - return descTable; - } - - public UUID getID() { - return id; - } - - public byte[] getSerializedField(String colName) { - return serializedFields.getOrDefault(colName, null); - } - - public void setSerializedField(String colName, byte[] serializedField) { - serializedFields.put(colName, serializedField); - } - - public void cacheSerializedDescriptorTable(DescriptorTable desctbl) { - try { - descTable = desctbl.toThrift(); - serializedDescTable = ByteString.copyFrom( - new TSerializer().serialize(descTable)); - } catch (TException e) { - LOG.warn("failed to serilize DescriptorTable, {}", e.getMessage()); - Preconditions.checkState(false, e.getMessage()); - } - } - - public void cacheSerializedOutputExprs(List<Expr> outExprs) { - List<TExpr> exprs = new ArrayList<>(); - for (Expr expr : outExprs) { - exprs.add(expr.treeToThrift()); - } - TExprList exprList = new TExprList(exprs); - try { - serializedOutputExpr = ByteString.copyFrom( - new TSerializer().serialize(exprList)); - } catch (TException e) { - LOG.warn("failed to serilize TExprList, {}", e.getMessage()); - Preconditions.checkState(false, e.getMessage()); - } - } - - public void cacheSerializedQueryOptions(TQueryOptions queryOptions) { - try { - serializedQueryOptions = ByteString.copyFrom( - new TSerializer().serialize(queryOptions)); - } catch (TException e) { - LOG.warn("failed to serilize queryOptions , {}", e.getMessage()); - Preconditions.checkState(false, e.getMessage()); - } - } - - public ByteString getSerializedDescTable() { - return serializedDescTable; - } - - public ByteString getSerializedOutputExprs() { - return serializedOutputExpr; - } - - public ByteString getSerializedQueryOptions() { - return serializedQueryOptions; - } - - public boolean isPointQueryShortCircuit() { - return isPointQueryShortCircuit; - } - - @Override - public void analyze(Analyzer analyzer) throws UserException { - // TODO support more Statement - if (!(inner instanceof SelectStmt) && !(inner instanceof NativeInsertStmt)) { - throw new UserException("Only support prepare SelectStmt or NativeInsertStmt"); - } - analyzer.setPrepareStmt(this); - if (inner instanceof SelectStmt) { - // Try to use FULL_PREPARED to increase performance - SelectStmt selectStmt = (SelectStmt) inner; - try { - // Use tmpAnalyzer since selectStmt will be reAnalyzed - Analyzer tmpAnalyzer = new Analyzer(context.getEnv(), context); - inner.analyze(tmpAnalyzer); - // Case 1 short circuit point query - if (selectStmt.checkAndSetPointQuery()) { - tbl = (OlapTable) selectStmt.getTableRefs().get(0).getTable(); - schemaVersion = tbl.getBaseSchemaVersion(); - preparedType = PreparedType.FULL_PREPARED; - isPointQueryShortCircuit = true; - LOG.debug("using FULL_PREPARED prepared"); - return; - } - } catch (UserException e) { - LOG.debug("fallback to STATEMENT prepared, {}", e); - } finally { - // will be reanalyzed - selectStmt.reset(); - } - // use session var to decide whether to use full prepared or let user client handle to do fail over - if (preparedType != PreparedType.FULL_PREPARED - && !ConnectContext.get().getSessionVariable().enableServeSidePreparedStatement) { - throw new UserException("Failed to prepare statement" - + "try to set enable_server_side_prepared_statement = true"); - } - } else if (inner instanceof NativeInsertStmt) { - LabelName label = ((NativeInsertStmt) inner).getLoadLabel(); - if (label == null || Strings.isNullOrEmpty(label.getLabelName())) { - analyzer.setPrepareStmt(this); - preparedType = PreparedType.STATEMENT; - } else { - throw new UserException("Only support prepare InsertStmt without label now"); - } - } - preparedType = PreparedType.STATEMENT; - LOG.debug("using STATEMENT prepared"); - } - - public String getName() { - return stmtName; - } - - @Override - public RedirectStatus getRedirectStatus() { - return RedirectStatus.NO_FORWARD; - } - - public List<PlaceHolderExpr> placeholders() { - return inner.getPlaceHolders(); - } - - public int getParmCount() { - return inner.getPlaceHolders().size(); - } - - public PreparedType getPreparedType() { - return preparedType; - } - - public List<Expr> getPlaceHolderExprList() { - ArrayList<Expr> slots = new ArrayList<>(); - for (PlaceHolderExpr pexpr : inner.getPlaceHolders()) { - slots.add(pexpr); - } - return slots; - } - - public List<String> getColLabelsOfPlaceHolders() { - ArrayList<String> lables = new ArrayList<>(); - for (int i = 0; i < inner.getPlaceHolders().size(); ++i) { - lables.add("lable " + i); - } - return lables; - } - - public StatementBase getInnerStmt() { - if (preparedType == PreparedType.FULL_PREPARED) { - // For performance reason we could reuse the inner statement when FULL_PREPARED - return inner; - } - // Make a copy of Statement, since anlyze will modify the structure of Statement. - // But we should keep the original statement - if (inner instanceof SelectStmt) { - return new SelectStmt((SelectStmt) inner); - } - // Other statement could reuse the inner statement - return inner; - } - - public int argsSize() { - return inner.getPlaceHolders().size(); - } - - public void asignValues(List<LiteralExpr> values) throws UserException { - if (values.size() != inner.getPlaceHolders().size()) { - throw new UserException("Invalid arguments size " - + values.size() + ", expected " + inner.getPlaceHolders().size()); - } - for (int i = 0; i < values.size(); ++i) { - inner.getPlaceHolders().get(i).setLiteral(values.get(i)); - inner.getPlaceHolders().get(i).analysisDone(); - } - if (!values.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("assign values {}", values.get(0).toSql()); - } - } - } - - @Override - public void reset() { - serializedDescTable = null; - serializedOutputExpr = null; - descTable = null; - this.id = UUID.randomUUID(); - inner.reset(); - if (inner instanceof NativeInsertStmt) { - ((NativeInsertStmt) inner).resetPrepare(); - } - serializedFields.clear(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index dffbba37cfe..9baee8e9592 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -29,7 +29,6 @@ import org.apache.doris.analysis.InPredicate; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.PartitionNames; -import org.apache.doris.analysis.PrepareStmt; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.SlotRef; @@ -193,7 +192,6 @@ public class OlapScanNode extends ScanNode { public ArrayListMultimap<Integer, TScanRangeLocations> bucketSeq2locations = ArrayListMultimap.create(); public Map<Integer, Long> bucketSeq2Bytes = Maps.newLinkedHashMap(); - boolean isFromPrepareStmt = false; // For point query private Map<SlotRef, Expr> pointQueryEqualPredicats; private DescriptorTable descTable; @@ -208,7 +206,6 @@ public class OlapScanNode extends ScanNode { // only used in short circuit plan at present private final PartitionPruneV2ForShortCircuitPlan cachedPartitionPruner = new PartitionPruneV2ForShortCircuitPlan(); - PrepareStmt preparedStatment = null; // Constructs node to scan given data files of table 'tbl'. public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { @@ -539,16 +536,12 @@ public class OlapScanNode extends ScanNode { super.init(analyzer); filterDeletedRows(analyzer); - // point query could do lazy evaluation, since stmt is a prepared statment - preparedStatment = analyzer.getPrepareStmt(); - if (preparedStatment == null || !preparedStatment.isPointQueryShortCircuit()) { - if (olapTable.getPartitionInfo().enableAutomaticPartition()) { - partitionsInfo = olapTable.getPartitionInfo(); - analyzerPartitionExpr(analyzer, partitionsInfo); - } - computeColumnsFilter(); - computePartitionInfo(); + if (olapTable.getPartitionInfo().enableAutomaticPartition()) { + partitionsInfo = olapTable.getPartitionInfo(); + analyzerPartitionExpr(analyzer, partitionsInfo); } + computeColumnsFilter(); + computePartitionInfo(); computeTupleState(analyzer); /** @@ -606,13 +599,10 @@ public class OlapScanNode extends ScanNode { cardinality = 0; } - // prepare stmt evaluate lazily in Coordinator execute - if (preparedStatment == null || !preparedStatment.isPointQueryShortCircuit()) { - try { - createScanRangeLocations(); - } catch (AnalysisException e) { - throw new UserException(e.getMessage()); - } + try { + createScanRangeLocations(); + } catch (AnalysisException e) { + throw new UserException(e.getMessage()); } // Relatively accurate cardinality according to ScanRange in @@ -1143,22 +1133,8 @@ public class OlapScanNode extends ScanNode { } } - public boolean isFromPrepareStmt() { - return this.isFromPrepareStmt; - } - - public void setPointQueryEqualPredicates(Map<SlotRef, Expr> predicates) { - this.pointQueryEqualPredicats = predicates; - } - - public Map<SlotRef, Expr> getPointQueryEqualPredicates() { - return this.pointQueryEqualPredicats; - } - public boolean isPointQuery() { - return this.pointQueryEqualPredicats != null - || (preparedStatment != null && preparedStatment.isPointQueryShortCircuit()) - || ConnectContext.get().getStatementContext().isShortCircuitQuery(); + return ConnectContext.get().getStatementContext().isShortCircuitQuery(); } private void computeTabletInfo() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index acde5f8f37c..d951de9974b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -60,7 +60,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; @@ -288,24 +287,7 @@ public class OriginalPlanner extends Planner { LOG.debug("this isn't block query"); } } - // Check SelectStatement if optimization condition satisfied - if (selectStmt.isPointQueryShortCircuit()) { - // Optimize for point query like: SELECT * FROM t1 WHERE pk1 = 1 and pk2 = 2 - // such query will use direct RPC to do point query - if (LOG.isDebugEnabled()) { - LOG.debug("it's a point query"); - } - Map<SlotRef, Expr> eqConjuncts = ((SelectStmt) selectStmt).getPointQueryEQPredicates(); - OlapScanNode olapScanNode = (OlapScanNode) singleNodePlan; - olapScanNode.setDescTable(analyzer.getDescTbl()); - olapScanNode.setPointQueryEqualPredicates(eqConjuncts); - if (analyzer.getPrepareStmt() != null) { - // Cache them for later request better performance - analyzer.getPrepareStmt().cacheSerializedDescriptorTable(olapScanNode.getDescTable()); - analyzer.getPrepareStmt().cacheSerializedOutputExprs(rootFragment.getOutputExprs()); - analyzer.getPrepareStmt().cacheSerializedQueryOptions(queryOptions); - } - } else if (selectStmt.isTwoPhaseReadOptEnabled()) { + if (selectStmt.isTwoPhaseReadOptEnabled()) { // Optimize query like `SELECT ... FROM <tbl> WHERE ... ORDER BY ... LIMIT ...` if (singleNodePlan instanceof SortNode && singleNodePlan.getChildren().size() == 1 diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index c531b1167d2..1b70c5b318b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -263,9 +263,6 @@ public class ConnectContext { private StatementContext statementContext; - // legacy planner - private Map<String, PrepareStmtContext> preparedStmtCtxs = Maps.newHashMap(); - // new planner private Map<String, PreparedStatementContext> preparedStatementContextMap = Maps.newHashMap(); @@ -410,11 +407,10 @@ public class ConnectContext { return txnEntry != null && txnEntry.isInsertValuesTxnIniting(); } - public void addPreparedStmt(String stmtName, PrepareStmtContext ctx) { - this.preparedStmtCtxs.put(stmtName, ctx); - } - public void addPreparedStatementContext(String stmtName, PreparedStatementContext ctx) throws UserException { + if (!sessionVariable.enableServeSidePreparedStatement) { + throw new UserException("Failed to do prepared command, server side prepared statement is disabled"); + } if (this.preparedStatementContextMap.size() > sessionVariable.maxPreparedStmtCount) { throw new UserException("Failed to create a server prepared statement" + "possibly because there are too many active prepared statements on server already." @@ -424,14 +420,9 @@ public class ConnectContext { } public void removePrepareStmt(String stmtName) { - this.preparedStmtCtxs.remove(stmtName); this.preparedStatementContextMap.remove(stmtName); } - public PrepareStmtContext getPreparedStmt(String stmtName) { - return this.preparedStmtCtxs.get(stmtName); - } - public PreparedStatementContext getPreparedStementContext(String stmtName) { return this.preparedStatementContextMap.get(stmtName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index dac1b5785b8..91a3dbaad94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -260,12 +260,8 @@ public abstract class ConnectProcessor { Exception nereidsSyntaxException = null; long parseSqlStartTime = System.currentTimeMillis(); List<StatementBase> cachedStmts = null; - // Currently we add a config to decide whether using PREPARED/EXECUTE command for nereids - // TODO: after implemented full prepared, we could remove this flag - boolean nereidsUseServerPrep = sessionVariable.enableServeSidePreparedStatement - || mysqlCommand == MysqlCommand.COM_QUERY; CacheKeyType cacheKeyType = null; - if (nereidsUseServerPrep && sessionVariable.isEnableNereidsPlanner()) { + if (sessionVariable.isEnableNereidsPlanner()) { if (wantToParseSqlFromSqlCache) { cachedStmts = parseFromSqlCache(originStmt); Optional<SqlCacheContext> sqlCacheContext = ConnectContext.get() @@ -308,6 +304,12 @@ public abstract class ConnectProcessor { // stmts == null when Nereids cannot planner this query or Nereids is disabled. if (stmts == null) { + if (mysqlCommand == MysqlCommand.COM_STMT_PREPARE) { + // avoid fall back to legacy planner + ctx.getState().setError(ErrorCode.ERR_UNSUPPORTED_PS, "Not supported such prepared statement"); + ctx.getState().setErrType(QueryState.ErrType.OTHER_ERR); + return; + } try { stmts = parse(convertedStmt); } catch (Throwable throwable) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 4ee46d3bec3..69311d3a029 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -282,8 +282,6 @@ public class Coordinator implements CoordInterface { public Map<RuntimeFilterId, Integer> ridToBuilderNum = Maps.newHashMap(); private ConnectContext context; - private PointQueryExec pointExec = null; - private StatsErrorEstimator statsErrorEstimator; // A countdown latch to mark the completion of each instance. @@ -1336,10 +1334,6 @@ public class Coordinator implements CoordInterface { for (ResultReceiver receiver : receivers) { receiver.cancel(cancelReason); } - if (null != pointExec) { - pointExec.cancel(); - return; - } cancelRemoteFragmentsAsync(cancelReason); cancelLatch(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java index fa5be19c44d..0f3de945f85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java @@ -17,11 +17,6 @@ package org.apache.doris.qe; -import org.apache.doris.analysis.ExecuteStmt; -import org.apache.doris.analysis.LiteralExpr; -import org.apache.doris.analysis.NullLiteral; -import org.apache.doris.analysis.PrepareStmt; -import org.apache.doris.analysis.QueryStmt; import org.apache.doris.analysis.StatementBase; import org.apache.doris.catalog.MysqlColType; import org.apache.doris.common.ConnectionException; @@ -91,73 +86,6 @@ public class MysqlConnectProcessor extends ConnectProcessor { } } - private void handleExecute(PrepareStmt prepareStmt, long stmtId) { - if (prepareStmt.getInnerStmt() instanceof QueryStmt) { - ctx.getState().setIsQuery(true); - } - prepareStmt.setIsPrepared(); - int paramCount = prepareStmt.getParmCount(); - LOG.debug("execute prepared statement {}, paramCount {}", stmtId, paramCount); - // null bitmap - String stmtStr = ""; - try { - List<LiteralExpr> realValueExprs = new ArrayList<>(); - if (paramCount > 0) { - byte[] nullbitmapData = new byte[(paramCount + 7) / 8]; - packetBuf.get(nullbitmapData); - // new_params_bind_flag - if ((int) packetBuf.get() != 0) { - // parse params's types - for (int i = 0; i < paramCount; ++i) { - int typeCode = packetBuf.getChar(); - LOG.debug("code {}", typeCode); - prepareStmt.placeholders().get(i).setTypeCode(typeCode); - } - } - // parse param data - for (int i = 0; i < paramCount; ++i) { - if (isNull(nullbitmapData, i)) { - realValueExprs.add(new NullLiteral()); - continue; - } - LiteralExpr l = prepareStmt.placeholders().get(i).createLiteralFromType(); - boolean isUnsigned = prepareStmt.placeholders().get(i).isUnsigned(); - l.setupParamFromBinary(packetBuf, isUnsigned); - realValueExprs.add(l); - } - } - ExecuteStmt executeStmt = new ExecuteStmt(String.valueOf(stmtId), realValueExprs); - // TODO set real origin statement - executeStmt.setOrigStmt(new OriginStatement("null", 0)); - executeStmt.setUserInfo(ctx.getCurrentUserIdentity()); - if (LOG.isDebugEnabled()) { - LOG.debug("executeStmt {}", executeStmt); - } - executor = new StmtExecutor(ctx, executeStmt); - ctx.setExecutor(executor); - executor.execute(); - //For the `insert into` statements during group commit load via JDBC. - //Printing audit logs can severely impact performance. - //Therefore, we have introduced a session variable to control whether to print audit logs. - //It is recommended to turn off audit logs only during group commit load via JDBC. - if (ctx.getSessionVariable().isEnablePreparedStmtAuditLog()) { - PrepareStmtContext preparedStmtContext = ConnectContext.get().getPreparedStmt(String.valueOf(stmtId)); - if (preparedStmtContext != null) { - stmtStr = executeStmt.toSql(); - } - } - } catch (Throwable e) { - // Catch all throwable. - // If reach here, maybe doris bug. - LOG.warn("Process one query failed because unknown reason: ", e); - ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, - e.getClass().getSimpleName() + ", msg: " + e.getMessage()); - } - if (ctx.getSessionVariable().isEnablePreparedStmtAuditLog()) { - auditAfterExec(stmtStr, executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(), true); - } - } - private void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedStatementContext prepCtx) { int paramCount = prepareCommand.placeholderCount(); LOG.debug("execute prepared statement {}, paramCount {}", stmtId, paramCount); @@ -239,24 +167,18 @@ public class MysqlConnectProcessor extends ConnectProcessor { LOG.debug("execute prepared statement {}", stmtId); } - PrepareStmtContext prepareCtx = ctx.getPreparedStmt(String.valueOf(stmtId)); ctx.setStartTime(); - if (prepareCtx != null) { - // get from lagacy planner context, to be removed - handleExecute((PrepareStmt) prepareCtx.stmt, stmtId); - } else { - // nererids - PreparedStatementContext preparedStatementContext = ctx.getPreparedStementContext(String.valueOf(stmtId)); - if (preparedStatementContext == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("No such statement in context, stmtId:{}", stmtId); - } - ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, - "msg: Not supported such prepared statement"); - return; + // nererids + PreparedStatementContext preparedStatementContext = ctx.getPreparedStementContext(String.valueOf(stmtId)); + if (preparedStatementContext == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("No such statement in context, stmtId:{}", stmtId); } - handleExecute(preparedStatementContext.command, stmtId, preparedStatementContext); + ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, + "msg: Not supported such prepared statement"); + return; } + handleExecute(preparedStatementContext.command, stmtId, preparedStatementContext); } // Process COM_QUERY statement, diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java deleted file mode 100644 index 9470af76423..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java +++ /dev/null @@ -1,386 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.qe; - -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.DescriptorTable; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.LiteralExpr; -import org.apache.doris.analysis.PrepareStmt; -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.cloud.catalog.CloudPartition; -import org.apache.doris.common.Config; -import org.apache.doris.common.Status; -import org.apache.doris.common.UserException; -import org.apache.doris.planner.OlapScanNode; -import org.apache.doris.planner.PlanFragment; -import org.apache.doris.planner.Planner; -import org.apache.doris.proto.InternalService; -import org.apache.doris.proto.InternalService.KeyTuple; -import org.apache.doris.rpc.BackendServiceProxy; -import org.apache.doris.rpc.RpcException; -import org.apache.doris.rpc.TCustomProtocolFactory; -import org.apache.doris.system.Backend; -import org.apache.doris.thrift.TExpr; -import org.apache.doris.thrift.TExprList; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TQueryOptions; -import org.apache.doris.thrift.TResultBatch; -import org.apache.doris.thrift.TScanRangeLocations; -import org.apache.doris.thrift.TStatusCode; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -public class PointQueryExec implements CoordInterface { - private static final Logger LOG = LogManager.getLogger(PointQueryExec.class); - // SlotRef sorted by column id - private Map<SlotRef, Expr> equalPredicats; - // ByteString serialized for prepared statement - private ByteString serializedDescTable; - private ByteString serializedOutputExpr; - private ByteString serializedQueryOptions; - private ArrayList<Expr> outputExprs; - private DescriptorTable descriptorTable; - private TQueryOptions queryOptions; - private long tabletID = 0; - private long timeoutMs = Config.point_query_timeout_ms; // default 10s - - private boolean isCancel = false; - private boolean isBinaryProtocol = false; - - private List<Backend> candidateBackends; - Planner planner; - - // For parepared statement cached structure, - // there are some pre caculated structure in Backend TabletFetch service - // using this ID to find for this prepared statement - private UUID cacheID; - - private final int maxMsgSizeOfResultReceiver; - - // used for snapshot read in cloud mode - private List<Long> versions; - - private OlapScanNode getPlanRoot() { - List<PlanFragment> fragments = planner.getFragments(); - PlanFragment fragment = fragments.get(0); - if (LOG.isDebugEnabled()) { - LOG.debug("execPointGet fragment {}", fragment); - } - OlapScanNode planRoot = (OlapScanNode) fragment.getPlanRoot(); - Preconditions.checkNotNull(planRoot); - return planRoot; - } - - public PointQueryExec(Planner planner, Analyzer analyzer, int maxMessageSize) { - // init from planner - this.planner = planner; - List<PlanFragment> fragments = planner.getFragments(); - PlanFragment fragment = fragments.get(0); - OlapScanNode planRoot = getPlanRoot(); - this.equalPredicats = planRoot.getPointQueryEqualPredicates(); - this.descriptorTable = planRoot.getDescTable(); - this.outputExprs = fragment.getOutputExprs(); - this.queryOptions = planner.getQueryOptions(); - - PrepareStmt prepareStmt = analyzer == null ? null : analyzer.getPrepareStmt(); - if (prepareStmt != null && prepareStmt.getPreparedType() == PrepareStmt.PreparedType.FULL_PREPARED) { - // Used cached or better performance - this.cacheID = prepareStmt.getID(); - this.serializedDescTable = prepareStmt.getSerializedDescTable(); - this.serializedOutputExpr = prepareStmt.getSerializedOutputExprs(); - this.isBinaryProtocol = true; - this.serializedQueryOptions = prepareStmt.getSerializedQueryOptions(); - } else { - // TODO - // planner.getDescTable().toThrift(); - } - this.maxMsgSizeOfResultReceiver = maxMessageSize; - } - - private void updateCloudPartitionVersions() throws RpcException { - OlapScanNode planRoot = getPlanRoot(); - List<CloudPartition> partitions = new ArrayList<>(); - Set<Long> partitionSet = new HashSet<>(); - OlapTable table = planRoot.getOlapTable(); - for (Long id : planRoot.getSelectedPartitionIds()) { - if (!partitionSet.contains(id)) { - partitionSet.add(id); - partitions.add((CloudPartition) table.getPartition(id)); - } - } - versions = CloudPartition.getSnapshotVisibleVersion(partitions); - // Only support single partition at present - Preconditions.checkState(versions.size() == 1); - LOG.debug("set cloud version {}", versions.get(0)); - } - - void setScanRangeLocations() throws Exception { - OlapScanNode planRoot = getPlanRoot(); - // compute scan range - List<TScanRangeLocations> locations = planRoot.lazyEvaluateRangeLocations(); - if (planRoot.getScanTabletIds().isEmpty()) { - return; - } - Preconditions.checkState(planRoot.getScanTabletIds().size() == 1); - this.tabletID = planRoot.getScanTabletIds().get(0); - - // update partition version if cloud mode - if (Config.isCloudMode() - && ConnectContext.get().getSessionVariable().enableSnapshotPointQuery) { - // TODO: Optimize to reduce the frequency of version checks in the meta service. - updateCloudPartitionVersions(); - } - - Preconditions.checkNotNull(locations); - candidateBackends = new ArrayList<>(); - for (Long backendID : planRoot.getScanBackendIds()) { - Backend backend = Env.getCurrentSystemInfo().getBackend(backendID); - if (SimpleScheduler.isAvailable(backend)) { - candidateBackends.add(backend); - } - } - // Random read replicas - Collections.shuffle(this.candidateBackends); - if (LOG.isDebugEnabled()) { - LOG.debug("set scan locations, backend ids {}, tablet id {}", candidateBackends, tabletID); - } - } - - public void setTimeout(long timeoutMs) { - this.timeoutMs = timeoutMs; - } - - void addKeyTuples( - InternalService.PTabletKeyLookupRequest.Builder requestBuilder) { - // TODO handle IN predicates - KeyTuple.Builder kBuilder = KeyTuple.newBuilder(); - for (Expr expr : equalPredicats.values()) { - LiteralExpr lexpr = (LiteralExpr) expr; - kBuilder.addKeyColumnRep(lexpr.getStringValue()); - } - requestBuilder.addKeyTuples(kBuilder); - } - - @Override - public void cancel(Status cancelReason) { - // Do nothing - } - - - @Override - public RowBatch getNext() throws Exception { - setScanRangeLocations(); - // No partition/tablet found return emtpy row batch - if (candidateBackends == null || candidateBackends.isEmpty()) { - return new RowBatch(); - } - Iterator<Backend> backendIter = candidateBackends.iterator(); - RowBatch rowBatch = null; - int tryCount = 0; - int maxTry = Math.min(Config.max_point_query_retry_time, candidateBackends.size()); - Status status = new Status(); - do { - Backend backend = backendIter.next(); - rowBatch = getNextInternal(status, backend); - ++tryCount; - if (rowBatch != null) { - break; - } - if (tryCount >= maxTry) { - break; - } - status.updateStatus(TStatusCode.OK, ""); - } while (true); - // handle status code - if (!status.ok()) { - if (Strings.isNullOrEmpty(status.getErrorMsg())) { - status.rewriteErrorMsg(); - } - if (status.isRpcError()) { - throw new RpcException(null, status.getErrorMsg()); - } else { - String errMsg = status.getErrorMsg(); - LOG.warn("query failed: {}", errMsg); - - // hide host info - int hostIndex = errMsg.indexOf("host"); - if (hostIndex != -1) { - errMsg = errMsg.substring(0, hostIndex); - } - throw new UserException(errMsg); - } - } - return rowBatch; - } - - @Override - public void exec() throws Exception { - // Do nothing - } - - private RowBatch getNextInternal(Status status, Backend backend) throws TException { - long timeoutTs = System.currentTimeMillis() + timeoutMs; - RowBatch rowBatch = new RowBatch(); - InternalService.PTabletKeyLookupResponse pResult = null; - try { - if (serializedDescTable == null) { - serializedDescTable = ByteString.copyFrom( - new TSerializer().serialize(descriptorTable.toThrift())); - } - if (serializedOutputExpr == null) { - List<TExpr> exprs = new ArrayList<>(); - for (Expr expr : outputExprs) { - exprs.add(expr.treeToThrift()); - } - TExprList exprList = new TExprList(exprs); - serializedOutputExpr = ByteString.copyFrom( - new TSerializer().serialize(exprList)); - } - if (serializedQueryOptions == null) { - serializedQueryOptions = ByteString.copyFrom( - new TSerializer().serialize(queryOptions)); - } - - InternalService.PTabletKeyLookupRequest.Builder requestBuilder - = InternalService.PTabletKeyLookupRequest.newBuilder() - .setTabletId(tabletID) - .setDescTbl(serializedDescTable) - .setOutputExpr(serializedOutputExpr) - .setQueryOptions(serializedQueryOptions) - .setIsBinaryRow(isBinaryProtocol); - if (versions != null && !versions.isEmpty()) { - requestBuilder.setVersion(versions.get(0)); - } - if (cacheID != null) { - InternalService.UUID.Builder uuidBuilder = InternalService.UUID.newBuilder(); - uuidBuilder.setUuidHigh(cacheID.getMostSignificantBits()); - uuidBuilder.setUuidLow(cacheID.getLeastSignificantBits()); - requestBuilder.setUuid(uuidBuilder); - } - addKeyTuples(requestBuilder); - - while (pResult == null) { - InternalService.PTabletKeyLookupRequest request = requestBuilder.build(); - Future<InternalService.PTabletKeyLookupResponse> futureResponse = - BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAddress(), request); - long currentTs = System.currentTimeMillis(); - if (currentTs >= timeoutTs) { - LOG.warn("fetch result timeout {}", backend.getBrpcAddress()); - status.updateStatus(TStatusCode.INTERNAL_ERROR, "query timeout"); - return null; - } - try { - pResult = futureResponse.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // continue to get result - LOG.info("future get interrupted Exception"); - if (isCancel) { - status.updateStatus(TStatusCode.CANCELLED, "cancelled"); - return null; - } - } catch (TimeoutException e) { - futureResponse.cancel(true); - LOG.warn("fetch result timeout {}, addr {}", timeoutTs - currentTs, backend.getBrpcAddress()); - status.updateStatus(TStatusCode.INTERNAL_ERROR, "query timeout"); - return null; - } - } - } catch (RpcException e) { - LOG.warn("fetch result rpc exception {}, e {}", backend.getBrpcAddress(), e); - status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage()); - SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); - return null; - } catch (ExecutionException e) { - LOG.warn("fetch result execution exception {}, addr {}", e, backend.getBrpcAddress()); - if (e.getMessage().contains("time out")) { - // if timeout, we set error code to TIMEOUT, and it will not retry querying. - status.updateStatus(TStatusCode.TIMEOUT, e.getMessage()); - } else { - status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage()); - SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); - } - return null; - } - Status resultStatus = new Status(pResult.getStatus()); - if (resultStatus.getErrorCode() != TStatusCode.OK) { - status.updateStatus(resultStatus.getErrorCode(), resultStatus.getErrorMsg()); - return null; - } - - if (pResult.hasEmptyBatch() && pResult.getEmptyBatch()) { - LOG.info("get empty rowbatch"); - rowBatch.setEos(true); - return rowBatch; - } else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) { - byte[] serialResult = pResult.getRowBatch().toByteArray(); - TResultBatch resultBatch = new TResultBatch(); - TDeserializer deserializer = new TDeserializer( - new TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver)); - try { - deserializer.deserialize(resultBatch, serialResult); - } catch (TException e) { - if (e.getMessage().contains("MaxMessageSize reached")) { - throw new TException("MaxMessageSize reached, try increase max_msg_size_of_result_receiver"); - } else { - throw e; - } - } - rowBatch.setBatch(resultBatch); - rowBatch.setEos(true); - return rowBatch; - } - - if (isCancel) { - status.updateStatus(TStatusCode.CANCELLED, "cancelled"); - } - return rowBatch; - } - - public void cancel() { - isCancel = true; - } - - @Override - public List<TNetworkAddress> getInvolvedBackends() { - return Lists.newArrayList(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PrepareStmtContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PrepareStmtContext.java deleted file mode 100644 index 3c3707e8b66..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PrepareStmtContext.java +++ /dev/null @@ -1,54 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.qe; - -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.StatementBase; -import org.apache.doris.planner.Planner; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class PrepareStmtContext { - private static final Logger LOG = LogManager.getLogger(PrepareStmtContext.class); - public StatementBase stmt; - public ConnectContext ctx; - public Planner planner; - public Analyzer analyzer; - public String stmtString; - - // Timestamp in millisecond last command starts at - protected volatile long startTime; - - public PrepareStmtContext(StatementBase stmt, ConnectContext ctx, Planner planner, - Analyzer analyzer, String stmtString) { - this.stmt = stmt; - this.ctx = ctx; - this.planner = planner; - this.analyzer = analyzer; - this.stmtString = stmtString; - } - - public long getStartTime() { - return startTime; - } - - public void setStartTime() { - startTime = System.currentTimeMillis(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 08517acdf0c..64616420474 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -32,7 +32,6 @@ import org.apache.doris.analysis.DdlStmt; import org.apache.doris.analysis.DeleteStmt; import org.apache.doris.analysis.DropPartitionClause; import org.apache.doris.analysis.DropTableStmt; -import org.apache.doris.analysis.ExecuteStmt; import org.apache.doris.analysis.ExplainOptions; import org.apache.doris.analysis.ExportStmt; import org.apache.doris.analysis.Expr; @@ -48,8 +47,6 @@ import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.OutFileClause; import org.apache.doris.analysis.PartitionNames; import org.apache.doris.analysis.PlaceHolderExpr; -import org.apache.doris.analysis.PrepareStmt; -import org.apache.doris.analysis.PrepareStmt.PreparedType; import org.apache.doris.analysis.Queriable; import org.apache.doris.analysis.QueryStmt; import org.apache.doris.analysis.RedirectStatus; @@ -280,18 +277,12 @@ public class StmtExecutor { private Data.PQueryStatistics.Builder statisticsForAuditLog; private boolean isCached; private String stmtName; - private StatementBase prepareStmt = null; private String mysqlLoadId; - // Distinguish from prepare and execute command - private boolean isExecuteStmt = false; // Handle selects that fe can do without be private boolean isHandleQueryInFe = false; // The profile of this execution private final Profile profile; - private ExecuteStmt execStmt; - PrepareStmtContext preparedStmtCtx = null; - // The result schema if "dry_run_query" is true. // Only one column to indicate the real return row numbers. private static final CommonResultSetMetaData DRY_RUN_QUERY_METADATA = new CommonResultSetMetaData( @@ -1002,11 +993,6 @@ public class StmtExecutor { parsedStmt.analyze(analyzer); } parsedStmt.checkPriv(); - if (prepareStmt instanceof PrepareStmt && !isExecuteStmt) { - handlePrepareStmt(); - return; - } - // sql/sqlHash block checkBlockRules(); if (parsedStmt instanceof QueryStmt) { @@ -1230,33 +1216,6 @@ public class StmtExecutor { parseByLegacy(); - boolean preparedStmtReanalyzed = false; - if (parsedStmt instanceof ExecuteStmt) { - execStmt = (ExecuteStmt) parsedStmt; - preparedStmtCtx = context.getPreparedStmt(execStmt.getName()); - if (preparedStmtCtx == null) { - throw new UserException("Could not execute, since `" + execStmt.getName() + "` not exist"); - } - // parsedStmt may already by set when constructing this StmtExecutor(); - ((PrepareStmt) preparedStmtCtx.stmt).asignValues(execStmt.getArgs()); - parsedStmt = ((PrepareStmt) preparedStmtCtx.stmt).getInnerStmt(); - planner = preparedStmtCtx.planner; - analyzer = preparedStmtCtx.analyzer; - prepareStmt = preparedStmtCtx.stmt; - if (LOG.isDebugEnabled()) { - LOG.debug("already prepared stmt: {}", preparedStmtCtx.stmtString); - } - isExecuteStmt = true; - if (!((PrepareStmt) preparedStmtCtx.stmt).needReAnalyze()) { - // Return directly to bypass analyze and plan - return; - } - // continue analyze - preparedStmtReanalyzed = true; - preparedStmtCtx.stmt.reset(); - // preparedStmtCtx.stmt.analyze(analyzer); - } - // yiguolei: insert stmt's grammar analysis will write editlog, // so that we check if the stmt should be forward to master here // if the stmt should be forward to master, then just return here and the master will do analysis again @@ -1266,23 +1225,6 @@ public class StmtExecutor { analyzer = new Analyzer(context.getEnv(), context); - if (parsedStmt instanceof PrepareStmt || context.getCommand() == MysqlCommand.COM_STMT_PREPARE) { - if (context.getCommand() == MysqlCommand.COM_STMT_PREPARE) { - prepareStmt = new PrepareStmt(parsedStmt, - String.valueOf(String.valueOf(context.getStmtId()))); - } else { - prepareStmt = (PrepareStmt) parsedStmt; - } - ((PrepareStmt) prepareStmt).setContext(context); - prepareStmt.analyze(analyzer); - // Need analyze inner statement - parsedStmt = ((PrepareStmt) prepareStmt).getInnerStmt(); - if (((PrepareStmt) prepareStmt).getPreparedType() == PrepareStmt.PreparedType.STATEMENT) { - // Skip analyze, do it lazy - return; - } - } - // Convert show statement to select statement here if (parsedStmt instanceof ShowStmt) { SelectStmt selectStmt = ((ShowStmt) parsedStmt).toSelectStmt(analyzer); @@ -1394,17 +1336,6 @@ public class StmtExecutor { throw new AnalysisException("Unexpected exception: " + e.getMessage()); } } - if (preparedStmtReanalyzed - && ((PrepareStmt) preparedStmtCtx.stmt).getPreparedType() == PrepareStmt.PreparedType.FULL_PREPARED) { - ((PrepareStmt) prepareStmt).asignValues(execStmt.getArgs()); - if (LOG.isDebugEnabled()) { - LOG.debug("update planner and analyzer after prepared statement reanalyzed"); - } - preparedStmtCtx.planner = planner; - preparedStmtCtx.analyzer = analyzer; - Preconditions.checkNotNull(preparedStmtCtx.stmt); - preparedStmtCtx.analyzer.setPrepareStmt(((PrepareStmt) preparedStmtCtx.stmt)); - } } private void parseByLegacy() throws AnalysisException, DdlException { @@ -1460,12 +1391,6 @@ public class StmtExecutor { queryStmt.removeOrderByElements(); } } - if (prepareStmt != null) { - analyzer.setPrepareStmt(((PrepareStmt) prepareStmt)); - if (execStmt != null && ((PrepareStmt) prepareStmt).getPreparedType() != PreparedType.FULL_PREPARED) { - ((PrepareStmt) prepareStmt).asignValues(execStmt.getArgs()); - } - } parsedStmt.analyze(analyzer); if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) { if (parsedStmt instanceof NativeInsertStmt && ((NativeInsertStmt) parsedStmt).isGroupCommit()) { @@ -1530,13 +1455,7 @@ public class StmtExecutor { analyzer = new Analyzer(context.getEnv(), context); // query re-analyze parsedStmt.reset(); - if (prepareStmt != null) { - analyzer.setPrepareStmt(((PrepareStmt) prepareStmt)); - if (execStmt != null - && ((PrepareStmt) prepareStmt).getPreparedType() != PreparedType.FULL_PREPARED) { - ((PrepareStmt) prepareStmt).asignValues(execStmt.getArgs()); - } - } + analyzer.setReAnalyze(true); parsedStmt.analyze(analyzer); @@ -1919,10 +1838,6 @@ public class StmtExecutor { : new ShortCircuitQueryContext(planner, (Queriable) parsedStmt); coordBase = new PointQueryExecutor(shortCircuitQueryContext, context.getSessionVariable().getMaxMsgSizeOfResultReceiver()); - } else if (queryStmt instanceof SelectStmt && ((SelectStmt) parsedStmt).isPointQueryShortCircuit()) { - // this branch is for legacy planner, to be removed - coordBase = new PointQueryExec(planner, analyzer, - context.getSessionVariable().getMaxMsgSizeOfResultReceiver()); } else if (planner instanceof NereidsPlanner && ((NereidsPlanner) planner).getDistributedPlans() != null) { coord = new NereidsCoordinator(context, analyzer, planner, context.getStatsErrorEstimator(), @@ -2672,22 +2587,6 @@ public class StmtExecutor { context.getState().setOk(); } - private void handlePrepareStmt() throws Exception { - List<String> labels = ((PrepareStmt) prepareStmt).getColLabelsOfPlaceHolders(); - // register prepareStmt - if (LOG.isDebugEnabled()) { - LOG.debug("add prepared statement {}, isBinaryProtocol {}", - prepareStmt.toSql(), context.getCommand() == MysqlCommand.COM_STMT_PREPARE); - } - context.addPreparedStmt(String.valueOf(context.getStmtId()), - new PrepareStmtContext(prepareStmt, - context, planner, analyzer, String.valueOf(context.getStmtId()))); - if (context.getCommand() == MysqlCommand.COM_STMT_PREPARE) { - sendStmtPrepareOK((int) context.getStmtId(), labels); - } - } - - // Process use statement. private void handleUseStmt() throws AnalysisException { UseStmt useStmt = (UseStmt) parsedStmt; @@ -2855,29 +2754,12 @@ public class StmtExecutor { // send field one by one for (int i = 0; i < colNames.size(); ++i) { serializer.reset(); - if (prepareStmt != null && prepareStmt instanceof PrepareStmt - && context.getCommand() == MysqlCommand.COM_STMT_EXECUTE) { - // Using PreparedStatment pre serializedField to avoid serialize each time - // we send a field - byte[] serializedField = ((PrepareStmt) prepareStmt).getSerializedField(colNames.get(i)); - if (serializedField == null) { - if (fieldInfos != null) { - serializer.writeField(fieldInfos.get(i), types.get(i)); - } else { - serializer.writeField(colNames.get(i), types.get(i)); - } - serializedField = serializer.toArray(); - ((PrepareStmt) prepareStmt).setSerializedField(colNames.get(i), serializedField); - } - context.getMysqlChannel().sendOnePacket(ByteBuffer.wrap(serializedField)); + if (fieldInfos != null) { + serializer.writeField(fieldInfos.get(i), types.get(i)); } else { - if (fieldInfos != null) { - serializer.writeField(fieldInfos.get(i), types.get(i)); - } else { - serializer.writeField(colNames.get(i), types.get(i)); - } - context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer()); + serializer.writeField(colNames.get(i), types.get(i)); } + context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer()); } // send EOF serializer.reset(); diff --git a/regression-test/data/variant_p0/variant_with_rowstore.out b/regression-test/data/variant_p0/variant_with_rowstore.out index 763825b37a6..a2aa68f2270 100644 --- a/regression-test/data/variant_p0/variant_with_rowstore.out +++ b/regression-test/data/variant_p0/variant_with_rowstore.out @@ -32,6 +32,6 @@ -- !point_select -- -1 {"a":1123} {"a":1123} --- !sql -- +-- !point_select -- 1 1|[""] diff --git a/regression-test/suites/point_query_p0/test_point_query.groovy b/regression-test/suites/point_query_p0/test_point_query.groovy index 0d4df448286..f27c366efbb 100644 --- a/regression-test/suites/point_query_p0/test_point_query.groovy +++ b/regression-test/suites/point_query_p0/test_point_query.groovy @@ -201,7 +201,7 @@ suite("test_point_query", "nonConcurrent") { qe_point_select stmt qe_point_select stmt // invalidate cache - sql "sync" + // "sync" nprep_sql """ INSERT INTO ${tableName} VALUES(1235, 120939.11130, "a ddd", "xxxxxx", "2030-01-02", "2020-01-01 12:36:38", 22.822, "7022-01-01 11:30:38", 0, 1929111.1111,[119291.19291], ["111", "222", "333"], 2) """ qe_point_select stmt qe_point_select stmt @@ -217,9 +217,10 @@ suite("test_point_query", "nonConcurrent") { qe_point_select stmt qe_point_select stmt - sql """ + nprep_sql """ ALTER table ${tableName} ADD COLUMN new_column1 INT default "0"; """ + sql "select 1" qe_point_select stmt } // disable useServerPrepStmts diff --git a/regression-test/suites/variant_p0/variant_with_rowstore.groovy b/regression-test/suites/variant_p0/variant_with_rowstore.groovy index d1946b8123c..f23a742249e 100644 --- a/regression-test/suites/variant_p0/variant_with_rowstore.groovy +++ b/regression-test/suites/variant_p0/variant_with_rowstore.groovy @@ -125,5 +125,9 @@ suite("regression_test_variant_rowstore", "variant_type"){ ); """ sql """insert into table_rs_invalid_json values (1, '1|[""]')""" - qt_sql "select * from table_rs_invalid_json where col0 = 1" + def result2 = connect(user=user, password=password, url=prepare_url) { + def stmt = prepareStatement "select * from table_rs_invalid_json where col0 = ?" + stmt.setInt(1, 1) + qe_point_select stmt + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
