This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 161b6b30d9d [Enhancement](Nereids) Nereids supports group_commit with
insert (#32523)
161b6b30d9d is described below
commit 161b6b30d9dab3b2c4e6ef9ade6b552b946bf04b
Author: 赵硕 <[email protected]>
AuthorDate: Thu Jun 6 10:27:54 2024 +0800
[Enhancement](Nereids) Nereids supports group_commit with insert (#32523)
## Proposed changes
Nereids supports group_commit with insert
Co-authored-by: abmdocrt <[email protected]>
Co-authored-by: meiyi <[email protected]>
---
.../org/apache/doris/httpv2/rest/LoadAction.java | 5 +-
.../glue/translator/PhysicalPlanTranslator.java | 4 +-
.../commands/insert/GroupCommitInsertExecutor.java | 252 +++++++++++++++++++++
.../plans/commands/insert/GroupCommitInserter.java | 144 ------------
.../commands/insert/InsertIntoTableCommand.java | 10 +-
.../trees/plans/commands/insert/InsertUtils.java | 5 +-
.../plans/commands/insert/OlapInsertExecutor.java | 2 +-
.../apache/doris/planner/GroupCommitPlanner.java | 39 +---
.../java/org/apache/doris/qe/ConnectContext.java | 10 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 11 +-
.../insert_group_commit_into_max_filter_ratio.out | 2 +
.../insert_p0/insert_group_commit_into.groovy | 14 +-
...nsert_group_commit_into_max_filter_ratio.groovy | 41 +++-
.../insert_group_commit_into_unique.groovy | 6 +-
...nsert_group_commit_into_unique_sync_mode.groovy | 8 +-
.../insert_group_commit_with_exception.groovy | 39 +---
.../insert_group_commit_with_large_data.groovy | 2 +-
.../suites/insert_p0/insert_with_null.groovy | 2 +-
.../test_group_commit_data_bytes_property.groovy | 2 +-
.../test_group_commit_interval_ms_property.groovy | 2 +-
20 files changed, 338 insertions(+), 262 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index de15a2816f0..0cc18e7c73d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -33,6 +33,7 @@ import
org.apache.doris.httpv2.exception.UnauthorizedException;
import org.apache.doris.load.StreamLoadHandler;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.resource.Tag;
@@ -107,7 +108,7 @@ public class LoadAction extends RestBaseController {
groupCommit = true;
try {
if (isGroupCommitBlock(db, table)) {
- String msg = "insert table " + table + " is blocked on
schema change";
+ String msg = "insert table " + table +
GroupCommitPlanner.SCHEMA_CHANGE;
return new RestBaseResult(msg);
}
} catch (Exception e) {
@@ -147,7 +148,7 @@ public class LoadAction extends RestBaseController {
try {
String[] pair = parseDbAndTb(sql);
if (isGroupCommitBlock(pair[0], pair[1])) {
- String msg = "insert table " + pair[1] + " is blocked on
schema change";
+ String msg = "insert table " + pair[1] +
GroupCommitPlanner.SCHEMA_CHANGE;
return new RestBaseResult(msg);
}
} catch (Exception e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 10b6f3c5637..73a76e4db43 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -425,8 +425,8 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
slotDesc.setAutoInc(column.isAutoInc());
}
OlapTableSink sink;
- // This statement is only used in the group_commit mode in the
http_stream
- if (context.getConnectContext().isGroupCommitStreamLoadSql()) {
+ // This statement is only used in the group_commit mode
+ if (context.getConnectContext().isGroupCommit()) {
sink = new GroupCommitBlockSink(olapTableSink.getTargetTable(),
olapTuple,
olapTableSink.getTargetTable().getPartitionIds(),
olapTableSink.isSingleReplicaLoad(),
context.getSessionVariable().getGroupCommit(), 0);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInsertExecutor.java
new file mode 100644
index 00000000000..c4c0fd9d5f8
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInsertExecutor.java
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.GroupCommitPlanner;
+import org.apache.doris.planner.OlapTableSink;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SqlModeHelper;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.transaction.TransactionStatus;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Handle group commit
+ */
+public class GroupCommitInsertExecutor extends AbstractInsertExecutor {
+ public static final Logger LOG =
LogManager.getLogger(GroupCommitInsertExecutor.class);
+ private static final long INVALID_TXN_ID = -1L;
+ protected final NereidsPlanner planner;
+ private long txnId = INVALID_TXN_ID;
+ private TransactionStatus txnStatus = TransactionStatus.ABORTED;
+
+ public GroupCommitInsertExecutor(ConnectContext ctx, TableIf table, String
labelName, NereidsPlanner planner,
+ Optional<InsertCommandContext> insertCtx,
boolean emptyInsert) {
+ super(ctx, table, labelName, planner, insertCtx, emptyInsert);
+ this.planner = planner;
+ }
+
+ /**
+ * Handle group commit
+ */
+ public static boolean canGroupCommit(ConnectContext ctx, DataSink sink,
+ PhysicalSink physicalSink,
NereidsPlanner planner) {
+ PhysicalOlapTableSink<?> olapSink = (PhysicalOlapTableSink<?>)
physicalSink;
+ boolean can = analyzeGroupCommit(ctx, sink, olapSink, planner);
+ ctx.setGroupCommit(can);
+ return can;
+ }
+
+ private static boolean analyzeGroupCommit(ConnectContext ctx, DataSink
sink,
+ PhysicalOlapTableSink<?> physicalOlapTableSink,
NereidsPlanner planner) {
+ if (!(sink instanceof OlapTableSink) ||
!ctx.getSessionVariable().isEnableInsertGroupCommit()
+ || ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
+ return false;
+ }
+ OlapTable targetTable = physicalOlapTableSink.getTargetTable();
+ return ctx.getSessionVariable().getSqlMode() !=
SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
+ && !ctx.isTxnModel() &&
isGroupCommitAvailablePlan(physicalOlapTableSink, planner)
+ && physicalOlapTableSink.getPartitionIds().isEmpty() &&
targetTable.getTableProperty()
+ .getUseSchemaLightChange() && !targetTable.getQualifiedDbName()
+ .equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME);
+ }
+
+ private static boolean literalExpr(NereidsPlanner planner) {
+ Optional<PhysicalUnion> union = planner.getPhysicalPlan()
+
.<Set<PhysicalUnion>>collect(PhysicalUnion.class::isInstance).stream().findAny();
+ List<List<NamedExpression>> constantExprsList = null;
+ if (union.isPresent()) {
+ constantExprsList = union.get().getConstantExprsList();
+ }
+ Optional<PhysicalOneRowRelation> oneRowRelation =
planner.getPhysicalPlan()
+
.<Set<PhysicalOneRowRelation>>collect(PhysicalOneRowRelation.class::isInstance).stream().findAny();
+ if (oneRowRelation.isPresent()) {
+ constantExprsList =
ImmutableList.of(oneRowRelation.get().getProjects());
+ }
+ for (List<NamedExpression> row : constantExprsList) {
+ for (Expression expr : row) {
+ while (expr instanceof Alias || expr instanceof Cast) {
+ expr = expr.child(0);
+ }
+ if (!(expr instanceof Literal)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private static boolean isGroupCommitAvailablePlan(PhysicalOlapTableSink<?
extends Plan> sink,
+ NereidsPlanner planner) {
+ Plan child = sink.child();
+ if (child instanceof PhysicalDistribute) {
+ child = child.child(0);
+ }
+ return (child instanceof OneRowRelation || (child instanceof
PhysicalUnion && child.arity() == 0))
+ && literalExpr(planner);
+ }
+
+ private void handleGroupCommit(ConnectContext ctx, DataSink sink,
+ PhysicalOlapTableSink<?> physicalOlapTableSink, NereidsPlanner
planner) throws Exception {
+ // TODO we should refactor this to remove rely on UnionNode
+ List<InternalService.PDataRow> rows = new ArrayList<>();
+
+ Optional<PhysicalUnion> union = planner.getPhysicalPlan()
+
.<Set<PhysicalUnion>>collect(PhysicalUnion.class::isInstance).stream().findAny();
+ List<List<NamedExpression>> constantExprsList = null;
+ if (union.isPresent()) {
+ constantExprsList = union.get().getConstantExprsList();
+ }
+ Optional<PhysicalOneRowRelation> oneRowRelation =
planner.getPhysicalPlan()
+
.<Set<PhysicalOneRowRelation>>collect(PhysicalOneRowRelation.class::isInstance).stream().findAny();
+ if (oneRowRelation.isPresent()) {
+ constantExprsList =
ImmutableList.of(oneRowRelation.get().getProjects());
+ }
+
+ // should set columns of sink since we maybe generate some invisible
columns
+ List<Column> fullSchema =
physicalOlapTableSink.getTargetTable().getFullSchema();
+ List<Column> targetSchema;
+ if (physicalOlapTableSink.getTargetTable().getFullSchema().size() !=
physicalOlapTableSink.getCols().size()) {
+ targetSchema = fullSchema;
+ } else {
+ targetSchema =
physicalOlapTableSink.getCols().stream().collect(Collectors.toList());
+ }
+ List<String> columnNames = targetSchema.stream()
+ .map(Column::getName)
+ .map(n -> n.replace("`", "``"))
+ .collect(Collectors.toList());
+ for (List<NamedExpression> row : constantExprsList) {
+ rows.add(InsertUtils.getRowStringValue(row));
+ }
+ GroupCommitPlanner groupCommitPlanner =
EnvFactory.getInstance().createGroupCommitPlanner(
+ physicalOlapTableSink.getDatabase(),
+ physicalOlapTableSink.getTargetTable(), columnNames,
ctx.queryId(),
+ ConnectContext.get().getSessionVariable().getGroupCommit());
+ PGroupCommitInsertResponse response =
groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
+ TStatusCode code =
TStatusCode.findByValue(response.getStatus().getStatusCode());
+ // TODO: in legacy, there is a retry, we need to implement
+ if (code != TStatusCode.OK) {
+ String errMsg = "group commit insert failed. backend id: "
+ + groupCommitPlanner.getBackend().getId() + ", status: "
+ + response.getStatus();
+ ErrorReport.reportDdlException(errMsg,
ErrorCode.ERR_FAILED_WHEN_INSERT);
+ }
+ txnStatus = TransactionStatus.PREPARE;
+ String sb = "{'label':'" + response.getLabel() + "', 'status':'" +
txnStatus.name()
+ + "', 'txnId':'" + response.getTxnId() + "'"
+ + "', 'optimizer':'" + "nereids" + "'"
+ + "}";
+ ctx.getState().setOk(response.getLoadedRows(), (int)
response.getFilteredRows(), sb);
+ ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(),
+ physicalOlapTableSink.getDatabase().getFullName(),
physicalOlapTableSink.getTargetTable().getName(),
+ txnStatus, response.getLoadedRows(), (int)
response.getFilteredRows());
+ // update it, so that user can get loaded rows in fe.audit.log
+ ctx.updateReturnRows((int) response.getLoadedRows());
+ }
+
+ @Override
+ public void beginTransaction() {
+
+ }
+
+ @Override
+ protected void finalizeSink(PlanFragment fragment, DataSink sink,
PhysicalSink physicalSink) {
+
+ }
+
+ @Override
+ protected void beforeExec() {
+ }
+
+ @Override
+ protected void onComplete() throws UserException {
+
+ }
+
+ @Override
+ protected void onFail(Throwable t) {
+ errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
+ ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage());
+ }
+
+ @Override
+ protected void afterExec(StmtExecutor executor) {
+
+ }
+
+ protected final void execImpl() throws Exception {
+ Optional<PhysicalOlapTableSink<?>> plan = (planner.getPhysicalPlan()
+
.<Set<PhysicalOlapTableSink<?>>>collect(PhysicalSink.class::isInstance)).stream()
+ .findAny();
+ PhysicalOlapTableSink<?> olapSink = plan.get();
+ DataSink sink = planner.getFragments().get(0).getSink();
+ handleGroupCommit(ctx, sink, olapSink, planner);
+ }
+
+ @Override
+ public void executeSingleInsert(StmtExecutor executor, long jobId) throws
Exception {
+ beforeExec();
+ try {
+ execImpl();
+ onComplete();
+ } catch (Throwable t) {
+ onFail(t);
+ // retry group_commit insert when meet
+ if (t.getMessage().contains(GroupCommitPlanner.SCHEMA_CHANGE)) {
+ throw t;
+ }
+ return;
+ }
+ afterExec(executor);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java
deleted file mode 100644
index 655a37c7554..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java
+++ /dev/null
@@ -1,144 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.trees.plans.commands.insert;
-
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.EnvFactory;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.UserException;
-import org.apache.doris.nereids.exceptions.AnalysisException;
-import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
-import org.apache.doris.planner.DataSink;
-import org.apache.doris.planner.GroupCommitPlanner;
-import org.apache.doris.planner.OlapTableSink;
-import org.apache.doris.planner.UnionNode;
-import org.apache.doris.proto.InternalService;
-import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.SqlModeHelper;
-import org.apache.doris.rpc.RpcException;
-import org.apache.doris.thrift.TStatusCode;
-import org.apache.doris.transaction.TransactionStatus;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.thrift.TException;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-/**
- * Handle group commit
- */
-public class GroupCommitInserter {
- public static final Logger LOG =
LogManager.getLogger(GroupCommitInserter.class);
-
- /**
- * Handle group commit
- */
- public static boolean groupCommit(ConnectContext ctx, DataSink sink,
PhysicalSink physicalSink) {
- PhysicalOlapTableSink<?> olapSink = (PhysicalOlapTableSink<?>)
physicalSink;
- // TODO: implement group commit
- if (canGroupCommit(ctx, sink, olapSink)) {
- // handleGroupCommit(ctx, sink, physicalOlapTableSink);
- // return;
- throw new AnalysisException("group commit is not supported in
Nereids now");
- }
- return false;
- }
-
- private static boolean canGroupCommit(ConnectContext ctx, DataSink sink,
- PhysicalOlapTableSink<?> physicalOlapTableSink) {
- if (!(sink instanceof OlapTableSink) ||
!ctx.getSessionVariable().isEnableInsertGroupCommit()
- || ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
- return false;
- }
- OlapTable targetTable = physicalOlapTableSink.getTargetTable();
- return ctx.getSessionVariable().getSqlMode() !=
SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
- && !ctx.isTxnModel() &&
isGroupCommitAvailablePlan(physicalOlapTableSink)
- && physicalOlapTableSink.getPartitionIds().isEmpty() &&
targetTable.getTableProperty()
- .getUseSchemaLightChange() && !targetTable.getQualifiedDbName()
- .equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME);
- }
-
- private static boolean isGroupCommitAvailablePlan(PhysicalOlapTableSink<?
extends Plan> sink) {
- Plan child = sink.child();
- if (child instanceof PhysicalDistribute) {
- child = child.child(0);
- }
- return child instanceof OneRowRelation || (child instanceof
PhysicalUnion && child.arity() == 0);
- }
-
- private void handleGroupCommit(ConnectContext ctx, DataSink sink,
- PhysicalOlapTableSink<?> physicalOlapTableSink)
- throws UserException, RpcException, TException,
ExecutionException, InterruptedException {
- // TODO we should refactor this to remove rely on UnionNode
- List<InternalService.PDataRow> rows = new ArrayList<>();
- List<List<Expr>> materializedConstExprLists = ((UnionNode)
sink.getFragment()
- .getPlanRoot()).getMaterializedConstExprLists();
- int filterSize = 0;
- for (Slot slot : physicalOlapTableSink.getOutput()) {
- if (slot.getName().contains(Column.DELETE_SIGN)
- || slot.getName().contains(Column.VERSION_COL)) {
- filterSize += 1;
- }
- }
- for (List<Expr> list : materializedConstExprLists) {
- rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize));
- }
- GroupCommitPlanner groupCommitPlanner =
EnvFactory.getInstance().createGroupCommitPlanner(
- physicalOlapTableSink.getDatabase(),
- physicalOlapTableSink.getTargetTable(), null, ctx.queryId(),
- ConnectContext.get().getSessionVariable().getGroupCommit());
- PGroupCommitInsertResponse response =
groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
- TStatusCode code =
TStatusCode.findByValue(response.getStatus().getStatusCode());
- if (code == TStatusCode.DATA_QUALITY_ERROR) {
- LOG.info("group commit insert failed. query id: {}, backend id:
{}, status: {}, "
- + "schema version: {}", ctx.queryId(),
- groupCommitPlanner.getBackend(), response.getStatus(),
-
physicalOlapTableSink.getTargetTable().getBaseSchemaVersion());
- } else if (code != TStatusCode.OK) {
- String errMsg = "group commit insert failed. backend id: "
- + groupCommitPlanner.getBackend().getId() + ", status: "
- + response.getStatus();
- ErrorReport.reportDdlException(errMsg,
ErrorCode.ERR_FAILED_WHEN_INSERT);
- }
- TransactionStatus txnStatus = TransactionStatus.PREPARE;
- String sb = "{'label':'" + response.getLabel() + "', 'status':'" +
txnStatus.name()
- + "', 'txnId':'" + response.getTxnId() + "'"
- + "', 'optimizer':'" + "nereids" + "'"
- + "}";
- ctx.getState().setOk(response.getLoadedRows(), (int)
response.getFilteredRows(), sb);
- ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(),
- physicalOlapTableSink.getDatabase().getFullName(),
physicalOlapTableSink.getTargetTable().getName(),
- txnStatus, response.getLoadedRows(), (int)
response.getFilteredRows());
- // update it, so that user can get loaded rows in fe.audit.log
- ctx.updateReturnRows((int) response.getLoadedRows());
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 4436d3e3f3c..abcda9e9bdc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -162,11 +162,13 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
ctx.isTxnModel() ? null : String.format("label_%x_%x",
ctx.queryId().hi, ctx.queryId().lo));
if (physicalSink instanceof PhysicalOlapTableSink) {
- if (GroupCommitInserter.groupCommit(ctx, sink, physicalSink)) {
- // return;
- throw new AnalysisException("group commit is not supported
in Nereids now");
- }
boolean emptyInsert = childIsEmptyRelation(physicalSink);
+ if (GroupCommitInsertExecutor.canGroupCommit(ctx, sink,
physicalSink, planner)) {
+ insertExecutor = new GroupCommitInsertExecutor(ctx,
targetTableIf, label, planner, insertCtx,
+ emptyInsert);
+ targetTableIf.readUnlock();
+ return insertExecutor;
+ }
OlapTable olapTable = (OlapTable) targetTableIf;
// the insertCtx contains some variables to adjust SinkNode
insertExecutor = ctx.isTxnModel()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index b4e8c471800..32121b9833b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -140,7 +140,10 @@ public class InsertUtils {
ctx.updateReturnRows(effectRows);
}
- private static InternalService.PDataRow
getRowStringValue(List<NamedExpression> cols) {
+ /**
+ * literal expr in insert operation
+ */
+ public static InternalService.PDataRow
getRowStringValue(List<NamedExpression> cols) {
if (cols.isEmpty()) {
return null;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 7c72a1e076e..2ed8bae8c3e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -299,6 +299,6 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
}
private boolean isGroupCommitHttpStream() {
- return ConnectContext.get() != null &&
ConnectContext.get().isGroupCommitStreamLoadSql();
+ return ConnectContext.get() != null &&
ConnectContext.get().isGroupCommit();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
index 7e24732d77f..1b3535f1a80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
@@ -17,13 +17,10 @@
package org.apache.doris.planner;
-import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.NativeInsertStmt;
-import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.SlotRef;
-import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@@ -71,6 +68,7 @@ import java.util.stream.Collectors;
// we only support OlapTable now.
public class GroupCommitPlanner {
private static final Logger LOG =
LogManager.getLogger(GroupCommitPlanner.class);
+ public static final String SCHEMA_CHANGE = " is blocked on schema change";
protected Database db;
protected OlapTable table;
@@ -85,7 +83,7 @@ public class GroupCommitPlanner {
this.db = db;
this.table = table;
if
(Env.getCurrentEnv().getGroupCommitManager().isBlock(this.table.getId())) {
- String msg = "insert table " + this.table.getId() + " is blocked
on schema change";
+ String msg = "insert table " + this.table.getId() + SCHEMA_CHANGE;
LOG.info(msg);
throw new DdlException(msg);
}
@@ -172,39 +170,6 @@ public class GroupCommitPlanner {
throw new DdlException("No suitable backend");
}
- // only for nereids use
- public static InternalService.PDataRow getRowStringValue(List<Expr> cols,
int filterSize) throws UserException {
- if (cols.isEmpty()) {
- return null;
- }
- InternalService.PDataRow.Builder row =
InternalService.PDataRow.newBuilder();
- List<Expr> exprs = cols.subList(0, cols.size() - filterSize);
- for (Expr expr : exprs) {
- if (!expr.isLiteralOrCastExpr() && !(expr instanceof CastExpr)) {
- if (expr.getChildren().get(0) instanceof NullLiteral) {
-
row.addColBuilder().setValue(StmtExecutor.NULL_VALUE_FOR_LOAD);
- continue;
- }
- throw new UserException(
- "do not support non-literal expr in transactional
insert operation: " + expr.toSql());
- }
- processExprVal(expr, row);
- }
- return row.build();
- }
-
- private static void processExprVal(Expr expr,
InternalService.PDataRow.Builder row) {
- if (expr instanceof NullLiteral) {
- row.addColBuilder().setValue(StmtExecutor.NULL_VALUE_FOR_LOAD);
- } else if (expr.getType() instanceof ArrayType) {
- row.addColBuilder().setValue(String.format("\"%s\"",
expr.getStringValueForArray()));
- } else if (!expr.getChildren().isEmpty()) {
- expr.getChildren().forEach(child -> processExprVal(child, row));
- } else {
- row.addColBuilder().setValue(String.format("\"%s\"",
expr.getStringValue()));
- }
- }
-
public Backend getBackend() {
return backend;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index b166de7691c..81be929bd93 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -226,7 +226,7 @@ public class ConnectContext {
private String workloadGroupName = "";
private Map<Long, Backend> insertGroupCommitTableToBeMap = new HashMap<>();
- private boolean isGroupCommitStreamLoadSql;
+ private boolean isGroupCommit;
private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL;
@@ -1349,12 +1349,12 @@ public class ConnectContext {
return this.sessionVariable.getNetWriteTimeout();
}
- public boolean isGroupCommitStreamLoadSql() {
- return isGroupCommitStreamLoadSql;
+ public boolean isGroupCommit() {
+ return isGroupCommit;
}
- public void setGroupCommitStreamLoadSql(boolean groupCommitStreamLoadSql) {
- isGroupCommitStreamLoadSql = groupCommitStreamLoadSql;
+ public void setGroupCommit(boolean groupCommit) {
+ isGroupCommit = groupCommit;
}
public Map<String, LiteralExpr> getUserVars() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 3c1f5a35e8a..affb1beb545 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -593,16 +593,9 @@ public class StmtExecutor {
LOG.warn("Analyze failed. {}",
context.getQueryIdentifier(), e);
throw ((NereidsException) e).getException();
}
- // FIXME: Force fallback for group commit because nereids
does not support it
- boolean isInsertCommand = parsedStmt != null
- && parsedStmt instanceof LogicalPlanAdapter
- && ((LogicalPlanAdapter)
parsedStmt).getLogicalPlan() instanceof InsertIntoTableCommand;
- boolean isGroupCommit =
(Config.wait_internal_group_commit_finish
- ||
context.sessionVariable.isEnableInsertGroupCommit()) && isInsertCommand;
if (e instanceof NereidsException
&& !(((NereidsException) e).getException()
instanceof MustFallbackException)
- &&
!context.getSessionVariable().enableFallbackToOriginalPlanner
- && !isGroupCommit) {
+ &&
!context.getSessionVariable().enableFallbackToOriginalPlanner) {
LOG.warn("Analyze failed. {}",
context.getQueryIdentifier(), e);
context.getState().setError(e.getMessage());
return;
@@ -3396,7 +3389,7 @@ public class StmtExecutor {
if (!Config.wait_internal_group_commit_finish &&
insert.getLabelName().isPresent()) {
throw new AnalysisException("label and group_commit can't
be set at the same time");
}
- context.setGroupCommitStreamLoadSql(true);
+ context.setGroupCommit(true);
}
OlapInsertExecutor insertExecutor = (OlapInsertExecutor)
insert.initPlan(context, this);
httpStreamParams.setTxnId(insertExecutor.getTxnId());
diff --git
a/regression-test/data/insert_p0/insert_group_commit_into_max_filter_ratio.out
b/regression-test/data/insert_p0/insert_group_commit_into_max_filter_ratio.out
index 62743feeb6c..241c03af460 100644
---
a/regression-test/data/insert_p0/insert_group_commit_into_max_filter_ratio.out
+++
b/regression-test/data/insert_p0/insert_group_commit_into_max_filter_ratio.out
@@ -9,6 +9,8 @@
1 a 10
2 \N -1
3 a 10
+4 a 10
+5 a 10
6 a \N
7 a \N
9 a \N
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index 98c1d982bbe..37d38e47b9e 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -136,7 +136,7 @@ suite("insert_group_commit_into") {
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- //sql """ set enable_fallback_to_original_planner=false;
"""
+ sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
@@ -224,16 +224,16 @@ suite("insert_group_commit_into") {
order_qt_select7 """ select name, score from ${table} order by
name asc; """
assertTrue(getAlterTableState(), "add rollup should success")
- /*if (item == "nereids") {
+ if (item == "nereids") {
group_commit_insert """ insert into ${table}(id, name,
score) values(10 + 1, 'h', 100); """, 1
group_commit_insert """ insert into ${table}(id, name,
score) select 10 + 2, 'h', 100; """, 1
group_commit_insert """ insert into ${table} with label
test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13,
'h', 100); """, 1
getRowCount(23)
- } else {*/
+ } else {
none_group_commit_insert """ insert into ${table}(id,
name, score) values(10 + 1, 'h', 100); """, 1
none_group_commit_insert """ insert into ${table}(id,
name, score) select 10 + 2, 'h', 100; """, 1
none_group_commit_insert """ insert into ${table} with
label test_gc_""" + System.currentTimeMillis() + """ (id, name, score)
values(13, 'h', 100); """, 1
- //}
+ }
def rowCount = sql "select count(*) from ${table}"
logger.info("row count: " + rowCount)
@@ -340,7 +340,7 @@ suite("insert_group_commit_into") {
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- //sql """ set enable_fallback_to_original_planner=false;
"""
+ sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
@@ -407,7 +407,7 @@ suite("insert_group_commit_into") {
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- //sql """ set enable_fallback_to_original_planner=false;
"""
+ sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
@@ -522,4 +522,4 @@ suite("insert_group_commit_into") {
} finally {
}
}
-}
\ No newline at end of file
+}
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
index 51264d3d863..96a517a0534 100644
---
a/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
+++
b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
@@ -41,6 +41,19 @@ suite("insert_group_commit_into_max_filter_ratio") {
}
}
+ def normal_insert = { sql, expected_row_count ->
+ def stmt = prepareStatement """ ${sql} """
+ def result = stmt.executeUpdate()
+ logger.info("insert result: " + result)
+ def serverInfo = (((StatementImpl) stmt).results).getServerInfo()
+ logger.info("result server info: " + serverInfo)
+ if (result != expected_row_count) {
+ logger.warn("insert result: " + result + ", expected_row_count: "
+ expected_row_count + ", sql: " + sql)
+ }
+ assertTrue(serverInfo.contains("'status':'VISIBLE'"))
+ assertTrue(serverInfo.contains("'label':'label"))
+ }
+
def group_commit_insert = { sql, expected_row_count ->
def stmt = prepareStatement """ ${sql} """
def result = stmt.executeUpdate()
@@ -80,7 +93,7 @@ suite("insert_group_commit_into_max_filter_ratio") {
logger.warn("insert result: " + result + ",
expected_row_count: " + expected_row_count + ", sql: " + sql)
}
// assertEquals(result, expected_row_count)
- assertTrue(serverInfo.contains("'status':'ABORTED'"))
+ assertTrue(serverInfo.contains("'status':'ABORTED'") ||
serverInfo.contains("too many filtered rows"))
// assertFalse(serverInfo.contains("'label':'group_commit_"))
} catch (Exception e) {
logger.info("exception: " + e)
@@ -171,10 +184,12 @@ suite("insert_group_commit_into_max_filter_ratio") {
for (item in ["legacy", "nereids"]) {
sql """ truncate table ${tableName} """
connect(user = context.config.jdbcUser, password =
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+ // TODO: pipeline need to be implemented
+ sql """ set experimental_enable_nereids_dml_with_pipeline = false;
"""
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- // sql """ set enable_fallback_to_original_planner=false; """
+ sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
@@ -185,11 +200,18 @@ suite("insert_group_commit_into_max_filter_ratio") {
group_commit_insert """ insert into ${dbTableName}(id) select 2;
""", 1
sql """ set group_commit = off_mode; """
off_mode_group_commit_insert """ insert into ${dbTableName} values
(3, 'a', 10); """, 1
- sql """ set group_commit = async_mode; """
- fail_group_commit_insert """ insert into ${dbTableName} values (4,
'abc', 10); """, 0
- sql """ set enable_insert_strict = false; """
- group_commit_insert """ insert into ${dbTableName} values (5,
'abc', 10); """, 0
+ if (item == "nereids") {
+ sql """ set group_commit = async_mode; """
+ normal_insert """ insert into ${dbTableName} values (4, 'abc',
10); """, 0
+ sql """ set enable_insert_strict = false; """
+ normal_insert """ insert into ${dbTableName} values (5, 'abc',
10); """, 0
+ } else {
+ sql """ set group_commit = async_mode; """
+ fail_group_commit_insert """ insert into ${dbTableName} values
(4, 'abc', 10); """, 0
+ sql """ set enable_insert_strict = false; """
+ group_commit_insert """ insert into ${dbTableName} values (5,
'abc', 10); """, 0
+ }
// The row 6 and 7 is different between legacy and nereids
try {
sql """ set group_commit = off_mode; """
@@ -212,8 +234,11 @@ suite("insert_group_commit_into_max_filter_ratio") {
// TODO should throw exception?
sql """ set group_commit = async_mode; """
sql """ set enable_insert_strict = true; """
- fail_group_commit_insert """ insert into ${dbTableName} values (8,
'a', 'a'); """, 0
-
+ if (item == "nereids") {
+ // sql """ insert into ${dbTableName} values (8, 'a', 'a');
""", 1
+ } else {
+ fail_group_commit_insert """ insert into ${dbTableName} values
(8, 'a', 'a'); """, 0
+ }
sql """ set group_commit = async_mode; """
sql """ set enable_insert_strict = false; """
group_commit_insert """ insert into ${dbTableName} values (9, 'a',
'a'); """, 0
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy
index 83247d25129..1bb978da1d1 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy
@@ -96,7 +96,7 @@ suite("insert_group_commit_into_unique") {
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- // sql """ set enable_fallback_to_original_planner=false;
"""
+ sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
@@ -182,7 +182,7 @@ suite("insert_group_commit_into_unique") {
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- // sql """ set enable_fallback_to_original_planner=false;
"""
+ sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
@@ -269,7 +269,7 @@ suite("insert_group_commit_into_unique") {
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- // sql """ set enable_fallback_to_original_planner=false;
"""
+ sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy
index aae859c71de..339d477936a 100644
---
a/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy
+++
b/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy
@@ -134,7 +134,7 @@ suite("insert_group_commit_into_unique_sync_mode") {
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- // sql """ set enable_fallback_to_original_planner=false;
"""
+ sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
@@ -218,11 +218,13 @@ suite("insert_group_commit_into_unique_sync_mode") {
// 1. insert into
connect(user = context.config.jdbcUser, password =
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+ // TODO: pipeline need to be implemented
+ sql """ set experimental_enable_nereids_dml_with_pipeline = false;
"""
sql """ set group_commit = sync_mode; """
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- // sql """ set enable_fallback_to_original_planner=false;
"""
+ sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
@@ -313,7 +315,7 @@ suite("insert_group_commit_into_unique_sync_mode") {
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- // sql """ set enable_fallback_to_original_planner=false;
"""
+ sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
index 31ed7680c9a..cbcd7fd1dc2 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
@@ -74,7 +74,7 @@ suite("insert_group_commit_with_exception") {
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- //sql """ set enable_fallback_to_original_planner=false; """
+ sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
@@ -84,22 +84,14 @@ suite("insert_group_commit_with_exception") {
def result = sql """ insert into ${table} values(1, 'a', 10,
100) """
assertTrue(false)
} catch (Exception e) {
- /*if (item == "nereids") {
- assertTrue(e.getMessage().contains("insert into cols
should be corresponding to the query output"))
- } else {*/
- assertTrue(e.getMessage().contains("Column count doesn't
match value count"))
- //}
+ assertTrue(e.getMessage().contains("Column count doesn't match
value count"))
}
try {
def result = sql """ insert into ${table} values(2, 'b') """
assertTrue(false)
} catch (Exception e) {
- /*if (item == "nereids") {
- assertTrue(e.getMessage().contains("insert into cols
should be corresponding to the query output"))
- } else {*/
- assertTrue(e.getMessage().contains("Column count doesn't
match value count"))
- //}
+ assertTrue(e.getMessage().contains("Column count doesn't match
value count"))
}
result = sql """ insert into ${table} values(3, 'c', 30) """
@@ -115,33 +107,21 @@ suite("insert_group_commit_with_exception") {
result = sql """ insert into ${table}(id, name) values(5, 'd',
50) """
assertTrue(false)
} catch (Exception e) {
- /*if (item == "nereids") {
- assertTrue(e.getMessage().contains("insert into cols
should be corresponding to the query output"))
- } else {*/
- assertTrue(e.getMessage().contains("Column count doesn't
match value count"))
- //}
+ assertTrue(e.getMessage().contains("Column count doesn't match
value count"))
}
try {
result = sql """ insert into ${table}(id, name) values(6) """
assertTrue(false)
} catch (Exception e) {
- /*if (item == "nereids") {
- assertTrue(e.getMessage().contains("insert into cols
should be corresponding to the query output"))
- } else {*/
- assertTrue(e.getMessage().contains("Column count doesn't
match value count"))
- //}
+ assertTrue(e.getMessage().contains("Column count doesn't match
value count"))
}
try {
result = sql """ insert into ${table}(id, names) values(7,
'd') """
assertTrue(false)
} catch (Exception e) {
- /*if (item == "nereids") {
- assertTrue(e.getMessage().contains("column names is not
found in table"))
- } else {*/
- assertTrue(e.getMessage().contains("Unknown column
'names'"))
- //}
+ assertTrue(e.getMessage().contains("Unknown column 'names'"))
}
@@ -149,11 +129,6 @@ suite("insert_group_commit_with_exception") {
def db = context.config.defaultDb + "_insert_p0"
String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db)
- if (item == "nereids") {
- println("nereids does not support prepare insert");
- continue;
- };
-
try (Connection connection = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword)) {
Statement statement = connection.createStatement();
statement.execute("use ${db}");
@@ -161,7 +136,7 @@ suite("insert_group_commit_with_exception") {
if (item == "nereids") {
statement.execute("set enable_nereids_dml = true;");
statement.execute("set enable_nereids_planner=true;");
- //statement.execute("set
enable_fallback_to_original_planner=false;");
+ statement.execute("set
enable_fallback_to_original_planner=false;");
} else {
statement.execute("set enable_nereids_dml = false;");
}
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
index 6a708eaf5e2..67482674d32 100644
---
a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
+++
b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
@@ -71,7 +71,7 @@ suite("insert_group_commit_with_large_data") {
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- //sql """ set enable_fallback_to_original_planner=false;
"""
+ sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
diff --git a/regression-test/suites/insert_p0/insert_with_null.groovy
b/regression-test/suites/insert_p0/insert_with_null.groovy
index e93fa8eb410..8d87aa655f9 100644
--- a/regression-test/suites/insert_p0/insert_with_null.groovy
+++ b/regression-test/suites/insert_p0/insert_with_null.groovy
@@ -73,7 +73,7 @@ suite("insert_with_null") {
sql """ set group_commit = async_mode; """
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- //sql """ set enable_fallback_to_original_planner=false; """
+ sql """ set enable_fallback_to_original_planner=false; """
}
sql """ insert into ${table} values(1, '"b"', ["k1=v1, k2=v2"]); """
diff --git
a/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy
b/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy
index cba4349e16d..d4f98e13d02 100644
---
a/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy
+++
b/regression-test/suites/insert_p0/test_group_commit_data_bytes_property.groovy
@@ -65,7 +65,7 @@ suite("test_group_commit_data_bytes_property") {
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- //sql """ set enable_fallback_to_original_planner=false; """
+ sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
diff --git
a/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy
b/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy
index 5d8962d07ac..9cc1b3abc63 100644
---
a/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy
+++
b/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy
@@ -65,7 +65,7 @@ suite("test_group_commit_interval_ms_property") {
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
- //sql """ set enable_fallback_to_original_planner=false; """
+ sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]