This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ac21d76bc55 [fix](group commit) Fix group commit in nereids (#37002)
ac21d76bc55 is described below
commit ac21d76bc555ba465d65a067e8bc45be1ac496bc
Author: meiyi <[email protected]>
AuthorDate: Thu Jul 4 18:08:52 2024 +0800
[fix](group commit) Fix group commit in nereids (#37002)
## Proposed changes
Before: the insert into values in group commit use the stream load way
to load data, this may cause complex data type load error.
After: use the original insert into values way(scanner node is union
node), sink is group_commit_block_sink.
---
.../exec/group_commit_block_sink_operator.cpp | 6 +-
be/src/runtime/fragment_mgr.cpp | 4 +
be/src/runtime/group_commit_mgr.cpp | 8 +-
be/src/runtime/group_commit_mgr.h | 6 +-
.../commands/insert/GroupCommitInsertExecutor.java | 256 ---------------------
.../commands/insert/InsertIntoTableCommand.java | 22 +-
.../insert/OlapGroupCommitInsertExecutor.java | 98 ++++++++
.../plans/commands/insert/OlapInsertExecutor.java | 9 +-
.../commands/insert/OlapTxnInsertExecutor.java | 6 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 17 ++
gensrc/thrift/FrontendService.thrift | 2 +
.../insert_p0/insert_group_commit_into.groovy | 27 +--
...nsert_group_commit_into_max_filter_ratio.groovy | 14 +-
.../insert_group_commit_into_unique.groovy | 6 +-
...nsert_group_commit_into_unique_sync_mode.groovy | 6 +-
.../insert_p0/test_group_commit_timeout.groovy | 2 +-
16 files changed, 182 insertions(+), 307 deletions(-)
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 402354d6f24..17088b37c3e 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -63,10 +63,14 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState*
state) {
Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
if (_state->exec_env()->wal_mgr()->is_running()) {
+ std::string label;
+ int64_t txn_id;
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
p._db_id, p._table_id, p._base_schema_version, p._load_id,
_load_block_queue,
_state->be_exec_version(), _state->query_mem_tracker(),
_create_plan_dependency,
- _put_block_dependency));
+ _put_block_dependency, label, txn_id));
+ _state->set_import_label(label);
+ _state->set_wal_id(txn_id); // wal_id is txn_id
return Status::OK();
} else {
return Status::InternalError("be is stopping");
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index aa23f97f9fd..76048373286 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -338,6 +338,10 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
params.__set_tracking_url(
to_load_error_http_path(rs->get_error_log_file_path()));
}
+ if (rs->wal_id() > 0) {
+ params.__set_txn_id(rs->wal_id());
+ params.__set_label(rs->import_label());
+ }
}
}
if (!req.runtime_state->export_output_files().empty()) {
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 8a81c942fd3..464f9f51221 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -257,7 +257,7 @@ Status GroupCommitTable::get_first_block_load_queue(
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
- std::shared_ptr<pipeline::Dependency> put_block_dep) {
+ std::shared_ptr<pipeline::Dependency> put_block_dep, std::string&
label, int64_t& txn_id) {
DCHECK(table_id == _table_id);
std::unique_lock l(_lock);
auto try_to_get_matched_queue = [&]() -> Status {
@@ -266,6 +266,8 @@ Status GroupCommitTable::get_first_block_load_queue(
if (base_schema_version == inner_block_queue->schema_version) {
if (inner_block_queue->add_load_id(load_id,
put_block_dep).ok()) {
load_block_queue = inner_block_queue;
+ label = inner_block_queue->label;
+ txn_id = inner_block_queue->txn_id;
return Status::OK();
}
} else {
@@ -561,7 +563,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
- std::shared_ptr<pipeline::Dependency> put_block_dep) {
+ std::shared_ptr<pipeline::Dependency> put_block_dep, std::string&
label, int64_t& txn_id) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
@@ -574,7 +576,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
}
RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
table_id, base_schema_version, load_id, load_block_queue,
be_exe_version, mem_tracker,
- create_plan_dep, put_block_dep));
+ create_plan_dep, put_block_dep, label, txn_id));
return Status::OK();
}
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index 36c51746ef4..e9ea152ea5c 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -159,7 +159,8 @@ public:
int be_exe_version,
std::shared_ptr<MemTrackerLimiter>
mem_tracker,
std::shared_ptr<pipeline::Dependency>
create_plan_dep,
- std::shared_ptr<pipeline::Dependency>
put_block_dep);
+ std::shared_ptr<pipeline::Dependency>
put_block_dep,
+ std::string& label, int64_t& txn_id);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
std::shared_ptr<pipeline::Dependency>
get_block_dep);
@@ -205,7 +206,8 @@ public:
int be_exe_version,
std::shared_ptr<MemTrackerLimiter>
mem_tracker,
std::shared_ptr<pipeline::Dependency>
create_plan_dep,
- std::shared_ptr<pipeline::Dependency>
put_block_dep);
+ std::shared_ptr<pipeline::Dependency>
put_block_dep,
+ std::string& label, int64_t& txn_id);
std::promise<Status> debug_promise;
std::future<Status> debug_future = debug_promise.get_future();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInsertExecutor.java
deleted file mode 100644
index 08686561a41..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInsertExecutor.java
+++ /dev/null
@@ -1,256 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.trees.plans.commands.insert;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.EnvFactory;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.nereids.NereidsPlanner;
-import org.apache.doris.nereids.trees.expressions.Alias;
-import org.apache.doris.nereids.trees.expressions.Cast;
-import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.nereids.trees.expressions.NamedExpression;
-import org.apache.doris.nereids.trees.expressions.literal.Literal;
-import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
-import org.apache.doris.planner.DataSink;
-import org.apache.doris.planner.GroupCommitPlanner;
-import org.apache.doris.planner.OlapTableSink;
-import org.apache.doris.planner.PlanFragment;
-import org.apache.doris.proto.InternalService;
-import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.SqlModeHelper;
-import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.thrift.TStatusCode;
-import org.apache.doris.transaction.TransactionStatus;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * Handle group commit
- */
-public class GroupCommitInsertExecutor extends AbstractInsertExecutor {
- public static final Logger LOG =
LogManager.getLogger(GroupCommitInsertExecutor.class);
- private static final long INVALID_TXN_ID = -1L;
- protected final NereidsPlanner planner;
- private long txnId = INVALID_TXN_ID;
- private TransactionStatus txnStatus = TransactionStatus.ABORTED;
-
- public GroupCommitInsertExecutor(ConnectContext ctx, TableIf table, String
labelName, NereidsPlanner planner,
- Optional<InsertCommandContext> insertCtx,
boolean emptyInsert) {
- super(ctx, table, labelName, planner, insertCtx, emptyInsert);
- this.planner = planner;
- }
-
- /**
- * Handle group commit
- */
- public static boolean canGroupCommit(ConnectContext ctx, DataSink sink,
- PhysicalSink physicalSink,
NereidsPlanner planner) {
- // The flag is set to false before execute sql, if it is true, this is
a http stream
- if (ctx.isGroupCommit()) {
- return false;
- }
- PhysicalOlapTableSink<?> olapSink = (PhysicalOlapTableSink<?>)
physicalSink;
- boolean can = analyzeGroupCommit(ctx, sink, olapSink, planner);
- ctx.setGroupCommit(can);
- return can;
- }
-
- private static boolean analyzeGroupCommit(ConnectContext ctx, DataSink
sink,
- PhysicalOlapTableSink<?> physicalOlapTableSink,
NereidsPlanner planner) {
- if (!(sink instanceof OlapTableSink) ||
!ctx.getSessionVariable().isEnableInsertGroupCommit()
- || ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
- return false;
- }
- OlapTable targetTable = physicalOlapTableSink.getTargetTable();
- return ctx.getSessionVariable().getSqlMode() !=
SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
- && !ctx.isTxnModel()
- && physicalOlapTableSink.getPartitionIds().isEmpty()
- && targetTable.getTableProperty().getUseSchemaLightChange()
- &&
!targetTable.getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME)
- && isGroupCommitAvailablePlan(physicalOlapTableSink, planner);
- }
-
- private static boolean literalExpr(NereidsPlanner planner) {
- Optional<PhysicalUnion> union = planner.getPhysicalPlan()
-
.<PhysicalUnion>collect(PhysicalUnion.class::isInstance).stream().findAny();
- List<List<NamedExpression>> constantExprsList = null;
- if (union.isPresent()) {
- constantExprsList = union.get().getConstantExprsList();
- }
- Optional<PhysicalOneRowRelation> oneRowRelation =
planner.getPhysicalPlan()
-
.<PhysicalOneRowRelation>collect(PhysicalOneRowRelation.class::isInstance).stream().findAny();
- if (oneRowRelation.isPresent()) {
- constantExprsList =
ImmutableList.of(oneRowRelation.get().getProjects());
- }
- for (List<NamedExpression> row : constantExprsList) {
- for (Expression expr : row) {
- while (expr instanceof Alias || expr instanceof Cast) {
- expr = expr.child(0);
- }
- if (!(expr instanceof Literal)) {
- return false;
- }
- }
- }
- return true;
- }
-
- private static boolean isGroupCommitAvailablePlan(PhysicalOlapTableSink<?
extends Plan> sink,
- NereidsPlanner planner) {
- Plan child = sink.child();
- if (child instanceof PhysicalDistribute) {
- child = child.child(0);
- }
- return (child instanceof OneRowRelation || (child instanceof
PhysicalUnion && child.arity() == 0))
- && literalExpr(planner);
- }
-
- private void handleGroupCommit(ConnectContext ctx, DataSink sink,
- PhysicalOlapTableSink<?> physicalOlapTableSink, NereidsPlanner
planner) throws Exception {
- // TODO we should refactor this to remove rely on UnionNode
- List<InternalService.PDataRow> rows = new ArrayList<>();
-
- Optional<PhysicalUnion> union = planner.getPhysicalPlan()
-
.<PhysicalUnion>collect(PhysicalUnion.class::isInstance).stream().findAny();
- List<List<NamedExpression>> constantExprsList = null;
- if (union.isPresent()) {
- constantExprsList = union.get().getConstantExprsList();
- }
- Optional<PhysicalOneRowRelation> oneRowRelation =
planner.getPhysicalPlan()
-
.<PhysicalOneRowRelation>collect(PhysicalOneRowRelation.class::isInstance).stream().findAny();
- if (oneRowRelation.isPresent()) {
- constantExprsList =
ImmutableList.of(oneRowRelation.get().getProjects());
- }
-
- // should set columns of sink since we maybe generate some invisible
columns
- List<Column> fullSchema =
physicalOlapTableSink.getTargetTable().getFullSchema();
- List<Column> targetSchema;
- if (physicalOlapTableSink.getTargetTable().getFullSchema().size() !=
physicalOlapTableSink.getCols().size()) {
- targetSchema = fullSchema;
- } else {
- targetSchema = new ArrayList<>(physicalOlapTableSink.getCols());
- }
- List<String> columnNames = targetSchema.stream()
- .map(Column::getName)
- .map(n -> n.replace("`", "``"))
- .collect(Collectors.toList());
- for (List<NamedExpression> row : constantExprsList) {
- rows.add(InsertUtils.getRowStringValue(row));
- }
- GroupCommitPlanner groupCommitPlanner =
EnvFactory.getInstance().createGroupCommitPlanner(
- physicalOlapTableSink.getDatabase(),
- physicalOlapTableSink.getTargetTable(), columnNames,
ctx.queryId(),
- ConnectContext.get().getSessionVariable().getGroupCommit());
- PGroupCommitInsertResponse response =
groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
- TStatusCode code =
TStatusCode.findByValue(response.getStatus().getStatusCode());
- // TODO: in legacy, there is a retry, we need to implement
- if (code != TStatusCode.OK) {
- String errMsg = "group commit insert failed. query_id: " +
DebugUtil.printId(ConnectContext.get().queryId())
- + ", backend id: " +
groupCommitPlanner.getBackend().getId() + ", status: " + response.getStatus();
- ErrorReport.reportDdlException(errMsg,
ErrorCode.ERR_FAILED_WHEN_INSERT);
- }
- txnStatus = TransactionStatus.PREPARE;
- String sb = "{'label':'" + response.getLabel() + "', 'status':'" +
txnStatus.name()
- + "', 'txnId':'" + response.getTxnId() + "'"
- + "', 'optimizer':'" + "nereids" + "'"
- + "}";
- ctx.getState().setOk(response.getLoadedRows(), (int)
response.getFilteredRows(), sb);
- ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(),
- physicalOlapTableSink.getDatabase().getFullName(),
physicalOlapTableSink.getTargetTable().getName(),
- txnStatus, response.getLoadedRows(), (int)
response.getFilteredRows());
- // update it, so that user can get loaded rows in fe.audit.log
- ctx.updateReturnRows((int) response.getLoadedRows());
- }
-
- @Override
- public void beginTransaction() {
-
- }
-
- @Override
- protected void finalizeSink(PlanFragment fragment, DataSink sink,
PhysicalSink physicalSink) {
-
- }
-
- @Override
- protected void beforeExec() {
- }
-
- @Override
- protected void onComplete() throws UserException {
-
- }
-
- @Override
- protected void onFail(Throwable t) {
- errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
- ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage());
- }
-
- @Override
- protected void afterExec(StmtExecutor executor) {
-
- }
-
- protected final void execImpl() throws Exception {
- Optional<PhysicalOlapTableSink<?>> plan = (planner.getPhysicalPlan()
-
.<PhysicalOlapTableSink<?>>collect(PhysicalSink.class::isInstance)).stream()
- .findAny();
- PhysicalOlapTableSink<?> olapSink = plan.get();
- DataSink sink = planner.getFragments().get(0).getSink();
- handleGroupCommit(ctx, sink, olapSink, planner);
- }
-
- @Override
- public void executeSingleInsert(StmtExecutor executor, long jobId) throws
Exception {
- beforeExec();
- try {
- execImpl();
- onComplete();
- } catch (Throwable t) {
- onFail(t);
- // retry group_commit insert when meet
- if (t.getMessage().contains(GroupCommitPlanner.SCHEMA_CHANGE)) {
- throw t;
- }
- return;
- }
- afterExec(executor);
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 39507d72685..7a1280092b0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -146,7 +146,10 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
if (cte.isPresent()) {
this.logicalQuery = ((LogicalPlan)
cte.get().withChildren(logicalQuery));
}
-
+ if (this.logicalQuery instanceof UnboundTableSink) {
+ OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx,
targetTableIf,
+ (UnboundTableSink<?>) this.logicalQuery);
+ }
LogicalPlanAdapter logicalPlanAdapter = new
LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
NereidsPlanner planner = new
NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter,
ctx.getSessionVariable().toThrift());
@@ -167,17 +170,16 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
if (physicalSink instanceof PhysicalOlapTableSink) {
boolean emptyInsert = childIsEmptyRelation(physicalSink);
- if (GroupCommitInsertExecutor.canGroupCommit(ctx, sink,
physicalSink, planner)) {
- insertExecutor = new GroupCommitInsertExecutor(ctx,
targetTableIf, label, planner, insertCtx,
- emptyInsert);
- targetTableIf.readUnlock();
- return insertExecutor;
- }
OlapTable olapTable = (OlapTable) targetTableIf;
// the insertCtx contains some variables to adjust SinkNode
- insertExecutor = ctx.isTxnModel()
- ? new OlapTxnInsertExecutor(ctx, olapTable, label,
planner, insertCtx, emptyInsert)
- : new OlapInsertExecutor(ctx, olapTable, label,
planner, insertCtx, emptyInsert);
+ if (ctx.isTxnModel()) {
+ insertExecutor = new OlapTxnInsertExecutor(ctx, olapTable,
label, planner, insertCtx, emptyInsert);
+ } else if (ctx.isGroupCommit()) {
+ insertExecutor = new OlapGroupCommitInsertExecutor(ctx,
olapTable, label, planner, insertCtx,
+ emptyInsert);
+ } else {
+ insertExecutor = new OlapInsertExecutor(ctx, olapTable,
label, planner, insertCtx, emptyInsert);
+ }
boolean isEnableMemtableOnSinkNode =
olapTable.getTableProperty().getUseSchemaLightChange()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
new file mode 100644
index 00000000000..5091bae17d1
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.analyzer.UnboundTableSink;
+import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.transaction.TransactionStatus;
+
+import com.google.common.base.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * Insert executor for olap table with group commit
+ */
+public class OlapGroupCommitInsertExecutor extends OlapInsertExecutor {
+ private static final Logger LOG =
LogManager.getLogger(OlapGroupCommitInsertExecutor.class);
+
+ public OlapGroupCommitInsertExecutor(ConnectContext ctx, Table table,
+ String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx,
+ boolean emptyInsert) {
+ super(ctx, table, labelName, planner, insertCtx, emptyInsert);
+ }
+
+ protected static void analyzeGroupCommit(ConnectContext ctx, TableIf
table, UnboundTableSink<?> tableSink) {
+ // The flag is set to false before execute sql, if it is true, this is
a http stream
+ if (ctx.isGroupCommit()) {
+ return;
+ }
+
ctx.setGroupCommit(ctx.getSessionVariable().isEnableInsertGroupCommit() &&
!ctx.isTxnModel()
+ && !ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate()
&& table instanceof OlapTable
+ && ((OlapTable)
table).getTableProperty().getUseSchemaLightChange()
+ && !((OlapTable)
table).getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME)
+ && tableSink.getPartitions().isEmpty()
+ && (tableSink.child() instanceof OneRowRelation ||
tableSink.child() instanceof LogicalUnion));
+ }
+
+ @Override
+ public void beginTransaction() {
+ }
+
+ @Override
+ protected void onComplete() {
+ if (ctx.getState().getStateType() == MysqlStateType.ERR) {
+ txnStatus = TransactionStatus.ABORTED;
+ } else {
+ txnStatus = TransactionStatus.PREPARE;
+ }
+ }
+
+ @Override
+ protected void onFail(Throwable t) {
+ errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
+ String queryId = DebugUtil.printId(ctx.queryId());
+ // if any throwable being thrown during insert operation, first we
should abort this txn
+ LOG.warn("insert [{}] with query id {} failed, url={}", labelName,
queryId, coordinator.getTrackingUrl(), t);
+ StringBuilder sb = new StringBuilder(t.getMessage());
+ if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
+ sb.append(". url: ").append(coordinator.getTrackingUrl());
+ }
+ ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString());
+ }
+
+ @Override
+ protected void afterExec(StmtExecutor executor) {
+ labelName = coordinator.getLabel();
+ txnId = coordinator.getTxnId();
+ setReturnInfo();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index b4f5503ed44..0153700863d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -74,7 +74,7 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
protected static final long INVALID_TXN_ID = -1L;
private static final Logger LOG =
LogManager.getLogger(OlapInsertExecutor.class);
protected long txnId = INVALID_TXN_ID;
- private TransactionStatus txnStatus = TransactionStatus.ABORTED;
+ protected TransactionStatus txnStatus = TransactionStatus.ABORTED;
/**
* constructor
@@ -274,11 +274,14 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
errMsg = "Record info of insert load with error " + e.getMessage();
}
+ setReturnInfo();
+ }
+
+ protected void setReturnInfo() {
// {'label':'my_label1', 'status':'visible', 'txnId':'123'}
// {'label':'my_label1', 'status':'visible', 'txnId':'123'
'err':'error messages'}
StringBuilder sb = new StringBuilder();
- sb.append("{'label':'").append(labelName).append("', 'status':'")
- .append(ctx.isTxnModel() ? TransactionStatus.PREPARE.name() :
txnStatus.name());
+ sb.append("{'label':'").append(labelName).append("',
'status':'").append(txnStatus.name());
sb.append("', 'txnId':'").append(txnId).append("'");
if (table.getType() == TableType.MATERIALIZED_VIEW) {
sb.append("', 'rows':'").append(loadedRows).append("'");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java
index ebe0a318e19..1512eca16e2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapTxnInsertExecutor.java
@@ -27,6 +27,7 @@ import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.transaction.SubTransactionState.SubTransactionType;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -43,10 +44,7 @@ public class OlapTxnInsertExecutor extends
OlapInsertExecutor {
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx,
boolean emptyInsert) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
- }
-
- public long getTxnId() {
- return txnId;
+ txnStatus = TransactionStatus.PREPARE;
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 9e7431e07c8..8c40d1a5938 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -240,6 +240,9 @@ public class Coordinator implements CoordInterface {
private List<String> deltaUrls;
private Map<String, String> loadCounters;
private String trackingUrl;
+ // related txnId and label of group commit
+ private long txnId;
+ private String label;
// for export
private List<String> exportFiles;
@@ -468,6 +471,14 @@ public class Coordinator implements CoordInterface {
return trackingUrl;
}
+ public long getTxnId() {
+ return txnId;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
public void setExecMemoryLimit(long execMemoryLimit) {
this.queryOptions.setMemLimit(execMemoryLimit);
}
@@ -2346,6 +2357,12 @@ public class Coordinator implements CoordInterface {
if (params.isSetTrackingUrl()) {
trackingUrl = params.getTrackingUrl();
}
+ if (params.isSetTxnId()) {
+ txnId = params.getTxnId();
+ }
+ if (params.isSetLabel()) {
+ label = params.getLabel();
+ }
if (params.isSetExportFiles()) {
updateExportFiles(params.getExportFiles());
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 6b11402f299..26d4961bf09 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -506,6 +506,8 @@ struct TReportExecStatusParams {
28: optional list<DataSinks.TIcebergCommitData> iceberg_commit_datas
+ 29: optional i64 txn_id
+ 30: optional string label
}
struct TFeResult {
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index 37d38e47b9e..9105a8dc8db 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -146,7 +146,11 @@ suite("insert_group_commit_into") {
group_commit_insert """ insert into ${table}(id) values(4);
""", 1
group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50); """, 2
group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+ if (item == "nereids") {
+ none_group_commit_insert """ insert into ${table}(id)
select 6; """, 1
+ } else {
+ group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+ }
getRowCount(6)
order_qt_select1 """ select * from ${table} order by id, name,
score asc; """
@@ -160,7 +164,7 @@ suite("insert_group_commit_into") {
group_commit_insert """ insert into ${table}(id, name)
values(4, 'e1'); """, 1
group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50); """, 2
group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+ group_commit_insert """ insert into ${table}(id) values(6);
""", 1
getRowCount(11)
order_qt_select2 """ select * from ${table} order by id, name,
score asc; """
@@ -171,7 +175,7 @@ suite("insert_group_commit_into") {
group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50); """, 2
sql """ alter table ${table} ADD column age int after name; """
group_commit_insert_with_retry """ insert into ${table}(id,
name) values(2, 'b'); """, 1
- group_commit_insert_with_retry """ insert into ${table}(id)
select 6; """, 1
+ group_commit_insert_with_retry """ insert into ${table}(id)
values(6); """, 1
assertTrue(getAlterTableState(), "add column should success")
getRowCount(17)
@@ -183,7 +187,7 @@ suite("insert_group_commit_into") {
sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q',
6, 50); """*/
sql """ truncate table ${table}; """
group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+ group_commit_insert """ insert into ${table}(id) values(6);
""", 1
getRowCount(2)
order_qt_select4 """ select * from ${table} order by id, name,
score asc; """
@@ -194,7 +198,7 @@ suite("insert_group_commit_into") {
group_commit_insert """ insert into ${table}(id, name, age,
score) values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2
sql """ alter table ${table} order by (id, name, score, age);
"""
group_commit_insert_with_retry """ insert into ${table}(id,
name) values(2, 'b'); """, 1
- group_commit_insert_with_retry """ insert into ${table}(id)
select 6; """, 1
+ group_commit_insert_with_retry """ insert into ${table}(id)
values(6); """, 1
assertTrue(getAlterTableState(), "modify column order should
success")
getRowCount(8)
@@ -206,7 +210,7 @@ suite("insert_group_commit_into") {
group_commit_insert """ insert into ${table}(id, name, age,
score) values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2
sql """ alter table ${table} DROP column age; """
group_commit_insert_with_retry """ insert into ${table}(id,
name) values(2, 'b'); """, 1
- group_commit_insert_with_retry """ insert into ${table}(id)
select 6; """, 1
+ group_commit_insert_with_retry """ insert into ${table}(id)
values(6); """, 1
assertTrue(getAlterTableState(), "drop column should success")
getRowCount(14)
@@ -218,7 +222,7 @@ suite("insert_group_commit_into") {
group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50),(101, 'a', 100); """, 2
sql """ alter table ${table} ADD ROLLUP r1(name, score); """
group_commit_insert_with_retry """ insert into ${table}(id,
name) values(2, 'b'); """, 1
- group_commit_insert_with_retry """ insert into ${table}(id)
select 6; """, 1
+ group_commit_insert_with_retry """ insert into ${table}(id)
values(6); """, 1
getRowCount(20)
order_qt_select7 """ select name, score from ${table} order by
name asc; """
@@ -226,7 +230,7 @@ suite("insert_group_commit_into") {
if (item == "nereids") {
group_commit_insert """ insert into ${table}(id, name,
score) values(10 + 1, 'h', 100); """, 1
- group_commit_insert """ insert into ${table}(id, name,
score) select 10 + 2, 'h', 100; """, 1
+ none_group_commit_insert """ insert into ${table}(id,
name, score) select 10 + 2, 'h', 100; """, 1
group_commit_insert """ insert into ${table} with label
test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13,
'h', 100); """, 1
getRowCount(23)
} else {
@@ -272,13 +276,10 @@ suite("insert_group_commit_into") {
logger.info("observer url: " + url)
connect(user = context.config.jdbcUser, password =
context.config.jdbcPassword, url = url) {
sql """ set group_commit = async_mode; """
- sql """ set enable_nereids_dml = false; """
- sql """ set enable_profile= true; """
- sql """ set enable_nereids_planner = false; """
// 1. insert into
def server_info = group_commit_insert """ insert into
${table}(name, id) values('c', 3); """, 1
- assertTrue(server_info.contains('query_id'))
+ /*assertTrue(server_info.contains('query_id'))
// get query_id, such as
43f87963586a482a-b0496bcf9e2b5555
def query_id_index =
server_info.indexOf("'query_id':'") + "'query_id':'".length()
def query_id = server_info.substring(query_id_index,
query_id_index + 33)
@@ -296,7 +297,7 @@ suite("insert_group_commit_into") {
logger.info("Get profile: code=" + code + ", out=" +
out + ", err=" + err)
assertEquals(code, 0)
def json = parseJson(out)
- assertEquals("success", json.msg.toLowerCase())
+ assertEquals("success", json.msg.toLowerCase())*/
}
}
} else {
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
index c0eb1431432..2c69824e7ab 100644
---
a/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
+++
b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy
@@ -195,21 +195,19 @@ suite("insert_group_commit_into_max_filter_ratio") {
sql """ set group_commit = sync_mode; """
group_commit_insert """ insert into ${dbTableName} values (1, 'a',
10); """, 1
sql """ set group_commit = async_mode; """
- group_commit_insert """ insert into ${dbTableName}(id) select 2;
""", 1
+ group_commit_insert """ insert into ${dbTableName}(id) values(2);
""", 1
sql """ set group_commit = off_mode; """
off_mode_group_commit_insert """ insert into ${dbTableName} values
(3, 'a', 10); """, 1
+ sql """ set group_commit = async_mode; """
if (item == "nereids") {
- sql """ set group_commit = async_mode; """
- normal_insert """ insert into ${dbTableName} values (4, 'abc',
10); """, 0
- sql """ set enable_insert_strict = false; """
- normal_insert """ insert into ${dbTableName} values (5, 'abc',
10); """, 0
+ group_commit_insert """ insert into ${dbTableName} values (4,
'abc', 10); """, 0
} else {
- sql """ set group_commit = async_mode; """
fail_group_commit_insert """ insert into ${dbTableName} values
(4, 'abc', 10); """, 0
- sql """ set enable_insert_strict = false; """
- group_commit_insert """ insert into ${dbTableName} values (5,
'abc', 10); """, 0
}
+ sql """ set enable_insert_strict = false; """
+ group_commit_insert """ insert into ${dbTableName} values (5,
'abc', 10); """, 0
+
// The row 6 and 7 is different between legacy and nereids
try {
sql """ set group_commit = off_mode; """
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy
index 1bb978da1d1..502ef2906d5 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy
@@ -102,7 +102,7 @@ suite("insert_group_commit_into_unique") {
}
group_commit_insert """ insert into ${dbTableName} values (1,
'a', 10),(5, 'q', 50); """, 2
- group_commit_insert """ insert into ${dbTableName}(id) select
6; """, 1
+ group_commit_insert """ insert into ${dbTableName}(id)
values(6); """, 1
group_commit_insert """ insert into ${dbTableName}(id)
values(4); """, 1
group_commit_insert """ insert into ${dbTableName}(name, id)
values('c', 3); """, 1
group_commit_insert """ insert into ${dbTableName}(id, name)
values(2, 'b'); """, 1
@@ -188,7 +188,7 @@ suite("insert_group_commit_into_unique") {
}
group_commit_insert """ insert into ${dbTableName} values (1,
'a', 10),(5, 'q', 50); """, 2
- group_commit_insert """ insert into ${dbTableName}(id, score)
select 6, 60; """, 1
+ group_commit_insert """ insert into ${dbTableName}(id, score)
values(6, 60); """, 1
group_commit_insert """ insert into ${dbTableName}(id, score)
values(4, 70); """, 1
group_commit_insert """ insert into ${dbTableName}(name, id,
score) values('c', 3, 30); """, 1
group_commit_insert """ insert into ${dbTableName}(score, id,
name) values(30, 2, 'b'); """, 1
@@ -275,7 +275,7 @@ suite("insert_group_commit_into_unique") {
}
group_commit_insert """ insert into ${dbTableName}(id, name,
score, __DORIS_SEQUENCE_COL__) values (1, 'a', 10, 100),(5, 'q', 50, 500); """,
2
- group_commit_insert """ insert into ${dbTableName}(id, score,
__DORIS_SEQUENCE_COL__) select 6, 60, 600; """, 1
+ group_commit_insert """ insert into ${dbTableName}(id, score,
__DORIS_SEQUENCE_COL__) values(6, 60, 600); """, 1
group_commit_insert """ insert into ${dbTableName}(id, score,
__DORIS_SEQUENCE_COL__) values(6, 50, 500); """, 1
group_commit_insert """ insert into ${dbTableName}(name, id,
score, __DORIS_SEQUENCE_COL__) values('c', 3, 30, 300); """, 1
group_commit_insert """ insert into ${dbTableName}(score, id,
name, __DORIS_SEQUENCE_COL__) values(30, 2, 'b', 200); """, 1
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy
index d9382dca3f4..93d39f303ad 100644
---
a/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy
+++
b/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy
@@ -140,7 +140,7 @@ suite("insert_group_commit_into_unique_sync_mode") {
}
group_commit_insert """ insert into ${dbTableName} values (1,
'a', 10),(5, 'q', 50); """, 2
- group_commit_insert """ insert into ${dbTableName}(id) select
6; """, 1
+ group_commit_insert """ insert into ${dbTableName}(id)
values(6); """, 1
group_commit_insert """ insert into ${dbTableName}(id)
values(4); """, 1
group_commit_insert """ insert into ${dbTableName}(name, id)
values('c', 3); """, 1
group_commit_insert """ insert into ${dbTableName}(id, name)
values(2, 'b'); """, 1
@@ -228,7 +228,7 @@ suite("insert_group_commit_into_unique_sync_mode") {
}
group_commit_insert """ insert into ${dbTableName} values (1,
'a', 10),(5, 'q', 50); """, 2
- group_commit_insert """ insert into ${dbTableName}(id, score)
select 6, 60; """, 1
+ group_commit_insert """ insert into ${dbTableName}(id, score)
values(6, 60); """, 1
group_commit_insert """ insert into ${dbTableName}(id, score)
values(4, 70); """, 1
group_commit_insert """ insert into ${dbTableName}(name, id,
score) values('c', 3, 30); """, 1
sql """ set group_commit = OFF_MODE; """
@@ -319,7 +319,7 @@ suite("insert_group_commit_into_unique_sync_mode") {
}
group_commit_insert """ insert into ${dbTableName}(id, name,
score, __DORIS_SEQUENCE_COL__) values (1, 'a', 10, 100),(5, 'q', 50, 500); """,
2
- group_commit_insert """ insert into ${dbTableName}(id, score,
__DORIS_SEQUENCE_COL__) select 6, 60, 600; """, 1
+ group_commit_insert """ insert into ${dbTableName}(id, score,
__DORIS_SEQUENCE_COL__) values(6, 60, 600); """, 1
group_commit_insert """ insert into ${dbTableName}(id, score,
__DORIS_SEQUENCE_COL__) values(6, 50, 500); """, 1
group_commit_insert """ insert into ${dbTableName}(name, id,
score, __DORIS_SEQUENCE_COL__) values('c', 3, 30, 300); """, 1
group_commit_insert """ insert into ${dbTableName}(score, id,
name, __DORIS_SEQUENCE_COL__) values(30, 2, 'b', 200); """, 1
diff --git a/regression-test/suites/insert_p0/test_group_commit_timeout.groovy
b/regression-test/suites/insert_p0/test_group_commit_timeout.groovy
index 7866a33df0e..3af6b9b11a0 100644
--- a/regression-test/suites/insert_p0/test_group_commit_timeout.groovy
+++ b/regression-test/suites/insert_p0/test_group_commit_timeout.groovy
@@ -46,7 +46,7 @@ suite("test_group_commit_timeout", "nonConcurrent") {
} catch (Exception e) {
long end = System.currentTimeMillis()
logger.info("failed " + e.getMessage())
- assertTrue(e.getMessage().contains("FragmentMgr cancel worker going to
cancel timeout instance"))
+ assertTrue(e.getMessage().contains("FragmentMgr cancel worker going to
cancel timeout instance") || e.getMessage().contains("Execute timeout"))
assertTrue(end - start <= 60000)
} finally {
sql "SET global query_timeout = ${query_timeout[0][1]}"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]