This is an automated email from the ASF dual-hosted git repository.
liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1d1846591f7 [fix](insert overwrite) delay overwrite partition routing
until incremental open (#63209)
1d1846591f7 is described below
commit 1d1846591f7b4367f413fb890b82429cf80a2680
Author: hui lai <[email protected]>
AuthorDate: Thu May 14 23:38:09 2026 +0800
[fix](insert overwrite) delay overwrite partition routing until incremental
open (#63209)
### What problem does this PR solve?
Problem Summary:
In auto-detect insert overwrite, BE sender could publish newly replaced
temporary partitions to local row routing before incremental open
finished on target BEs.
The race was:
1. One sender calls FE `replacePartition` and receives new temporary
partition/tablet metadata.
2. The sender records the new partition id and replaces local
`_vpartition` routing first.
3. Another concurrent batch can then route rows to the new tablet.
4. The first sender has not finished incremental open yet, so the target
BE may not have created the delta writer for that tablet.
5. The target BE returns `unknown tablet to append data`.
This PR makes the sender finish `_create_partition_callback`, including
incremental open/open_wait, before publishing the new partition/tablet
to local routing and marking the new partition as handled.
---
be/src/exec/sink/vrow_distribution.cpp | 19 ++++++------
be/test/exec/sink/vrow_distribution_test.cpp | 45 ++++++++++++++++++++++++----
2 files changed, 50 insertions(+), 14 deletions(-)
diff --git a/be/src/exec/sink/vrow_distribution.cpp
b/be/src/exec/sink/vrow_distribution.cpp
index 40a20c08d5a..891c4b394cb 100644
--- a/be/src/exec/sink/vrow_distribution.cpp
+++ b/be/src/exec/sink/vrow_distribution.cpp
@@ -145,13 +145,13 @@ Status VRowDistribution::automatic_create_partition() {
Status status(Status::create(result.status));
VLOG_NOTICE << "automatic partition rpc end response " << result;
if (result.status.status_code == TStatusCode::OK) {
+ RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
// add new created partitions
RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions));
for (const auto& part : result.partitions) {
_new_partition_ids.insert(part.id);
VLOG_TRACE << "record new id: " << part.id;
}
- RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
}
// Record this request's elapsed time
@@ -163,13 +163,13 @@ Status VRowDistribution::automatic_create_partition() {
}
// for reuse the same create callback of create-partition
-static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult&
arg) {
+static TCreatePartitionResult cast_as_create_result(const
TReplacePartitionResult& arg) {
TCreatePartitionResult result;
result.status = arg.status;
- result.nodes = std::move(arg.nodes);
- result.partitions = std::move(arg.partitions);
- result.tablets = std::move(arg.tablets);
- result.slave_tablets = std::move(arg.slave_tablets);
+ result.nodes = arg.nodes;
+ result.partitions = arg.partitions;
+ result.tablets = arg.tablets;
+ result.slave_tablets = arg.slave_tablets;
return result;
}
@@ -246,6 +246,10 @@ Status VRowDistribution::_replace_overwriting_partition() {
Status status(Status::create(result.status));
VLOG_NOTICE << "auto detect replace partition result: " << result;
if (result.status.status_code == TStatusCode::OK) {
+ // Reuse the function as the args' structure are same. It adds
nodes/locations
+ // and waits for incremental_open before the new tablets become
routable.
+ auto result_as_create = cast_as_create_result(result);
+ RETURN_IF_ERROR(_create_partition_callback(_caller,
&result_as_create));
// record new partitions
for (const auto& part : result.partitions) {
_new_partition_ids.insert(part.id);
@@ -253,9 +257,6 @@ Status VRowDistribution::_replace_overwriting_partition() {
}
// replace data in _partitions
RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids,
result.partitions));
- // reuse the function as the args' structure are same. it add
nodes/locations and incremental_open
- auto result_as_create = cast_as_create_result(result);
- RETURN_IF_ERROR(_create_partition_callback(_caller,
&result_as_create));
}
return status;
diff --git a/be/test/exec/sink/vrow_distribution_test.cpp
b/be/test/exec/sink/vrow_distribution_test.cpp
index 9c8e8798779..a5d862f370b 100644
--- a/be/test/exec/sink/vrow_distribution_test.cpp
+++ b/be/test/exec/sink/vrow_distribution_test.cpp
@@ -58,10 +58,16 @@ Status _noop_create_partition_callback(void*,
TCreatePartitionResult*) {
return Status::OK();
}
+Status _delegated_create_partition_callback(void* caller,
TCreatePartitionResult* result) {
+ return
(*static_cast<std::function<Status(TCreatePartitionResult*)>*>(caller))(result);
+}
+
std::unique_ptr<VRowDistributionHarness> _build_vrow_distribution_harness(
OperatorContext& ctx, const TOlapTableSchemaParam& tschema,
const TOlapTablePartitionParam& tpartition, const
TOlapTableLocationParam& tlocation,
- TTupleId tablet_sink_tuple_id, int64_t txn_id) {
+ TTupleId tablet_sink_tuple_id, int64_t txn_id,
+ CreatePartitionCallback create_partition_callback =
&_noop_create_partition_callback,
+ void* caller = nullptr) {
auto h = std::make_unique<VRowDistributionHarness>();
h->schema = std::make_shared<OlapTableSchemaParam>();
@@ -91,9 +97,9 @@ std::unique_ptr<VRowDistributionHarness>
_build_vrow_distribution_harness(
rctx.location = h->location.get();
rctx.vec_output_expr_ctxs = &h->output_expr_ctxs;
rctx.schema = h->schema;
- rctx.caller = nullptr;
+ rctx.caller = caller;
rctx.write_single_replica = false;
- rctx.create_partition_callback = &_noop_create_partition_callback;
+ rctx.create_partition_callback = create_partition_callback;
h->row_distribution.init(rctx);
st = h->row_distribution.open(h->output_row_desc.get());
@@ -357,8 +363,36 @@ TEST(VRowDistributionTest,
ReplaceOverwritingPartitionInjectedRequestDedupAndRep
tpartition.__set_overwrite_group_id(123);
auto tlocation = sink_test_utils::build_location_param();
- auto h = _build_vrow_distribution_harness(ctx, tschema, tpartition,
tlocation,
- tablet_sink_tuple_id, txn_id);
+ VRowDistributionHarness* harness = nullptr;
+ bool create_callback_called = false;
+ std::function<Status(TCreatePartitionResult*)> create_callback =
+ [&](TCreatePartitionResult* result) {
+ create_callback_called = true;
+ EXPECT_EQ(result->partitions.size(), 2);
+
+ auto old_partition_block =
ColumnHelper::create_block<DataTypeInt32>({1});
+ VOlapTablePartition* old_part = nullptr;
+ harness->vpartition->find_partition(&old_partition_block, 0,
old_part);
+ if (old_part == nullptr) {
+ return Status::InternalError("old partition is not found");
+ }
+ EXPECT_EQ(old_part->id, 1);
+
+ auto another_old_partition_block =
ColumnHelper::create_block<DataTypeInt32>({25});
+ VOlapTablePartition* another_old_part = nullptr;
+
harness->vpartition->find_partition(&another_old_partition_block, 0,
+ another_old_part);
+ if (another_old_part == nullptr) {
+ return Status::InternalError("another old partition is not
found");
+ }
+ EXPECT_EQ(another_old_part->id, 2);
+ return Status::OK();
+ };
+
+ auto h = _build_vrow_distribution_harness(
+ ctx, tschema, tpartition, tlocation, tablet_sink_tuple_id, txn_id,
+ &_delegated_create_partition_callback, &create_callback);
+ harness = h.get();
doris::config::enable_debug_points = true;
doris::DebugPoints::instance()->clear();
@@ -423,6 +457,7 @@ TEST(VRowDistributionTest,
ReplaceOverwritingPartitionInjectedRequestDedupAndRep
row_part_tablet_ids, rows_stat_val);
EXPECT_TRUE(st.ok()) << st.to_string();
EXPECT_EQ(injected_times, 1);
+ EXPECT_TRUE(create_callback_called);
ASSERT_EQ(row_part_tablet_ids.size(), 1);
ASSERT_EQ(row_part_tablet_ids[0].partition_ids.size(), 2);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]