This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 24969d5ad42 [fix](group commit) Fix some group commit problems (#37595)
24969d5ad42 is described below
commit 24969d5ad42a21a2fe90e6a69c66c45bbdb54e1d
Author: meiyi <[email protected]>
AuthorDate: Thu Jul 11 15:20:02 2024 +0800
[fix](group commit) Fix some group commit problems (#37595)
## Proposed changes
Fix some problems found by
https://github.com/apache/doris/pull/26856/files
1. Some insert should not run into group commit mode, including insert
overwrite and mvmt
2. If there is no partition for the data, should throw an exception
---
.../exec/group_commit_block_sink_operator.cpp | 24 +++++++++++++++-------
be/src/runtime/group_commit_mgr.cpp | 24 +++++++++++-----------
be/src/runtime/group_commit_mgr.h | 6 ++----
.../glue/translator/PhysicalPlanTranslator.java | 5 +++--
.../commands/insert/InsertIntoTableCommand.java | 4 +++-
.../insert/OlapGroupCommitInsertExecutor.java | 3 +++
.../insert_p0/insert_group_commit_into.groovy | 2 ++
regression-test/suites/insert_p0/txn_insert.groovy | 2 +-
.../test_group_commit_stream_load.groovy | 2 +-
9 files changed, 44 insertions(+), 28 deletions(-)
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 424ede07be5..9bac6d4ed29 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -61,21 +61,19 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState*
state) {
"CreateGroupCommitPlanDependency", true);
_put_block_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
"GroupCommitPutBlockDependency", true);
- WARN_IF_ERROR(_initialize_load_queue(), "");
+ [[maybe_unused]] auto st = _initialize_load_queue();
return Status::OK();
}
Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
if (_state->exec_env()->wal_mgr()->is_running()) {
- std::string label;
- int64_t txn_id;
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
p._db_id, p._table_id, p._base_schema_version, p._load_id,
_load_block_queue,
_state->be_exec_version(), _state->query_mem_tracker(),
_create_plan_dependency,
- _put_block_dependency, label, txn_id));
- _state->set_import_label(label);
- _state->set_wal_id(txn_id); // wal_id is txn_id
+ _put_block_dependency));
+ _state->set_import_label(_load_block_queue->label);
+ _state->set_wal_id(_load_block_queue->txn_id); // wal_id is txn_id
return Status::OK();
} else {
return Status::InternalError("be is stopping");
@@ -339,13 +337,25 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState*
state, vectorized::Bloc
local_state._vpartition->find_partition(block.get(), index,
local_state._partitions[index]);
}
+ bool stop_processing = false;
for (int row_index = 0; row_index < rows; row_index++) {
if (local_state._partitions[row_index] == nullptr) [[unlikely]] {
local_state._filter_bitmap.Set(row_index, true);
LOG(WARNING) << "no partition for this tuple. tuple="
<< block->dump_data(row_index, 1);
+ RETURN_IF_ERROR(state->append_error_msg_to_file(
+ []() -> std::string { return ""; },
+ [&]() -> std::string {
+ fmt::memory_buffer buf;
+ fmt::format_to(buf, "no partition for this tuple.
tuple=\n{}",
+ block->dump_data(row_index, 1));
+ return fmt::to_string(buf);
+ },
+ &stop_processing));
+ local_state._has_filtered_rows = true;
+ state->update_num_rows_load_filtered(1);
+ state->update_num_rows_load_total(-1);
}
- local_state._has_filtered_rows = true;
}
}
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index f0a57162458..5f989da023b 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -138,7 +138,7 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
auto last_print_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _last_print_time)
.count();
- if (last_print_duration >= 5000) {
+ if (last_print_duration >= 10000) {
_last_print_time = std::chrono::steady_clock::now();
LOG(INFO) << "find one group_commit need to commit, txn_id="
<< txn_id
<< ", label=" << label << ", instance_id=" <<
load_instance_id
@@ -258,15 +258,13 @@ Status GroupCommitTable::get_first_block_load_queue(
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
- std::shared_ptr<pipeline::Dependency> put_block_dep, std::string&
label, int64_t& txn_id) {
+ std::shared_ptr<pipeline::Dependency> put_block_dep) {
DCHECK(table_id == _table_id);
std::unique_lock l(_lock);
auto try_to_get_matched_queue = [&]() -> Status {
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (inner_block_queue->contain_load_id(load_id)) {
load_block_queue = inner_block_queue;
- label = inner_block_queue->label;
- txn_id = inner_block_queue->txn_id;
return Status::OK();
}
}
@@ -275,8 +273,6 @@ Status GroupCommitTable::get_first_block_load_queue(
if (base_schema_version == inner_block_queue->schema_version) {
if (inner_block_queue->add_load_id(load_id,
put_block_dep).ok()) {
load_block_queue = inner_block_queue;
- label = inner_block_queue->label;
- txn_id = inner_block_queue->txn_id;
return Status::OK();
}
} else {
@@ -319,6 +315,10 @@ Status GroupCommitTable::get_first_block_load_queue(
void GroupCommitTable::remove_load_id(const UniqueId& load_id) {
std::unique_lock l(_lock);
+ if (_create_plan_deps.find(load_id) != _create_plan_deps.end()) {
+ _create_plan_deps.erase(load_id);
+ return;
+ }
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (inner_block_queue->remove_load_id(load_id).ok()) {
return;
@@ -391,13 +391,13 @@ Status GroupCommitTable::_create_group_commit_load(int
be_exe_version,
instance_id, label, txn_id, schema_version,
_all_block_queues_bytes,
result.wait_internal_group_commit_finish,
result.group_commit_interval_ms,
result.group_commit_data_bytes);
- std::unique_lock l(_lock);
RETURN_IF_ERROR(load_block_queue->create_wal(
_db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
be_exe_version));
- _load_block_queues.emplace(instance_id, load_block_queue);
+ std::unique_lock l(_lock);
+ _load_block_queues.emplace(instance_id, load_block_queue);
std::vector<UniqueId> success_load_ids;
for (const auto& [id, load_info] : _create_plan_deps) {
auto create_dep = std::get<0>(load_info);
@@ -409,8 +409,8 @@ Status GroupCommitTable::_create_group_commit_load(int
be_exe_version,
}
}
}
- for (const auto& id2 : success_load_ids) {
- _create_plan_deps.erase(id2);
+ for (const auto& id : success_load_ids) {
+ _create_plan_deps.erase(id);
}
}
}
@@ -610,7 +610,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
- std::shared_ptr<pipeline::Dependency> put_block_dep, std::string&
label, int64_t& txn_id) {
+ std::shared_ptr<pipeline::Dependency> put_block_dep) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
@@ -623,7 +623,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
}
RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
table_id, base_schema_version, load_id, load_block_queue,
be_exe_version, mem_tracker,
- create_plan_dep, put_block_dep, label, txn_id));
+ create_plan_dep, put_block_dep));
return Status::OK();
}
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index 16c7e0c24d3..c6cb34a022a 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -160,8 +160,7 @@ public:
int be_exe_version,
std::shared_ptr<MemTrackerLimiter>
mem_tracker,
std::shared_ptr<pipeline::Dependency>
create_plan_dep,
- std::shared_ptr<pipeline::Dependency>
put_block_dep,
- std::string& label, int64_t& txn_id);
+ std::shared_ptr<pipeline::Dependency>
put_block_dep);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
std::shared_ptr<pipeline::Dependency>
get_block_dep);
@@ -211,8 +210,7 @@ public:
int be_exe_version,
std::shared_ptr<MemTrackerLimiter>
mem_tracker,
std::shared_ptr<pipeline::Dependency>
create_plan_dep,
- std::shared_ptr<pipeline::Dependency>
put_block_dep,
- std::string& label, int64_t& txn_id);
+ std::shared_ptr<pipeline::Dependency>
put_block_dep);
void remove_load_id(int64_t table_id, const UniqueId& load_id);
std::promise<Status> debug_promise;
std::future<Status> debug_future = debug_promise.get_future();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 62c823fdec4..b4a957e72fc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -432,8 +432,9 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
// This statement is only used in the group_commit mode
if (context.getConnectContext().isGroupCommit()) {
sink = new GroupCommitBlockSink(olapTableSink.getTargetTable(),
olapTuple,
- olapTableSink.getTargetTable().getPartitionIds(),
olapTableSink.isSingleReplicaLoad(),
- context.getSessionVariable().getGroupCommit(), 0);
+ olapTableSink.getTargetTable().getPartitionIds(),
olapTableSink.isSingleReplicaLoad(),
+ context.getSessionVariable().getGroupCommit(),
+
ConnectContext.get().getSessionVariable().getEnableInsertStrict() ? 0 : 1);
} else {
sink = new OlapTableSink(
olapTableSink.getTargetTable(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 7a1280092b0..df6fcd69fcb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -146,7 +146,9 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
if (cte.isPresent()) {
this.logicalQuery = ((LogicalPlan)
cte.get().withChildren(logicalQuery));
}
- if (this.logicalQuery instanceof UnboundTableSink) {
+ boolean isOverwrite = insertCtx.isPresent() && insertCtx.get()
instanceof OlapInsertCommandContext
+ && ((OlapInsertCommandContext)
insertCtx.get()).isOverwrite();
+ if (this.logicalQuery instanceof UnboundTableSink && !isOverwrite)
{
OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx,
targetTableIf,
(UnboundTableSink<?>) this.logicalQuery);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
index 5091bae17d1..984e8b0c8ca 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
@@ -17,12 +17,14 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
+import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
@@ -60,6 +62,7 @@ public class OlapGroupCommitInsertExecutor extends
OlapInsertExecutor {
&& ((OlapTable)
table).getTableProperty().getUseSchemaLightChange()
&& !((OlapTable)
table).getQualifiedDbName().equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME)
&& tableSink.getPartitions().isEmpty()
+ && (!(table instanceof MTMV) ||
MTMVUtil.allowModifyMTMVData(ctx))
&& (tableSink.child() instanceof OneRowRelation ||
tableSink.child() instanceof LogicalUnion));
}
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index 9105a8dc8db..e1cb943a9ab 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -219,7 +219,9 @@ suite("insert_group_commit_into") {
// 7. insert into and add rollup
group_commit_insert """ insert into ${table}(name, id)
values('c', 3); """, 1
group_commit_insert """ insert into ${table}(id) values(4);
""", 1
+ sql "set enable_insert_strict=false"
group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50),(101, 'a', 100); """, 2
+ sql "set enable_insert_strict=true"
sql """ alter table ${table} ADD ROLLUP r1(name, score); """
group_commit_insert_with_retry """ insert into ${table}(id,
name) values(2, 'b'); """, 1
group_commit_insert_with_retry """ insert into ${table}(id)
values(6); """, 1
diff --git a/regression-test/suites/insert_p0/txn_insert.groovy
b/regression-test/suites/insert_p0/txn_insert.groovy
index 088e1134900..4c42c3b12c4 100644
--- a/regression-test/suites/insert_p0/txn_insert.groovy
+++ b/regression-test/suites/insert_p0/txn_insert.groovy
@@ -495,7 +495,7 @@ suite("txn_insert") {
assertFalse(true, "should not reach here")
} catch (Exception e) {
logger.info("exception: " + e)
- assertTrue(e.getMessage().contains("The transaction is
already timeout"))
+ assertTrue(e.getMessage().contains("The transaction is
already timeout") || e.getMessage().contains("Execute timeout"))
} finally {
try {
sql "rollback"
diff --git
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index e12a1f2f01b..0f823f1d4ef 100644
---
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -194,7 +194,7 @@ suite("test_group_commit_stream_load") {
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
- checkStreamLoadResult(exception, result, 6, 3, 2, 1)
+ checkStreamLoadResult(exception, result, 6, 2, 3, 1)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]