This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 0b20a78ea8c branch-4.1: [fix](insert overwrite) delay overwrite
partition routing until incremental open #63209 (#63264)
0b20a78ea8c is described below
commit 0b20a78ea8c492b13716992a39e3a44c1c522c93
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 21 15:42:01 2026 +0800
branch-4.1: [fix](insert overwrite) delay overwrite partition routing until
incremental open #63209 (#63264)
Cherry-picked from #63209
Co-authored-by: hui lai <[email protected]>
---
be/src/exec/sink/vrow_distribution.cpp | 19 ++++++------
be/test/exec/sink/vrow_distribution_test.cpp | 45 ++++++++++++++++++++++++----
2 files changed, 50 insertions(+), 14 deletions(-)
diff --git a/be/src/exec/sink/vrow_distribution.cpp
b/be/src/exec/sink/vrow_distribution.cpp
index f14f8052969..130f21ad364 100644
--- a/be/src/exec/sink/vrow_distribution.cpp
+++ b/be/src/exec/sink/vrow_distribution.cpp
@@ -141,13 +141,13 @@ Status VRowDistribution::automatic_create_partition() {
Status status(Status::create(result.status));
VLOG_NOTICE << "automatic partition rpc end response " << result;
if (result.status.status_code == TStatusCode::OK) {
+ RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
// add new created partitions
RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions));
for (const auto& part : result.partitions) {
_new_partition_ids.insert(part.id);
VLOG_TRACE << "record new id: " << part.id;
}
- RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
}
// Record this request's elapsed time
@@ -159,13 +159,13 @@ Status VRowDistribution::automatic_create_partition() {
}
// for reuse the same create callback of create-partition
-static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult&
arg) {
+static TCreatePartitionResult cast_as_create_result(const
TReplacePartitionResult& arg) {
TCreatePartitionResult result;
result.status = arg.status;
- result.nodes = std::move(arg.nodes);
- result.partitions = std::move(arg.partitions);
- result.tablets = std::move(arg.tablets);
- result.slave_tablets = std::move(arg.slave_tablets);
+ result.nodes = arg.nodes;
+ result.partitions = arg.partitions;
+ result.tablets = arg.tablets;
+ result.slave_tablets = arg.slave_tablets;
return result;
}
@@ -237,6 +237,10 @@ Status VRowDistribution::_replace_overwriting_partition() {
Status status(Status::create(result.status));
VLOG_NOTICE << "auto detect replace partition result: " << result;
if (result.status.status_code == TStatusCode::OK) {
+ // Reuse the function as the args' structure are same. It adds
nodes/locations
+ // and waits for incremental_open before the new tablets become
routable.
+ auto result_as_create = cast_as_create_result(result);
+ RETURN_IF_ERROR(_create_partition_callback(_caller,
&result_as_create));
// record new partitions
for (const auto& part : result.partitions) {
_new_partition_ids.insert(part.id);
@@ -244,9 +248,6 @@ Status VRowDistribution::_replace_overwriting_partition() {
}
// replace data in _partitions
RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids,
result.partitions));
- // reuse the function as the args' structure are same. it add
nodes/locations and incremental_open
- auto result_as_create = cast_as_create_result(result);
- RETURN_IF_ERROR(_create_partition_callback(_caller,
&result_as_create));
}
return status;
diff --git a/be/test/exec/sink/vrow_distribution_test.cpp
b/be/test/exec/sink/vrow_distribution_test.cpp
index 36765eca44f..82c780a4e2c 100644
--- a/be/test/exec/sink/vrow_distribution_test.cpp
+++ b/be/test/exec/sink/vrow_distribution_test.cpp
@@ -58,10 +58,16 @@ Status _noop_create_partition_callback(void*,
TCreatePartitionResult*) {
return Status::OK();
}
+Status _delegated_create_partition_callback(void* caller,
TCreatePartitionResult* result) {
+ return
(*static_cast<std::function<Status(TCreatePartitionResult*)>*>(caller))(result);
+}
+
std::unique_ptr<VRowDistributionHarness> _build_vrow_distribution_harness(
OperatorContext& ctx, const TOlapTableSchemaParam& tschema,
const TOlapTablePartitionParam& tpartition, const
TOlapTableLocationParam& tlocation,
- TTupleId tablet_sink_tuple_id, int64_t txn_id) {
+ TTupleId tablet_sink_tuple_id, int64_t txn_id,
+ CreatePartitionCallback create_partition_callback =
&_noop_create_partition_callback,
+ void* caller = nullptr) {
auto h = std::make_unique<VRowDistributionHarness>();
h->schema = std::make_shared<OlapTableSchemaParam>();
@@ -92,9 +98,9 @@ std::unique_ptr<VRowDistributionHarness>
_build_vrow_distribution_harness(
rctx.location = h->location.get();
rctx.vec_output_expr_ctxs = &h->output_expr_ctxs;
rctx.schema = h->schema;
- rctx.caller = nullptr;
+ rctx.caller = caller;
rctx.write_single_replica = false;
- rctx.create_partition_callback = &_noop_create_partition_callback;
+ rctx.create_partition_callback = create_partition_callback;
h->row_distribution.init(rctx);
st = h->row_distribution.open(h->output_row_desc.get());
@@ -358,8 +364,36 @@ TEST(VRowDistributionTest,
ReplaceOverwritingPartitionInjectedRequestDedupAndRep
tpartition.__set_overwrite_group_id(123);
auto tlocation = sink_test_utils::build_location_param();
- auto h = _build_vrow_distribution_harness(ctx, tschema, tpartition,
tlocation,
- tablet_sink_tuple_id, txn_id);
+ VRowDistributionHarness* harness = nullptr;
+ bool create_callback_called = false;
+ std::function<Status(TCreatePartitionResult*)> create_callback =
+ [&](TCreatePartitionResult* result) {
+ create_callback_called = true;
+ EXPECT_EQ(result->partitions.size(), 2);
+
+ auto old_partition_block =
ColumnHelper::create_block<DataTypeInt32>({1});
+ VOlapTablePartition* old_part = nullptr;
+ harness->vpartition->find_partition(&old_partition_block, 0,
old_part);
+ if (old_part == nullptr) {
+ return Status::InternalError("old partition is not found");
+ }
+ EXPECT_EQ(old_part->id, 1);
+
+ auto another_old_partition_block =
ColumnHelper::create_block<DataTypeInt32>({25});
+ VOlapTablePartition* another_old_part = nullptr;
+
harness->vpartition->find_partition(&another_old_partition_block, 0,
+ another_old_part);
+ if (another_old_part == nullptr) {
+ return Status::InternalError("another old partition is not
found");
+ }
+ EXPECT_EQ(another_old_part->id, 2);
+ return Status::OK();
+ };
+
+ auto h = _build_vrow_distribution_harness(
+ ctx, tschema, tpartition, tlocation, tablet_sink_tuple_id, txn_id,
+ &_delegated_create_partition_callback, &create_callback);
+ harness = h.get();
doris::config::enable_debug_points = true;
doris::DebugPoints::instance()->clear();
@@ -424,6 +458,7 @@ TEST(VRowDistributionTest,
ReplaceOverwritingPartitionInjectedRequestDedupAndRep
row_part_tablet_ids, rows_stat_val);
EXPECT_TRUE(st.ok()) << st.to_string();
EXPECT_EQ(injected_times, 1);
+ EXPECT_TRUE(create_callback_called);
ASSERT_EQ(row_part_tablet_ids.size(), 1);
ASSERT_EQ(row_part_tablet_ids[0].partition_ids.size(), 2);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]